feat: implement mission executor with breadcrumb system and failsafe integration

- Full mission state machine: IDLE/RUNNING/PAUSED/COMPLETE/ABORTED
- Breadcrumb buffer: 1800 entries at 2s interval (1hr coverage)
- Failsafe integration: HOVER=pause, RETURN_TO_SAFE=abort+breadcrumb, EMERGENCY=immediate abort
- Waypoint loading from YAML mission file via LOAD command
- Arrival detection with configurable radius
- Image capture loiter with configurable delay
- Simple proportional navigation toward waypoints (Phase 3+ replaces with full planner)
- Entry point recorded at mission start for safe zone reference
This commit is contained in:
Grant 2026-05-07 00:26:12 +00:00
parent 9f7a775427
commit 4b1766255a

View File

@ -1,35 +1,132 @@
#!/usr/bin/env python3
"""
Mission executor node.
Manages inspection mission state machine.
Sequences waypoints, triggers image capture, handles replanning.
Mission Executor Node Argonaut 3
====================================
Manages inspection mission state machine per ROV_Failsafe_Design_v2.0.
States: IDLE RUNNING (PAUSED | COMPLETE | ABORTED)
Sequences inspection waypoints, triggers image capture, maintains the
breadcrumb position buffer used by the failsafe return-to-safe path,
and responds correctly to each failsafe action level.
States: IDLE -> RUNNING -> PAUSED | COMPLETE | ABORTED
Failsafe integration:
ACTION_HOVER -> pause mission, hold position
ACTION_RETURN_TO_SAFE -> abort mission, return via breadcrumb path
ACTION_EMERGENCY_SURFACE -> abort mission, emergency surface immediately
Breadcrumb system:
Position logged every 2s from mission start.
Circular buffer of 1800 entries (1 hour of operation).
Used by return-to-safe to reverse the entry path safely.
Topics subscribed:
/rov/state (nav_msgs/Odometry) current position
/rov/failsafe (rov_interfaces/FailsafeStatus) safety override
Topics published:
/rov/cmd_vel (geometry_msgs/Twist) velocity setpoints to controller
/rov/mission/status (rov_interfaces/MissionStatus) current mission state
/rov/cmd_vel (geometry_msgs/Twist) velocity setpoints
/rov/mission/status (rov_interfaces/MissionStatus) mission state
/rov/mission/waypoint (rov_interfaces/InspectionWaypoint) current target
Services provided:
/rov/mission/command (rov_interfaces/MissionCommand) start/pause/abort/resume
"""
import math
import yaml
import collections
from dataclasses import dataclass, field
from typing import List, Optional
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist
from geometry_msgs.msg import Twist, Point
from nav_msgs.msg import Odometry
from std_msgs.msg import Header
from rov_interfaces.msg import MissionStatus, FailsafeStatus, InspectionWaypoint
from rov_interfaces.srv import MissionCommand
class MissionExecutor(Node):
"""Executes inspection missions from a waypoint list."""
# ---------------------------------------------------------------------------
# Breadcrumb entry
# ---------------------------------------------------------------------------
# Mission states
@dataclass
class Breadcrumb:
"""Single position record for the return-to-safe path."""
timestamp_s: float # ROS time in seconds
x: float # metres
y: float # metres
z: float # metres (negative = depth)
heading_deg: float # degrees
confidence: float # 0.01.0 (placeholder until full state estimation)
# ---------------------------------------------------------------------------
# Breadcrumb buffer
# ---------------------------------------------------------------------------
class BreadcrumbBuffer:
"""
Circular buffer of position records logged during mission.
1800 entries at 2s interval = 1 hour of operation.
Older entries are discarded as new ones are added.
"""
CAPACITY = 1800 # entries
LOG_INTERVAL_S = 2.0 # seconds between entries
MIN_CONFIDENCE = 0.3 # entries below this are flagged during reversal
def __init__(self):
self._buffer: collections.deque = collections.deque(maxlen=self.CAPACITY)
self._last_log_time: float = 0.0
def update(self, now_s: float, position: Point, heading_deg: float,
confidence: float = 1.0):
"""Log a breadcrumb if the log interval has elapsed."""
if now_s - self._last_log_time >= self.LOG_INTERVAL_S:
self._buffer.append(Breadcrumb(
timestamp_s=now_s,
x=position.x,
y=position.y,
z=position.z,
heading_deg=heading_deg,
confidence=confidence,
))
self._last_log_time = now_s
def get_return_path(self) -> List[Breadcrumb]:
"""
Return the breadcrumb buffer in reverse order for path reversal.
Entries below MIN_CONFIDENCE are filtered skipped in favour of
the next reliable entry to reduce navigation uncertainty.
"""
reliable = [b for b in self._buffer if b.confidence >= self.MIN_CONFIDENCE]
return list(reversed(reliable))
def clear(self):
"""Clear all entries — called at mission start."""
self._buffer.clear()
self._last_log_time = 0.0
@property
def entry_count(self) -> int:
"""Number of entries currently in buffer."""
return len(self._buffer)
# ---------------------------------------------------------------------------
# Mission executor node
# ---------------------------------------------------------------------------
class MissionExecutor(Node):
"""
Executes inspection missions from a waypoint sequence.
Integrates with the failsafe monitor and maintains breadcrumb buffer.
"""
# Internal mission states
STATE_IDLE = 'IDLE'
STATE_RUNNING = 'RUNNING'
STATE_PAUSED = 'PAUSED'
@ -39,80 +136,255 @@ class MissionExecutor(Node):
def __init__(self):
super().__init__('mission_executor')
# ------------------------------------------------------------------
# Parameters (from dev.yaml / field.yaml)
# ------------------------------------------------------------------
self.declare_parameter('waypoint_arrival_radius_m', 0.5)
self.declare_parameter('waypoint_timeout_s', 60.0)
self.declare_parameter('image_capture_delay_s', 1.0)
self._arrival_radius = self.get_parameter('waypoint_arrival_radius_m').value
self._wp_timeout_s = self.get_parameter('waypoint_timeout_s').value
self._capture_delay_s = self.get_parameter('image_capture_delay_s').value
# ------------------------------------------------------------------
# Mission state
# ------------------------------------------------------------------
self.state = self.STATE_IDLE
self.waypoints = [] # List of InspectionWaypoint
self.current_wp_index = 0
self.mission_id = ''
self.current_position = None
self.failsafe_active = False
self.waypoints: List[InspectionWaypoint] = []
self.current_wp_index = 0
# --- Subscribers ---
self.state_sub = self.create_subscription(
Odometry, '/rov/state', self.state_callback, 10)
self.failsafe_sub = self.create_subscription(
FailsafeStatus, '/rov/failsafe', self.failsafe_callback, 10)
# Entry point — set when mission starts, used for safe zone reference
self.entry_point: Optional[Point] = None
# --- Publishers ---
self.cmd_pub = self.create_publisher(Twist, '/rov/cmd_vel', 10)
self.status_pub = self.create_publisher(MissionStatus, '/rov/mission/status', 10)
# Operation mode — affects return-to-safe path choice
self.operation_mode = 'tethered' # 'tethered' or 'untethered'
# --- Services ---
self.cmd_srv = self.create_service(
MissionCommand, '/rov/mission/command', self.command_callback)
# ------------------------------------------------------------------
# Sensor state
# ------------------------------------------------------------------
# --- Timer: mission loop at 10Hz ---
self.timer = self.create_timer(0.1, self.mission_loop)
self.current_position: Optional[Point] = None
self.current_heading_deg: float = 0.0
self.get_logger().info('MissionExecutor node started')
# ------------------------------------------------------------------
# Failsafe state
# ------------------------------------------------------------------
def state_callback(self, msg: Odometry):
"""Update current position."""
self.last_failsafe_action = FailsafeStatus.ACTION_NONE
self.last_assessment_state = FailsafeStatus.ASSESSMENT_GREEN
# ------------------------------------------------------------------
# Breadcrumb buffer
# ------------------------------------------------------------------
self.breadcrumbs = BreadcrumbBuffer()
# ------------------------------------------------------------------
# Waypoint timing
# ------------------------------------------------------------------
self.wp_start_time: Optional[float] = None # When current wp navigation began
self.capture_start_time: Optional[float] = None # When image capture loiter began
self.capturing_image: bool = False
# ------------------------------------------------------------------
# Subscribers
# ------------------------------------------------------------------
self.create_subscription(
Odometry, '/rov/state',
self._state_callback, 10)
self.create_subscription(
FailsafeStatus, '/rov/failsafe',
self._failsafe_callback, 10)
# ------------------------------------------------------------------
# Publishers
# ------------------------------------------------------------------
self.cmd_pub = self.create_publisher(
Twist, '/rov/cmd_vel', 10)
self.status_pub = self.create_publisher(
MissionStatus, '/rov/mission/status', 10)
self.waypoint_pub = self.create_publisher(
InspectionWaypoint, '/rov/mission/waypoint', 10)
# ------------------------------------------------------------------
# Service
# ------------------------------------------------------------------
self.create_service(
MissionCommand, '/rov/mission/command',
self._command_callback)
# ------------------------------------------------------------------
# Timers
# ------------------------------------------------------------------
# Mission loop at 10 Hz
self.create_timer(0.1, self._mission_loop)
# Status publisher at 2 Hz
self.create_timer(0.5, self._publish_status)
self.get_logger().info('MissionExecutor started')
# ------------------------------------------------------------------
# Subscriber callbacks
# ------------------------------------------------------------------
def _state_callback(self, msg: Odometry):
"""Update current position and heading from state estimator."""
self.current_position = msg.pose.pose.position
def failsafe_callback(self, msg: FailsafeStatus):
"""Handle failsafe — abort mission if active."""
was_active = self.failsafe_active
self.failsafe_active = (msg.action != FailsafeStatus.ACTION_NONE)
if self.failsafe_active and not was_active:
self.get_logger().warn('Failsafe triggered — aborting mission')
self.state = self.STATE_ABORTED
# Extract heading from quaternion (yaw only — simplified)
q = msg.pose.pose.orientation
siny_cosp = 2.0 * (q.w * q.z + q.x * q.y)
cosy_cosp = 1.0 - 2.0 * (q.y * q.y + q.z * q.z)
self.current_heading_deg = math.degrees(math.atan2(siny_cosp, cosy_cosp))
def command_callback(self, request, response):
"""Handle mission command service calls."""
# Log breadcrumb if mission is active
if self.state == self.STATE_RUNNING and self.current_position is not None:
now_s = self.get_clock().now().nanoseconds / 1e9
self.breadcrumbs.update(
now_s,
self.current_position,
self.current_heading_deg,
)
def _failsafe_callback(self, msg: FailsafeStatus):
"""
Respond to failsafe actions per design spec priority.
Different actions require different mission responses.
"""
action = msg.action
assessment_state = msg.assessment_state
# Store for use in mission loop
self.last_failsafe_action = action
self.last_assessment_state = assessment_state
# --- Emergency surface: abort immediately, no controlled return ---
if action == FailsafeStatus.ACTION_EMERGENCY_SURFACE:
if self.state not in (self.STATE_ABORTED, self.STATE_COMPLETE):
self.get_logger().error(
'EMERGENCY SURFACE commanded — aborting mission immediately')
self._abort_mission('Emergency surface — failsafe')
return
# --- Return to safe: abort mission, initiate breadcrumb reversal ---
if action == FailsafeStatus.ACTION_RETURN_TO_SAFE:
if self.state not in (self.STATE_ABORTED, self.STATE_COMPLETE):
self.get_logger().warn(
f'RETURN TO SAFE commanded — aborting mission '
f'(breadcrumbs: {self.breadcrumbs.entry_count})')
self._abort_mission('Return to safe — failsafe')
# Breadcrumb path handed off to motion controller here
# (Phase 3+ — motion controller implements path reversal)
return
# --- Hover: pause mission, hold position ---
if action == FailsafeStatus.ACTION_HOVER:
if self.state == self.STATE_RUNNING:
self.get_logger().warn('HOVER commanded — pausing mission')
self.state = self.STATE_PAUSED
self._stop_motion()
return
# --- No action: if paused by failsafe and now clear, resume ---
if action == FailsafeStatus.ACTION_NONE:
if (self.state == self.STATE_PAUSED and
assessment_state == FailsafeStatus.ASSESSMENT_GREEN):
self.get_logger().info(
'Failsafe cleared (GREEN) — resuming mission')
self.state = self.STATE_RUNNING
# ------------------------------------------------------------------
# Service callback — mission commands
# ------------------------------------------------------------------
def _command_callback(self, request, response):
"""
Handle mission commands: START, PAUSE, RESUME, ABORT, LOAD.
LOAD accepts a YAML waypoint file path via parameters[0].
"""
cmd = request.command.upper()
if cmd == 'START':
if self.state == self.STATE_IDLE:
self.mission_id = request.mission_id
if self.state != self.STATE_IDLE:
response.success = False
response.message = f'Cannot START — current state: {self.state}'
return response
if not self.waypoints:
response.success = False
response.message = 'Cannot START — no waypoints loaded'
return response
# Record entry point at mission start
self.entry_point = (
self.current_position if self.current_position is not None
else Point()
)
# Set operation mode from parameters if provided
if request.parameters:
self.operation_mode = request.parameters[0]
self.breadcrumbs.clear()
self.current_wp_index = 0
self.mission_id = request.mission_id
self.state = self.STATE_RUNNING
self.wp_start_time = self.get_clock().now().nanoseconds / 1e9
self.get_logger().info(
f'Mission {self.mission_id} started — '
f'{len(self.waypoints)} waypoints, '
f'mode={self.operation_mode}'
)
response.success = True
response.message = f'Mission {self.mission_id} started'
else:
elif cmd == 'LOAD':
# Load waypoints from YAML file path in parameters[0]
if not request.parameters:
response.success = False
response.message = f'Cannot start — current state: {self.state}'
response.message = 'LOAD requires parameters[0] = YAML file path'
return response
loaded, message = self._load_waypoints(request.parameters[0])
response.success = loaded
response.message = message
elif cmd == 'PAUSE':
if self.state == self.STATE_RUNNING:
if self.state != self.STATE_RUNNING:
response.success = False
response.message = f'Cannot PAUSE — current state: {self.state}'
return response
self.state = self.STATE_PAUSED
self._stop_motion()
response.success = True
response.message = 'Mission paused'
else:
response.success = False
response.message = f'Cannot pause — current state: {self.state}'
elif cmd == 'RESUME':
if self.state == self.STATE_PAUSED:
if self.state != self.STATE_PAUSED:
response.success = False
response.message = f'Cannot RESUME — current state: {self.state}'
return response
self.state = self.STATE_RUNNING
self.wp_start_time = self.get_clock().now().nanoseconds / 1e9
response.success = True
response.message = 'Mission resumed'
else:
response.success = False
response.message = f'Cannot resume — current state: {self.state}'
elif cmd == 'ABORT':
self.state = self.STATE_ABORTED
self.stop_motion()
self._abort_mission('Operator abort command')
response.success = True
response.message = 'Mission aborted'
@ -122,10 +394,12 @@ class MissionExecutor(Node):
return response
def mission_loop(self):
"""Main mission execution loop — called at 10Hz."""
self.publish_status()
# ------------------------------------------------------------------
# Mission execution loop — 10 Hz
# ------------------------------------------------------------------
def _mission_loop(self):
"""Main mission execution loop."""
if self.state != self.STATE_RUNNING:
return
@ -137,44 +411,227 @@ class MissionExecutor(Node):
if self.current_wp_index >= len(self.waypoints):
self.get_logger().info('All waypoints reached — mission complete')
self.state = self.STATE_COMPLETE
self._stop_motion()
return
# TODO: navigate to current waypoint
# TODO: check arrival condition
# TODO: trigger image capture if required
# TODO: advance to next waypoint on arrival
if self.current_position is None:
self.get_logger().warn('No position estimate — waiting')
return
def stop_motion(self):
"""Publish zero velocity to stop the ROV."""
self.cmd_pub.publish(Twist()) # All zeros = stop
current_wp = self.waypoints[self.current_wp_index]
now_s = self.get_clock().now().nanoseconds / 1e9
def publish_status(self):
# --- Publish current waypoint target ---
self.waypoint_pub.publish(current_wp)
# --- Check waypoint timeout ---
if (self.wp_start_time is not None and
now_s - self.wp_start_time > self._wp_timeout_s):
self.get_logger().warn(
f'Waypoint {current_wp.waypoint_id} timed out after '
f'{self._wp_timeout_s}s — advancing to next'
)
self._advance_waypoint(now_s)
return
# --- Check arrival at waypoint ---
distance = self._distance_to(current_wp.position)
if distance > self._arrival_radius:
# Not yet arrived — navigate toward waypoint
self._navigate_toward(current_wp)
return
# --- Arrived at waypoint ---
self._stop_motion()
if current_wp.capture_image and not self.capturing_image:
# Start image capture loiter
self.capturing_image = True
self.capture_start_time = now_s
self.get_logger().info(
f'Arrived at {current_wp.waypoint_id}'
f'capturing image (loiter {current_wp.loiter_time_s}s)'
)
return
if self.capturing_image:
# Wait for loiter time before advancing
elapsed_capture = now_s - self.capture_start_time
if elapsed_capture < current_wp.loiter_time_s + self._capture_delay_s:
return
self.capturing_image = False
self.get_logger().info(
f'Image capture complete at {current_wp.waypoint_id}')
# Advance to next waypoint
self._advance_waypoint(now_s)
# ------------------------------------------------------------------
# Navigation helpers
# ------------------------------------------------------------------
def _navigate_toward(self, waypoint: InspectionWaypoint):
"""
Publish proportional velocity command toward the waypoint.
Simple P-controller replaced by full motion planner in Phase 3+.
Max speed: 0.3 m/s in XY, 0.1 m/s in Z.
"""
MAX_XY = 0.3 # m/s
MAX_Z = 0.1 # m/s
KP = 0.5 # proportional gain
dx = waypoint.position.x - self.current_position.x
dy = waypoint.position.y - self.current_position.y
dz = waypoint.position.z - self.current_position.z
# Clamp to max speeds
vx = max(-MAX_XY, min(MAX_XY, KP * dx))
vy = max(-MAX_XY, min(MAX_XY, KP * dy))
vz = max(-MAX_Z, min(MAX_Z, KP * dz))
cmd = Twist()
cmd.linear.x = vx
cmd.linear.y = vy
cmd.linear.z = vz
self.cmd_pub.publish(cmd)
def _stop_motion(self):
"""Publish zero velocity to halt the ROV."""
self.cmd_pub.publish(Twist()) # All zeros
def _distance_to(self, target: Point) -> float:
"""Euclidean distance from current position to target."""
if self.current_position is None:
return float('inf')
dx = target.x - self.current_position.x
dy = target.y - self.current_position.y
dz = target.z - self.current_position.z
return math.sqrt(dx*dx + dy*dy + dz*dz)
def _advance_waypoint(self, now_s: float):
"""Move to the next waypoint in the sequence."""
self.current_wp_index += 1
self.wp_start_time = now_s
self.capturing_image = False
if self.current_wp_index < len(self.waypoints):
next_wp = self.waypoints[self.current_wp_index]
self.get_logger().info(
f'Advancing to waypoint {self.current_wp_index}: '
f'{next_wp.waypoint_id}'
)
def _abort_mission(self, reason: str):
"""Abort the mission and stop motion."""
self.state = self.STATE_ABORTED
self._stop_motion()
self.get_logger().warn(f'Mission aborted: {reason}')
# ------------------------------------------------------------------
# Waypoint loading
# ------------------------------------------------------------------
def _load_waypoints(self, yaml_path: str):
"""
Load InspectionWaypoint list from a YAML mission file.
Expected format:
waypoints:
- id: "WP1"
x: 1.0
y: 2.0
z: -5.0
heading_deg: 90.0
standoff_m: 0.5
loiter_time_s: 3.0
capture_image: true
panel_id: "P1"
"""
try:
with open(yaml_path, 'r') as f:
data = yaml.safe_load(f)
if 'waypoints' not in data:
return False, f'No waypoints key in {yaml_path}'
self.waypoints = []
for entry in data['waypoints']:
wp = InspectionWaypoint()
wp.header.stamp = self.get_clock().now().to_msg()
wp.waypoint_id = str(entry.get('id', ''))
wp.position.x = float(entry.get('x', 0.0))
wp.position.y = float(entry.get('y', 0.0))
wp.position.z = float(entry.get('z', 0.0))
wp.heading_deg = float(entry.get('heading_deg', 0.0))
wp.standoff_m = float(entry.get('standoff_m', 0.5))
wp.loiter_time_s = float(entry.get('loiter_time_s', 2.0))
wp.capture_image = bool(entry.get('capture_image', False))
wp.panel_id = str(entry.get('panel_id', ''))
self.waypoints.append(wp)
msg = f'Loaded {len(self.waypoints)} waypoints from {yaml_path}'
self.get_logger().info(msg)
return True, msg
except FileNotFoundError:
return False, f'Mission file not found: {yaml_path}'
except Exception as e:
return False, f'Failed to load waypoints: {str(e)}'
# ------------------------------------------------------------------
# Status publishing — 2 Hz
# ------------------------------------------------------------------
def _publish_status(self):
"""Publish current mission status."""
msg = MissionStatus()
msg.header.stamp = self.get_clock().now().to_msg()
msg.mission_id = self.mission_id
msg.current_task = f'Waypoint {self.current_wp_index}/{len(self.waypoints)}'
msg.message = self.state
if len(self.waypoints) > 0:
msg.progress_percent = (self.current_wp_index / len(self.waypoints)) * 100.0
# Map state string to uint8
state_map = {
msg.state = {
self.STATE_IDLE: MissionStatus.STATE_IDLE,
self.STATE_RUNNING: MissionStatus.STATE_RUNNING,
self.STATE_PAUSED: MissionStatus.STATE_PAUSED,
self.STATE_COMPLETE: MissionStatus.STATE_COMPLETE,
self.STATE_ABORTED: MissionStatus.STATE_ABORTED,
}
msg.state = state_map.get(self.state, MissionStatus.STATE_IDLE)
}.get(self.state, MissionStatus.STATE_IDLE)
# Progress
total = len(self.waypoints)
msg.progress_percent = (
(self.current_wp_index / total * 100.0) if total > 0 else 0.0
)
# Current task description
if self.state == self.STATE_RUNNING and total > 0:
wp_id = (self.waypoints[self.current_wp_index].waypoint_id
if self.current_wp_index < total else 'complete')
msg.current_task = f'WP {self.current_wp_index + 1}/{total}: {wp_id}'
else:
msg.current_task = self.state
msg.message = (
f'{self.state} | '
f'WP {self.current_wp_index}/{total} | '
f'Breadcrumbs: {self.breadcrumbs.entry_count}'
)
self.status_pub.publish(msg)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main(args=None):
"""ROS2 node entry point."""
rclpy.init(args=args)
node = MissionExecutor()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()