feat: implement full failsafe monitor per design v2.0

- Updated FailsafeStatus.msg with assessment state, FSM state, thruster_anomaly flag
- Implemented GREEN/AMBER/RED continuous self-assessment
- Dynamic assessment rates: 5Hz/20Hz/50Hz
- Full FSM: NORMAL->HOLD_AND_RECOVER->RETURN_TO_SAFE->EMERGENCY_SURFACE
- Priority-ordered failsafe conditions (8 levels per design spec)
- Tiered battery thresholds: warning/return/critical/emergency
- Conditional comms loss response based on assessment state
- Conditional manual abort response based on assessment state
- All thresholds configurable via failsafe.yaml
This commit is contained in:
Grant 2026-05-07 00:01:54 +00:00
parent 31ddc2c7eb
commit 9f7a775427
3 changed files with 564 additions and 89 deletions

View File

@ -1,8 +1,40 @@
# Failsafe configuration # Failsafe Monitor Configuration — Argonaut 3
# Tune thresholds before any pool or field testing # See ROV_Failsafe_Design_v2.0 for rationale behind all values.
# Tune marked values after pool testing before field deployment.
failsafe_monitor: failsafe_monitor:
ros__parameters: ros__parameters:
low_battery_threshold: 0.20 # Surface when battery below 20%
max_depth_m: 50.0 # Emergency surface if depth exceeded # --- Comms ---
comms_timeout_s: 5.0 # Surface after 5s comms loss # Time without FCU heartbeat before comms loss triggers.
# Default 2s per design spec. May need increasing in high-latency tether.
# TUNE: verify in pool — 2s may generate false positives.
comms_timeout_s: 2.0
# Hold timeouts before escalating to RETURN_TO_SAFE on comms loss.
comms_hold_green_s: 60.0 # Hold 60s if GREEN before returning
comms_hold_amber_s: 30.0 # Hold 30s if AMBER before returning
# Hold timeout on manual abort before auto-return (GREEN state only).
abort_hold_green_s: 120.0
# --- Battery thresholds (fractions 0.0-1.0) ---
# All values configurable via web UI in production (battery.yaml).
# TUNE: measure empirically per battery type. Do not assume.
battery_warning_pct: 0.25 # Alert, no new panels started
battery_return_pct: 0.20 # Return to safe after current panel
battery_critical_pct: 0.12 # Abandon current task, return immediately
battery_emergency_pct: 0.08 # Emergency surface, no other action
# --- Depth limits (metres, positive = depth below surface) ---
# NOTE: Standard BlueROV2 Heavy rated to 100m.
# Values below target hardware-upgraded configuration.
# Validate against actual build specification before field use.
depth_design_m: 200.0 # Log + alert, continue mission
depth_warning_m: 250.0 # Halt descent, hold, alert operator
depth_safety_m: 300.0 # RETURN_TO_SAFE immediately
# --- Thruster anomaly ---
# Single thruster drawing > this ratio vs peer average triggers AMBER.
# TUNE: establish normal baseline in calm water before setting threshold.
thruster_peer_ratio: 1.5 # 150% of peer average

View File

@ -1,132 +1,548 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
""" """
Failsafe monitor node. Failsafe Monitor Node Argonaut 3
Monitors all safety-critical conditions and publishes failsafe state. ===================================
This node must be running at all times during autonomous operation. Implements continuous self-assessment per ROV_Failsafe_Design_v2.0.
Conditions monitored: Assessment states (GREEN/AMBER/RED) are evaluated continuously.
- Comms loss (heartbeat timeout) Assessment rate adapts dynamically:
- Low battery (from MAVROS battery state) - GREEN (all nominal): 5 Hz
- Maximum depth exceeded - AMBER (degraded): 20 Hz
- Obstacle detection (future proximity sensors) - RED (critical): 50 Hz
- Manual abort command
State machine states:
NORMAL -> ASSESSING -> HOLD_AND_RECOVER -> RETURN_TO_SAFE -> EMERGENCY_SURFACE
Priority order (highest wins when multiple conditions active):
1. EMERGENCY_SURFACE (manual trigger)
2. Critical/emergency battery
3. Safety depth limit (300m)
4. Obstacle detected in RED state
5. Comms loss (assessed response)
6. Manual abort (assessed response)
7. Thruster anomaly (AMBER downgrade)
8. Low battery warning (informational)
Topics subscribed: Topics subscribed:
/mavros/battery (sensor_msgs/BatteryState) battery level /mavros/battery (sensor_msgs/BatteryState)
/mavros/state (mavros_msgs/State) FCU connection state /mavros/state (mavros_msgs/State)
/rov/mission/abort (std_msgs/Bool) manual abort trigger /mavros/global_position/rel_alt (std_msgs/Float32) altitude/depth proxy
/rov/mission/abort (std_msgs/Bool)
Topics published: Topics published:
/rov/failsafe (rov_interfaces/FailsafeStatus) current failsafe state /rov/failsafe (rov_interfaces/FailsafeStatus)
""" """
import rclpy import rclpy
from rclpy.node import Node from rclpy.node import Node
from rclpy.duration import Duration from rclpy.duration import Duration
from std_msgs.msg import Bool from std_msgs.msg import Bool, Float32
from sensor_msgs.msg import BatteryState from sensor_msgs.msg import BatteryState
from mavros_msgs.msg import State from mavros_msgs.msg import State
from rov_interfaces.msg import FailsafeStatus from rov_interfaces.msg import FailsafeStatus
# ---------------------------------------------------------------------------
# Internal state enumerations (not exposed in ROS messages directly)
# ---------------------------------------------------------------------------
class AssessmentState:
"""Continuous self-assessment result states."""
GREEN = FailsafeStatus.ASSESSMENT_GREEN # All systems nominal
AMBER = FailsafeStatus.ASSESSMENT_AMBER # Degraded but functional
RED = FailsafeStatus.ASSESSMENT_RED # Incapable of safe autonomous action
class FSMState:
"""Failsafe state machine states."""
NORMAL = FailsafeStatus.STATE_NORMAL
ASSESSING = FailsafeStatus.STATE_ASSESSING
HOLD_AND_RECOVER = FailsafeStatus.STATE_HOLD_AND_RECOVER
RETURN_TO_SAFE = FailsafeStatus.STATE_RETURN_TO_SAFE
EMERGENCY_SURFACE = FailsafeStatus.STATE_EMERGENCY_SURFACE
MISSION_COMPLETE = FailsafeStatus.STATE_MISSION_COMPLETE
ABORTED = FailsafeStatus.STATE_ABORTED
# ---------------------------------------------------------------------------
# Assessment rates (seconds between evaluations per design spec)
# ---------------------------------------------------------------------------
RATE_GREEN = 1.0 / 5.0 # 5 Hz — normal operation
RATE_AMBER = 1.0 / 20.0 # 20 Hz — degraded
RATE_RED = 1.0 / 50.0 # 50 Hz — failsafe active
class FailsafeMonitor(Node): class FailsafeMonitor(Node):
"""Monitors safety conditions and publishes failsafe status.""" """
Monitors all safety-critical conditions and publishes failsafe state.
Implements the Argonaut 3 failsafe design (v2.0) in full.
"""
def __init__(self): def __init__(self):
super().__init__('failsafe_monitor') super().__init__('failsafe_monitor')
# --- Parameters --- # ------------------------------------------------------------------
self.declare_parameter('low_battery_threshold', 0.20) # 20% remaining # Parameters (loaded from failsafe.yaml via ros__parameters)
self.declare_parameter('max_depth_m', 50.0) # metres # ------------------------------------------------------------------
self.declare_parameter('comms_timeout_s', 5.0) # seconds
self.low_batt_thresh = self.get_parameter('low_battery_threshold').value # Comms
self.max_depth = self.get_parameter('max_depth_m').value self.declare_parameter('comms_timeout_s', 2.0)
self.comms_timeout = self.get_parameter('comms_timeout_s').value
# Battery thresholds (fractions 0.01.0)
self.declare_parameter('battery_warning_pct', 0.25) # Alert, no new panels
self.declare_parameter('battery_return_pct', 0.20) # Return to safe
self.declare_parameter('battery_critical_pct', 0.12) # Immediate return
self.declare_parameter('battery_emergency_pct', 0.08) # Emergency surface
# Depth limits (metres — positive = depth below surface)
self.declare_parameter('depth_design_m', 200.0) # Log + alert, continue
self.declare_parameter('depth_warning_m', 250.0) # Halt descent, hold
self.declare_parameter('depth_safety_m', 300.0) # Return to safe
# Hold timeouts before escalating to RETURN_TO_SAFE
self.declare_parameter('comms_hold_green_s', 60.0) # Hold 60s if GREEN
self.declare_parameter('comms_hold_amber_s', 30.0) # Hold 30s if AMBER
self.declare_parameter('abort_hold_green_s', 120.0) # Hold 120s on abort
# Thruster anomaly detection
self.declare_parameter('thruster_peer_ratio', 1.5) # 150% of peer average
self._load_parameters()
# ------------------------------------------------------------------
# Internal state # Internal state
self.last_heartbeat = self.get_clock().now() # ------------------------------------------------------------------
self.battery_percent = 1.0
self.assessment_state = AssessmentState.GREEN
self.fsm_state = FSMState.NORMAL
# Sensor readings
self.battery_percent = 1.0 # 0.01.0
self.fcu_connected = False self.fcu_connected = False
self.current_depth_m = 0.0
# --- Subscribers --- # Timing
self.battery_sub = self.create_subscription( self.last_heartbeat = self.get_clock().now()
BatteryState, '/mavros/battery', self.battery_callback, 10) self.comms_loss_start = None # When comms loss began
self.state_sub = self.create_subscription( self.hold_recover_start = None # When HOLD_AND_RECOVER began
State, '/mavros/state', self.state_callback, 10) self.abort_hold_start = None # When abort hold began
self.abort_sub = self.create_subscription(
Bool, '/rov/mission/abort', self.abort_callback, 10)
# --- Publishers --- # Active condition flags
self.failsafe_pub = self.create_publisher(FailsafeStatus, '/rov/failsafe', 10) self.flag_comms_loss = False
self.flag_low_battery = False
self.flag_critical_battery = False
self.flag_emergency_battery = False
self.flag_depth_exceeded = False
self.flag_obstacle = False
self.flag_manual_abort = False
self.flag_thruster_anomaly = False
# --- Timer: evaluate failsafe conditions at 10Hz --- # ------------------------------------------------------------------
self.timer = self.create_timer(0.1, self.evaluate_failsafe) # Subscribers
# ------------------------------------------------------------------
self.get_logger().info('FailsafeMonitor node started') self.create_subscription(
BatteryState, '/mavros/battery',
self._battery_callback, 10)
def battery_callback(self, msg: BatteryState): self.create_subscription(
"""Update battery state.""" State, '/mavros/state',
self._state_callback, 10)
self.create_subscription(
Float32, '/mavros/global_position/rel_alt',
self._altitude_callback, 10)
self.create_subscription(
Bool, '/rov/mission/abort',
self._abort_callback, 10)
# ------------------------------------------------------------------
# Publishers
# ------------------------------------------------------------------
self.failsafe_pub = self.create_publisher(
FailsafeStatus, '/rov/failsafe', 10)
# ------------------------------------------------------------------
# Initial timer — starts at GREEN rate (5 Hz)
# ------------------------------------------------------------------
self._current_rate = RATE_GREEN
self._eval_timer = self.create_timer(RATE_GREEN, self._evaluate)
self.get_logger().info(
'FailsafeMonitor started — '
f'comms_timeout={self._comms_timeout_s}s '
f'battery_return={self._battery_return_pct*100:.0f}% '
f'depth_safety={self._depth_safety_m}m'
)
# ------------------------------------------------------------------
# Parameter loading
# ------------------------------------------------------------------
def _load_parameters(self):
"""Load all parameters from the ROS parameter server."""
self._comms_timeout_s = self.get_parameter('comms_timeout_s').value
self._battery_warning_pct = self.get_parameter('battery_warning_pct').value
self._battery_return_pct = self.get_parameter('battery_return_pct').value
self._battery_critical_pct = self.get_parameter('battery_critical_pct').value
self._battery_emerg_pct = self.get_parameter('battery_emergency_pct').value
self._depth_design_m = self.get_parameter('depth_design_m').value
self._depth_warning_m = self.get_parameter('depth_warning_m').value
self._depth_safety_m = self.get_parameter('depth_safety_m').value
self._comms_hold_green_s = self.get_parameter('comms_hold_green_s').value
self._comms_hold_amber_s = self.get_parameter('comms_hold_amber_s').value
self._abort_hold_green_s = self.get_parameter('abort_hold_green_s').value
self._thruster_peer_ratio = self.get_parameter('thruster_peer_ratio').value
# ------------------------------------------------------------------
# Subscriber callbacks
# ------------------------------------------------------------------
def _battery_callback(self, msg: BatteryState):
"""Update battery percentage from MAVROS."""
# MAVROS BatteryState.percentage is 0.01.0
self.battery_percent = msg.percentage self.battery_percent = msg.percentage
def state_callback(self, msg: State): def _state_callback(self, msg: State):
"""Update FCU connection state and heartbeat time.""" """Update FCU connection state and record last heartbeat time."""
self.fcu_connected = msg.connected self.fcu_connected = msg.connected
if msg.connected: if msg.connected:
# Reset comms loss tracking on each received heartbeat
self.last_heartbeat = self.get_clock().now() self.last_heartbeat = self.get_clock().now()
self.comms_loss_start = None
def abort_callback(self, msg: Bool): def _altitude_callback(self, msg: Float32):
"""Handle manual abort command.""" """
Update current depth from relative altitude.
rel_alt is positive above home negate for depth below surface.
"""
self.current_depth_m = -msg.data if msg.data < 0.0 else 0.0
def _abort_callback(self, msg: Bool):
"""Handle manual abort command from operator."""
if msg.data: if msg.data:
self.get_logger().warn('Manual abort received') self.flag_manual_abort = True
self.publish_failsafe( self.get_logger().warn('Manual abort received from operator')
manual_abort=True,
action=FailsafeStatus.ACTION_SURFACE,
message='Manual abort commanded'
)
def evaluate_failsafe(self): # ------------------------------------------------------------------
"""Evaluate all failsafe conditions and publish status.""" # Main evaluation loop — rate adapts to assessment state
# ------------------------------------------------------------------
def _evaluate(self):
"""
Main evaluation cycle. Called by timer.
1. Assess all sensor inputs -> GREEN/AMBER/RED
2. Adapt assessment rate to match state
3. Apply priority-ordered failsafe logic
4. Publish status
"""
self._assess_conditions()
self._adapt_rate()
self._apply_failsafe_priority()
self._publish_status()
# ------------------------------------------------------------------
# Step 1: Continuous self-assessment -> GREEN / AMBER / RED
# ------------------------------------------------------------------
def _assess_conditions(self):
"""
Evaluate all monitored parameters and derive assessment state.
GREEN: all nominal, 5+ parameters assessed.
AMBER: core navigation intact, <= 1 parameter marginal.
RED: navigation sensor failure, >= 2 marginal, or any safety-critical lost.
"""
now = self.get_clock().now() now = self.get_clock().now()
elapsed = (now - self.last_heartbeat).nanoseconds / 1e9
comms_loss = elapsed > self.comms_timeout # --- Comms ---
low_battery = self.battery_percent < self.low_batt_thresh elapsed_s = (now - self.last_heartbeat).nanoseconds / 1e9
self.flag_comms_loss = elapsed_s > self._comms_timeout_s
if self.flag_comms_loss and self.comms_loss_start is None:
self.comms_loss_start = now
if comms_loss: # --- Battery ---
self.publish_failsafe( pct = self.battery_percent
comms_loss=True, self.flag_emergency_battery = pct < self._battery_emerg_pct
action=FailsafeStatus.ACTION_SURFACE, self.flag_critical_battery = pct < self._battery_critical_pct
message=f'Comms lost for {elapsed:.1f}s' self.flag_low_battery = pct < self._battery_return_pct
)
elif low_battery: # --- Depth ---
self.publish_failsafe( d = self.current_depth_m
low_battery=True, self.flag_depth_exceeded = d > self._depth_safety_m
action=FailsafeStatus.ACTION_SURFACE,
message=f'Low battery: {self.battery_percent*100:.0f}%' # --- Obstacle (placeholder — Phase 3+ when sonar is fitted) ---
# self.flag_obstacle = <sonar_node data>
# --- Derive assessment state ---
marginal_count = sum([
self.flag_comms_loss,
self.flag_low_battery,
self.flag_depth_exceeded,
self.flag_obstacle,
self.flag_thruster_anomaly,
])
if self.flag_emergency_battery or self.flag_depth_exceeded or marginal_count >= 2:
self.assessment_state = AssessmentState.RED
elif marginal_count == 1:
self.assessment_state = AssessmentState.AMBER
else:
self.assessment_state = AssessmentState.GREEN
# ------------------------------------------------------------------
# Step 2: Adapt timer rate to assessment state
# ------------------------------------------------------------------
def _adapt_rate(self):
"""
Change evaluation rate based on current assessment state.
Timer is recreated only when rate actually changes.
"""
target_rate = {
AssessmentState.GREEN: RATE_GREEN,
AssessmentState.AMBER: RATE_AMBER,
AssessmentState.RED: RATE_RED,
}[self.assessment_state]
if abs(target_rate - self._current_rate) > 1e-6:
self._eval_timer.cancel()
self._eval_timer = self.create_timer(target_rate, self._evaluate)
self._current_rate = target_rate
self.get_logger().info(
f'Assessment rate changed to {1.0/target_rate:.0f} Hz '
f'(state={self._assessment_state_name()})'
) )
def publish_failsafe(self, comms_loss=False, low_battery=False, # ------------------------------------------------------------------
depth_exceeded=False, obstacle_detected=False, # Step 3: Priority-ordered failsafe logic
manual_abort=False, # ------------------------------------------------------------------
action=FailsafeStatus.ACTION_NONE, message=''):
"""Publish a failsafe status message.""" def _apply_failsafe_priority(self):
"""
Apply failsafe conditions in strict priority order per design spec.
Higher priority conditions override lower priority ones.
"""
now = self.get_clock().now()
# Priority 1: Manual emergency surface (hardcoded, no assessment)
# (Handled separately — no current ROS trigger for P1 emergency.
# Manual abort at P6 covers operator intent. Reserved for future.)
# Priority 2: Critical / emergency battery
if self.flag_emergency_battery:
self._transition(FSMState.EMERGENCY_SURFACE,
FailsafeStatus.ACTION_EMERGENCY_SURFACE,
f'Emergency battery: {self.battery_percent*100:.0f}%')
return
if self.flag_critical_battery:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'Critical battery: {self.battery_percent*100:.0f}%')
return
# Priority 3: Safety depth limit (300m)
if self.flag_depth_exceeded:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'Safety depth exceeded: {self.current_depth_m:.1f}m')
return
# Priority 4: Obstacle in RED state
if self.flag_obstacle and self.assessment_state == AssessmentState.RED:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
'Obstacle detected in RED state')
return
# Priority 5: Comms loss — conditional on assessment state
if self.flag_comms_loss:
self._handle_comms_loss(now)
return
# Priority 6: Manual abort — conditional on assessment state
if self.flag_manual_abort:
self._handle_manual_abort(now)
return
# Priority 7: Thruster anomaly — AMBER downgrade, informational
if self.flag_thruster_anomaly:
# Does not change FSM state, already handled in assessment
self.get_logger().warn('Thruster anomaly detected — AMBER state')
return
# Priority 8: Low battery warning — informational only
if self.battery_percent < self._battery_warning_pct:
self.get_logger().warn(
f'Battery warning: {self.battery_percent*100:.0f}% — '
'complete current task, no new panels'
)
# All clear — return to NORMAL if previously in HOLD_AND_RECOVER
if self.fsm_state == FSMState.HOLD_AND_RECOVER:
self.get_logger().info('Conditions cleared — returning to NORMAL')
self.fsm_state = FSMState.NORMAL
self.hold_recover_start = None
def _handle_comms_loss(self, now):
"""
Comms loss response conditional on assessment state per design spec.
GREEN: hold 60s then RETURN_TO_SAFE.
AMBER: hold 30s then RETURN_TO_SAFE.
RED: RETURN_TO_SAFE immediately.
"""
if self.assessment_state == AssessmentState.RED:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
'Comms loss in RED state — immediate return')
return
# Enter HOLD_AND_RECOVER if not already there
if self.fsm_state != FSMState.HOLD_AND_RECOVER:
self._transition(FSMState.HOLD_AND_RECOVER,
FailsafeStatus.ACTION_HOVER,
'Comms loss — holding, attempting reconnection')
self.hold_recover_start = now
return
# Check hold timeout
hold_limit = (
self._comms_hold_green_s
if self.assessment_state == AssessmentState.GREEN
else self._comms_hold_amber_s
)
hold_elapsed = (now - self.hold_recover_start).nanoseconds / 1e9
if hold_elapsed > hold_limit:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'Comms lost for {hold_elapsed:.0f}s — returning to safe')
def _handle_manual_abort(self, now):
"""
Manual abort response conditional on assessment state per design spec.
GREEN: hold position 120s awaiting operator instruction, then RETURN_TO_SAFE.
AMBER/RED: RETURN_TO_SAFE immediately.
"""
if self.assessment_state != AssessmentState.GREEN:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'Manual abort in {self._assessment_state_name()} state')
return
if self.fsm_state != FSMState.HOLD_AND_RECOVER:
self._transition(FSMState.HOLD_AND_RECOVER,
FailsafeStatus.ACTION_HOVER,
'Manual abort — holding, awaiting operator instruction')
self.abort_hold_start = now
return
if self.abort_hold_start is not None:
elapsed = (now - self.abort_hold_start).nanoseconds / 1e9
if elapsed > self._abort_hold_green_s:
self._transition(FSMState.RETURN_TO_SAFE,
FailsafeStatus.ACTION_RETURN_TO_SAFE,
f'No operator instruction after {elapsed:.0f}s — returning to safe')
# ------------------------------------------------------------------
# State machine transition
# ------------------------------------------------------------------
def _transition(self, new_state: int, action: int, message: str):
"""
Transition the FSM to a new state. Logs the transition.
Only logs if state actually changes to avoid log spam.
"""
if self.fsm_state != new_state:
self.get_logger().warn(
f'FSM: {self._fsm_state_name(self.fsm_state)} -> '
f'{self._fsm_state_name(new_state)} | {message}'
)
self.fsm_state = new_state
# ------------------------------------------------------------------
# Step 4: Publish current status
# ------------------------------------------------------------------
def _publish_status(self):
"""Publish the current failsafe status to /rov/failsafe."""
msg = FailsafeStatus() msg = FailsafeStatus()
msg.header.stamp = self.get_clock().now().to_msg() msg.header.stamp = self.get_clock().now().to_msg()
msg.comms_loss = comms_loss msg.assessment_state = self.assessment_state
msg.low_battery = low_battery msg.failsafe_state = self.fsm_state
msg.depth_exceeded = depth_exceeded
msg.obstacle_detected = obstacle_detected # Trigger flags
msg.manual_abort = manual_abort msg.comms_loss = self.flag_comms_loss
msg.action = action msg.low_battery = self.flag_low_battery
msg.message = message msg.depth_exceeded = self.flag_depth_exceeded
msg.obstacle_detected = self.flag_obstacle
msg.manual_abort = self.flag_manual_abort
msg.thruster_anomaly = self.flag_thruster_anomaly
# Telemetry
msg.battery_percent = float(self.battery_percent)
# Derive action from FSM state
msg.action = {
FSMState.NORMAL: FailsafeStatus.ACTION_NONE,
FSMState.ASSESSING: FailsafeStatus.ACTION_NONE,
FSMState.HOLD_AND_RECOVER: FailsafeStatus.ACTION_HOVER,
FSMState.RETURN_TO_SAFE: FailsafeStatus.ACTION_RETURN_TO_SAFE,
FSMState.EMERGENCY_SURFACE: FailsafeStatus.ACTION_EMERGENCY_SURFACE,
FSMState.MISSION_COMPLETE: FailsafeStatus.ACTION_NONE,
FSMState.ABORTED: FailsafeStatus.ACTION_NONE,
}.get(self.fsm_state, FailsafeStatus.ACTION_NONE)
msg.message = (
f'Assessment: {self._assessment_state_name()} | '
f'State: {self._fsm_state_name(self.fsm_state)} | '
f'Battery: {self.battery_percent*100:.0f}%'
)
self.failsafe_pub.publish(msg) self.failsafe_pub.publish(msg)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _assessment_state_name(self) -> str:
"""Return human-readable assessment state name."""
return {
AssessmentState.GREEN: 'GREEN',
AssessmentState.AMBER: 'AMBER',
AssessmentState.RED: 'RED',
}.get(self.assessment_state, 'UNKNOWN')
def _fsm_state_name(self, state: int) -> str:
"""Return human-readable FSM state name."""
return {
FSMState.NORMAL: 'NORMAL',
FSMState.ASSESSING: 'ASSESSING',
FSMState.HOLD_AND_RECOVER: 'HOLD_AND_RECOVER',
FSMState.RETURN_TO_SAFE: 'RETURN_TO_SAFE',
FSMState.EMERGENCY_SURFACE: 'EMERGENCY_SURFACE',
FSMState.MISSION_COMPLETE: 'MISSION_COMPLETE',
FSMState.ABORTED: 'ABORTED',
}.get(state, 'UNKNOWN')
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main(args=None): def main(args=None):
"""ROS2 node entry point."""
rclpy.init(args=args) rclpy.init(args=args)
node = FailsafeMonitor() node = FailsafeMonitor()
try:
rclpy.spin(node) rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node() node.destroy_node()
rclpy.shutdown() rclpy.shutdown()

View File

@ -1,12 +1,39 @@
std_msgs/Header header std_msgs/Header header
# Assessment state (continuous self-assessment result)
uint8 assessment_state
uint8 ASSESSMENT_GREEN=0
uint8 ASSESSMENT_AMBER=1
uint8 ASSESSMENT_RED=2
# Failsafe state machine state
uint8 failsafe_state
uint8 STATE_NORMAL=0
uint8 STATE_ASSESSING=1
uint8 STATE_HOLD_AND_RECOVER=2
uint8 STATE_RETURN_TO_SAFE=3
uint8 STATE_EMERGENCY_SURFACE=4
uint8 STATE_MISSION_COMPLETE=5
uint8 STATE_ABORTED=6
# Active trigger flags
bool comms_loss bool comms_loss
bool low_battery bool low_battery
bool depth_exceeded bool depth_exceeded
bool obstacle_detected bool obstacle_detected
bool manual_abort bool manual_abort
bool thruster_anomaly
# Commanded action
uint8 action uint8 action
uint8 ACTION_NONE=0 uint8 ACTION_NONE=0
uint8 ACTION_HOVER=1 uint8 ACTION_HOVER=1
uint8 ACTION_SURFACE=2 uint8 ACTION_RETURN_TO_SAFE=2
uint8 ACTION_EMERGENCY_SURFACE=3 uint8 ACTION_SURFACE=3
uint8 ACTION_EMERGENCY_SURFACE=4
# Battery detail
float32 battery_percent
# Human-readable status
string message string message