rov-autonomy/tools/mock_named_value_publisher.py

387 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
mock_named_value_publisher.py
─────────────────────────────
SymbyTech ROV Autonomy — Task 11: Mock NAMED_VALUE Publisher
Runs a WebSocket server that streams ROV state variables directly into the
Cockpit data lake via the "Generic WebSocket Connections" feature introduced
in Cockpit v1.18.0.
Data path:
This script (WebSocket server, port 8765)
← Cockpit connects as client
→ Cockpit reads variable=value messages
→ Cockpit data lake populated directly
→ W1 System Health Indicator (reads rov_failsa)
→ W2 Mission Status (reads rov_ms, rov_mp)
Message format — one variable per line:
rov_failsa=0 (integer: 0=GREEN, 1=AMBER, 2=RED)
rov_ms=0 (integer: 0=IDLE, 1=RUNNING, 2=PAUSED, 3=COMPLETE, 4=ABORTED)
rov_mp=0.0 (float: 0.0 to 1.0 mission progress)
Cockpit configuration (one-time setup):
Menu > Settings > Generic WebSocket Connections
Add: ws://{{ vehicle-address }}:8765
Usage (run from inside the BlueOS VM):
python3 mock_named_value_publisher.py # auto-cycle mode
python3 mock_named_value_publisher.py --manual # keyboard control
python3 mock_named_value_publisher.py --port 8765
Install dependency (once):
pip3 install websockets --break-system-packages
Repo location:
rov-autonomy / tools / mock_named_value_publisher.py
"""
import argparse
import asyncio
import sys
import threading
import time
# --- Dependency check ---------------------------------------------------------
try:
import websockets
from websockets.exceptions import ConnectionClosed
except ImportError:
print()
print(" ERROR: websockets is not installed.")
print(" Fix: pip3 install websockets --break-system-packages")
print()
sys.exit(1)
# ==============================================================================
# CONFIGURATION
# ==============================================================================
# --- Variable name constants --------------------------------------------------
# These are the Cockpit data lake keys that W1 and W2 read from.
# MUST match the VARIABLE / NAME_STATE / NAME_PROG constants in each widget.
NAME_FAILSAFE = "rov_failsa" # W1 polls this (0=GREEN, 1=AMBER, 2=RED)
NAME_MS_STATE = "rov_ms" # W2 polls this (0=IDLE … 4=ABORTED)
NAME_MS_PROG = "rov_mp" # W2 polls this (0.01.0 progress)
# --- Failsafe state values ----------------------------------------------------
STATE_GREEN = 0
STATE_AMBER = 1
STATE_RED = 2
# --- Mission state values -----------------------------------------------------
MISSION_IDLE = 0
MISSION_RUNNING = 1
MISSION_PAUSED = 2
MISSION_COMPLETE = 3
MISSION_ABORTED = 4
# --- Auto-cycle timing (seconds per state) ------------------------------------
CYCLE_HOLD = {
STATE_GREEN: 5.0,
STATE_AMBER: 4.0,
STATE_RED: 4.0,
}
# --- Publish rate -------------------------------------------------------------
# How often to send updated values to all connected Cockpit clients.
PUBLISH_INTERVAL = 0.25 # 4 Hz — well above Cockpit's 500 ms poll rate
# --- WebSocket server port ----------------------------------------------------
DEFAULT_PORT = 8765
# ==============================================================================
# CONSOLE COLOURS
# ==============================================================================
class C:
GREEN = "\033[92m"
AMBER = "\033[93m"
RED = "\033[91m"
BOLD = "\033[1m"
RESET = "\033[0m"
STATE_COLOUR = {STATE_GREEN: C.GREEN, STATE_AMBER: C.AMBER, STATE_RED: C.RED}
FAILSAFE_LABEL = {
STATE_GREEN: "GREEN - Systems nominal",
STATE_AMBER: "AMBER - Parameter degraded",
STATE_RED: "RED - Critical failure",
}
MISSION_LABEL = {
MISSION_IDLE: "IDLE",
MISSION_RUNNING: "RUNNING",
MISSION_PAUSED: "PAUSED",
MISSION_COMPLETE: "COMPLETE",
MISSION_ABORTED: "ABORTED",
}
# ==============================================================================
# SHARED STATE (thread-safe via lock)
# ==============================================================================
_lock = threading.Lock()
_failsafe_state = STATE_GREEN
_mission_state = MISSION_IDLE
_mission_progress = 0.0
def set_failsafe(state: int):
"""Set the failsafe state from any thread."""
global _failsafe_state
with _lock:
_failsafe_state = state
def set_mission(state: int, progress: float = 0.0):
"""Set the mission state and progress from any thread."""
global _mission_state, _mission_progress
with _lock:
_mission_state = state
_mission_progress = max(0.0, min(1.0, progress))
def snapshot():
"""Read current state atomically."""
with _lock:
return _failsafe_state, _mission_state, _mission_progress
# ==============================================================================
# WEBSOCKET SERVER
# ==============================================================================
# Registry of all currently connected Cockpit clients
_clients: set = set()
_clients_lock = asyncio.Lock()
async def handler(websocket):
"""
Handle one Cockpit client connection.
Cockpit connects to us as a client. On connect we send the current state
immediately so the widget shows something right away, then we rely on the
publish loop to keep values current.
"""
addr = websocket.remote_address
print(f" [+] Cockpit connected from {addr}")
async with _clients_lock:
_clients.add(websocket)
try:
# Send current state immediately on connect so widgets light up fast
fs, ms, mp = snapshot()
await websocket.send(f"{NAME_FAILSAFE}={fs}")
await websocket.send(f"{NAME_MS_STATE}={ms}")
await websocket.send(f"{NAME_MS_PROG}={mp:.3f}")
# Hold the connection open — the publish loop does the heavy lifting
await websocket.wait_closed()
except ConnectionClosed:
pass # Normal disconnect — not an error
finally:
async with _clients_lock:
_clients.discard(websocket)
print(f" [-] Cockpit disconnected from {addr}")
async def publish_loop():
"""
Broadcast current state to all connected clients every PUBLISH_INTERVAL.
Each message is variableName=value on a separate send call.
Cockpit maps each message to a data lake variable by the key before the =.
"""
while True:
await asyncio.sleep(PUBLISH_INTERVAL)
fs, ms, mp = snapshot()
messages = [
f"{NAME_FAILSAFE}={fs}",
f"{NAME_MS_STATE}={ms}",
f"{NAME_MS_PROG}={mp:.3f}",
]
# Snapshot the client set to avoid holding the lock during sends
async with _clients_lock:
current_clients = set(_clients)
for ws in current_clients:
try:
for msg in messages:
await ws.send(msg)
except ConnectionClosed:
pass # Will be cleaned up in the handler
# ==============================================================================
# AUTO-CYCLE MODE
# ==============================================================================
def cycle_thread():
"""
Background thread that cycles the failsafe state:
GREEN (5s) -> AMBER (4s) -> RED (4s) -> repeat.
A matching mission state cycle runs in lock-step.
"""
failsafe_seq = [STATE_GREEN, STATE_AMBER, STATE_RED]
mission_seq = [
(MISSION_IDLE, 0.00), # GREEN -> IDLE
(MISSION_RUNNING, 0.50), # AMBER -> RUNNING at 50%
(MISSION_PAUSED, 0.50), # RED -> PAUSED at 50%
]
step = 0
while True:
fs = failsafe_seq[step % 3]
ms, mp = mission_seq[step % 3]
hold = CYCLE_HOLD[fs]
set_failsafe(fs)
set_mission(ms, mp)
colour = STATE_COLOUR[fs]
print(
f" {colour}{C.BOLD}{FAILSAFE_LABEL[fs]:<28}{C.RESET}"
f" mission={MISSION_LABEL[ms]:<10}"
f" progress={mp:.2f}"
f" (hold {hold:.0f}s)"
)
time.sleep(hold)
step += 1
# ==============================================================================
# MANUAL MODE (Linux/Mac — uses termios raw mode)
# ==============================================================================
def manual_thread():
"""
Keyboard-driven control.
Keys:
0 / g GREEN
1 / a AMBER
2 / r RED
m cycle mission state
q quit
"""
import termios
import tty
def getch():
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setraw(fd)
return sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
print("\n Mode: MANUAL KEYBOARD")
print(" 0/g -> GREEN 1/a -> AMBER 2/r -> RED m -> mission q -> quit\n")
# Start in GREEN / RUNNING so widgets are live immediately
set_failsafe(STATE_GREEN)
set_mission(MISSION_RUNNING, 0.30)
print(f" Initial: {C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}")
mission_cycle = [MISSION_IDLE, MISSION_RUNNING, MISSION_PAUSED]
mission_idx = 1
while True:
ch = getch()
if ch in ("q", "Q", "\x03"):
print("\n Quit.")
# Signal the event loop to stop
asyncio.get_event_loop().call_soon_threadsafe(
asyncio.get_event_loop().stop
)
break
elif ch in ("0", "g", "G"):
set_failsafe(STATE_GREEN)
print(f" -> {C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}")
elif ch in ("1", "a", "A"):
set_failsafe(STATE_AMBER)
print(f" -> {C.AMBER}{C.BOLD}{FAILSAFE_LABEL[STATE_AMBER]}{C.RESET}")
elif ch in ("2", "r", "R"):
set_failsafe(STATE_RED)
print(f" -> {C.RED}{C.BOLD}{FAILSAFE_LABEL[STATE_RED]}{C.RESET}")
elif ch in ("m", "M"):
mission_idx = (mission_idx + 1) % 3
ms = mission_cycle[mission_idx]
mp = 0.50 if ms == MISSION_RUNNING else 0.0
set_mission(ms, mp)
print(f" -> mission={MISSION_LABEL[ms]} progress={mp:.2f}")
# ==============================================================================
# ARGUMENT PARSING
# ==============================================================================
def parse_args():
p = argparse.ArgumentParser(
description="Task 11 — Mock NAMED_VALUE publisher via WebSocket."
)
p.add_argument(
"--port", type=int, default=DEFAULT_PORT,
help=f"WebSocket port to listen on (default: {DEFAULT_PORT})."
)
p.add_argument(
"--manual", action="store_true",
help="Keyboard control mode (Linux/Mac only). Default: auto-cycle."
)
return p.parse_args()
# ==============================================================================
# MAIN
# ==============================================================================
async def main(port: int, manual: bool):
# Start the control thread (cycle or manual) as a daemon
if manual:
t = threading.Thread(target=manual_thread, daemon=True)
else:
print("\n Mode: AUTO-CYCLE (Ctrl+C to stop)\n")
t = threading.Thread(target=cycle_thread, daemon=True)
t.start()
# Run the WebSocket server and publish loop concurrently
async with websockets.serve(handler, "0.0.0.0", port):
print(f" WebSocket server listening on ws://0.0.0.0:{port}")
print(f" In Cockpit: Settings > Generic WebSocket Connections")
print(f" Add: ws://{{{{ vehicle-address }}}}:{port}\n")
await publish_loop()
if __name__ == "__main__":
args = parse_args()
print()
print(f" {C.BOLD}SymbyTech ROV - Mock NAMED_VALUE Publisher{C.RESET}")
print(f" Task 11 - End-to-end data path test (Cockpit v1.18.0 WebSocket)")
print()
print(f" Variables streamed:")
print(f" {NAME_FAILSAFE} -> W1 System Health Indicator")
print(f" {NAME_MS_STATE} -> W2 Mission Status")
print(f" {NAME_MS_PROG} -> W2 Mission Progress")
print()
try:
asyncio.run(main(args.port, args.manual))
except KeyboardInterrupt:
print("\n Ctrl+C - stopped.")
print()