rov-autonomy/tools/mock_named_value_publisher.py

617 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
mock_named_value_publisher.py
─────────────────────────────
SymbyTech ROV Autonomy — Task 11: Mock NAMED_VALUE Publisher
Injects fake NAMED_VALUE_INT / NAMED_VALUE_FLOAT messages into the BlueOS
MAVLink bus so Cockpit widgets (W1 System Health, W2 Mission Status) show
live GREEN / AMBER / RED states without real ROS2 hardware.
Full data path this script exercises:
This script (pymavlink udpout)
→ BlueOS mavlink-router (VM port 14550)
→ mavlink2rest (VM port 6040)
→ Cockpit data lake (window.cockpit.getDataLakeValue)
→ W1 System Health Indicator
→ W2 Mission Status
════════════════════════════════════════════════════════════════════════
⚠ IMPORTANT — MAVLink NAMED_VALUE 10-char name limit
════════════════════════════════════════════════════════════════════════
The NAMED_VALUE_INT / NAMED_VALUE_FLOAT "name" field is exactly
10 bytes (null-terminated, per MAVLink spec). pymavlink silently
truncates any string longer than 10 chars when it encodes the packet.
Consequence:
"rov_failsafe" (12 chars) → truncated → "rov_failsa"
"rov_mission_state" (17 chars) → truncated → "rov_missio"
"rov_mission_progress"(20 chars) → truncated → "rov_missio" (CLASH!)
This script uses short, collision-free names defined in the NAME_*
constants below. Before wiring the real bridge node, verify the
actual keys that appear in the Cockpit data lake by loading W0
(Data Lake Inspector), then update each widget's VARIABLE / NAME
constant to match.
════════════════════════════════════════════════════════════════════════
Usage
════════════════════════════════════════════════════════════════════════
# Install dependency (once)
pip install pymavlink
# Auto-cycle through GREEN → AMBER → RED (default)
python3 mock_named_value_publisher.py
# Manual keyboard control (0/g = GREEN, 1/a = AMBER, 2/r = RED)
python3 mock_named_value_publisher.py --mode manual
# Override the VM host (default: NAT IP 192.168.122.89)
python3 mock_named_value_publisher.py --host 100.84.141.120 # Tailscale
# Run on the SymbyTech server (SSH in first) — recommended
# The server can reach the VM via NAT without Tailscale.
════════════════════════════════════════════════════════════════════════
Repo location
════════════════════════════════════════════════════════════════════════
rov-autonomy / tools / mock_named_value_publisher.py
"""
import argparse
import sys
import time
import threading
# ─── Early import check ───────────────────────────────────────────────────────
# Fail fast with a friendly message if pymavlink isn't installed.
try:
from pymavlink import mavutil
from pymavlink.dialects.v20 import ardupilotmega as mavlink2
except ImportError:
print()
print(" ERROR: pymavlink is not installed.")
print(" Fix: pip install pymavlink")
print()
sys.exit(1)
# ══════════════════════════════════════════════════════════════════════════════
# CONFIGURATION — edit these to match your environment
# ══════════════════════════════════════════════════════════════════════════════
# ─── MAVLink name constants ───────────────────────────────────────────────────
# These MUST be ≤10 chars. Update each corresponding widget VARIABLE once you
# have confirmed the actual data lake key via W0 (Data Lake Inspector).
#
# Current widget VARIABLE constants (may need updating after W0 inspection):
# W1: VARIABLE = 'rov_failsafe' → update to NAME_FAILSAFE below
# W2: NAME_STATE = 'rov_mission_state' → update to NAME_MS_STATE below
# W2: NAME_PROG = 'rov_mission_progress' → update to NAME_MS_PROG below
NAME_FAILSAFE = "rov_failsa" # 10 chars — truncation of "rov_failsafe"
NAME_MS_STATE = "rov_ms" # 6 chars — short form of "rov_mission_state"
NAME_MS_PROG = "rov_mp" # 6 chars — short form of "rov_mission_progress"
# ─── Failsafe state values — MUST match W1 widget applyState() branches ──────
STATE_GREEN = 0 # Systems nominal
STATE_AMBER = 1 # Parameter degraded
STATE_RED = 2 # Critical failure
# ─── Mission state values — MUST match W2 widget state labels ─────────────────
MISSION_IDLE = 0
MISSION_RUNNING = 1
MISSION_PAUSED = 2
MISSION_COMPLETE = 3
MISSION_ABORTED = 4
# ─── Auto-cycle timing ────────────────────────────────────────────────────────
# How long (seconds) to hold each failsafe state before advancing in cycle mode
CYCLE_HOLD_SECONDS = {
STATE_GREEN: 5.0,
STATE_AMBER: 4.0,
STATE_RED: 4.0,
}
# How often to re-send all NAMED_VALUE messages (seconds).
# Cockpit widgets poll at 500 ms; re-sending at 250 ms gives two updates per
# widget poll cycle — any dropped UDP packet is covered by the next one.
PUBLISH_INTERVAL = 0.25
# ─── MAVLink IDs for this script ─────────────────────────────────────────────
# We impersonate sysid 1 (autopilot) so mavlink-router forwards our packets
# to mavlink2rest. Component 195 is unused in ArduSub — no collision risk.
OUR_SYSID = 1
OUR_COMPID = 195
# ─── Heartbeat interval ───────────────────────────────────────────────────────
# mavlink-router discovers endpoints by seeing heartbeats from them.
# Without a heartbeat, the router may not route our NAMED_VALUE packets.
HEARTBEAT_INTERVAL = 1.0
# ══════════════════════════════════════════════════════════════════════════════
# CONSOLE COLOURS — ANSI escape codes, safe on Linux/Mac
# ══════════════════════════════════════════════════════════════════════════════
class C:
"""Thin namespace for ANSI colour codes."""
GREEN = "\033[92m"
AMBER = "\033[93m"
RED = "\033[91m"
DIM = "\033[2m"
BOLD = "\033[1m"
RESET = "\033[0m"
# Per-state colour lookup — used for console output only
STATE_COLOUR = {STATE_GREEN: C.GREEN, STATE_AMBER: C.AMBER, STATE_RED: C.RED}
# Human-readable labels for console output
FAILSAFE_LABEL = {
STATE_GREEN: "GREEN — Systems nominal",
STATE_AMBER: "AMBER — Parameter degraded",
STATE_RED: "RED — Critical failure",
}
MISSION_LABEL = {
MISSION_IDLE: "IDLE",
MISSION_RUNNING: "RUNNING",
MISSION_PAUSED: "PAUSED",
MISSION_COMPLETE: "COMPLETE",
MISSION_ABORTED: "ABORTED",
}
# ══════════════════════════════════════════════════════════════════════════════
# MockPublisher
# ══════════════════════════════════════════════════════════════════════════════
class MockPublisher:
"""
Manages a pymavlink UDP connection to the BlueOS VM and runs two
background threads:
1. Heartbeat thread — sends MAVLink HEARTBEAT at 1 Hz so that
mavlink-router registers this script as a
known endpoint and routes its messages.
2. Publish thread — sends NAMED_VALUE_INT / NAMED_VALUE_FLOAT
messages at PUBLISH_INTERVAL Hz so the
Cockpit data lake stays current.
The main thread controls published values via the set_* methods, which
are protected by a threading.Lock so there are no race conditions.
"""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.connection = None # set in connect()
# ── Shared state (main thread writes, publish thread reads) ──
self._lock = threading.Lock()
self._failsafe_state = STATE_GREEN
self._mission_state = MISSION_IDLE
self._mission_progress = 0.0 # float, 0.01.0
# ── Thread control ──
self._running = False
self._heartbeat_thread = None
self._publish_thread = None
# ─── Connection ───────────────────────────────────────────────────────────
def connect(self):
"""
Open a udpout connection to BlueOS mavlink-router.
'udpout' means pymavlink sends UDP datagrams to host:port without
binding a local receive socket. That is exactly what we want —
mavlink-router sees the datagrams arrive on its GCS port (14550)
and routes them to all connected endpoints (including mavlink2rest).
"""
url = f"udpout:{self.host}:{self.port}"
print(f" Connecting → {url} (sysid={OUR_SYSID} compid={OUR_COMPID})")
self.connection = mavutil.mavlink_connection(
url,
source_system=OUR_SYSID,
source_component=OUR_COMPID,
)
# Small pause to let the OS open the socket before we send
time.sleep(0.3)
print(" Socket open.")
# ─── Setters (thread-safe) ────────────────────────────────────────────────
def set_failsafe_state(self, state: int):
"""Set the failsafe state (STATE_GREEN / STATE_AMBER / STATE_RED)."""
with self._lock:
self._failsafe_state = state
def set_mission_state(self, state: int, progress: float = 0.0):
"""
Set the mission state and optional progress value (0.01.0).
Progress is clamped to [0.0, 1.0].
"""
with self._lock:
self._mission_state = state
self._mission_progress = max(0.0, min(1.0, progress))
# ─── Snapshot (thread-safe read) ──────────────────────────────────────────
def _snapshot(self):
"""Atomically read current state for publishing."""
with self._lock:
return (
self._failsafe_state,
self._mission_state,
self._mission_progress,
)
# ─── MAVLink send helpers ─────────────────────────────────────────────────
def _time_boot_ms(self) -> int:
"""
MAVLink time_boot_ms field. We use wall-clock monotonic time wrapped
to 32 bits — good enough for a mock publisher; a real node would use
the autopilot's boot time.
"""
return int(time.monotonic() * 1000) & 0xFFFFFFFF
def _send_heartbeat(self):
"""
Send a MAVLink HEARTBEAT so mavlink-router keeps our endpoint live.
Type = GCS (6), autopilot = Generic (0).
mavlink-router uses heartbeats to maintain its endpoint routing table.
Without this the router may silently drop our NAMED_VALUE packets.
"""
self.connection.mav.heartbeat_send(
mavlink2.MAV_TYPE_GCS, # type: GCS
mavlink2.MAV_AUTOPILOT_GENERIC, # autopilot: generic
0, # base_mode
0, # custom_mode
mavlink2.MAV_STATE_ACTIVE, # system_status
)
def _send_named_values(self):
"""
Send the three NAMED_VALUE_FLOAT messages that feed W1 and W2.
All three use NAMED_VALUE_FLOAT — Cockpit's data lake bridge maps
NAMED_VALUE_FLOAT to data lake variables but silently ignores
NAMED_VALUE_INT. States (0/1/2) are sent as floats (0.0/1.0/2.0);
the widgets cast them back to int with Math.round() when reading.
pymavlink packs the name field into exactly 10 bytes. Strings
longer than 10 chars are truncated; shorter strings are null-padded.
The constants NAME_FAILSAFE / NAME_MS_STATE / NAME_MS_PROG are
already ≤10 chars to avoid truncation.
"""
t = self._time_boot_ms()
fs, ms, mp = self._snapshot()
# W1 data: failsafe state sent as float (0.0=GREEN, 1.0=AMBER, 2.0=RED)
self.connection.mav.named_value_float_send(
t,
NAME_FAILSAFE.encode("utf-8"),
float(fs),
)
# W2 data (part 1): mission state as float (0.0=IDLE … 4.0=ABORTED)
self.connection.mav.named_value_float_send(
t,
NAME_MS_STATE.encode("utf-8"),
float(ms),
)
# W2 data (part 2): mission progress (0.01.0)
self.connection.mav.named_value_float_send(
t,
NAME_MS_PROG.encode("utf-8"),
mp,
)
# ─── Background threads ───────────────────────────────────────────────────
def _heartbeat_loop(self):
"""Send a heartbeat every HEARTBEAT_INTERVAL seconds."""
while self._running:
try:
self._send_heartbeat()
except Exception as exc:
# Don't crash the thread on a transient socket error
print(f"\n [heartbeat error] {exc}", file=sys.stderr)
time.sleep(HEARTBEAT_INTERVAL)
def _publish_loop(self):
"""Send NAMED_VALUE messages every PUBLISH_INTERVAL seconds."""
while self._running:
try:
self._send_named_values()
except Exception as exc:
print(f"\n [publish error] {exc}", file=sys.stderr)
time.sleep(PUBLISH_INTERVAL)
# ─── Lifecycle ────────────────────────────────────────────────────────────
def start(self):
"""Start the heartbeat and publish background threads."""
self._running = True
self._heartbeat_thread = threading.Thread(
target=self._heartbeat_loop,
name="heartbeat",
daemon=True,
)
self._publish_thread = threading.Thread(
target=self._publish_loop,
name="publish",
daemon=True,
)
# Send the first heartbeat synchronously before launching the publish
# thread — this gives the router a chance to register our endpoint
# before NAMED_VALUE packets start arriving.
self._send_heartbeat()
time.sleep(0.1)
self._heartbeat_thread.start()
self._publish_thread.start()
def stop(self):
"""Signal threads to stop and wait for them to exit cleanly."""
self._running = False
if self._heartbeat_thread:
self._heartbeat_thread.join(timeout=2.0)
if self._publish_thread:
self._publish_thread.join(timeout=2.0)
# ══════════════════════════════════════════════════════════════════════════════
# CYCLE MODE — auto-advance through GREEN → AMBER → RED → GREEN
# ══════════════════════════════════════════════════════════════════════════════
def run_cycle_mode(publisher: MockPublisher):
"""
Automatically cycles through the three failsafe states in order.
A parallel mission state cycle runs alongside so W2 also shows activity:
failsafe GREEN → mission IDLE
failsafe AMBER → mission RUNNING (50% progress)
failsafe RED → mission PAUSED
Hold time for each state is defined in CYCLE_HOLD_SECONDS.
Press Ctrl+C to stop.
"""
print("\n Mode: AUTO-CYCLE (Ctrl+C to stop)\n")
# Define the cycle sequences — indices advance in lock-step
failsafe_cycle = [STATE_GREEN, STATE_AMBER, STATE_RED]
mission_cycle = [
(MISSION_IDLE, 0.00), # matches GREEN
(MISSION_RUNNING, 0.50), # matches AMBER — 50% through mission
(MISSION_PAUSED, 0.50), # matches RED — paused mid-mission
]
step = 0
while True:
# Pick current state from cycle sequences
fs = failsafe_cycle[step % len(failsafe_cycle)]
ms, mp = mission_cycle[step % len(mission_cycle)]
colour = STATE_COLOUR[fs]
hold = CYCLE_HOLD_SECONDS[fs]
# Push new state to publisher (publish thread picks it up within 250 ms)
publisher.set_failsafe_state(fs)
publisher.set_mission_state(ms, mp)
# Console output
print(
f" {colour}{C.BOLD}{FAILSAFE_LABEL[fs]:<30}{C.RESET}"
f" mission={MISSION_LABEL[ms]:<10}"
f" progress={mp:.2f}"
f" (hold {hold:.0f}s)"
)
time.sleep(hold)
step += 1
# ══════════════════════════════════════════════════════════════════════════════
# MANUAL MODE — single-key control (Linux/Mac only)
# ══════════════════════════════════════════════════════════════════════════════
def _getch_unix():
"""
Read a single character from stdin without requiring Enter.
Uses termios/tty raw mode — Linux and macOS only.
On Windows, use msvcrt.getch() instead (not implemented here).
"""
import termios
import tty
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(fd)
ch = sys.stdin.read(1)
finally:
# Always restore terminal settings, even on exception
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
def run_manual_mode(publisher: MockPublisher):
"""
Keyboard-driven control of the failsafe state.
Keybindings:
0 / g / G → GREEN (Systems nominal)
1 / a / A → AMBER (Parameter degraded)
2 / r / R → RED (Critical failure)
m / M → cycle mission state (IDLE → RUNNING → PAUSED → IDLE)
q / Q → quit
NOTE: Uses termios raw mode — requires Linux or macOS.
On Windows, run in WSL or use cycle mode instead.
"""
# Check platform
if sys.platform == "win32":
print()
print(" ERROR: manual mode uses termios (Linux/Mac only).")
print(" On Windows: run inside WSL, or use --mode cycle instead.")
print()
sys.exit(1)
print("\n Mode: MANUAL KEYBOARD (no Enter needed)")
print()
print(" 0 / g → GREEN — Systems nominal")
print(" 1 / a → AMBER — Parameter degraded")
print(" 2 / r → RED — Critical failure")
print(" m → cycle mission state")
print(" q → quit")
print()
# Start in GREEN / RUNNING at 30% so W2 immediately shows activity
publisher.set_failsafe_state(STATE_GREEN)
publisher.set_mission_state(MISSION_RUNNING, 0.30)
print(f" Initial: {C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}")
print()
# Simple mission state toggle cycle for the 'm' key
mission_cycle = [MISSION_IDLE, MISSION_RUNNING, MISSION_PAUSED]
mission_idx = 1 # start at RUNNING
while True:
ch = _getch_unix()
if ch in ("q", "Q", "\x03"):
# q or Ctrl+C
print("\n Quit.")
break
elif ch in ("0", "g", "G"):
publisher.set_failsafe_state(STATE_GREEN)
print(f"{C.GREEN}{C.BOLD}{FAILSAFE_LABEL[STATE_GREEN]}{C.RESET}")
elif ch in ("1", "a", "A"):
publisher.set_failsafe_state(STATE_AMBER)
print(f"{C.AMBER}{C.BOLD}{FAILSAFE_LABEL[STATE_AMBER]}{C.RESET}")
elif ch in ("2", "r", "R"):
publisher.set_failsafe_state(STATE_RED)
print(f"{C.RED}{C.BOLD}{FAILSAFE_LABEL[STATE_RED]}{C.RESET}")
elif ch in ("m", "M"):
# Advance mission state cycle
mission_idx = (mission_idx + 1) % len(mission_cycle)
ms = mission_cycle[mission_idx]
# Give RUNNING a 50% progress, others 0%
mp = 0.50 if ms == MISSION_RUNNING else 0.00
publisher.set_mission_state(ms, mp)
print(f" → mission={MISSION_LABEL[ms]} progress={mp:.2f}")
else:
# Unknown key — print without newline so the display stays clean
print(f" [unknown key {repr(ch)}]", end="\r", flush=True)
# ══════════════════════════════════════════════════════════════════════════════
# ARGUMENT PARSING
# ══════════════════════════════════════════════════════════════════════════════
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(
description=(
"Task 11 — Mock NAMED_VALUE publisher.\n"
"Injects fake failsafe states into BlueOS MAVLink bus for end-to-end widget testing."
),
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument(
"--host",
default="192.168.122.89",
metavar="IP",
help=(
"BlueOS VM IP address. "
"Default: 192.168.122.89 (NAT, from SymbyTech server). "
"Use 100.84.141.120 for Tailscale access from laptop."
),
)
p.add_argument(
"--port",
type=int,
default=14550,
metavar="PORT",
help="MAVLink GCS UDP port on BlueOS (default: 14550).",
)
p.add_argument(
"--mode",
choices=["cycle", "manual"],
default="cycle",
help=(
"cycle = automatically step through GREEN/AMBER/RED (default). "
"manual = keyboard control (Linux/Mac only)."
),
)
return p.parse_args()
# ══════════════════════════════════════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════════════════════════════════════
def print_banner(args: argparse.Namespace):
"""Print startup information so the operator knows what is running."""
print()
print(f" {C.BOLD}SymbyTech ROV — Mock NAMED_VALUE Publisher{C.RESET}")
print(f" Task 11 — End-to-end data path test")
print()
print(f" Target: {args.host}:{args.port} (BlueOS mavlink-router)")
print(f" Mode: {args.mode}")
print()
print(" MAVLink messages to be injected (all NAMED_VALUE_FLOAT):")
print(f" {C.DIM}{NAME_FAILSAFE:<12}{C.RESET} NAMED_VALUE_FLOAT → W1 System Health Indicator")
print(f" {C.DIM}{NAME_MS_STATE:<12}{C.RESET} NAMED_VALUE_FLOAT → W2 Mission Status")
print(f" {C.DIM}{NAME_MS_PROG:<12}{C.RESET} NAMED_VALUE_FLOAT → W2 Mission Progress")
print()
print(" ⚠ 10-char name limit applies. Use W0 (Data Lake Inspector)")
print(" to confirm actual data lake keys, then update each widget's")
print(" VARIABLE / NAME constant to match before committing to Gitea.")
print()
print(" Widget constants to check after W0 inspection:")
print(f" W1 VARIABLE (currently 'rov_failsafe') → should be '{NAME_FAILSAFE}'")
print(f" W2 NAME_STATE (currently 'rov_mission_state') → should be '{NAME_MS_STATE}'")
print(f" W2 NAME_PROG (currently 'rov_mission_progress') → should be '{NAME_MS_PROG}'")
print()
def main():
args = parse_args()
print_banner(args)
publisher = MockPublisher(args.host, args.port)
try:
publisher.connect()
publisher.start()
print(" Threads running. First heartbeat sent.\n")
if args.mode == "cycle":
run_cycle_mode(publisher)
elif args.mode == "manual":
run_manual_mode(publisher)
except KeyboardInterrupt:
print("\n Ctrl+C — stopping.")
except Exception as exc:
print(f"\n FATAL: {exc}", file=sys.stderr)
sys.exit(1)
finally:
publisher.stop()
print(" Publisher stopped. MAVLink socket closed.")
print()
if __name__ == "__main__":
main()