feat: add rov_simulation package with mock_publisher and dev_stack launch

- New rov_simulation package (dev only — not for vehicle deployment)
- mock_publisher: synthetic MAVROS sensor data with 5 test scenarios
  (nominal, low_battery, comms_loss, depth_warning, all_clear)
- dev_stack.launch.py: launches mock_publisher + failsafe_monitor + mission_executor
- Scenarios trigger specific failsafe conditions for end-to-end testing
This commit is contained in:
Grant 2026-05-07 00:43:52 +00:00
parent c3a8b4bc77
commit 6aea0c24a6
6 changed files with 440 additions and 0 deletions

View File

@ -0,0 +1,70 @@
"""
Dev stack launch file Argonaut 3 Simulation.
DEV ONLY NOT FOR DEPLOYMENT TO VEHICLE HARDWARE.
Starts mock_publisher + failsafe_monitor + mission_executor together
for integrated testing without physical hardware.
Usage:
ros2 launch rov_simulation dev_stack.launch.py
ros2 launch rov_simulation dev_stack.launch.py scenario:=low_battery
ros2 launch rov_simulation dev_stack.launch.py scenario:=comms_loss
Available scenarios: nominal, low_battery, comms_loss, depth_warning, all_clear
"""
from launch import LaunchDescription
from launch.actions import DeclareLaunchArgument
from launch.substitutions import LaunchConfiguration
from launch_ros.actions import Node
from ament_index_python.packages import get_package_share_directory
import os
def generate_launch_description():
"""Launch full dev stack with selected simulation scenario."""
control_pkg = get_package_share_directory('rov_control')
mission_pkg = get_package_share_directory('rov_mission')
failsafe_config = os.path.join(control_pkg, 'config', 'failsafe.yaml')
mission_config = os.path.join(mission_pkg, 'config', 'dev.yaml')
return LaunchDescription([
# Scenario argument — passed to mock_publisher
DeclareLaunchArgument(
'scenario',
default_value='nominal',
description=(
'Simulation scenario: '
'nominal | low_battery | comms_loss | depth_warning | all_clear'
)
),
# Mock publisher — synthetic sensor data (dev only)
Node(
package='rov_simulation',
executable='mock_publisher',
name='mock_publisher',
output='screen',
parameters=[{'scenario': LaunchConfiguration('scenario')}],
),
# Failsafe monitor — reads mock sensor data, publishes /rov/failsafe
Node(
package='rov_control',
executable='failsafe_monitor',
name='failsafe_monitor',
output='screen',
parameters=[failsafe_config],
),
# Mission executor — reads failsafe state, publishes mission status
Node(
package='rov_mission',
executable='mission_executor',
name='mission_executor',
output='screen',
parameters=[mission_config],
),
])

View File

@ -0,0 +1,19 @@
<?xml version="1.0"?>
<package format="3">
<name>rov_simulation</name>
<version>0.1.0</version>
<description>Development-only simulation utilities — NOT for deployment to vehicle hardware.</description>
<maintainer email="grant@symbytech.com">grant</maintainer>
<license>Proprietary</license>
<depend>rclpy</depend>
<depend>sensor_msgs</depend>
<depend>mavros_msgs</depend>
<depend>geometry_msgs</depend>
<depend>nav_msgs</depend>
<depend>rov_interfaces</depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>

View File

@ -0,0 +1,322 @@
#!/usr/bin/env python3
"""
Mock Publisher Node Argonaut 3 Simulation
=============================================
DEV ONLY NOT FOR DEPLOYMENT TO VEHICLE HARDWARE.
Publishes synthetic sensor data to simulate a connected ArduSub vehicle.
Allows failsafe_monitor and mission_executor to be tested without
physical hardware or a live MAVROS connection.
Simulated topics (mimics MAVROS output):
/mavros/state (mavros_msgs/State) FCU connection + heartbeat
/mavros/battery (sensor_msgs/BatteryState) battery level
/mavros/global_position/rel_alt (std_msgs/Float32) depth (negative = below surface)
/rov/state (nav_msgs/Odometry) vehicle position + orientation
Scenarios (set via ROS parameter 'scenario'):
nominal healthy vehicle, normal battery, shallow depth
low_battery battery draining toward return threshold
comms_loss heartbeat stops after 10s to trigger comms failsafe
depth_warning vehicle descends toward warning depth
all_clear all GREEN, used to verify normal operation
Usage:
ros2 run rov_simulation mock_publisher --ros-args -p scenario:=low_battery
"""
import math
import rclpy
from rclpy.node import Node
from std_msgs.msg import Float32, Bool, Header
from sensor_msgs.msg import BatteryState
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Quaternion
from mavros_msgs.msg import State
# ---------------------------------------------------------------------------
# Available simulation scenarios
# ---------------------------------------------------------------------------
SCENARIOS = {
'nominal': {
'description': 'Healthy vehicle — all systems nominal',
'battery_start': 0.85, # 85% — well above all thresholds
'battery_drain': 0.0, # No drain
'depth_start_m': 5.0, # Shallow
'depth_rate_mps': 0.0, # Stationary
'comms_loss_at_s': None, # No comms loss
},
'low_battery': {
'description': 'Battery draining — triggers warning then return threshold',
'battery_start': 0.30, # Start above warning (25%)
'battery_drain': 0.001, # Drain 0.1% per second
'depth_start_m': 10.0,
'depth_rate_mps': 0.0,
'comms_loss_at_s': None,
},
'comms_loss': {
'description': 'Heartbeat stops after 10s — triggers comms failsafe',
'battery_start': 0.80,
'battery_drain': 0.0,
'depth_start_m': 8.0,
'depth_rate_mps': 0.0,
'comms_loss_at_s': 10.0, # Stop heartbeat at 10s
},
'depth_warning': {
'description': 'Vehicle descending toward warning depth limit',
'battery_start': 0.75,
'battery_drain': 0.0,
'depth_start_m': 180.0, # Start near design depth (200m)
'depth_rate_mps': 0.5, # Descend 0.5 m/s toward warning (250m)
'comms_loss_at_s': None,
},
'all_clear': {
'description': 'All GREEN — verify normal operation produces no failsafe',
'battery_start': 0.95,
'battery_drain': 0.0,
'depth_start_m': 3.0,
'depth_rate_mps': 0.0,
'comms_loss_at_s': None,
},
}
class MockPublisher(Node):
"""
Publishes synthetic MAVROS-compatible sensor data for dev testing.
Simulates a connected ArduSub vehicle without physical hardware.
"""
PUBLISH_RATE_HZ = 10 # All topics published at 10 Hz
def __init__(self):
super().__init__('mock_publisher')
# ------------------------------------------------------------------
# Parameters
# ------------------------------------------------------------------
self.declare_parameter('scenario', 'nominal')
scenario_name = self.get_parameter('scenario').value
if scenario_name not in SCENARIOS:
self.get_logger().error(
f'Unknown scenario: {scenario_name}. '
f'Valid: {list(SCENARIOS.keys())}'
)
scenario_name = 'nominal'
self._scenario = SCENARIOS[scenario_name]
self.get_logger().info(
f'MockPublisher starting — scenario: {scenario_name} '
f'({self._scenario["description"]})'
)
# ------------------------------------------------------------------
# Simulation state
# ------------------------------------------------------------------
self._elapsed_s = 0.0
self._battery_pct = self._scenario['battery_start']
self._depth_m = self._scenario['depth_start_m']
self._comms_active = True
# Simple position simulation — vehicle moves in XY circle for realism
self._pos_x = 0.0
self._pos_y = 0.0
self._orbit_radius_m = 2.0 # metres
self._orbit_rate_rps = 0.05 # radians per second
# ------------------------------------------------------------------
# Publishers (all matching MAVROS topic names and message types)
# ------------------------------------------------------------------
self._state_pub = self.create_publisher(
State, '/mavros/state', 10)
self._battery_pub = self.create_publisher(
BatteryState, '/mavros/battery', 10)
self._alt_pub = self.create_publisher(
Float32, '/mavros/global_position/rel_alt', 10)
self._odom_pub = self.create_publisher(
Odometry, '/rov/state', 10)
# ------------------------------------------------------------------
# Timer
# ------------------------------------------------------------------
self._dt = 1.0 / self.PUBLISH_RATE_HZ
self.create_timer(self._dt, self._publish_all)
self.get_logger().warn(
'*** DEV ONLY — mock_publisher is active. '
'Do not deploy to vehicle hardware. ***'
)
# ------------------------------------------------------------------
# Main publish cycle
# ------------------------------------------------------------------
def _publish_all(self):
"""Advance simulation state and publish all topics."""
self._advance_simulation()
self._publish_state()
self._publish_battery()
self._publish_altitude()
self._publish_odometry()
# ------------------------------------------------------------------
# Simulation state advancement
# ------------------------------------------------------------------
def _advance_simulation(self):
"""Advance all simulated quantities by one timestep."""
self._elapsed_s += self._dt
# Battery drain
drain = self._scenario['battery_drain'] * self._dt
self._battery_pct = max(0.0, self._battery_pct - drain)
# Depth change
self._depth_m += self._scenario['depth_rate_mps'] * self._dt
# Position orbit (simple circle for realistic /rov/state output)
angle = self._orbit_rate_rps * self._elapsed_s
self._pos_x = self._orbit_radius_m * math.cos(angle)
self._pos_y = self._orbit_radius_m * math.sin(angle)
# Comms loss scenario — stop publishing heartbeat after threshold
loss_at = self._scenario['comms_loss_at_s']
if loss_at is not None and self._elapsed_s >= loss_at:
if self._comms_active:
self.get_logger().warn(
f'[{self._elapsed_s:.1f}s] Simulating comms loss — '
'stopping FCU heartbeat'
)
self._comms_active = False
# Log key threshold crossings once
self._log_threshold_crossings()
def _log_threshold_crossings(self):
"""Log once when battery crosses each threshold."""
pct = self._battery_pct
thresholds = [
(0.25, 'WARNING (25%)'),
(0.20, 'RETURN threshold (20%)'),
(0.12, 'CRITICAL (12%)'),
(0.08, 'EMERGENCY (8%)'),
]
for threshold, label in thresholds:
attr = f'_logged_battery_{int(threshold*100)}'
if pct < threshold and not getattr(self, attr, False):
self.get_logger().warn(
f'[{self._elapsed_s:.1f}s] Battery crossed {label}: '
f'{pct*100:.1f}%'
)
setattr(self, attr, True)
# ------------------------------------------------------------------
# Topic publishers
# ------------------------------------------------------------------
def _publish_state(self):
"""
Publish FCU connection state (mavros_msgs/State).
Mimics the MAVROS /mavros/state topic.
Stops publishing when comms_loss scenario activates.
"""
if not self._comms_active:
# Do not publish — simulates lost heartbeat
return
msg = State()
msg.header.stamp = self.get_clock().now().to_msg()
msg.connected = True
msg.armed = False # Unarmed for dev testing
msg.guided = True
msg.mode = 'ALT_HOLD'
msg.system_status = 3 # MAV_STATE_STANDBY
self._state_pub.publish(msg)
def _publish_battery(self):
"""
Publish battery state (sensor_msgs/BatteryState).
Mimics the MAVROS /mavros/battery topic.
percentage field: 0.01.0.
"""
msg = BatteryState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.percentage = float(self._battery_pct)
msg.voltage = 14.8 * self._battery_pct + 12.0 # Approximate LiPo curve
msg.current = 5.0 # Amps — nominal draw
msg.charge = float('nan')
msg.capacity = float('nan')
msg.design_capacity = float('nan')
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
self._battery_pub.publish(msg)
def _publish_altitude(self):
"""
Publish relative altitude (std_msgs/Float32).
Negative value = depth below surface, matching MAVROS convention.
"""
msg = Float32()
msg.data = -abs(self._depth_m) # Always negative (depth)
self._alt_pub.publish(msg)
def _publish_odometry(self):
"""
Publish vehicle state (nav_msgs/Odometry).
Provides position for mission_executor arrival detection and
breadcrumb logging.
"""
msg = Odometry()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'map'
msg.child_frame_id = 'base_link'
# Position — orbit + depth
msg.pose.pose.position.x = self._pos_x
msg.pose.pose.position.y = self._pos_y
msg.pose.pose.position.z = -abs(self._depth_m)
# Orientation — identity quaternion (level vehicle)
msg.pose.pose.orientation.w = 1.0
msg.pose.pose.orientation.x = 0.0
msg.pose.pose.orientation.y = 0.0
msg.pose.pose.orientation.z = 0.0
# Velocity — tangential to orbit
angle = self._orbit_rate_rps * self._elapsed_s
speed = self._orbit_radius_m * self._orbit_rate_rps
msg.twist.twist.linear.x = -speed * math.sin(angle)
msg.twist.twist.linear.y = speed * math.cos(angle)
msg.twist.twist.linear.z = 0.0
self._odom_pub.publish(msg)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
def main(args=None):
"""ROS2 node entry point."""
rclpy.init(args=args)
node = MockPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
pass
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()

View File

@ -0,0 +1,29 @@
from setuptools import setup
import os
from glob import glob
package_name = 'rov_simulation'
setup(
name=package_name,
version='0.1.0',
packages=[package_name],
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
(os.path.join('share', package_name, 'launch'),
glob('launch/*.py')),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='grant',
maintainer_email='grant@symbytech.com',
description='Development-only simulation utilities — NOT for deployment',
license='Proprietary',
entry_points={
'console_scripts': [
'mock_publisher = rov_simulation.mock_publisher:main',
],
},
)