387 lines
12 KiB
Python
387 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
mock_named_value_publisher.py
|
||
─────────────────────────────
|
||
SymbyTech ROV Autonomy — Task 11: Mock NAMED_VALUE Publisher
|
||
|
||
Runs a WebSocket server that streams ROV state variables directly into the
|
||
Cockpit data lake via the "Generic WebSocket Connections" feature introduced
|
||
in Cockpit v1.18.0.
|
||
|
||
Data path:
|
||
This script (WebSocket server, port 8765)
|
||
← Cockpit connects as client
|
||
→ Cockpit reads variable=value messages
|
||
→ Cockpit data lake populated directly
|
||
→ W1 System Health Indicator (reads rov_failsa)
|
||
→ W2 Mission Status (reads rov_ms, rov_mp)
|
||
|
||
Message format — one variable per line:
|
||
rov_failsa=0 (integer: 0=GREEN, 1=AMBER, 2=RED)
|
||
rov_ms=0 (integer: 0=IDLE, 1=RUNNING, 2=PAUSED, 3=COMPLETE, 4=ABORTED)
|
||
rov_mp=0.0 (float: 0.0 to 1.0 mission progress)
|
||
|
||
Cockpit configuration (one-time setup):
|
||
Menu > Settings > Generic WebSocket Connections
|
||
Add: ws://{{ vehicle-address }}:8765
|
||
|
||
Usage (run from inside the BlueOS VM):
|
||
python3 mock_named_value_publisher.py # auto-cycle mode
|
||
python3 mock_named_value_publisher.py --manual # keyboard control
|
||
python3 mock_named_value_publisher.py --port 8765
|
||
|
||
Install dependency (once):
|
||
pip3 install websockets --break-system-packages
|
||
|
||
Repo location:
|
||
rov-autonomy / tools / mock_named_value_publisher.py
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import sys
|
||
import threading
|
||
import time
|
||
|
||
# --- Dependency check ---------------------------------------------------------
|
||
try:
|
||
import websockets
|
||
from websockets.exceptions import ConnectionClosed
|
||
except ImportError:
|
||
print()
|
||
print(" ERROR: websockets is not installed.")
|
||
print(" Fix: pip3 install websockets --break-system-packages")
|
||
print()
|
||
sys.exit(1)
|
||
|
||
|
||
# ==============================================================================
|
||
# CONFIGURATION
|
||
# ==============================================================================
|
||
|
||
# --- Variable name constants --------------------------------------------------
|
||
# These are the Cockpit data lake keys that W1 and W2 read from.
|
||
# MUST match the VARIABLE / NAME_STATE / NAME_PROG constants in each widget.
|
||
NAME_FAILSAFE = "rov_failsa" # W1 polls this (0=GREEN, 1=AMBER, 2=RED)
|
||
NAME_MS_STATE = "rov_ms" # W2 polls this (0=IDLE … 4=ABORTED)
|
||
NAME_MS_PROG = "rov_mp" # W2 polls this (0.0–1.0 progress)
|
||
|
||
# --- Failsafe state values ----------------------------------------------------
|
||
STATE_GREEN = 0
|
||
STATE_AMBER = 1
|
||
STATE_RED = 2
|
||
|
||
# --- Mission state values -----------------------------------------------------
|
||
MISSION_IDLE = 0
|
||
MISSION_RUNNING = 1
|
||
MISSION_PAUSED = 2
|
||
MISSION_COMPLETE = 3
|
||
MISSION_ABORTED = 4
|
||
|
||
# --- Auto-cycle timing (seconds per state) ------------------------------------
|
||
CYCLE_HOLD = {
|
||
STATE_GREEN: 5.0,
|
||
STATE_AMBER: 4.0,
|
||
STATE_RED: 4.0,
|
||
}
|
||
|
||
# --- Publish rate -------------------------------------------------------------
|
||
# How often to send updated values to all connected Cockpit clients.
|
||
PUBLISH_INTERVAL = 0.25 # 4 Hz — well above Cockpit's 500 ms poll rate
|
||
|
||
# --- WebSocket server port ----------------------------------------------------
|
||
DEFAULT_PORT = 8765
|
||
|
||
|
||
# ==============================================================================
|
||
# CONSOLE COLOURS
|
||
# ==============================================================================
|
||
|
||
class C:
|
||
GREEN = "\033[92m"
|
||
AMBER = "\033[93m"
|
||
RED = "\033[91m"
|
||
BOLD = "\033[1m"
|
||
RESET = "\033[0m"
|
||
|
||
STATE_COLOUR = {STATE_GREEN: C.GREEN, STATE_AMBER: C.AMBER, STATE_RED: C.RED}
|
||
|
||
FAILSAFE_LABEL = {
|
||
STATE_GREEN: "GREEN - Systems nominal",
|
||
STATE_AMBER: "AMBER - Parameter degraded",
|
||
STATE_RED: "RED - Critical failure",
|
||
}
|
||
|
||
MISSION_LABEL = {
|
||
MISSION_IDLE: "IDLE",
|
||
MISSION_RUNNING: "RUNNING",
|
||
MISSION_PAUSED: "PAUSED",
|
||
MISSION_COMPLETE: "COMPLETE",
|
||
MISSION_ABORTED: "ABORTED",
|
||
}
|
||
|
||
|
||
# ==============================================================================
|
||
# SHARED STATE (thread-safe via lock)
|
||
# ==============================================================================
|
||
|
||
_lock = threading.Lock()
|
||
_failsafe_state = STATE_GREEN
|
||
_mission_state = MISSION_IDLE
|
||
_mission_progress = 0.0
|
||
|
||
|
||
def set_failsafe(state: int):
|
||
"""Set the failsafe state from any thread."""
|
||
global _failsafe_state
|
||
with _lock:
|
||
_failsafe_state = state
|
||
|
||
|
||
def set_mission(state: int, progress: float = 0.0):
|
||
"""Set the mission state and progress from any thread."""
|
||
global _mission_state, _mission_progress
|
||
with _lock:
|
||
_mission_state = state
|
||
_mission_progress = max(0.0, min(1.0, progress))
|
||
|
||
|
||
def snapshot():
|
||
"""Read current state atomically."""
|
||
with _lock:
|
||
return _failsafe_state, _mission_state, _mission_progress
|
||
|
||
|
||
# ==============================================================================
|
||
# WEBSOCKET SERVER
|
||
# ==============================================================================
|
||
|
||
# Registry of all currently connected Cockpit clients
|
||
_clients: set = set()
|
||
_clients_lock = asyncio.Lock()
|
||
|
||
|
||
async def handler(websocket):
|
||
"""
|
||
Handle one Cockpit client connection.
|
||
|
||
Cockpit connects to us as a client. On connect we send the current state
|
||
immediately so the widget shows something right away, then we rely on the
|
||
publish loop to keep values current.
|
||
"""
|
||
addr = websocket.remote_address
|
||
print(f" [+] Cockpit connected from {addr}")
|
||
|
||
async with _clients_lock:
|
||
_clients.add(websocket)
|
||
|
||
try:
|
||
# Send current state immediately on connect so widgets light up fast
|
||
fs, ms, mp = snapshot()
|
||
await websocket.send(f"{NAME_FAILSAFE}={fs}")
|
||
await websocket.send(f"{NAME_MS_STATE}={ms}")
|
||
await websocket.send(f"{NAME_MS_PROG}={mp:.3f}")
|
||
|
||
# Hold the connection open — the publish loop does the heavy lifting
|
||
await websocket.wait_closed()
|
||
|
||
except ConnectionClosed:
|
||
pass # Normal disconnect — not an error
|
||
|
||
finally:
|
||
async with _clients_lock:
|
||
_clients.discard(websocket)
|
||
print(f" [-] Cockpit disconnected from {addr}")
|
||
|
||
|
||
async def publish_loop():
|
||
"""
|
||
Broadcast current state to all connected clients every PUBLISH_INTERVAL.
|
||
|
||
Each message is variableName=value on a separate send call.
|
||
Cockpit maps each message to a data lake variable by the key before the =.
|
||
"""
|
||
while True:
|
||
await asyncio.sleep(PUBLISH_INTERVAL)
|
||
|
||
fs, ms, mp = snapshot()
|
||
messages = [
|
||
f"{NAME_FAILSAFE}={fs}",
|
||
f"{NAME_MS_STATE}={ms}",
|
||
f"{NAME_MS_PROG}={mp:.3f}",
|
||
]
|
||
|
||
# Snapshot the client set to avoid holding the lock during sends
|
||
async with _clients_lock:
|
||
current_clients = set(_clients)
|
||
|
||
for ws in current_clients:
|
||
try:
|
||
for msg in messages:
|
||
await ws.send(msg)
|
||
except ConnectionClosed:
|
||
pass # Will be cleaned up in the handler
|
||
|
||
|
||
# ==============================================================================
|
||
# AUTO-CYCLE MODE
|
||
# ==============================================================================
|
||
|
||
def cycle_thread():
|
||
"""
|
||
Background thread that cycles the failsafe state:
|
||
GREEN (5s) -> AMBER (4s) -> RED (4s) -> repeat.
|
||
|
||
A matching mission state cycle runs in lock-step.
|
||
"""
|
||
failsafe_seq = [STATE_GREEN, STATE_AMBER, STATE_RED]
|
||
mission_seq = [
|
||
(MISSION_IDLE, 0.00), # GREEN -> IDLE
|
||
(MISSION_RUNNING, 0.50), # AMBER -> RUNNING at 50%
|
||
(MISSION_PAUSED, 0.50), # RED -> PAUSED at 50%
|
||
]
|
||
|
||
step = 0
|
||
while True:
|
||
fs = failsafe_seq[step % 3]
|
||
ms, mp = mission_seq[step % 3]
|
||
hold = CYCLE_HOLD[fs]
|
||
|
||
set_failsafe(fs)
|
||
set_mission(ms, mp)
|
||
|
||
colour = STATE_COLOUR[fs]
|
||
print(
|
||
f" {colour}{C.BOLD}{FAILSAFE_LABEL[fs]:<28}{C.RESET}"
|
||
f" mission={MISSION_LABEL[ms]:<10}"
|
||
f" progress={mp:.2f}"
|
||
f" (hold {hold:.0f}s)"
|
||
)
|
||
|
||
time.sleep(hold)
|
||
step += 1
|
||
|
||
|
||
# ==============================================================================
|
||
# MANUAL MODE (Linux/Mac — uses termios raw mode)
|
||
# ==============================================================================
|
||
|
||
def manual_thread():
|
||
"""
|
||
Keyboard-driven control.
|
||
|
||
Keys:
|
||
0 / g GREEN
|
||
1 / a AMBER
|
||
2 / r RED
|
||
m cycle mission state
|
||
q quit
|
||
"""
|
||
import termios
|
||
import tty
|
||
|
||
def getch():
|
||
fd = sys.stdin.fileno()
|
||
old = termios.tcgetattr(fd)
|
||
try:
|
||
tty.setraw(fd)
|
||
return sys.stdin.read(1)
|
||
finally:
|
||
termios.tcsetattr(fd, termios.TCSADRAIN, old)
|
||
|
||
print("\n Mode: MANUAL KEYBOARD")
|
||
print(" 0/g -> GREEN 1/a -> AMBER 2/r -> RED m -> mission q -> quit\n")
|
||
|
||
# Start in GREEN / RUNNING so widgets are live immediately
|
||
set_failsafe(STATE_GREEN)
|
||
set_mission(MISSION_RUNNING, 0.30)
|
||
print(f" Initial: {C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}")
|
||
|
||
mission_cycle = [MISSION_IDLE, MISSION_RUNNING, MISSION_PAUSED]
|
||
mission_idx = 1
|
||
|
||
while True:
|
||
ch = getch()
|
||
if ch in ("q", "Q", "\x03"):
|
||
print("\n Quit.")
|
||
# Signal the event loop to stop
|
||
asyncio.get_event_loop().call_soon_threadsafe(
|
||
asyncio.get_event_loop().stop
|
||
)
|
||
break
|
||
elif ch in ("0", "g", "G"):
|
||
set_failsafe(STATE_GREEN)
|
||
print(f" -> {C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}")
|
||
elif ch in ("1", "a", "A"):
|
||
set_failsafe(STATE_AMBER)
|
||
print(f" -> {C.AMBER}{C.BOLD}{FAILSAFE_LABEL[STATE_AMBER]}{C.RESET}")
|
||
elif ch in ("2", "r", "R"):
|
||
set_failsafe(STATE_RED)
|
||
print(f" -> {C.RED}{C.BOLD}{FAILSAFE_LABEL[STATE_RED]}{C.RESET}")
|
||
elif ch in ("m", "M"):
|
||
mission_idx = (mission_idx + 1) % 3
|
||
ms = mission_cycle[mission_idx]
|
||
mp = 0.50 if ms == MISSION_RUNNING else 0.0
|
||
set_mission(ms, mp)
|
||
print(f" -> mission={MISSION_LABEL[ms]} progress={mp:.2f}")
|
||
|
||
|
||
# ==============================================================================
|
||
# ARGUMENT PARSING
|
||
# ==============================================================================
|
||
|
||
def parse_args():
|
||
p = argparse.ArgumentParser(
|
||
description="Task 11 — Mock NAMED_VALUE publisher via WebSocket."
|
||
)
|
||
p.add_argument(
|
||
"--port", type=int, default=DEFAULT_PORT,
|
||
help=f"WebSocket port to listen on (default: {DEFAULT_PORT})."
|
||
)
|
||
p.add_argument(
|
||
"--manual", action="store_true",
|
||
help="Keyboard control mode (Linux/Mac only). Default: auto-cycle."
|
||
)
|
||
return p.parse_args()
|
||
|
||
|
||
# ==============================================================================
|
||
# MAIN
|
||
# ==============================================================================
|
||
|
||
async def main(port: int, manual: bool):
|
||
# Start the control thread (cycle or manual) as a daemon
|
||
if manual:
|
||
t = threading.Thread(target=manual_thread, daemon=True)
|
||
else:
|
||
print("\n Mode: AUTO-CYCLE (Ctrl+C to stop)\n")
|
||
t = threading.Thread(target=cycle_thread, daemon=True)
|
||
t.start()
|
||
|
||
# Run the WebSocket server and publish loop concurrently
|
||
async with websockets.serve(handler, "0.0.0.0", port):
|
||
print(f" WebSocket server listening on ws://0.0.0.0:{port}")
|
||
print(f" In Cockpit: Settings > Generic WebSocket Connections")
|
||
print(f" Add: ws://{{{{ vehicle-address }}}}:{port}\n")
|
||
await publish_loop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
args = parse_args()
|
||
|
||
print()
|
||
print(f" {C.BOLD}SymbyTech ROV - Mock NAMED_VALUE Publisher{C.RESET}")
|
||
print(f" Task 11 - End-to-end data path test (Cockpit v1.18.0 WebSocket)")
|
||
print()
|
||
print(f" Variables streamed:")
|
||
print(f" {NAME_FAILSAFE} -> W1 System Health Indicator")
|
||
print(f" {NAME_MS_STATE} -> W2 Mission Status")
|
||
print(f" {NAME_MS_PROG} -> W2 Mission Progress")
|
||
print()
|
||
|
||
try:
|
||
asyncio.run(main(args.port, args.manual))
|
||
except KeyboardInterrupt:
|
||
print("\n Ctrl+C - stopped.")
|
||
print()
|