Compare commits

..

5 Commits

Author SHA1 Message Date
Grant
559f60dfd8 feat(bringup): add foxglove_bridge and MCAP recording 2026-05-07 08:26:26 +00:00
Grant
6aea0c24a6 feat: add rov_simulation package with mock_publisher and dev_stack launch
- New rov_simulation package (dev only — not for vehicle deployment)
- mock_publisher: synthetic MAVROS sensor data with 5 test scenarios
  (nominal, low_battery, comms_loss, depth_warning, all_clear)
- dev_stack.launch.py: launches mock_publisher + failsafe_monitor + mission_executor
- Scenarios trigger specific failsafe conditions for end-to-end testing
2026-05-07 00:43:52 +00:00
Grant
c3a8b4bc77 fix: mission launch file now correctly loads env-specific config at launch time 2026-05-07 00:29:02 +00:00
Grant
4b1766255a feat: implement mission executor with breadcrumb system and failsafe integration
- Full mission state machine: IDLE/RUNNING/PAUSED/COMPLETE/ABORTED
- Breadcrumb buffer: 1800 entries at 2s interval (1hr coverage)
- Failsafe integration: HOVER=pause, RETURN_TO_SAFE=abort+breadcrumb, EMERGENCY=immediate abort
- Waypoint loading from YAML mission file via LOAD command
- Arrival detection with configurable radius
- Image capture loiter with configurable delay
- Simple proportional navigation toward waypoints (Phase 3+ replaces with full planner)
- Entry point recorded at mission start for safe zone reference
2026-05-07 00:26:12 +00:00
Grant
9f7a775427 feat: implement full failsafe monitor per design v2.0
- Updated FailsafeStatus.msg with assessment state, FSM state, thruster_anomaly flag
- Implemented GREEN/AMBER/RED continuous self-assessment
- Dynamic assessment rates: 5Hz/20Hz/50Hz
- Full FSM: NORMAL->HOLD_AND_RECOVER->RETURN_TO_SAFE->EMERGENCY_SURFACE
- Priority-ordered failsafe conditions (8 levels per design spec)
- Tiered battery thresholds: warning/return/critical/emergency
- Conditional comms loss response based on assessment state
- Conditional manual abort response based on assessment state
- All thresholds configurable via failsafe.yaml
2026-05-07 00:01:54 +00:00
14 changed files with 1674 additions and 198 deletions

View File

@ -0,0 +1,74 @@
"""
foxglove_mcap.launch.py
Launches foxglove_bridge (WebSocket :8765) and MCAP bag recording.
Included automatically by rov_full.launch.py.
Standalone:
ros2 launch rov_bringup foxglove_mcap.launch.py
ros2 launch rov_bringup foxglove_mcap.launch.py record:=false
Foxglove connection from Edge PC:
Foxglove Studio -> Open Connection -> Foxglove WebSocket -> ws://rov-brain.local:8765
Storage:
Bags written to /data/bags/ on NVMe. Create before first launch:
sudo mkdir -p /data/bags && sudo chown ubuntu:ubuntu /data/bags
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument, ExecuteProcess, LogInfo
from launch.conditions import IfCondition
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
def generate_launch_description():
bag_output_arg = DeclareLaunchArgument(
'bag_output_dir',
default_value='/data/bags/dive',
description='Base path for MCAP bag. Timestamp appended automatically.',
)
record_arg = DeclareLaunchArgument(
'record',
default_value='true',
description='Set false to run foxglove_bridge without recording.',
)
# Exposes all ROS2 topics over WebSocket
# Edge PC connects via Foxglove Studio -> ws://rov-brain.local:8765
foxglove_bridge_node = Node(
package='foxglove_bridge',
executable='foxglove_bridge',
name='foxglove_bridge',
parameters=[{
'port': 8765,
'address': '0.0.0.0', # all interfaces — required for tether access
'tls': False,
'send_buffer_limit': 10000000, # 10MB — headroom for image topics
'max_update_ms': 100, # 10Hz cap — prevents WebSocket flooding
}],
output='screen',
)
# Records all topics to NVMe in MCAP format (Foxglove native)
# Output dir is auto-timestamped, e.g.: /data/bags/dive_2026_05_07-14_22_05/
mcap_recorder = ExecuteProcess(
cmd=[
'ros2', 'bag', 'record',
'--all',
'--storage', 'mcap',
'--output', LaunchConfiguration('bag_output_dir'),
'--max-bag-size', '0', # no size limit — one bag per dive
],
output='screen',
condition=IfCondition(LaunchConfiguration('record')),
)
return LaunchDescription([
bag_output_arg,
record_arg,
LogInfo(msg='foxglove_bridge: connect Foxglove Studio to ws://<RPi5-IP>:8765'),
foxglove_bridge_node,
mcap_recorder,
])

View File

@ -79,4 +79,11 @@ def generate_launch_description():
os.path.join(mission_pkg, 'launch', 'mission.launch.py')
),
),
# Foxglove bridge + MCAP recording (Prong 2 + 3)
IncludeLaunchDescription(
PythonLaunchDescriptionSource(
os.path.join(bringup_pkg, 'launch', 'foxglove_mcap.launch.py')
),
),
])

View File

@ -14,4 +14,5 @@
<export>
<build_type>ament_cmake</build_type>
</export>
<exec_depend>foxglove_bridge</exec_depend>
</package>

View File

@ -1,8 +1,40 @@
# Failsafe configuration
# Tune thresholds before any pool or field testing
# Failsafe Monitor Configuration — Argonaut 3
# See ROV_Failsafe_Design_v2.0 for rationale behind all values.
# Tune marked values after pool testing before field deployment.
failsafe_monitor:
ros__parameters:
low_battery_threshold: 0.20 # Surface when battery below 20%
max_depth_m: 50.0 # Emergency surface if depth exceeded
comms_timeout_s: 5.0 # Surface after 5s comms loss
# --- Comms ---
# Time without FCU heartbeat before comms loss triggers.
# Default 2s per design spec. May need increasing in high-latency tether.
# TUNE: verify in pool — 2s may generate false positives.
comms_timeout_s: 2.0
# Hold timeouts before escalating to RETURN_TO_SAFE on comms loss.
comms_hold_green_s: 60.0 # Hold 60s if GREEN before returning
comms_hold_amber_s: 30.0 # Hold 30s if AMBER before returning
# Hold timeout on manual abort before auto-return (GREEN state only).
abort_hold_green_s: 120.0
# --- Battery thresholds (fractions 0.0-1.0) ---
# All values configurable via web UI in production (battery.yaml).
# TUNE: measure empirically per battery type. Do not assume.
battery_warning_pct: 0.25 # Alert, no new panels started
battery_return_pct: 0.20 # Return to safe after current panel
battery_critical_pct: 0.12 # Abandon current task, return immediately
battery_emergency_pct: 0.08 # Emergency surface, no other action
# --- Depth limits (metres, positive = depth below surface) ---
# NOTE: Standard BlueROV2 Heavy rated to 100m.
# Values below target hardware-upgraded configuration.
# Validate against actual build specification before field use.
depth_design_m: 200.0 # Log + alert, continue mission
depth_warning_m: 250.0 # Halt descent, hold, alert operator
depth_safety_m: 300.0 # RETURN_TO_SAFE immediately
# --- Thruster anomaly ---
# Single thruster drawing > this ratio vs peer average triggers AMBER.
# TUNE: establish normal baseline in calm water before setting threshold.
thruster_peer_ratio: 1.5 # 150% of peer average

View File

@ -1,134 +1,550 @@
#!/usr/bin/env python3
"""
Failsafe monitor node.
Monitors all safety-critical conditions and publishes failsafe state.
This node must be running at all times during autonomous operation.
Failsafe Monitor Node Argonaut 3
===================================
Implements continuous self-assessment per ROV_Failsafe_Design_v2.0.
Conditions monitored:
- Comms loss (heartbeat timeout)
- Low battery (from MAVROS battery state)
- Maximum depth exceeded
- Obstacle detection (future proximity sensors)
- Manual abort command
Assessment states (GREEN/AMBER/RED) are evaluated continuously.
Assessment rate adapts dynamically:
- GREEN (all nominal): 5 Hz
- AMBER (degraded): 20 Hz
- RED (critical): 50 Hz
State machine states:
NORMAL -> ASSESSING -> HOLD_AND_RECOVER -> RETURN_TO_SAFE -> EMERGENCY_SURFACE
Priority order (highest wins when multiple conditions active):
1. EMERGENCY_SURFACE (manual trigger)
2. Critical/emergency battery
3. Safety depth limit (300m)
4. Obstacle detected in RED state
5. Comms loss (assessed response)
6. Manual abort (assessed response)
7. Thruster anomaly (AMBER downgrade)
8. Low battery warning (informational)
Topics subscribed:
/mavros/battery (sensor_msgs/BatteryState) battery level
/mavros/state (mavros_msgs/State) FCU connection state
/rov/mission/abort (std_msgs/Bool) manual abort trigger
/mavros/battery (sensor_msgs/BatteryState)
/mavros/state (mavros_msgs/State)
/mavros/global_position/rel_alt (std_msgs/Float32) altitude/depth proxy
/rov/mission/abort (std_msgs/Bool)
Topics published:
/rov/failsafe (rov_interfaces/FailsafeStatus) current failsafe state
/rov/failsafe (rov_interfaces/FailsafeStatus)
"""
import rclpy
from rclpy.node import Node
from rclpy.duration import Duration
from std_msgs.msg import Bool
from std_msgs.msg import Bool, Float32
from sensor_msgs.msg import BatteryState
from mavros_msgs.msg import State
from rov_interfaces.msg import FailsafeStatus
# ---------------------------------------------------------------------------
# Internal state enumerations (not exposed in ROS messages directly)
# ---------------------------------------------------------------------------
class AssessmentState:
"""Continuous self-assessment result states."""
GREEN = FailsafeStatus.ASSESSMENT_GREEN # All systems nominal
AMBER = FailsafeStatus.ASSESSMENT_AMBER # Degraded but functional
RED = FailsafeStatus.ASSESSMENT_RED # Incapable of safe autonomous action
class FSMState:
"""Failsafe state machine states."""
NORMAL = FailsafeStatus.STATE_NORMAL
ASSESSING = FailsafeStatus.STATE_ASSESSING
HOLD_AND_RECOVER = FailsafeStatus.STATE_HOLD_AND_RECOVER
RETURN_TO_SAFE = FailsafeStatus.STATE_RETURN_TO_SAFE
EMERGENCY_SURFACE = FailsafeStatus.STATE_EMERGENCY_SURFACE
MISSION_COMPLETE = FailsafeStatus.STATE_MISSION_COMPLETE
ABORTED = FailsafeStatus.STATE_ABORTED
# ---------------------------------------------------------------------------
# Assessment rates (seconds between evaluations per design spec)
# ---------------------------------------------------------------------------
RATE_GREEN = 1.0 / 5.0 # 5 Hz — normal operation
RATE_AMBER = 1.0 / 20.0 # 20 Hz — degraded
RATE_RED = 1.0 / 50.0 # 50 Hz — failsafe active
class FailsafeMonitor(Node):
"""Monitors safety conditions and publishes failsafe status."""
"""
Monitors all safety-critical conditions and publishes failsafe state.
Implements the Argonaut 3 failsafe design (v2.0) in full.
"""
def __init__(self):
super().__init__('failsafe_monitor')
# --- Parameters ---
self.declare_parameter('low_battery_threshold', 0.20) # 20% remaining
self.declare_parameter('max_depth_m', 50.0) # metres
self.declare_parameter('comms_timeout_s', 5.0) # seconds
# ------------------------------------------------------------------
# Parameters (loaded from failsafe.yaml via ros__parameters)
# ------------------------------------------------------------------
self.low_batt_thresh = self.get_parameter('low_battery_threshold').value
self.max_depth = self.get_parameter('max_depth_m').value
self.comms_timeout = self.get_parameter('comms_timeout_s').value
# Comms
self.declare_parameter('comms_timeout_s', 2.0)
# Battery thresholds (fractions 0.01.0)
self.declare_parameter('battery_warning_pct', 0.25) # Alert, no new panels
self.declare_parameter('battery_return_pct', 0.20) # Return to safe
self.declare_parameter('battery_critical_pct', 0.12) # Immediate return
self.declare_parameter('battery_emergency_pct', 0.08) # Emergency surface
# Depth limits (metres — positive = depth below surface)
self.declare_parameter('depth_design_m', 200.0) # Log + alert, continue
self.declare_parameter('depth_warning_m', 250.0) # Halt descent, hold
self.declare_parameter('depth_safety_m', 300.0) # Return to safe
# Hold timeouts before escalating to RETURN_TO_SAFE
self.declare_parameter('comms_hold_green_s', 60.0) # Hold 60s if GREEN
self.declare_parameter('comms_hold_amber_s', 30.0) # Hold 30s if AMBER
self.declare_parameter('abort_hold_green_s', 120.0) # Hold 120s on abort
# Thruster anomaly detection
self.declare_parameter('thruster_peer_ratio', 1.5) # 150% of peer average
self._load_parameters()
# ------------------------------------------------------------------
# Internal state
self.last_heartbeat = self.get_clock().now()
self.battery_percent = 1.0
self.fcu_connected = False
# ------------------------------------------------------------------
# --- Subscribers ---
self.battery_sub = self.create_subscription(
BatteryState, '/mavros/battery', self.battery_callback, 10)
self.state_sub = self.create_subscription(
State, '/mavros/state', self.state_callback, 10)
self.abort_sub = self.create_subscription(
Bool, '/rov/mission/abort', self.abort_callback, 10)
self.assessment_state = AssessmentState.GREEN
self.fsm_state = FSMState.NORMAL
# --- Publishers ---
self.failsafe_pub = self.create_publisher(FailsafeStatus, '/rov/failsafe', 10)
# Sensor readings
self.battery_percent = 1.0 # 0.01.0
self.fcu_connected = False
self.current_depth_m = 0.0
# --- Timer: evaluate failsafe conditions at 10Hz ---
self.timer = self.create_timer(0.1, self.evaluate_failsafe)
# Timing
self.last_heartbeat = self.get_clock().now()
self.comms_loss_start = None # When comms loss began
self.hold_recover_start = None # When HOLD_AND_RECOVER began
self.abort_hold_start = None # When abort hold began
self.get_logger().info('FailsafeMonitor node started')
# Active condition flags
self.flag_comms_loss = False
self.flag_low_battery = False
self.flag_critical_battery = False
self.flag_emergency_battery = False
self.flag_depth_exceeded = False
self.flag_obstacle = False
self.flag_manual_abort = False
self.flag_thruster_anomaly = False
def battery_callback(self, msg: BatteryState):
"""Update battery state."""
# ------------------------------------------------------------------
# Subscribers
# ------------------------------------------------------------------
self.create_subscription(
BatteryState, '/mavros/battery',
self._battery_callback, 10)
self.create_subscription(
State, '/mavros/state',
self._state_callback, 10)
self.create_subscription(
Float32, '/mavros/global_position/rel_alt',
self._altitude_callback, 10)
self.create_subscription(
Bool, '/rov/mission/abort',
self._abort_callback, 10)
# ------------------------------------------------------------------
# Publishers
# ------------------------------------------------------------------
self.failsafe_pub = self.create_publisher(
FailsafeStatus, '/rov/failsafe', 10)
# ------------------------------------------------------------------
# Initial timer — starts at GREEN rate (5 Hz)
# ------------------------------------------------------------------
self._current_rate = RATE_GREEN
self._eval_timer = self.create_timer(RATE_GREEN, self._evaluate)
self.get_logger().info(
'FailsafeMonitor started — '
f'comms_timeout={self._comms_timeout_s}s '
f'battery_return={self._battery_return_pct*100:.0f}% '
f'depth_safety={self._depth_safety_m}m'
)
# ------------------------------------------------------------------
# Parameter loading
# ------------------------------------------------------------------
def _load_parameters(self):
"""Load all parameters from the ROS parameter server."""
self._comms_timeout_s = self.get_parameter('comms_timeout_s').value
self._battery_warning_pct = self.get_parameter('battery_warning_pct').value
self._battery_return_pct = self.get_parameter('battery_return_pct').value
self._battery_critical_pct = self.get_parameter('battery_critical_pct').value
self._battery_emerg_pct = self.get_parameter('battery_emergency_pct').value
self._depth_design_m = self.get_parameter('depth_design_m').value
self._depth_warning_m = self.get_parameter('depth_warning_m').value
self._depth_safety_m = self.get_parameter('depth_safety_m').value
self._comms_hold_green_s = self.get_parameter('comms_hold_green_s').value
self._comms_hold_amber_s = self.get_parameter('comms_hold_amber_s').value
self._abort_hold_green_s = self.get_parameter('abort_hold_green_s').value
self._thruster_peer_ratio = self.get_parameter('thruster_peer_ratio').value
# ------------------------------------------------------------------
# Subscriber callbacks
# ------------------------------------------------------------------
def _battery_callback(self, msg: BatteryState):
"""Update battery percentage from MAVROS."""
# MAVROS BatteryState.percentage is 0.01.0
self.battery_percent = msg.percentage
def state_callback(self, msg: State):
"""Update FCU connection state and heartbeat time."""
def _state_callback(self, msg: State):
"""Update FCU connection state and record last heartbeat time."""
self.fcu_connected = msg.connected
if msg.connected:
self.last_heartbeat = self.get_clock().now()
# Reset comms loss tracking on each received heartbeat
self.last_heartbeat = self.get_clock().now()
self.comms_loss_start = None
def abort_callback(self, msg: Bool):
"""Handle manual abort command."""
def _altitude_callback(self, msg: Float32):
"""
Update current depth from relative altitude.
rel_alt is positive above home negate for depth below surface.
"""
self.current_depth_m = -msg.data if msg.data < 0.0 else 0.0
def _abort_callback(self, msg: Bool):
"""Handle manual abort command from operator."""
if msg.data:
self.get_logger().warn('Manual abort received')
self.publish_failsafe(
manual_abort=True,
action=FailsafeStatus.ACTION_SURFACE,
message='Manual abort commanded'
)
self.flag_manual_abort = True
self.get_logger().warn('Manual abort received from operator')
def evaluate_failsafe(self):
"""Evaluate all failsafe conditions and publish status."""
# ------------------------------------------------------------------
# Main evaluation loop — rate adapts to assessment state
# ------------------------------------------------------------------
def _evaluate(self):
"""
Main evaluation cycle. Called by timer.
1. Assess all sensor inputs -> GREEN/AMBER/RED
2. Adapt assessment rate to match state
3. Apply priority-ordered failsafe logic
4. Publish status
"""
self._assess_conditions()
self._adapt_rate()
self._apply_failsafe_priority()
self._publish_status()
# ------------------------------------------------------------------
# Step 1: Continuous self-assessment -> GREEN / AMBER / RED
# ------------------------------------------------------------------
def _assess_conditions(self):
"""
Evaluate all monitored parameters and derive assessment state.
GREEN: all nominal, 5+ parameters assessed.
AMBER: core navigation intact, <= 1 parameter marginal.
RED: navigation sensor failure, >= 2 marginal, or any safety-critical lost.
"""
now = self.get_clock().now()
elapsed = (now - self.last_heartbeat).nanoseconds / 1e9
comms_loss = elapsed > self.comms_timeout
low_battery = self.battery_percent < self.low_batt_thresh
# --- Comms ---
elapsed_s = (now - self.last_heartbeat).nanoseconds / 1e9
self.flag_comms_loss = elapsed_s > self._comms_timeout_s
if self.flag_comms_loss and self.comms_loss_start is None:
self.comms_loss_start = now
if comms_loss:
self.publish_failsafe(
comms_loss=True,
action=FailsafeStatus.ACTION_SURFACE,
message=f'Comms lost for {elapsed:.1f}s'
)
elif low_battery:
self.publish_failsafe(
low_battery=True,
action=FailsafeStatus.ACTION_SURFACE,
message=f'Low battery: {self.battery_percent*100:.0f}%'
# --- Battery ---
pct = self.battery_percent
self.flag_emergency_battery = pct < self._battery_emerg_pct
self.flag_critical_battery = pct < self._battery_critical_pct
self.flag_low_battery = pct < self._battery_return_pct
# --- Depth ---
d = self.current_depth_m
self.flag_depth_exceeded = d > self._depth_safety_m
# --- Obstacle (placeholder — Phase 3+ when sonar is fitted) ---
# self.flag_obstacle = <sonar_node data>
# --- Derive assessment state ---
marginal_count = sum([
self.flag_comms_loss,
self.flag_low_battery,
self.flag_depth_exceeded,
self.flag_obstacle,
self.flag_thruster_anomaly,
])
if self.flag_emergency_battery or self.flag_depth_exceeded or marginal_count >= 2:
self.assessment_state = AssessmentState.RED
elif marginal_count == 1:
self.assessment_state = AssessmentState.AMBER
else:
self.assessment_state = AssessmentState.GREEN
# ------------------------------------------------------------------
# Step 2: Adapt timer rate to assessment state
# ------------------------------------------------------------------
def _adapt_rate(self):
"""
Change evaluation rate based on current assessment state.
Timer is recreated only when rate actually changes.
"""
target_rate = {
AssessmentState.GREEN: RATE_GREEN,
AssessmentState.AMBER: RATE_AMBER,
AssessmentState.RED: RATE_RED,
}[self.assessment_state]
if abs(target_rate - self._current_rate) > 1e-6:
self._eval_timer.cancel()
self._eval_timer = self.create_timer(target_rate, self._evaluate)
self._current_rate = target_rate
self.get_logger().info(
f'Assessment rate changed to {1.0/target_rate:.0f} Hz '
f'(state={self._assessment_state_name()})'
)
def publish_failsafe(self, comms_loss=False, low_battery=False,
depth_exceeded=False, obstacle_detected=False,
manual_abort=False,
action=FailsafeStatus.ACTION_NONE, message=''):
"""Publish a failsafe status message."""
# ------------------------------------------------------------------
# Step 3: Priority-ordered failsafe logic
# ------------------------------------------------------------------
def _apply_failsafe_priority(self):
"""
Apply failsafe conditions in strict priority order per design spec.
Higher priority conditions override lower priority ones.
"""
now = self.get_clock().now()
# Priority 1: Manual emergency surface (hardcoded, no assessment)
# (Handled separately — no current ROS trigger for P1 emergency.
# Manual abort at P6 covers operator intent. Reserved for future.)
# Priority 2: Critical / emergency battery
if self.flag_emergency_battery:
self._transition(FSMState.EMERGENCY_SURFACE,
FailsafeStatus.ACTION_EMERGENCY_SURFACE,
f'Emergency battery: {self.battery_percent*100:.0f}%')
return
if self.flag_critical_battery:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'Critical battery: {self.battery_percent*100:.0f}%')
return
# Priority 3: Safety depth limit (300m)
if self.flag_depth_exceeded:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'Safety depth exceeded: {self.current_depth_m:.1f}m')
return
# Priority 4: Obstacle in RED state
if self.flag_obstacle and self.assessment_state == AssessmentState.RED:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
'Obstacle detected in RED state')
return
# Priority 5: Comms loss — conditional on assessment state
if self.flag_comms_loss:
self._handle_comms_loss(now)
return
# Priority 6: Manual abort — conditional on assessment state
if self.flag_manual_abort:
self._handle_manual_abort(now)
return
# Priority 7: Thruster anomaly — AMBER downgrade, informational
if self.flag_thruster_anomaly:
# Does not change FSM state, already handled in assessment
self.get_logger().warn('Thruster anomaly detected — AMBER state')
return
# Priority 8: Low battery warning — informational only
if self.battery_percent < self._battery_warning_pct:
self.get_logger().warn(
f'Battery warning: {self.battery_percent*100:.0f}% — '
'complete current task, no new panels'
)
# All clear — return to NORMAL if previously in HOLD_AND_RECOVER
if self.fsm_state == FSMState.HOLD_AND_RECOVER:
self.get_logger().info('Conditions cleared — returning to NORMAL')
self.fsm_state = FSMState.NORMAL
self.hold_recover_start = None
def _handle_comms_loss(self, now):
"""
Comms loss response conditional on assessment state per design spec.
GREEN: hold 60s then RETURN_TO_SAFE.
AMBER: hold 30s then RETURN_TO_SAFE.
RED: RETURN_TO_SAFE immediately.
"""
if self.assessment_state == AssessmentState.RED:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
'Comms loss in RED state — immediate return')
return
# Enter HOLD_AND_RECOVER if not already there
if self.fsm_state != FSMState.HOLD_AND_RECOVER:
self._transition(FSMState.HOLD_AND_RECOVER,
FailsafeStatus.ACTION_HOVER,
'Comms loss — holding, attempting reconnection')
self.hold_recover_start = now
return
# Check hold timeout
hold_limit = (
self._comms_hold_green_s
if self.assessment_state == AssessmentState.GREEN
else self._comms_hold_amber_s
)
hold_elapsed = (now - self.hold_recover_start).nanoseconds / 1e9
if hold_elapsed > hold_limit:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'Comms lost for {hold_elapsed:.0f}s — returning to safe')
def _handle_manual_abort(self, now):
"""
Manual abort response conditional on assessment state per design spec.
GREEN: hold position 120s awaiting operator instruction, then RETURN_TO_SAFE.
AMBER/RED: RETURN_TO_SAFE immediately.
"""
if self.assessment_state != AssessmentState.GREEN:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'Manual abort in {self._assessment_state_name()} state')
return
if self.fsm_state != FSMState.HOLD_AND_RECOVER:
self._transition(FSMState.HOLD_AND_RECOVER,
FailsafeStatus.ACTION_HOVER,
'Manual abort — holding, awaiting operator instruction')
self.abort_hold_start = now
return
if self.abort_hold_start is not None:
elapsed = (now - self.abort_hold_start).nanoseconds / 1e9
if elapsed > self._abort_hold_green_s:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'No operator instruction after {elapsed:.0f}s — returning to safe')
# ------------------------------------------------------------------
# State machine transition
# ------------------------------------------------------------------
def _transition(self, new_state: int, action: int, message: str):
"""
Transition the FSM to a new state. Logs the transition.
Only logs if state actually changes to avoid log spam.
"""
if self.fsm_state != new_state:
self.get_logger().warn(
f'FSM: {self._fsm_state_name(self.fsm_state)} -> '
f'{self._fsm_state_name(new_state)} | {message}'
)
self.fsm_state = new_state
# ------------------------------------------------------------------
# Step 4: Publish current status
# ------------------------------------------------------------------
def _publish_status(self):
"""Publish the current failsafe status to /rov/failsafe."""
msg = FailsafeStatus()
msg.header.stamp = self.get_clock().now().to_msg()
msg.comms_loss = comms_loss
msg.low_battery = low_battery
msg.depth_exceeded = depth_exceeded
msg.obstacle_detected = obstacle_detected
msg.manual_abort = manual_abort
msg.action = action
msg.message = message
msg.header.stamp = self.get_clock().now().to_msg()
msg.assessment_state = self.assessment_state
msg.failsafe_state = self.fsm_state
# Trigger flags
msg.comms_loss = self.flag_comms_loss
msg.low_battery = self.flag_low_battery
msg.depth_exceeded = self.flag_depth_exceeded
msg.obstacle_detected = self.flag_obstacle
msg.manual_abort = self.flag_manual_abort
msg.thruster_anomaly = self.flag_thruster_anomaly
# Telemetry
msg.battery_percent = float(self.battery_percent)
# Derive action from FSM state
msg.action = {
FSMState.NORMAL: FailsafeStatus.ACTION_NONE,
FSMState.ASSESSING: FailsafeStatus.ACTION_NONE,
FSMState.HOLD_AND_RECOVER: FailsafeStatus.ACTION_HOVER,
FSMState.RETURN_TO_SAFE: FailsafeStatus.ACTION_RETURN_TO_SAFE,
FSMState.EMERGENCY_SURFACE: FailsafeStatus.ACTION_EMERGENCY_SURFACE,
FSMState.MISSION_COMPLETE: FailsafeStatus.ACTION_NONE,
FSMState.ABORTED: FailsafeStatus.ACTION_NONE,
}.get(self.fsm_state, FailsafeStatus.ACTION_NONE)
msg.message = (
f'Assessment: {self._assessment_state_name()} | '
f'State: {self._fsm_state_name(self.fsm_state)} | '
f'Battery: {self.battery_percent*100:.0f}%'
)
self.failsafe_pub.publish(msg)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _assessment_state_name(self) -> str:
"""Return human-readable assessment state name."""
return {
AssessmentState.GREEN: 'GREEN',
AssessmentState.AMBER: 'AMBER',
AssessmentState.RED: 'RED',
}.get(self.assessment_state, 'UNKNOWN')
def _fsm_state_name(self, state: int) -> str:
"""Return human-readable FSM state name."""
return {
FSMState.NORMAL: 'NORMAL',
FSMState.ASSESSING: 'ASSESSING',
FSMState.HOLD_AND_RECOVER: 'HOLD_AND_RECOVER',
FSMState.RETURN_TO_SAFE: 'RETURN_TO_SAFE',
FSMState.EMERGENCY_SURFACE: 'EMERGENCY_SURFACE',
FSMState.MISSION_COMPLETE: 'MISSION_COMPLETE',
FSMState.ABORTED: 'ABORTED',
}.get(state, 'UNKNOWN')
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main(args=None):
"""ROS2 node entry point."""
rclpy.init(args=args)
node = FailsafeMonitor()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':

View File

@ -1,12 +1,39 @@
std_msgs/Header header
# Assessment state (continuous self-assessment result)
uint8 assessment_state
uint8 ASSESSMENT_GREEN=0
uint8 ASSESSMENT_AMBER=1
uint8 ASSESSMENT_RED=2
# Failsafe state machine state
uint8 failsafe_state
uint8 STATE_NORMAL=0
uint8 STATE_ASSESSING=1
uint8 STATE_HOLD_AND_RECOVER=2
uint8 STATE_RETURN_TO_SAFE=3
uint8 STATE_EMERGENCY_SURFACE=4
uint8 STATE_MISSION_COMPLETE=5
uint8 STATE_ABORTED=6
# Active trigger flags
bool comms_loss
bool low_battery
bool depth_exceeded
bool obstacle_detected
bool manual_abort
bool thruster_anomaly
# Commanded action
uint8 action
uint8 ACTION_NONE=0
uint8 ACTION_HOVER=1
uint8 ACTION_SURFACE=2
uint8 ACTION_EMERGENCY_SURFACE=3
uint8 ACTION_RETURN_TO_SAFE=2
uint8 ACTION_SURFACE=3
uint8 ACTION_EMERGENCY_SURFACE=4
# Battery detail
float32 battery_percent
# Human-readable status
string message

View File

@ -1,30 +1,52 @@
"""Mission executor launch file."""
"""
Mission executor launch file.
Selects config based on env argument at launch time.
Usage:
ros2 launch rov_mission mission.launch.py # dev config (default)
ros2 launch rov_mission mission.launch.py env:=field # field config
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.actions import DeclareLaunchArgument, OpaqueFunction
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
def _launch_with_config(context, *args, **kwargs):
"""Resolve env argument and return node with correct config file."""
pkg_dir = get_package_share_directory('rov_mission')
env_arg = DeclareLaunchArgument(
'env', default_value='dev',
description='Deployment environment: dev, field'
)
# Evaluate env argument at launch time
env = LaunchConfiguration('env').perform(context)
env = LaunchConfiguration('env')
config_file = os.path.join(pkg_dir, 'config', f'{env}.yaml')
return LaunchDescription([
env_arg,
if not os.path.exists(config_file):
raise FileNotFoundError(
f'Config file not found: {config_file}. '
f'Valid environments: dev, field'
)
return [
Node(
package='rov_mission',
executable='mission_executor',
name='mission_executor',
output='screen',
# Config selected by env argument at launch time
parameters=[config_file],
)
]
def generate_launch_description():
"""Launch mission executor with environment-specific config."""
return LaunchDescription([
DeclareLaunchArgument(
'env',
default_value='dev',
description='Deployment environment: dev, field'
),
OpaqueFunction(function=_launch_with_config),
])

View File

@ -1,118 +1,390 @@
#!/usr/bin/env python3
"""
Mission executor node.
Manages inspection mission state machine.
Sequences waypoints, triggers image capture, handles replanning.
Mission Executor Node Argonaut 3
====================================
Manages inspection mission state machine per ROV_Failsafe_Design_v2.0.
States: IDLE RUNNING (PAUSED | COMPLETE | ABORTED)
Sequences inspection waypoints, triggers image capture, maintains the
breadcrumb position buffer used by the failsafe return-to-safe path,
and responds correctly to each failsafe action level.
States: IDLE -> RUNNING -> PAUSED | COMPLETE | ABORTED
Failsafe integration:
ACTION_HOVER -> pause mission, hold position
ACTION_RETURN_TO_SAFE -> abort mission, return via breadcrumb path
ACTION_EMERGENCY_SURFACE -> abort mission, emergency surface immediately
Breadcrumb system:
Position logged every 2s from mission start.
Circular buffer of 1800 entries (1 hour of operation).
Used by return-to-safe to reverse the entry path safely.
Topics subscribed:
/rov/state (nav_msgs/Odometry) current position
/rov/failsafe (rov_interfaces/FailsafeStatus) safety override
/rov/state (nav_msgs/Odometry) current position
/rov/failsafe (rov_interfaces/FailsafeStatus) safety override
Topics published:
/rov/cmd_vel (geometry_msgs/Twist) velocity setpoints to controller
/rov/mission/status (rov_interfaces/MissionStatus) current mission state
/rov/cmd_vel (geometry_msgs/Twist) velocity setpoints
/rov/mission/status (rov_interfaces/MissionStatus) mission state
/rov/mission/waypoint (rov_interfaces/InspectionWaypoint) current target
Services provided:
/rov/mission/command (rov_interfaces/MissionCommand) start/pause/abort/resume
/rov/mission/command (rov_interfaces/MissionCommand) start/pause/abort/resume
"""
import math
import yaml
import collections
from dataclasses import dataclass, field
from typing import List, Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from geometry_msgs.msg import Twist, Point
from nav_msgs.msg import Odometry
from std_msgs.msg import Header
from rov_interfaces.msg import MissionStatus, FailsafeStatus, InspectionWaypoint
from rov_interfaces.srv import MissionCommand
class MissionExecutor(Node):
"""Executes inspection missions from a waypoint list."""
# ---------------------------------------------------------------------------
# Breadcrumb entry
# ---------------------------------------------------------------------------
# Mission states
STATE_IDLE = 'IDLE'
STATE_RUNNING = 'RUNNING'
STATE_PAUSED = 'PAUSED'
@dataclass
class Breadcrumb:
"""Single position record for the return-to-safe path."""
timestamp_s: float # ROS time in seconds
x: float # metres
y: float # metres
z: float # metres (negative = depth)
heading_deg: float # degrees
confidence: float # 0.01.0 (placeholder until full state estimation)
# ---------------------------------------------------------------------------
# Breadcrumb buffer
# ---------------------------------------------------------------------------
class BreadcrumbBuffer:
"""
Circular buffer of position records logged during mission.
1800 entries at 2s interval = 1 hour of operation.
Older entries are discarded as new ones are added.
"""
CAPACITY = 1800 # entries
LOG_INTERVAL_S = 2.0 # seconds between entries
MIN_CONFIDENCE = 0.3 # entries below this are flagged during reversal
def __init__(self):
self._buffer: collections.deque = collections.deque(maxlen=self.CAPACITY)
self._last_log_time: float = 0.0
def update(self, now_s: float, position: Point, heading_deg: float,
confidence: float = 1.0):
"""Log a breadcrumb if the log interval has elapsed."""
if now_s - self._last_log_time >= self.LOG_INTERVAL_S:
self._buffer.append(Breadcrumb(
timestamp_s=now_s,
x=position.x,
y=position.y,
z=position.z,
heading_deg=heading_deg,
confidence=confidence,
))
self._last_log_time = now_s
def get_return_path(self) -> List[Breadcrumb]:
"""
Return the breadcrumb buffer in reverse order for path reversal.
Entries below MIN_CONFIDENCE are filtered skipped in favour of
the next reliable entry to reduce navigation uncertainty.
"""
reliable = [b for b in self._buffer if b.confidence >= self.MIN_CONFIDENCE]
return list(reversed(reliable))
def clear(self):
"""Clear all entries — called at mission start."""
self._buffer.clear()
self._last_log_time = 0.0
@property
def entry_count(self) -> int:
"""Number of entries currently in buffer."""
return len(self._buffer)
# ---------------------------------------------------------------------------
# Mission executor node
# ---------------------------------------------------------------------------
class MissionExecutor(Node):
"""
Executes inspection missions from a waypoint sequence.
Integrates with the failsafe monitor and maintains breadcrumb buffer.
"""
# Internal mission states
STATE_IDLE = 'IDLE'
STATE_RUNNING = 'RUNNING'
STATE_PAUSED = 'PAUSED'
STATE_COMPLETE = 'COMPLETE'
STATE_ABORTED = 'ABORTED'
STATE_ABORTED = 'ABORTED'
def __init__(self):
super().__init__('mission_executor')
self.state = self.STATE_IDLE
self.waypoints = [] # List of InspectionWaypoint
self.current_wp_index = 0
self.mission_id = ''
self.current_position = None
self.failsafe_active = False
# ------------------------------------------------------------------
# Parameters (from dev.yaml / field.yaml)
# ------------------------------------------------------------------
# --- Subscribers ---
self.state_sub = self.create_subscription(
Odometry, '/rov/state', self.state_callback, 10)
self.failsafe_sub = self.create_subscription(
FailsafeStatus, '/rov/failsafe', self.failsafe_callback, 10)
self.declare_parameter('waypoint_arrival_radius_m', 0.5)
self.declare_parameter('waypoint_timeout_s', 60.0)
self.declare_parameter('image_capture_delay_s', 1.0)
# --- Publishers ---
self.cmd_pub = self.create_publisher(Twist, '/rov/cmd_vel', 10)
self.status_pub = self.create_publisher(MissionStatus, '/rov/mission/status', 10)
self._arrival_radius = self.get_parameter('waypoint_arrival_radius_m').value
self._wp_timeout_s = self.get_parameter('waypoint_timeout_s').value
self._capture_delay_s = self.get_parameter('image_capture_delay_s').value
# --- Services ---
self.cmd_srv = self.create_service(
MissionCommand, '/rov/mission/command', self.command_callback)
# ------------------------------------------------------------------
# Mission state
# ------------------------------------------------------------------
# --- Timer: mission loop at 10Hz ---
self.timer = self.create_timer(0.1, self.mission_loop)
self.state = self.STATE_IDLE
self.mission_id = ''
self.waypoints: List[InspectionWaypoint] = []
self.current_wp_index = 0
self.get_logger().info('MissionExecutor node started')
# Entry point — set when mission starts, used for safe zone reference
self.entry_point: Optional[Point] = None
def state_callback(self, msg: Odometry):
"""Update current position."""
# Operation mode — affects return-to-safe path choice
self.operation_mode = 'tethered' # 'tethered' or 'untethered'
# ------------------------------------------------------------------
# Sensor state
# ------------------------------------------------------------------
self.current_position: Optional[Point] = None
self.current_heading_deg: float = 0.0
# ------------------------------------------------------------------
# Failsafe state
# ------------------------------------------------------------------
self.last_failsafe_action = FailsafeStatus.ACTION_NONE
self.last_assessment_state = FailsafeStatus.ASSESSMENT_GREEN
# ------------------------------------------------------------------
# Breadcrumb buffer
# ------------------------------------------------------------------
self.breadcrumbs = BreadcrumbBuffer()
# ------------------------------------------------------------------
# Waypoint timing
# ------------------------------------------------------------------
self.wp_start_time: Optional[float] = None # When current wp navigation began
self.capture_start_time: Optional[float] = None # When image capture loiter began
self.capturing_image: bool = False
# ------------------------------------------------------------------
# Subscribers
# ------------------------------------------------------------------
self.create_subscription(
Odometry, '/rov/state',
self._state_callback, 10)
self.create_subscription(
FailsafeStatus, '/rov/failsafe',
self._failsafe_callback, 10)
# ------------------------------------------------------------------
# Publishers
# ------------------------------------------------------------------
self.cmd_pub = self.create_publisher(
Twist, '/rov/cmd_vel', 10)
self.status_pub = self.create_publisher(
MissionStatus, '/rov/mission/status', 10)
self.waypoint_pub = self.create_publisher(
InspectionWaypoint, '/rov/mission/waypoint', 10)
# ------------------------------------------------------------------
# Service
# ------------------------------------------------------------------
self.create_service(
MissionCommand, '/rov/mission/command',
self._command_callback)
# ------------------------------------------------------------------
# Timers
# ------------------------------------------------------------------
# Mission loop at 10 Hz
self.create_timer(0.1, self._mission_loop)
# Status publisher at 2 Hz
self.create_timer(0.5, self._publish_status)
self.get_logger().info('MissionExecutor started')
# ------------------------------------------------------------------
# Subscriber callbacks
# ------------------------------------------------------------------
def _state_callback(self, msg: Odometry):
"""Update current position and heading from state estimator."""
self.current_position = msg.pose.pose.position
def failsafe_callback(self, msg: FailsafeStatus):
"""Handle failsafe — abort mission if active."""
was_active = self.failsafe_active
self.failsafe_active = (msg.action != FailsafeStatus.ACTION_NONE)
if self.failsafe_active and not was_active:
self.get_logger().warn('Failsafe triggered — aborting mission')
self.state = self.STATE_ABORTED
# Extract heading from quaternion (yaw only — simplified)
q = msg.pose.pose.orientation
siny_cosp = 2.0 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1.0 - 2.0 * (q.y * q.y + q.z * q.z)
self.current_heading_deg = math.degrees(math.atan2(siny_cosp, cosy_cosp))
def command_callback(self, request, response):
"""Handle mission command service calls."""
# Log breadcrumb if mission is active
if self.state == self.STATE_RUNNING and self.current_position is not None:
now_s = self.get_clock().now().nanoseconds / 1e9
self.breadcrumbs.update(
now_s,
self.current_position,
self.current_heading_deg,
)
def _failsafe_callback(self, msg: FailsafeStatus):
"""
Respond to failsafe actions per design spec priority.
Different actions require different mission responses.
"""
action = msg.action
assessment_state = msg.assessment_state
# Store for use in mission loop
self.last_failsafe_action = action
self.last_assessment_state = assessment_state
# --- Emergency surface: abort immediately, no controlled return ---
if action == FailsafeStatus.ACTION_EMERGENCY_SURFACE:
if self.state not in (self.STATE_ABORTED, self.STATE_COMPLETE):
self.get_logger().error(
'EMERGENCY SURFACE commanded — aborting mission immediately')
self._abort_mission('Emergency surface — failsafe')
return
# --- Return to safe: abort mission, initiate breadcrumb reversal ---
if action == FailsafeStatus.ACTION_RETURN_TO_SAFE:
if self.state not in (self.STATE_ABORTED, self.STATE_COMPLETE):
self.get_logger().warn(
f'RETURN TO SAFE commanded — aborting mission '
f'(breadcrumbs: {self.breadcrumbs.entry_count})')
self._abort_mission('Return to safe — failsafe')
# Breadcrumb path handed off to motion controller here
# (Phase 3+ — motion controller implements path reversal)
return
# --- Hover: pause mission, hold position ---
if action == FailsafeStatus.ACTION_HOVER:
if self.state == self.STATE_RUNNING:
self.get_logger().warn('HOVER commanded — pausing mission')
self.state = self.STATE_PAUSED
self._stop_motion()
return
# --- No action: if paused by failsafe and now clear, resume ---
if action == FailsafeStatus.ACTION_NONE:
if (self.state == self.STATE_PAUSED and
assessment_state == FailsafeStatus.ASSESSMENT_GREEN):
self.get_logger().info(
'Failsafe cleared (GREEN) — resuming mission')
self.state = self.STATE_RUNNING
# ------------------------------------------------------------------
# Service callback — mission commands
# ------------------------------------------------------------------
def _command_callback(self, request, response):
"""
Handle mission commands: START, PAUSE, RESUME, ABORT, LOAD.
LOAD accepts a YAML waypoint file path via parameters[0].
"""
cmd = request.command.upper()
if cmd == 'START':
if self.state == self.STATE_IDLE:
self.mission_id = request.mission_id
self.current_wp_index = 0
self.state = self.STATE_RUNNING
response.success = True
response.message = f'Mission {self.mission_id} started'
else:
if self.state != self.STATE_IDLE:
response.success = False
response.message = f'Cannot start — current state: {self.state}'
response.message = f'Cannot START — current state: {self.state}'
return response
if not self.waypoints:
response.success = False
response.message = 'Cannot START — no waypoints loaded'
return response
# Record entry point at mission start
self.entry_point = (
self.current_position if self.current_position is not None
else Point()
)
# Set operation mode from parameters if provided
if request.parameters:
self.operation_mode = request.parameters[0]
self.breadcrumbs.clear()
self.current_wp_index = 0
self.mission_id = request.mission_id
self.state = self.STATE_RUNNING
self.wp_start_time = self.get_clock().now().nanoseconds / 1e9
self.get_logger().info(
f'Mission {self.mission_id} started — '
f'{len(self.waypoints)} waypoints, '
f'mode={self.operation_mode}'
)
response.success = True
response.message = f'Mission {self.mission_id} started'
elif cmd == 'LOAD':
# Load waypoints from YAML file path in parameters[0]
if not request.parameters:
response.success = False
response.message = 'LOAD requires parameters[0] = YAML file path'
return response
loaded, message = self._load_waypoints(request.parameters[0])
response.success = loaded
response.message = message
elif cmd == 'PAUSE':
if self.state == self.STATE_RUNNING:
self.state = self.STATE_PAUSED
response.success = True
response.message = 'Mission paused'
else:
if self.state != self.STATE_RUNNING:
response.success = False
response.message = f'Cannot pause — current state: {self.state}'
response.message = f'Cannot PAUSE — current state: {self.state}'
return response
self.state = self.STATE_PAUSED
self._stop_motion()
response.success = True
response.message = 'Mission paused'
elif cmd == 'RESUME':
if self.state == self.STATE_PAUSED:
self.state = self.STATE_RUNNING
response.success = True
response.message = 'Mission resumed'
else:
if self.state != self.STATE_PAUSED:
response.success = False
response.message = f'Cannot resume — current state: {self.state}'
response.message = f'Cannot RESUME — current state: {self.state}'
return response
self.state = self.STATE_RUNNING
self.wp_start_time = self.get_clock().now().nanoseconds / 1e9
response.success = True
response.message = 'Mission resumed'
elif cmd == 'ABORT':
self.state = self.STATE_ABORTED
self.stop_motion()
self._abort_mission('Operator abort command')
response.success = True
response.message = 'Mission aborted'
@ -122,10 +394,12 @@ class MissionExecutor(Node):
return response
def mission_loop(self):
"""Main mission execution loop — called at 10Hz."""
self.publish_status()
# ------------------------------------------------------------------
# Mission execution loop — 10 Hz
# ------------------------------------------------------------------
def _mission_loop(self):
"""Main mission execution loop."""
if self.state != self.STATE_RUNNING:
return
@ -137,46 +411,229 @@ class MissionExecutor(Node):
if self.current_wp_index >= len(self.waypoints):
self.get_logger().info('All waypoints reached — mission complete')
self.state = self.STATE_COMPLETE
self._stop_motion()
return
# TODO: navigate to current waypoint
# TODO: check arrival condition
# TODO: trigger image capture if required
# TODO: advance to next waypoint on arrival
if self.current_position is None:
self.get_logger().warn('No position estimate — waiting')
return
def stop_motion(self):
"""Publish zero velocity to stop the ROV."""
self.cmd_pub.publish(Twist()) # All zeros = stop
current_wp = self.waypoints[self.current_wp_index]
now_s = self.get_clock().now().nanoseconds / 1e9
def publish_status(self):
# --- Publish current waypoint target ---
self.waypoint_pub.publish(current_wp)
# --- Check waypoint timeout ---
if (self.wp_start_time is not None and
now_s - self.wp_start_time > self._wp_timeout_s):
self.get_logger().warn(
f'Waypoint {current_wp.waypoint_id} timed out after '
f'{self._wp_timeout_s}s — advancing to next'
)
self._advance_waypoint(now_s)
return
# --- Check arrival at waypoint ---
distance = self._distance_to(current_wp.position)
if distance > self._arrival_radius:
# Not yet arrived — navigate toward waypoint
self._navigate_toward(current_wp)
return
# --- Arrived at waypoint ---
self._stop_motion()
if current_wp.capture_image and not self.capturing_image:
# Start image capture loiter
self.capturing_image = True
self.capture_start_time = now_s
self.get_logger().info(
f'Arrived at {current_wp.waypoint_id}'
f'capturing image (loiter {current_wp.loiter_time_s}s)'
)
return
if self.capturing_image:
# Wait for loiter time before advancing
elapsed_capture = now_s - self.capture_start_time
if elapsed_capture < current_wp.loiter_time_s + self._capture_delay_s:
return
self.capturing_image = False
self.get_logger().info(
f'Image capture complete at {current_wp.waypoint_id}')
# Advance to next waypoint
self._advance_waypoint(now_s)
# ------------------------------------------------------------------
# Navigation helpers
# ------------------------------------------------------------------
def _navigate_toward(self, waypoint: InspectionWaypoint):
"""
Publish proportional velocity command toward the waypoint.
Simple P-controller replaced by full motion planner in Phase 3+.
Max speed: 0.3 m/s in XY, 0.1 m/s in Z.
"""
MAX_XY = 0.3 # m/s
MAX_Z = 0.1 # m/s
KP = 0.5 # proportional gain
dx = waypoint.position.x - self.current_position.x
dy = waypoint.position.y - self.current_position.y
dz = waypoint.position.z - self.current_position.z
# Clamp to max speeds
vx = max(-MAX_XY, min(MAX_XY, KP * dx))
vy = max(-MAX_XY, min(MAX_XY, KP * dy))
vz = max(-MAX_Z, min(MAX_Z, KP * dz))
cmd = Twist()
cmd.linear.x = vx
cmd.linear.y = vy
cmd.linear.z = vz
self.cmd_pub.publish(cmd)
def _stop_motion(self):
"""Publish zero velocity to halt the ROV."""
self.cmd_pub.publish(Twist()) # All zeros
def _distance_to(self, target: Point) -> float:
"""Euclidean distance from current position to target."""
if self.current_position is None:
return float('inf')
dx = target.x - self.current_position.x
dy = target.y - self.current_position.y
dz = target.z - self.current_position.z
return math.sqrt(dx*dx + dy*dy + dz*dz)
def _advance_waypoint(self, now_s: float):
"""Move to the next waypoint in the sequence."""
self.current_wp_index += 1
self.wp_start_time = now_s
self.capturing_image = False
if self.current_wp_index < len(self.waypoints):
next_wp = self.waypoints[self.current_wp_index]
self.get_logger().info(
f'Advancing to waypoint {self.current_wp_index}: '
f'{next_wp.waypoint_id}'
)
def _abort_mission(self, reason: str):
"""Abort the mission and stop motion."""
self.state = self.STATE_ABORTED
self._stop_motion()
self.get_logger().warn(f'Mission aborted: {reason}')
# ------------------------------------------------------------------
# Waypoint loading
# ------------------------------------------------------------------
def _load_waypoints(self, yaml_path: str):
"""
Load InspectionWaypoint list from a YAML mission file.
Expected format:
waypoints:
- id: "WP1"
x: 1.0
y: 2.0
z: -5.0
heading_deg: 90.0
standoff_m: 0.5
loiter_time_s: 3.0
capture_image: true
panel_id: "P1"
"""
try:
with open(yaml_path, 'r') as f:
data = yaml.safe_load(f)
if 'waypoints' not in data:
return False, f'No waypoints key in {yaml_path}'
self.waypoints = []
for entry in data['waypoints']:
wp = InspectionWaypoint()
wp.header.stamp = self.get_clock().now().to_msg()
wp.waypoint_id = str(entry.get('id', ''))
wp.position.x = float(entry.get('x', 0.0))
wp.position.y = float(entry.get('y', 0.0))
wp.position.z = float(entry.get('z', 0.0))
wp.heading_deg = float(entry.get('heading_deg', 0.0))
wp.standoff_m = float(entry.get('standoff_m', 0.5))
wp.loiter_time_s = float(entry.get('loiter_time_s', 2.0))
wp.capture_image = bool(entry.get('capture_image', False))
wp.panel_id = str(entry.get('panel_id', ''))
self.waypoints.append(wp)
msg = f'Loaded {len(self.waypoints)} waypoints from {yaml_path}'
self.get_logger().info(msg)
return True, msg
except FileNotFoundError:
return False, f'Mission file not found: {yaml_path}'
except Exception as e:
return False, f'Failed to load waypoints: {str(e)}'
# ------------------------------------------------------------------
# Status publishing — 2 Hz
# ------------------------------------------------------------------
def _publish_status(self):
"""Publish current mission status."""
msg = MissionStatus()
msg = MissionStatus()
msg.header.stamp = self.get_clock().now().to_msg()
msg.mission_id = self.mission_id
msg.current_task = f'Waypoint {self.current_wp_index}/{len(self.waypoints)}'
msg.message = self.state
if len(self.waypoints) > 0:
msg.progress_percent = (self.current_wp_index / len(self.waypoints)) * 100.0
msg.mission_id = self.mission_id
# Map state string to uint8
state_map = {
self.STATE_IDLE: MissionStatus.STATE_IDLE,
self.STATE_RUNNING: MissionStatus.STATE_RUNNING,
self.STATE_PAUSED: MissionStatus.STATE_PAUSED,
msg.state = {
self.STATE_IDLE: MissionStatus.STATE_IDLE,
self.STATE_RUNNING: MissionStatus.STATE_RUNNING,
self.STATE_PAUSED: MissionStatus.STATE_PAUSED,
self.STATE_COMPLETE: MissionStatus.STATE_COMPLETE,
self.STATE_ABORTED: MissionStatus.STATE_ABORTED,
}
msg.state = state_map.get(self.state, MissionStatus.STATE_IDLE)
self.STATE_ABORTED: MissionStatus.STATE_ABORTED,
}.get(self.state, MissionStatus.STATE_IDLE)
# Progress
total = len(self.waypoints)
msg.progress_percent = (
(self.current_wp_index / total * 100.0) if total > 0 else 0.0
)
# Current task description
if self.state == self.STATE_RUNNING and total > 0:
wp_id = (self.waypoints[self.current_wp_index].waypoint_id
if self.current_wp_index < total else 'complete')
msg.current_task = f'WP {self.current_wp_index + 1}/{total}: {wp_id}'
else:
msg.current_task = self.state
msg.message = (
f'{self.state} | '
f'WP {self.current_wp_index}/{total} | '
f'Breadcrumbs: {self.breadcrumbs.entry_count}'
)
self.status_pub.publish(msg)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main(args=None):
"""ROS2 node entry point."""
rclpy.init(args=args)
node = MissionExecutor()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':

View File

@ -0,0 +1,70 @@
"""
Dev stack launch file Argonaut 3 Simulation.
DEV ONLY NOT FOR DEPLOYMENT TO VEHICLE HARDWARE.
Starts mock_publisher + failsafe_monitor + mission_executor together
for integrated testing without physical hardware.
Usage:
ros2 launch rov_simulation dev_stack.launch.py
ros2 launch rov_simulation dev_stack.launch.py scenario:=low_battery
ros2 launch rov_simulation dev_stack.launch.py scenario:=comms_loss
Available scenarios: nominal, low_battery, comms_loss, depth_warning, all_clear
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
"""Launch full dev stack with selected simulation scenario."""
control_pkg = get_package_share_directory('rov_control')
mission_pkg = get_package_share_directory('rov_mission')
failsafe_config = os.path.join(control_pkg, 'config', 'failsafe.yaml')
mission_config = os.path.join(mission_pkg, 'config', 'dev.yaml')
return LaunchDescription([
# Scenario argument — passed to mock_publisher
DeclareLaunchArgument(
'scenario',
default_value='nominal',
description=(
'Simulation scenario: '
'nominal | low_battery | comms_loss | depth_warning | all_clear'
)
),
# Mock publisher — synthetic sensor data (dev only)
Node(
package='rov_simulation',
executable='mock_publisher',
name='mock_publisher',
output='screen',
parameters=[{'scenario': LaunchConfiguration('scenario')}],
),
# Failsafe monitor — reads mock sensor data, publishes /rov/failsafe
Node(
package='rov_control',
executable='failsafe_monitor',
name='failsafe_monitor',
output='screen',
parameters=[failsafe_config],
),
# Mission executor — reads failsafe state, publishes mission status
Node(
package='rov_mission',
executable='mission_executor',
name='mission_executor',
output='screen',
parameters=[mission_config],
),
])

View File

@ -0,0 +1,19 @@
<?xml version="1.0"?>
<package format="3">
<name>rov_simulation</name>
<version>0.1.0</version>
<description>Development-only simulation utilities — NOT for deployment to vehicle hardware.</description>
<maintainer email="grant@symbytech.com">grant</maintainer>
<license>Proprietary</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>mavros_msgs</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>rov_interfaces</depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,322 @@
#!/usr/bin/env python3
"""
Mock Publisher Node Argonaut 3 Simulation
=============================================
DEV ONLY NOT FOR DEPLOYMENT TO VEHICLE HARDWARE.
Publishes synthetic sensor data to simulate a connected ArduSub vehicle.
Allows failsafe_monitor and mission_executor to be tested without
physical hardware or a live MAVROS connection.
Simulated topics (mimics MAVROS output):
/mavros/state (mavros_msgs/State) FCU connection + heartbeat
/mavros/battery (sensor_msgs/BatteryState) battery level
/mavros/global_position/rel_alt (std_msgs/Float32) depth (negative = below surface)
/rov/state (nav_msgs/Odometry) vehicle position + orientation
Scenarios (set via ROS parameter 'scenario'):
nominal healthy vehicle, normal battery, shallow depth
low_battery battery draining toward return threshold
comms_loss heartbeat stops after 10s to trigger comms failsafe
depth_warning vehicle descends toward warning depth
all_clear all GREEN, used to verify normal operation
Usage:
ros2 run rov_simulation mock_publisher --ros-args -p scenario:=low_battery
"""
import math
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, Bool, Header
from sensor_msgs.msg import BatteryState
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Quaternion
from mavros_msgs.msg import State
# ---------------------------------------------------------------------------
# Available simulation scenarios
# ---------------------------------------------------------------------------
SCENARIOS = {
'nominal': {
'description': 'Healthy vehicle — all systems nominal',
'battery_start': 0.85, # 85% — well above all thresholds
'battery_drain': 0.0, # No drain
'depth_start_m': 5.0, # Shallow
'depth_rate_mps': 0.0, # Stationary
'comms_loss_at_s': None, # No comms loss
},
'low_battery': {
'description': 'Battery draining — triggers warning then return threshold',
'battery_start': 0.30, # Start above warning (25%)
'battery_drain': 0.001, # Drain 0.1% per second
'depth_start_m': 10.0,
'depth_rate_mps': 0.0,
'comms_loss_at_s': None,
},
'comms_loss': {
'description': 'Heartbeat stops after 10s — triggers comms failsafe',
'battery_start': 0.80,
'battery_drain': 0.0,
'depth_start_m': 8.0,
'depth_rate_mps': 0.0,
'comms_loss_at_s': 10.0, # Stop heartbeat at 10s
},
'depth_warning': {
'description': 'Vehicle descending toward warning depth limit',
'battery_start': 0.75,
'battery_drain': 0.0,
'depth_start_m': 180.0, # Start near design depth (200m)
'depth_rate_mps': 0.5, # Descend 0.5 m/s toward warning (250m)
'comms_loss_at_s': None,
},
'all_clear': {
'description': 'All GREEN — verify normal operation produces no failsafe',
'battery_start': 0.95,
'battery_drain': 0.0,
'depth_start_m': 3.0,
'depth_rate_mps': 0.0,
'comms_loss_at_s': None,
},
}
class MockPublisher(Node):
"""
Publishes synthetic MAVROS-compatible sensor data for dev testing.
Simulates a connected ArduSub vehicle without physical hardware.
"""
PUBLISH_RATE_HZ = 10 # All topics published at 10 Hz
def __init__(self):
super().__init__('mock_publisher')
# ------------------------------------------------------------------
# Parameters
# ------------------------------------------------------------------
self.declare_parameter('scenario', 'nominal')
scenario_name = self.get_parameter('scenario').value
if scenario_name not in SCENARIOS:
self.get_logger().error(
f'Unknown scenario: {scenario_name}. '
f'Valid: {list(SCENARIOS.keys())}'
)
scenario_name = 'nominal'
self._scenario = SCENARIOS[scenario_name]
self.get_logger().info(
f'MockPublisher starting — scenario: {scenario_name} '
f'({self._scenario["description"]})'
)
# ------------------------------------------------------------------
# Simulation state
# ------------------------------------------------------------------
self._elapsed_s = 0.0
self._battery_pct = self._scenario['battery_start']
self._depth_m = self._scenario['depth_start_m']
self._comms_active = True
# Simple position simulation — vehicle moves in XY circle for realism
self._pos_x = 0.0
self._pos_y = 0.0
self._orbit_radius_m = 2.0 # metres
self._orbit_rate_rps = 0.05 # radians per second
# ------------------------------------------------------------------
# Publishers (all matching MAVROS topic names and message types)
# ------------------------------------------------------------------
self._state_pub = self.create_publisher(
State, '/mavros/state', 10)
self._battery_pub = self.create_publisher(
BatteryState, '/mavros/battery', 10)
self._alt_pub = self.create_publisher(
Float32, '/mavros/global_position/rel_alt', 10)
self._odom_pub = self.create_publisher(
Odometry, '/rov/state', 10)
# ------------------------------------------------------------------
# Timer
# ------------------------------------------------------------------
self._dt = 1.0 / self.PUBLISH_RATE_HZ
self.create_timer(self._dt, self._publish_all)
self.get_logger().warn(
'*** DEV ONLY — mock_publisher is active. '
'Do not deploy to vehicle hardware. ***'
)
# ------------------------------------------------------------------
# Main publish cycle
# ------------------------------------------------------------------
def _publish_all(self):
"""Advance simulation state and publish all topics."""
self._advance_simulation()
self._publish_state()
self._publish_battery()
self._publish_altitude()
self._publish_odometry()
# ------------------------------------------------------------------
# Simulation state advancement
# ------------------------------------------------------------------
def _advance_simulation(self):
"""Advance all simulated quantities by one timestep."""
self._elapsed_s += self._dt
# Battery drain
drain = self._scenario['battery_drain'] * self._dt
self._battery_pct = max(0.0, self._battery_pct - drain)
# Depth change
self._depth_m += self._scenario['depth_rate_mps'] * self._dt
# Position orbit (simple circle for realistic /rov/state output)
angle = self._orbit_rate_rps * self._elapsed_s
self._pos_x = self._orbit_radius_m * math.cos(angle)
self._pos_y = self._orbit_radius_m * math.sin(angle)
# Comms loss scenario — stop publishing heartbeat after threshold
loss_at = self._scenario['comms_loss_at_s']
if loss_at is not None and self._elapsed_s >= loss_at:
if self._comms_active:
self.get_logger().warn(
f'[{self._elapsed_s:.1f}s] Simulating comms loss — '
'stopping FCU heartbeat'
)
self._comms_active = False
# Log key threshold crossings once
self._log_threshold_crossings()
def _log_threshold_crossings(self):
"""Log once when battery crosses each threshold."""
pct = self._battery_pct
thresholds = [
(0.25, 'WARNING (25%)'),
(0.20, 'RETURN threshold (20%)'),
(0.12, 'CRITICAL (12%)'),
(0.08, 'EMERGENCY (8%)'),
]
for threshold, label in thresholds:
attr = f'_logged_battery_{int(threshold*100)}'
if pct < threshold and not getattr(self, attr, False):
self.get_logger().warn(
f'[{self._elapsed_s:.1f}s] Battery crossed {label}: '
f'{pct*100:.1f}%'
)
setattr(self, attr, True)
# ------------------------------------------------------------------
# Topic publishers
# ------------------------------------------------------------------
def _publish_state(self):
"""
Publish FCU connection state (mavros_msgs/State).
Mimics the MAVROS /mavros/state topic.
Stops publishing when comms_loss scenario activates.
"""
if not self._comms_active:
# Do not publish — simulates lost heartbeat
return
msg = State()
msg.header.stamp = self.get_clock().now().to_msg()
msg.connected = True
msg.armed = False # Unarmed for dev testing
msg.guided = True
msg.mode = 'ALT_HOLD'
msg.system_status = 3 # MAV_STATE_STANDBY
self._state_pub.publish(msg)
def _publish_battery(self):
"""
Publish battery state (sensor_msgs/BatteryState).
Mimics the MAVROS /mavros/battery topic.
percentage field: 0.01.0.
"""
msg = BatteryState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.percentage = float(self._battery_pct)
msg.voltage = 14.8 * self._battery_pct + 12.0 # Approximate LiPo curve
msg.current = 5.0 # Amps — nominal draw
msg.charge = float('nan')
msg.capacity = float('nan')
msg.design_capacity = float('nan')
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
self._battery_pub.publish(msg)
def _publish_altitude(self):
"""
Publish relative altitude (std_msgs/Float32).
Negative value = depth below surface, matching MAVROS convention.
"""
msg = Float32()
msg.data = -abs(self._depth_m) # Always negative (depth)
self._alt_pub.publish(msg)
def _publish_odometry(self):
"""
Publish vehicle state (nav_msgs/Odometry).
Provides position for mission_executor arrival detection and
breadcrumb logging.
"""
msg = Odometry()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'map'
msg.child_frame_id = 'base_link'
# Position — orbit + depth
msg.pose.pose.position.x = self._pos_x
msg.pose.pose.position.y = self._pos_y
msg.pose.pose.position.z = -abs(self._depth_m)
# Orientation — identity quaternion (level vehicle)
msg.pose.pose.orientation.w = 1.0
msg.pose.pose.orientation.x = 0.0
msg.pose.pose.orientation.y = 0.0
msg.pose.pose.orientation.z = 0.0
# Velocity — tangential to orbit
angle = self._orbit_rate_rps * self._elapsed_s
speed = self._orbit_radius_m * self._orbit_rate_rps
msg.twist.twist.linear.x = -speed * math.sin(angle)
msg.twist.twist.linear.y = speed * math.cos(angle)
msg.twist.twist.linear.z = 0.0
self._odom_pub.publish(msg)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main(args=None):
"""ROS2 node entry point."""
rclpy.init(args=args)
node = MockPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,29 @@
from setuptools import setup
import os
from glob import glob
package_name = 'rov_simulation'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob('launch/*.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='grant',
maintainer_email='grant@symbytech.com',
description='Development-only simulation utilities — NOT for deployment',
license='Proprietary',
entry_points={
'console_scripts': [
'mock_publisher = rov_simulation.mock_publisher:main',
],
},
)