#!/usr/bin/env python3 """ mock_named_value_publisher.py ───────────────────────────── SymbyTech ROV Autonomy — Task 11: Mock NAMED_VALUE Publisher Runs a WebSocket server that streams ROV state variables directly into the Cockpit data lake via the "Generic WebSocket Connections" feature introduced in Cockpit v1.18.0. Data path: This script (WebSocket server, port 8765) ← Cockpit connects as client → Cockpit reads variable=value messages → Cockpit data lake populated directly → W1 System Health Indicator (reads rov_failsa) → W2 Mission Status (reads rov_ms, rov_mp) Message format — one variable per line: rov_failsa=0 (integer: 0=GREEN, 1=AMBER, 2=RED) rov_ms=0 (integer: 0=IDLE, 1=RUNNING, 2=PAUSED, 3=COMPLETE, 4=ABORTED) rov_mp=0.0 (float: 0.0 to 1.0 mission progress) Cockpit configuration (one-time setup): Menu > Settings > Generic WebSocket Connections Add: ws://{{ vehicle-address }}:8765 Usage (run from inside the BlueOS VM): python3 mock_named_value_publisher.py # auto-cycle mode python3 mock_named_value_publisher.py --manual # keyboard control python3 mock_named_value_publisher.py --port 8765 Install dependency (once): pip3 install websockets --break-system-packages Repo location: rov-autonomy / tools / mock_named_value_publisher.py """ import argparse import asyncio import sys import threading import time # --- Dependency check --------------------------------------------------------- try: import websockets from websockets.exceptions import ConnectionClosed except ImportError: print() print(" ERROR: websockets is not installed.") print(" Fix: pip3 install websockets --break-system-packages") print() sys.exit(1) # ============================================================================== # CONFIGURATION # ============================================================================== # --- Variable name constants -------------------------------------------------- # These are the Cockpit data lake keys that W1 and W2 read from. # MUST match the VARIABLE / NAME_STATE / NAME_PROG constants in each widget. NAME_FAILSAFE = "rov_failsa" # W1 polls this (0=GREEN, 1=AMBER, 2=RED) NAME_MS_STATE = "rov_ms" # W2 polls this (0=IDLE … 4=ABORTED) NAME_MS_PROG = "rov_mp" # W2 polls this (0.0–1.0 progress) # --- Failsafe state values ---------------------------------------------------- STATE_GREEN = 0 STATE_AMBER = 1 STATE_RED = 2 # --- Mission state values ----------------------------------------------------- MISSION_IDLE = 0 MISSION_RUNNING = 1 MISSION_PAUSED = 2 MISSION_COMPLETE = 3 MISSION_ABORTED = 4 # --- Auto-cycle timing (seconds per state) ------------------------------------ CYCLE_HOLD = { STATE_GREEN: 5.0, STATE_AMBER: 4.0, STATE_RED: 4.0, } # --- Publish rate ------------------------------------------------------------- # How often to send updated values to all connected Cockpit clients. PUBLISH_INTERVAL = 0.25 # 4 Hz — well above Cockpit's 500 ms poll rate # --- WebSocket server port ---------------------------------------------------- DEFAULT_PORT = 8765 # ============================================================================== # CONSOLE COLOURS # ============================================================================== class C: GREEN = "\033[92m" AMBER = "\033[93m" RED = "\033[91m" BOLD = "\033[1m" RESET = "\033[0m" STATE_COLOUR = {STATE_GREEN: C.GREEN, STATE_AMBER: C.AMBER, STATE_RED: C.RED} FAILSAFE_LABEL = { STATE_GREEN: "GREEN - Systems nominal", STATE_AMBER: "AMBER - Parameter degraded", STATE_RED: "RED - Critical failure", } MISSION_LABEL = { MISSION_IDLE: "IDLE", MISSION_RUNNING: "RUNNING", MISSION_PAUSED: "PAUSED", MISSION_COMPLETE: "COMPLETE", MISSION_ABORTED: "ABORTED", } # ============================================================================== # SHARED STATE (thread-safe via lock) # ============================================================================== _lock = threading.Lock() _failsafe_state = STATE_GREEN _mission_state = MISSION_IDLE _mission_progress = 0.0 def set_failsafe(state: int): """Set the failsafe state from any thread.""" global _failsafe_state with _lock: _failsafe_state = state def set_mission(state: int, progress: float = 0.0): """Set the mission state and progress from any thread.""" global _mission_state, _mission_progress with _lock: _mission_state = state _mission_progress = max(0.0, min(1.0, progress)) def snapshot(): """Read current state atomically.""" with _lock: return _failsafe_state, _mission_state, _mission_progress # ============================================================================== # WEBSOCKET SERVER # ============================================================================== # Registry of all currently connected Cockpit clients _clients: set = set() _clients_lock = asyncio.Lock() async def handler(websocket): """ Handle one Cockpit client connection. Cockpit connects to us as a client. On connect we send the current state immediately so the widget shows something right away, then we rely on the publish loop to keep values current. """ addr = websocket.remote_address print(f" [+] Cockpit connected from {addr}") async with _clients_lock: _clients.add(websocket) try: # Send current state immediately on connect so widgets light up fast fs, ms, mp = snapshot() await websocket.send(f"{NAME_FAILSAFE}={fs}") await websocket.send(f"{NAME_MS_STATE}={ms}") await websocket.send(f"{NAME_MS_PROG}={mp:.3f}") # Hold the connection open — the publish loop does the heavy lifting await websocket.wait_closed() except ConnectionClosed: pass # Normal disconnect — not an error finally: async with _clients_lock: _clients.discard(websocket) print(f" [-] Cockpit disconnected from {addr}") async def publish_loop(): """ Broadcast current state to all connected clients every PUBLISH_INTERVAL. Each message is variableName=value on a separate send call. Cockpit maps each message to a data lake variable by the key before the =. """ while True: await asyncio.sleep(PUBLISH_INTERVAL) fs, ms, mp = snapshot() messages = [ f"{NAME_FAILSAFE}={fs}", f"{NAME_MS_STATE}={ms}", f"{NAME_MS_PROG}={mp:.3f}", ] # Snapshot the client set to avoid holding the lock during sends async with _clients_lock: current_clients = set(_clients) for ws in current_clients: try: for msg in messages: await ws.send(msg) except ConnectionClosed: pass # Will be cleaned up in the handler # ============================================================================== # AUTO-CYCLE MODE # ============================================================================== def cycle_thread(): """ Background thread that cycles the failsafe state: GREEN (5s) -> AMBER (4s) -> RED (4s) -> repeat. A matching mission state cycle runs in lock-step. """ failsafe_seq = [STATE_GREEN, STATE_AMBER, STATE_RED] mission_seq = [ (MISSION_IDLE, 0.00), # GREEN -> IDLE (MISSION_RUNNING, 0.50), # AMBER -> RUNNING at 50% (MISSION_PAUSED, 0.50), # RED -> PAUSED at 50% ] step = 0 while True: fs = failsafe_seq[step % 3] ms, mp = mission_seq[step % 3] hold = CYCLE_HOLD[fs] set_failsafe(fs) set_mission(ms, mp) colour = STATE_COLOUR[fs] print( f" {colour}{C.BOLD}{FAILSAFE_LABEL[fs]:<28}{C.RESET}" f" mission={MISSION_LABEL[ms]:<10}" f" progress={mp:.2f}" f" (hold {hold:.0f}s)" ) time.sleep(hold) step += 1 # ============================================================================== # MANUAL MODE (Linux/Mac — uses termios raw mode) # ============================================================================== def manual_thread(): """ Keyboard-driven control. Keys: 0 / g GREEN 1 / a AMBER 2 / r RED m cycle mission state q quit """ import termios import tty def getch(): fd = sys.stdin.fileno() old = termios.tcgetattr(fd) try: tty.setraw(fd) return sys.stdin.read(1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old) print("\n Mode: MANUAL KEYBOARD") print(" 0/g -> GREEN 1/a -> AMBER 2/r -> RED m -> mission q -> quit\n") # Start in GREEN / RUNNING so widgets are live immediately set_failsafe(STATE_GREEN) set_mission(MISSION_RUNNING, 0.30) print(f" Initial: {C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}") mission_cycle = [MISSION_IDLE, MISSION_RUNNING, MISSION_PAUSED] mission_idx = 1 while True: ch = getch() if ch in ("q", "Q", "\x03"): print("\n Quit.") # Signal the event loop to stop asyncio.get_event_loop().call_soon_threadsafe( asyncio.get_event_loop().stop ) break elif ch in ("0", "g", "G"): set_failsafe(STATE_GREEN) print(f" -> {C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}") elif ch in ("1", "a", "A"): set_failsafe(STATE_AMBER) print(f" -> {C.AMBER}{C.BOLD}{FAILSAFE_LABEL[STATE_AMBER]}{C.RESET}") elif ch in ("2", "r", "R"): set_failsafe(STATE_RED) print(f" -> {C.RED}{C.BOLD}{FAILSAFE_LABEL[STATE_RED]}{C.RESET}") elif ch in ("m", "M"): mission_idx = (mission_idx + 1) % 3 ms = mission_cycle[mission_idx] mp = 0.50 if ms == MISSION_RUNNING else 0.0 set_mission(ms, mp) print(f" -> mission={MISSION_LABEL[ms]} progress={mp:.2f}") # ============================================================================== # ARGUMENT PARSING # ============================================================================== def parse_args(): p = argparse.ArgumentParser( description="Task 11 — Mock NAMED_VALUE publisher via WebSocket." ) p.add_argument( "--port", type=int, default=DEFAULT_PORT, help=f"WebSocket port to listen on (default: {DEFAULT_PORT})." ) p.add_argument( "--manual", action="store_true", help="Keyboard control mode (Linux/Mac only). Default: auto-cycle." ) return p.parse_args() # ============================================================================== # MAIN # ============================================================================== async def main(port: int, manual: bool): # Start the control thread (cycle or manual) as a daemon if manual: t = threading.Thread(target=manual_thread, daemon=True) else: print("\n Mode: AUTO-CYCLE (Ctrl+C to stop)\n") t = threading.Thread(target=cycle_thread, daemon=True) t.start() # Run the WebSocket server and publish loop concurrently async with websockets.serve(handler, "0.0.0.0", port): print(f" WebSocket server listening on ws://0.0.0.0:{port}") print(f" In Cockpit: Settings > Generic WebSocket Connections") print(f" Add: ws://{{{{ vehicle-address }}}}:{port}\n") await publish_loop() if __name__ == "__main__": args = parse_args() print() print(f" {C.BOLD}SymbyTech ROV - Mock NAMED_VALUE Publisher{C.RESET}") print(f" Task 11 - End-to-end data path test (Cockpit v1.18.0 WebSocket)") print() print(f" Variables streamed:") print(f" {NAME_FAILSAFE} -> W1 System Health Indicator") print(f" {NAME_MS_STATE} -> W2 Mission Status") print(f" {NAME_MS_PROG} -> W2 Mission Progress") print() try: asyncio.run(main(args.port, args.manual)) except KeyboardInterrupt: print("\n Ctrl+C - stopped.") print()