#!/usr/bin/env python3 """ mock_named_value_publisher.py ───────────────────────────── SymbyTech ROV Autonomy — Task 11: Mock NAMED_VALUE Publisher Injects fake NAMED_VALUE_INT / NAMED_VALUE_FLOAT messages into the BlueOS MAVLink bus so Cockpit widgets (W1 System Health, W2 Mission Status) show live GREEN / AMBER / RED states without real ROS2 hardware. Full data path this script exercises: This script (pymavlink udpout) → BlueOS mavlink-router (VM port 14550) → mavlink2rest (VM port 6040) → Cockpit data lake (window.cockpit.getDataLakeValue) → W1 System Health Indicator → W2 Mission Status ════════════════════════════════════════════════════════════════════════ ⚠ IMPORTANT — MAVLink NAMED_VALUE 10-char name limit ════════════════════════════════════════════════════════════════════════ The NAMED_VALUE_INT / NAMED_VALUE_FLOAT "name" field is exactly 10 bytes (null-terminated, per MAVLink spec). pymavlink silently truncates any string longer than 10 chars when it encodes the packet. Consequence: "rov_failsafe" (12 chars) → truncated → "rov_failsa" "rov_mission_state" (17 chars) → truncated → "rov_missio" "rov_mission_progress"(20 chars) → truncated → "rov_missio" (CLASH!) This script uses short, collision-free names defined in the NAME_* constants below. Before wiring the real bridge node, verify the actual keys that appear in the Cockpit data lake by loading W0 (Data Lake Inspector), then update each widget's VARIABLE / NAME constant to match. ════════════════════════════════════════════════════════════════════════ Usage ════════════════════════════════════════════════════════════════════════ # Install dependency (once) pip install pymavlink # Auto-cycle through GREEN → AMBER → RED (default) python3 mock_named_value_publisher.py # Manual keyboard control (0/g = GREEN, 1/a = AMBER, 2/r = RED) python3 mock_named_value_publisher.py --mode manual # Override the VM host (default: NAT IP 192.168.122.89) python3 mock_named_value_publisher.py --host 100.84.141.120 # Tailscale # Run on the SymbyTech server (SSH in first) — recommended # The server can reach the VM via NAT without Tailscale. ════════════════════════════════════════════════════════════════════════ Repo location ════════════════════════════════════════════════════════════════════════ rov-autonomy / tools / mock_named_value_publisher.py """ import argparse import sys import time import threading # ─── Early import check ─────────────────────────────────────────────────────── # Fail fast with a friendly message if pymavlink isn't installed. try: from pymavlink import mavutil from pymavlink.dialects.v20 import ardupilotmega as mavlink2 except ImportError: print() print(" ERROR: pymavlink is not installed.") print(" Fix: pip install pymavlink") print() sys.exit(1) # ══════════════════════════════════════════════════════════════════════════════ # CONFIGURATION — edit these to match your environment # ══════════════════════════════════════════════════════════════════════════════ # ─── MAVLink name constants ─────────────────────────────────────────────────── # These MUST be ≤10 chars. Update each corresponding widget VARIABLE once you # have confirmed the actual data lake key via W0 (Data Lake Inspector). # # Current widget VARIABLE constants (may need updating after W0 inspection): # W1: VARIABLE = 'rov_failsafe' → update to NAME_FAILSAFE below # W2: NAME_STATE = 'rov_mission_state' → update to NAME_MS_STATE below # W2: NAME_PROG = 'rov_mission_progress' → update to NAME_MS_PROG below NAME_FAILSAFE = "rov_failsa" # 10 chars — truncation of "rov_failsafe" NAME_MS_STATE = "rov_ms" # 6 chars — short form of "rov_mission_state" NAME_MS_PROG = "rov_mp" # 6 chars — short form of "rov_mission_progress" # ─── Failsafe state values — MUST match W1 widget applyState() branches ────── STATE_GREEN = 0 # Systems nominal STATE_AMBER = 1 # Parameter degraded STATE_RED = 2 # Critical failure # ─── Mission state values — MUST match W2 widget state labels ───────────────── MISSION_IDLE = 0 MISSION_RUNNING = 1 MISSION_PAUSED = 2 MISSION_COMPLETE = 3 MISSION_ABORTED = 4 # ─── Auto-cycle timing ──────────────────────────────────────────────────────── # How long (seconds) to hold each failsafe state before advancing in cycle mode CYCLE_HOLD_SECONDS = { STATE_GREEN: 5.0, STATE_AMBER: 4.0, STATE_RED: 4.0, } # How often to re-send all NAMED_VALUE messages (seconds). # Cockpit widgets poll at 500 ms; re-sending at 250 ms gives two updates per # widget poll cycle — any dropped UDP packet is covered by the next one. PUBLISH_INTERVAL = 0.25 # ─── MAVLink IDs for this script ───────────────────────────────────────────── # We impersonate a lightweight GCS node. System 255 is conventional for GCS; # component 190 is unused in ArduSub and chosen to avoid conflicts. OUR_SYSID = 255 OUR_COMPID = 190 # ─── Heartbeat interval ─────────────────────────────────────────────────────── # mavlink-router discovers endpoints by seeing heartbeats from them. # Without a heartbeat, the router may not route our NAMED_VALUE packets. HEARTBEAT_INTERVAL = 1.0 # ══════════════════════════════════════════════════════════════════════════════ # CONSOLE COLOURS — ANSI escape codes, safe on Linux/Mac # ══════════════════════════════════════════════════════════════════════════════ class C: """Thin namespace for ANSI colour codes.""" GREEN = "\033[92m" AMBER = "\033[93m" RED = "\033[91m" DIM = "\033[2m" BOLD = "\033[1m" RESET = "\033[0m" # Per-state colour lookup — used for console output only STATE_COLOUR = {STATE_GREEN: C.GREEN, STATE_AMBER: C.AMBER, STATE_RED: C.RED} # Human-readable labels for console output FAILSAFE_LABEL = { STATE_GREEN: "GREEN — Systems nominal", STATE_AMBER: "AMBER — Parameter degraded", STATE_RED: "RED — Critical failure", } MISSION_LABEL = { MISSION_IDLE: "IDLE", MISSION_RUNNING: "RUNNING", MISSION_PAUSED: "PAUSED", MISSION_COMPLETE: "COMPLETE", MISSION_ABORTED: "ABORTED", } # ══════════════════════════════════════════════════════════════════════════════ # MockPublisher # ══════════════════════════════════════════════════════════════════════════════ class MockPublisher: """ Manages a pymavlink UDP connection to the BlueOS VM and runs two background threads: 1. Heartbeat thread — sends MAVLink HEARTBEAT at 1 Hz so that mavlink-router registers this script as a known endpoint and routes its messages. 2. Publish thread — sends NAMED_VALUE_INT / NAMED_VALUE_FLOAT messages at PUBLISH_INTERVAL Hz so the Cockpit data lake stays current. The main thread controls published values via the set_* methods, which are protected by a threading.Lock so there are no race conditions. """ def __init__(self, host: str, port: int): self.host = host self.port = port self.connection = None # set in connect() # ── Shared state (main thread writes, publish thread reads) ── self._lock = threading.Lock() self._failsafe_state = STATE_GREEN self._mission_state = MISSION_IDLE self._mission_progress = 0.0 # float, 0.0–1.0 # ── Thread control ── self._running = False self._heartbeat_thread = None self._publish_thread = None # ─── Connection ─────────────────────────────────────────────────────────── def connect(self): """ Open a udpout connection to BlueOS mavlink-router. 'udpout' means pymavlink sends UDP datagrams to host:port without binding a local receive socket. That is exactly what we want — mavlink-router sees the datagrams arrive on its GCS port (14550) and routes them to all connected endpoints (including mavlink2rest). """ url = f"udpout:{self.host}:{self.port}" print(f" Connecting → {url} (sysid={OUR_SYSID} compid={OUR_COMPID})") self.connection = mavutil.mavlink_connection( url, source_system=OUR_SYSID, source_component=OUR_COMPID, ) # Small pause to let the OS open the socket before we send time.sleep(0.3) print(" Socket open.") # ─── Setters (thread-safe) ──────────────────────────────────────────────── def set_failsafe_state(self, state: int): """Set the failsafe state (STATE_GREEN / STATE_AMBER / STATE_RED).""" with self._lock: self._failsafe_state = state def set_mission_state(self, state: int, progress: float = 0.0): """ Set the mission state and optional progress value (0.0–1.0). Progress is clamped to [0.0, 1.0]. """ with self._lock: self._mission_state = state self._mission_progress = max(0.0, min(1.0, progress)) # ─── Snapshot (thread-safe read) ────────────────────────────────────────── def _snapshot(self): """Atomically read current state for publishing.""" with self._lock: return ( self._failsafe_state, self._mission_state, self._mission_progress, ) # ─── MAVLink send helpers ───────────────────────────────────────────────── def _time_boot_ms(self) -> int: """ MAVLink time_boot_ms field. We use wall-clock monotonic time wrapped to 32 bits — good enough for a mock publisher; a real node would use the autopilot's boot time. """ return int(time.monotonic() * 1000) & 0xFFFFFFFF def _send_heartbeat(self): """ Send a MAVLink HEARTBEAT so mavlink-router keeps our endpoint live. Type = GCS (6), autopilot = Generic (0). mavlink-router uses heartbeats to maintain its endpoint routing table. Without this the router may silently drop our NAMED_VALUE packets. """ self.connection.mav.heartbeat_send( mavlink2.MAV_TYPE_GCS, # type: GCS mavlink2.MAV_AUTOPILOT_GENERIC, # autopilot: generic 0, # base_mode 0, # custom_mode mavlink2.MAV_STATE_ACTIVE, # system_status ) def _send_named_values(self): """ Send the three NAMED_VALUE_FLOAT messages that feed W1 and W2. All three use NAMED_VALUE_FLOAT — Cockpit's data lake bridge maps NAMED_VALUE_FLOAT to data lake variables but silently ignores NAMED_VALUE_INT. States (0/1/2) are sent as floats (0.0/1.0/2.0); the widgets cast them back to int with Math.round() when reading. pymavlink packs the name field into exactly 10 bytes. Strings longer than 10 chars are truncated; shorter strings are null-padded. The constants NAME_FAILSAFE / NAME_MS_STATE / NAME_MS_PROG are already ≤10 chars to avoid truncation. """ t = self._time_boot_ms() fs, ms, mp = self._snapshot() # W1 data: failsafe state sent as float (0.0=GREEN, 1.0=AMBER, 2.0=RED) self.connection.mav.named_value_float_send( t, NAME_FAILSAFE.encode("utf-8"), float(fs), ) # W2 data (part 1): mission state as float (0.0=IDLE … 4.0=ABORTED) self.connection.mav.named_value_float_send( t, NAME_MS_STATE.encode("utf-8"), float(ms), ) # W2 data (part 2): mission progress (0.0–1.0) self.connection.mav.named_value_float_send( t, NAME_MS_PROG.encode("utf-8"), mp, ) # ─── Background threads ─────────────────────────────────────────────────── def _heartbeat_loop(self): """Send a heartbeat every HEARTBEAT_INTERVAL seconds.""" while self._running: try: self._send_heartbeat() except Exception as exc: # Don't crash the thread on a transient socket error print(f"\n [heartbeat error] {exc}", file=sys.stderr) time.sleep(HEARTBEAT_INTERVAL) def _publish_loop(self): """Send NAMED_VALUE messages every PUBLISH_INTERVAL seconds.""" while self._running: try: self._send_named_values() except Exception as exc: print(f"\n [publish error] {exc}", file=sys.stderr) time.sleep(PUBLISH_INTERVAL) # ─── Lifecycle ──────────────────────────────────────────────────────────── def start(self): """Start the heartbeat and publish background threads.""" self._running = True self._heartbeat_thread = threading.Thread( target=self._heartbeat_loop, name="heartbeat", daemon=True, ) self._publish_thread = threading.Thread( target=self._publish_loop, name="publish", daemon=True, ) # Send the first heartbeat synchronously before launching the publish # thread — this gives the router a chance to register our endpoint # before NAMED_VALUE packets start arriving. self._send_heartbeat() time.sleep(0.1) self._heartbeat_thread.start() self._publish_thread.start() def stop(self): """Signal threads to stop and wait for them to exit cleanly.""" self._running = False if self._heartbeat_thread: self._heartbeat_thread.join(timeout=2.0) if self._publish_thread: self._publish_thread.join(timeout=2.0) # ══════════════════════════════════════════════════════════════════════════════ # CYCLE MODE — auto-advance through GREEN → AMBER → RED → GREEN # ══════════════════════════════════════════════════════════════════════════════ def run_cycle_mode(publisher: MockPublisher): """ Automatically cycles through the three failsafe states in order. A parallel mission state cycle runs alongside so W2 also shows activity: failsafe GREEN → mission IDLE failsafe AMBER → mission RUNNING (50% progress) failsafe RED → mission PAUSED Hold time for each state is defined in CYCLE_HOLD_SECONDS. Press Ctrl+C to stop. """ print("\n Mode: AUTO-CYCLE (Ctrl+C to stop)\n") # Define the cycle sequences — indices advance in lock-step failsafe_cycle = [STATE_GREEN, STATE_AMBER, STATE_RED] mission_cycle = [ (MISSION_IDLE, 0.00), # matches GREEN (MISSION_RUNNING, 0.50), # matches AMBER — 50% through mission (MISSION_PAUSED, 0.50), # matches RED — paused mid-mission ] step = 0 while True: # Pick current state from cycle sequences fs = failsafe_cycle[step % len(failsafe_cycle)] ms, mp = mission_cycle[step % len(mission_cycle)] colour = STATE_COLOUR[fs] hold = CYCLE_HOLD_SECONDS[fs] # Push new state to publisher (publish thread picks it up within 250 ms) publisher.set_failsafe_state(fs) publisher.set_mission_state(ms, mp) # Console output print( f" {colour}{C.BOLD}{FAILSAFE_LABEL[fs]:<30}{C.RESET}" f" mission={MISSION_LABEL[ms]:<10}" f" progress={mp:.2f}" f" (hold {hold:.0f}s)" ) time.sleep(hold) step += 1 # ══════════════════════════════════════════════════════════════════════════════ # MANUAL MODE — single-key control (Linux/Mac only) # ══════════════════════════════════════════════════════════════════════════════ def _getch_unix(): """ Read a single character from stdin without requiring Enter. Uses termios/tty raw mode — Linux and macOS only. On Windows, use msvcrt.getch() instead (not implemented here). """ import termios import tty fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: tty.setraw(fd) ch = sys.stdin.read(1) finally: # Always restore terminal settings, even on exception termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) return ch def run_manual_mode(publisher: MockPublisher): """ Keyboard-driven control of the failsafe state. Keybindings: 0 / g / G → GREEN (Systems nominal) 1 / a / A → AMBER (Parameter degraded) 2 / r / R → RED (Critical failure) m / M → cycle mission state (IDLE → RUNNING → PAUSED → IDLE) q / Q → quit NOTE: Uses termios raw mode — requires Linux or macOS. On Windows, run in WSL or use cycle mode instead. """ # Check platform if sys.platform == "win32": print() print(" ERROR: manual mode uses termios (Linux/Mac only).") print(" On Windows: run inside WSL, or use --mode cycle instead.") print() sys.exit(1) print("\n Mode: MANUAL KEYBOARD (no Enter needed)") print() print(" 0 / g → GREEN — Systems nominal") print(" 1 / a → AMBER — Parameter degraded") print(" 2 / r → RED — Critical failure") print(" m → cycle mission state") print(" q → quit") print() # Start in GREEN / RUNNING at 30% so W2 immediately shows activity publisher.set_failsafe_state(STATE_GREEN) publisher.set_mission_state(MISSION_RUNNING, 0.30) print(f" Initial: {C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}") print() # Simple mission state toggle cycle for the 'm' key mission_cycle = [MISSION_IDLE, MISSION_RUNNING, MISSION_PAUSED] mission_idx = 1 # start at RUNNING while True: ch = _getch_unix() if ch in ("q", "Q", "\x03"): # q or Ctrl+C print("\n Quit.") break elif ch in ("0", "g", "G"): publisher.set_failsafe_state(STATE_GREEN) print(f" → {C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}") elif ch in ("1", "a", "A"): publisher.set_failsafe_state(STATE_AMBER) print(f" → {C.AMBER}{C.BOLD}{FAILSAFE_LABEL[STATE_AMBER]}{C.RESET}") elif ch in ("2", "r", "R"): publisher.set_failsafe_state(STATE_RED) print(f" → {C.RED}{C.BOLD}{FAILSAFE_LABEL[STATE_RED]}{C.RESET}") elif ch in ("m", "M"): # Advance mission state cycle mission_idx = (mission_idx + 1) % len(mission_cycle) ms = mission_cycle[mission_idx] # Give RUNNING a 50% progress, others 0% mp = 0.50 if ms == MISSION_RUNNING else 0.00 publisher.set_mission_state(ms, mp) print(f" → mission={MISSION_LABEL[ms]} progress={mp:.2f}") else: # Unknown key — print without newline so the display stays clean print(f" [unknown key {repr(ch)}]", end="\r", flush=True) # ══════════════════════════════════════════════════════════════════════════════ # ARGUMENT PARSING # ══════════════════════════════════════════════════════════════════════════════ def parse_args() -> argparse.Namespace: p = argparse.ArgumentParser( description=( "Task 11 — Mock NAMED_VALUE publisher.\n" "Injects fake failsafe states into BlueOS MAVLink bus for end-to-end widget testing." ), formatter_class=argparse.RawDescriptionHelpFormatter, ) p.add_argument( "--host", default="192.168.122.89", metavar="IP", help=( "BlueOS VM IP address. " "Default: 192.168.122.89 (NAT, from SymbyTech server). " "Use 100.84.141.120 for Tailscale access from laptop." ), ) p.add_argument( "--port", type=int, default=14550, metavar="PORT", help="MAVLink GCS UDP port on BlueOS (default: 14550).", ) p.add_argument( "--mode", choices=["cycle", "manual"], default="cycle", help=( "cycle = automatically step through GREEN/AMBER/RED (default). " "manual = keyboard control (Linux/Mac only)." ), ) return p.parse_args() # ══════════════════════════════════════════════════════════════════════════════ # MAIN # ══════════════════════════════════════════════════════════════════════════════ def print_banner(args: argparse.Namespace): """Print startup information so the operator knows what is running.""" print() print(f" {C.BOLD}SymbyTech ROV — Mock NAMED_VALUE Publisher{C.RESET}") print(f" Task 11 — End-to-end data path test") print() print(f" Target: {args.host}:{args.port} (BlueOS mavlink-router)") print(f" Mode: {args.mode}") print() print(" MAVLink messages to be injected (all NAMED_VALUE_FLOAT):") print(f" {C.DIM}{NAME_FAILSAFE:<12}{C.RESET} NAMED_VALUE_FLOAT → W1 System Health Indicator") print(f" {C.DIM}{NAME_MS_STATE:<12}{C.RESET} NAMED_VALUE_FLOAT → W2 Mission Status") print(f" {C.DIM}{NAME_MS_PROG:<12}{C.RESET} NAMED_VALUE_FLOAT → W2 Mission Progress") print() print(" ⚠ 10-char name limit applies. Use W0 (Data Lake Inspector)") print(" to confirm actual data lake keys, then update each widget's") print(" VARIABLE / NAME constant to match before committing to Gitea.") print() print(" Widget constants to check after W0 inspection:") print(f" W1 VARIABLE (currently 'rov_failsafe') → should be '{NAME_FAILSAFE}'") print(f" W2 NAME_STATE (currently 'rov_mission_state') → should be '{NAME_MS_STATE}'") print(f" W2 NAME_PROG (currently 'rov_mission_progress') → should be '{NAME_MS_PROG}'") print() def main(): args = parse_args() print_banner(args) publisher = MockPublisher(args.host, args.port) try: publisher.connect() publisher.start() print(" Threads running. First heartbeat sent.\n") if args.mode == "cycle": run_cycle_mode(publisher) elif args.mode == "manual": run_manual_mode(publisher) except KeyboardInterrupt: print("\n Ctrl+C — stopping.") except Exception as exc: print(f"\n FATAL: {exc}", file=sys.stderr) sys.exit(1) finally: publisher.stop() print(" Publisher stopped. MAVLink socket closed.") print() if __name__ == "__main__": main()