Chapter 4: Building ROS 2 Nodes with Python
Python has emerged as a preferred language for rapid robotics development due to its simplicity, extensive ecosystem, and powerful machine learning libraries. This chapter explores advanced techniques for building robust ROS 2 nodes using Python, from basic pub-sub patterns to complex multi-threaded architectures. We'll examine how to leverage Python's strengths while meeting the real-time requirements of robotic systems, creating nodes that are both powerful and reliable.
What You'll Learn
By the end of this chapter, you'll be able to:
- Implement sophisticated ROS 2 nodes using rclpy
- Apply multi-threading and async patterns for concurrent operations
- Design reusable node components and architectures
- Implement comprehensive error handling and logging
- Optimize performance for real-time robotic applications
- Debug and profile Python-based ROS 2 systems
Advanced ROS 2 Python Patterns
Component-Based Node Architecture
Building complex robotic systems requires modular, reusable components that can be combined and reconfigured easily:
# component_base.py - Base class for reusable components
from abc import ABC, abstractmethod
from rclpy.node import Node
from typing import Dict, Any, Optional
import threading
import time
class RobotComponent(ABC):
"""Base class for robot components"""
def __init__(self, node: Node, component_name: str):
self.node = node
self.name = component_name
self.enabled = True
self.last_update_time = time.time()
self.update_rate = 10.0 # Hz
@abstractmethod
def initialize(self) -> bool:
"""Initialize the component"""
pass
@abstractmethod
def update(self, dt: float) -> bool:
"""Update component state"""
pass
@abstractmethod
def shutdown(self) -> bool:
"""Clean shutdown"""
pass
def set_enabled(self, enabled: bool):
"""Enable/disable component"""
self.enabled = enabled
self.node.get_logger().info(f"Component {self.name}: {'enabled' if enabled else 'disabled'}")
def get_status(self) -> Dict[str, Any]:
"""Get component status"""
return {
'name': self.name,
'enabled': self.enabled,
'last_update': self.last_update_time,
'update_rate': self.update_rate
}
# NavigationComponent example
class NavigationComponent(RobotComponent):
"""Advanced navigation component with SLAM integration"""
def __init__(self, node: Node):
super().__init__(node, 'navigation')
self.current_goal = None
self.planning_frequency = 5.0 # Hz
self.path = []
self.path_index = 0
def initialize(self) -> bool:
"""Initialize navigation systems"""
try:
# Publishers
self.cmd_vel_pub = self.node.create_publisher(
Twist, 'cmd_vel', 10)
# Subscribers
self.odom_sub = self.node.create_subscription(
Odometry, 'odom', self.odometry_callback, 10)
self.map_sub = self.node.create_subscription(
OccupancyGrid, 'map', self.map_callback, 10)
# Action client for navigation
self.nav_action_client = ActionClient(
self.node, NavigateToPose, 'navigate_to_pose')
# Initialize path planner
self.path_planner = AStarPlanner()
self.node.get_logger().info("Navigation component initialized")
return True
except Exception as e:
self.node.get_logger().error(f"Navigation init failed: {e}")
return False
def update(self, dt: float) -> bool:
"""Update navigation state"""
if not self.enabled:
return True
try:
# Update path following if active goal exists
if self.current_goal and self.path:
self.follow_path(dt)
elif self.current_goal:
self.replan_path()
# Check for obstacles
if self.check_obstacles():
self.handle_obstacle_avoidance()
return True
except Exception as e:
self.node.get_logger().error(f"Navigation update error: {e}")
return False
def shutdown(self) -> bool:
"""Clean shutdown"""
try:
# Stop robot
stop_msg = Twist()
self.cmd_vel_pub.publish(stop_msg)
# Cancel any ongoing navigation
if self.nav_action_client.server_is_ready():
self.nav_action_client.cancel_all_goals()
self.node.get_logger().info("Navigation component shutdown")
return True
except Exception as e:
self.node.get_logger().error(f"Navigation shutdown error: {e}")
return False
def navigate_to_pose(self, pose: Pose):
"""Start navigation to target pose"""
goal_msg = NavigateToPose.Goal()
goal_msg.pose.header.frame_id = "map"
goal_msg.pose.pose = pose
self.current_goal = pose
# Send goal to action server
future = self.nav_action_client.send_goal_async(goal_msg)
future.add_done_callback(self.goal_response_callback)
def odometry_callback(self, msg: Odometry):
"""Handle odometry updates"""
self.current_pose = msg.pose.pose
self.current_velocity = msg.twist.twist
def map_callback(self, msg: OccupancyGrid):
"""Handle map updates"""
self.current_map = msg
self.path_planner.update_map(msg)
# Component manager node
class ComponentManagerNode(Node):
"""Node that manages multiple robot components"""
def __init__(self):
super().__init__('component_manager')
self.components: Dict[str, RobotComponent] = {}
self.component_threads: Dict[str, threading.Thread] = {}
self.running = True
# Services for component management
self.enable_service = self.create_service(
SetEnabled, 'enable_component', self.enable_component_callback)
self.status_service = self.create_service(
GetComponentStatus, 'get_component_status', self.get_status_callback)
# Start update thread
self.update_thread = threading.Thread(target=self.update_loop, daemon=True)
self.update_thread.start()
self.get_logger().info("Component manager initialized")
def register_component(self, component: RobotComponent):
"""Register a new component"""
if component.initialize():
self.components[component.name] = component
self.get_logger().info(f"Registered component: {component.name}")
else:
self.get_logger().error(f"Failed to initialize component: {component.name}")
def update_loop(self):
"""Main update loop for all components"""
last_time = time.time()
while self.running and rclpy.ok():
current_time = time.time()
dt = current_time - last_time
last_time = current_time
# Update all components
for component in self.components.values():
if component.enabled:
component.update(dt)
# Maintain 10 Hz update rate
elapsed = time.time() - current_time
sleep_time = max(0, 0.1 - elapsed)
time.sleep(sleep_time)
def enable_component_callback(self, request, response):
"""Handle component enable/disable requests"""
component_name = request.component_name
if component_name in self.components:
self.components[component_name].set_enabled(request.enabled)
response.success = True
response.message = f"Component {component_name} {'enabled' if request.enabled else 'disabled'}"
else:
response.success = False
response.message = f"Component {component_name} not found"
return response
def get_status_callback(self, request, response):
"""Return component status information"""
response.statuses = []
for component in self.components.values():
status = ComponentStatus()
status.name = component.name
status.enabled = component.enabled
status.last_update = component.last_update_time
response.statuses.append(status)
return response
def shutdown(self):
"""Graceful shutdown"""
self.running = False
self.update_thread.join(timeout=1.0)
# Shutdown all components
for component in self.components.values():
component.shutdown()
super().destroy_node()
Async ROS 2 Programming
Python's asyncio can be leveraged for I/O-bound operations in robotics:
# async_robot_node.py
import asyncio
import rclpy
from rclpy.node import Node
from rclpy.executors import AsyncIOExecutor
import aiohttp
import json
class AsyncRobotNode(Node):
"""ROS 2 node with async capabilities for web services and ML inference"""
def __init__(self):
super().__init__('async_robot_node')
# Async components
self.http_session = None
self.inference_queue = asyncio.Queue(maxsize=100)
self.web_api_base = "http://robot-api:8080"
# ROS components
self.image_sub = self.create_subscription(
Image, 'camera/image_raw', self.image_callback, 10)
self.detection_pub = self.create_publisher(
DetectionArray, 'detections', 10)
# Initialize async tasks
self.async_tasks = []
async def initialize_async(self):
"""Initialize async components"""
# Create HTTP session for API calls
self.http_session = aiohttp.ClientSession()
# Start background tasks
self.async_tasks = [
asyncio.create_task(self.inference_processor()),
asyncio.create_task(self.api_health_monitor()),
asyncio.create_task(self.performance_monitor())
]
self.get_logger().info("Async components initialized")
def image_callback(self, msg):
"""Handle incoming images asynchronously"""
try:
# Queue image for inference
if not self.inference_queue.full():
asyncio.create_task(
self.queue_image_for_inference(msg))
else:
self.get_logger().warn("Inference queue full, dropping frame")
except Exception as e:
self.get_logger().error(f"Image callback error: {e}")
async def queue_image_for_inference(self, msg):
"""Queue image for async processing"""
await self.inference_queue.put(msg)
async def inference_processor(self):
"""Process images through ML inference service"""
while rclpy.ok():
try:
# Get image from queue
image_msg = await asyncio.wait_for(
self.inference_queue.get(), timeout=1.0)
# Perform inference
detections = await self.perform_inference(image_msg)
# Publish results
self.detection_pub.publish(detections)
except asyncio.TimeoutError:
# No images available, continue
continue
except Exception as e:
self.get_logger().error(f"Inference processing error: {e}")
async def perform_inference(self, image_msg):
"""Send image to inference service and get results"""
try:
# Convert image to bytes
image_bytes = self.image_to_bytes(image_msg)
# Send to inference API
async with self.http_session.post(
f"{self.web_api_base}/inference",
data=image_bytes,
headers={"Content-Type": "application/octet-stream"}
) as response:
if response.status == 200:
results = await response.json()
return self.parse_inference_results(results, image_msg.header)
else:
self.get_logger().error(f"Inference API error: {response.status}")
return DetectionArray()
except Exception as e:
self.get_logger().error(f"Inference request failed: {e}")
return DetectionArray()
async def api_health_monitor(self):
"""Monitor health of external APIs"""
while rclpy.ok():
try:
async with self.http_session.get(
f"{self.web_api_base}/health",
timeout=aiohttp.ClientTimeout(total=2.0)
) as response:
if response.status == 200:
health = await response.json()
if not health.get('healthy', False):
self.get_logger().warn("Inference API unhealthy")
else:
self.get_logger().error(f"Health check failed: {response.status}")
except Exception as e:
self.get_logger().error(f"Health monitor error: {e}")
# Check every 5 seconds
await asyncio.sleep(5.0)
async def performance_monitor(self):
"""Monitor node performance"""
import psutil
while rclpy.ok():
try:
# Get system metrics
cpu_usage = psutil.cpu_percent()
memory_usage = psutil.virtual_memory().percent
# Get queue size
queue_size = self.inference_queue.qsize()
# Log if thresholds exceeded
if cpu_usage > 80:
self.get_logger().warn(f"High CPU usage: {cpu_usage:.1f}%")
if memory_usage > 80:
self.get_logger().warn(f"High memory usage: {memory_usage:.1f}%")
if queue_size > 50:
self.get_logger().warn(f"High queue size: {queue_size}")
except Exception as e:
self.get_logger().error(f"Performance monitor error: {e}")
await asyncio.sleep(1.0)
def image_to_bytes(self, image_msg):
"""Convert ROS image message to bytes"""
# Implementation depends on image encoding
import cv2
from cv_bridge import CvBridge
bridge = CvBridge()
cv_image = bridge.imgmsg_to_cv2(image_msg)
_, buffer = cv2.imencode('.jpg', cv_image)
return buffer.tobytes()
def parse_inference_results(self, results, header):
"""Parse inference API results into ROS messages"""
detection_array = DetectionArray()
detection_array.header = header
for result in results.get('detections', []):
detection = Detection()
detection.class_id = result.get('class_id', 0)
detection.confidence = result.get('confidence', 0.0)
detection.bbox = BoundingBox2D()
detection.bbox.center.position.x = result.get('center_x', 0)
detection.bbox.center.position.y = result.get('center_y', 0)
detection.bbox.size_x = result.get('width', 0)
detection.bbox.size_y = result.get('height', 0)
detection_array.detections.append(detection)
return detection_array
async def shutdown_async(self):
"""Shutdown async components gracefully"""
# Cancel all tasks
for task in self.async_tasks:
task.cancel()
# Wait for tasks to complete
await asyncio.gather(*self.async_tasks, return_exceptions=True)
# Close HTTP session
if self.http_session:
await self.http_session.close()
async def main():
"""Async main function"""
rclpy.init()
try:
# Create node
node = AsyncRobotNode()
# Initialize async components
await node.initialize_async()
# Create executor
executor = AsyncIOExecutor()
try:
await executor.spin(node)
except KeyboardInterrupt:
pass
# Shutdown
await node.shutdown_async()
node.destroy_node()
finally:
rclpy.shutdown()
if __name__ == '__main__':
asyncio.run(main())
Multi-Threading Strategies
Python threading for CPU-bound operations in robotics:
# threaded_robot_node.py
import threading
import queue
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import Optional
@dataclass
class SensorReading:
timestamp: float
sensor_id: str
data: dict
class ThreadedRobotNode(Node):
"""Node using multiple threads for concurrent processing"""
def __init__(self):
super().__init__('threaded_robot_node')
# Thread pools
self.sensor_thread_pool = ThreadPoolExecutor(
max_workers=4, thread_name_prefix='sensor')
self.processing_thread_pool = ThreadPoolExecutor(
max_workers=2, thread_name_prefix='process')
# Data queues for thread communication
self.sensor_queue = queue.Queue(maxsize=100)
self.processing_queue = queue.Queue(maxsize=50)
# Shared data with locks
self.sensors_data = {}
self.sensors_lock = threading.RLock()
# Control flags
self.running = True
self.processing_active = True
# Start worker threads
self.start_worker_threads()
self.get_logger().info("Threaded robot node initialized")
def start_worker_threads(self):
"""Start background worker threads"""
# Sensor processing thread
self.sensor_thread = threading.Thread(
target=self.sensor_processing_loop,
name='SensorProcessor',
daemon=True
)
self.sensor_thread.start()
# Main processing thread
self.processing_thread = threading.Thread(
target=self.main_processing_loop,
name='MainProcessor',
daemon=True
)
self.processing_thread.start()
# Monitoring thread
self.monitor_thread = threading.Thread(
target=self.monitoring_loop,
name='Monitor',
daemon=True
)
self.monitor_thread.start()
def sensor_callback(self, msg, sensor_id: str):
"""Handle sensor data in callback thread"""
try:
# Quick processing in callback
reading = SensorReading(
timestamp=time.time(),
sensor_id=sensor_id,
data=self.extract_sensor_data(msg)
)
# Queue for detailed processing
if not self.sensor_queue.full():
self.sensor_queue.put_nowait(reading)
else:
self.get_logger().warn(f"Sensor queue full for {sensor_id}")
except Exception as e:
self.get_logger().error(f"Sensor callback error for {sensor_id}: {e}")
def sensor_processing_loop(self):
"""Background thread for detailed sensor processing"""
while self.running and rclpy.ok():
try:
# Get sensor reading with timeout
reading = self.sensor_queue.get(timeout=1.0)
# Submit to thread pool for processing
future = self.sensor_thread_pool.submit(
self.process_sensor_reading, reading)
# Store future for result collection
future.sensor_id = reading.sensor_id
# Update shared data (thread-safe)
with self.sensors_lock:
self.sensors_data[reading.sensor_id] = {
'timestamp': reading.timestamp,
'future': future
}
self.sensor_queue.task_done()
except queue.Empty:
# No sensor data available
continue
except Exception as e:
self.get_logger().error(f"Sensor processing error: {e}")
def process_sensor_reading(self, reading: SensorReading):
"""Process individual sensor reading"""
try:
# Simulate intensive processing
time.sleep(0.01)
# Apply sensor-specific processing
if reading.sensor_id == 'lidar':
return self.process_lidar_data(reading.data)
elif reading.sensor_id == 'camera':
return self.process_camera_data(reading.data)
else:
return self.process_generic_sensor(reading.data)
except Exception as e:
self.get_logger().error(f"Error processing {reading.sensor_id}: {e}")
return None
def process_lidar_data(self, data):
"""Process LiDAR point cloud"""
# Placeholder for LiDAR processing
return {
'obstacles': 5,
'free_space': 0.8,
'min_distance': 2.5
}
def process_camera_data(self, data):
"""Process camera image"""
# Placeholder for vision processing
return {
'objects_detected': 3,
'faces_found': 1,
'quality_score': 0.9
}
def process_generic_sensor(self, data):
"""Process generic sensor data"""
return {'processed': True, 'value': data.get('value', 0)}
def main_processing_loop(self):
"""Main decision-making loop"""
loop_rate = 20 # 20 Hz
loop_period = 1.0 / loop_rate
last_loop_time = time.time()
while self.processing_active and rclpy.ok():
loop_start = time.time()
try:
# Collect processed sensor data
sensor_states = self.collect_sensor_states()
# Make decisions based on sensor data
decisions = self.make_decisions(sensor_states)
# Execute commands
self.execute_commands(decisions)
# Maintain loop rate
loop_duration = time.time() - loop_start
sleep_time = max(0, loop_period - loop_duration)
time.sleep(sleep_time)
except Exception as e:
self.get_logger().error(f"Main processing error: {e}")
def collect_sensor_states(self):
"""Collect latest processed sensor data"""
states = {}
with self.sensors_lock:
for sensor_id, sensor_info in self.sensors_data.items():
future = sensor_info.get('future')
if future and future.done():
try:
states[sensor_id] = future.result()
except Exception as e:
self.get_logger().error(f"Error getting {sensor_id} result: {e}")
return states
def make_decisions(self, sensor_states):
"""Make high-level decisions based on sensor states"""
decisions = {
'emergency_stop': False,
'velocity': Twist(),
'actions': []
}
# Check for obstacles
lidar_data = sensor_states.get('lidar', {})
if lidar_data.get('min_distance', float('inf')) < 1.0:
decisions['emergency_stop'] = True
self.get_logger().warn("Emergency stop: obstacle too close")
# Check camera for people
camera_data = sensor_states.get('camera', {})
if camera_data.get('faces_found', 0) > 0:
decisions['actions'].append('wave_hello')
return decisions
def execute_commands(self, decisions):
"""Execute decisions by publishing commands"""
if decisions['emergency_stop']:
stop_msg = Twist()
self.cmd_vel_pub.publish(stop_msg)
else:
self.cmd_vel_pub.publish(decisions['velocity'])
def monitoring_loop(self):
"""Monitor thread performance and health"""
while self.running and rclpy.ok():
try:
# Check queue sizes
sensor_queue_size = self.sensor_queue.qsize()
processing_queue_size = self.processing_queue.qsize()
# Check thread pool activity
sensor_active = len(self.sensor_thread_pool._threads)
processing_active = len(self.processing_thread_pool._threads)
# Log warnings for issues
if sensor_queue_size > 80:
self.get_logger().warn(f"High sensor queue: {sensor_queue_size}")
if processing_queue_size > 40:
self.get_logger().warn(f"High processing queue: {processing_queue_size}")
# Periodic status report
self.get_logger().info(
f"Queues - Sensor: {sensor_queue_size}, "
f"Processing: {processing_queue_size}, "
f"Threads - Sensor: {sensor_active}, "
f"Processing: {processing_active}"
)
except Exception as e:
self.get_logger().error(f"Monitor error: {e}")
time.sleep(5.0) # Monitor every 5 seconds
def shutdown(self):
"""Graceful shutdown"""
self.get_logger().info("Shutting down threaded node...")
# Stop loops
self.running = False
self.processing_active = False
# Wait for threads to complete
self.sensor_thread.join(timeout=2.0)
self.processing_thread.join(timeout=2.0)
self.monitor_thread.join(timeout=1.0)
# Shutdown thread pools
self.sensor_thread_pool.shutdown(wait=True)
self.processing_thread_pool.shutdown(wait=True)
self.get_logger().info("Threaded node shutdown complete")
def main():
"""Main function with proper signal handling"""
rclpy.init()
try:
node = ThreadedRobotNode()
# Setup signal handling
import signal
def signal_handler(sig, frame):
print("\nShutting down...")
node.shutdown()
rclpy.shutdown()
signal.signal(signal.SIGINT, signal_handler)
# Spin node
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(node)
executor.spin()
except Exception as e:
print(f"Node error: {e}")
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
Debugging and Profiling
Comprehensive Node Diagnostics
# debug_node.py
import cProfile
import pstats
import io
import tracemalloc
import linecache
import os
from rclpy.node import Node
class DebuggableNode(Node):
"""Node with built-in debugging and profiling capabilities"""
def __init__(self, node_name):
super().__init__(node_name)
self.debug_enabled = False
self.profile_enabled = False
self.memory_profiling_enabled = False
# Profilers
self.profiler = None
self.stats_buffer = io.StringIO()
# Debug parameters
self.declare_parameter('enable_debug', False)
self.declare_parameter('enable_profiling', False)
self.declare_parameter('enable_memory_profiling', False)
# Initialize debug features
self.initialize_debug_features()
def initialize_debug_features(self):
"""Initialize debugging features based on parameters"""
self.debug_enabled = self.get_parameter('enable_debug').value
self.profile_enabled = self.get_parameter('enable_profiling').value
self.memory_profiling_enabled = self.get_parameter('enable_memory_profiling').value
if self.debug_enabled:
self.get_logger().info("Debug mode enabled")
if self.profile_enabled:
self.start_profiling()
if self.memory_profiling_enabled:
tracemalloc.start()
self.get_logger().info("Memory profiling enabled")
def start_profiling(self):
"""Start CPU profiling"""
self.profiler = cProfile.Profile()
self.profiler.enable()
self.get_logger().info("CPU profiling started")
def stop_profiling(self):
"""Stop CPU profiling and print results"""
if self.profiler:
self.profiler.disable()
# Get stats
stats = pstats.Stats(self.profiler, stream=self.stats_buffer)
stats.sort_stats('cumulative')
stats.print_stats(20) # Top 20 functions
self.get_logger().info(
f"Profile results:\n{self.stats_buffer.getvalue()}")
def debug_function_call(self, func_name: str, *args, **kwargs):
"""Debug wrapper for function calls"""
if not self.debug_enabled:
return None
# Get calling location
frame = inspect.currentframe().f_back
filename = os.path.basename(frame.f_code.co_filename)
line_number = frame.f_lineno
self.get_logger().debug(
f"Calling {func_name} from {filename}:{line_number} "
f"with args={args}, kwargs={kwargs}")
def check_memory_usage(self):
"""Check and report memory usage"""
if not self.memory_profiling_enabled:
return
current, peak = tracemalloc.get_traced_memory()
current_mb = current / (1024 * 1024)
peak_mb = peak / (1024 * 1024)
self.get_logger().debug(
f"Memory usage: {current_mb:.2f} MB (Peak: {peak_mb:.2f} MB)")
# Warning if memory usage is high
if peak_mb > 100: # More than 100MB
self.get_logger().warn(f"High memory usage: {peak_mb:.2f} MB")
def get_top_memory_allocations(self, n: int = 10):
"""Get top memory allocations"""
if not self.memory_profiling_enabled:
return []
snapshot = tracemalloc.take_snapshot()
top_stats = snapshot.statistics('lineno')
return top_stats[:n]
def print_memory_stats(self):
"""Print detailed memory statistics"""
if not self.memory_profiling_enabled:
return
top_allocations = self.get_top_memory_allocations()
self.get_logger().info("Top memory allocations:")
for i, stat in enumerate(top_allocations, 1):
frame = stat.traceback[0]
self.get_logger().info(
f"#{i}: {frame.filename}:{frame.lineno}: "
f"{stat.size / 1024:.1f} KiB")
# Usage example
class MyRobotNode(DebuggableNode):
"""Robot node with debugging capabilities"""
def __init__(self):
super().__init__('my_robot_node')
self.timer = self.create_timer(0.1, self.timer_callback)
self.callback_count = 0
def timer_callback(self):
"""Timer callback with debugging"""
self.debug_function_call('timer_callback')
self.callback_count += 1
# Simulate some work
data = list(range(1000))
result = sum(data)
if self.callback_count % 100 == 0:
self.get_logger().info(f"Processed {self.callback_count} callbacks")
self.check_memory_usage()
def some_intensive_function(self, n: int):
"""Example function to profile"""
# This function will show up in profiling results
total = 0
for i in range(n):
total += i * i
return total
def on_shutdown(self):
"""Called during node shutdown"""
if self.profile_enabled:
self.stop_profiling()
if self.memory_profiling_enabled:
self.print_memory_stats()
tracemalloc.stop()
Knowledge Check
Conceptual Questions
-
Explain the benefits of component-based architecture in ROS 2 nodes.
-
Compare thread-based vs. async approaches for concurrent operations in robotics.
-
How does Python's GIL affect ROS 2 node performance, and what are the workarounds?
-
Describe best practices for error handling in multi-threaded ROS 2 nodes.
-
Why is profiling important for robotic applications, and how can it be implemented?
Practical Exercises
-
Create a component-based node with sensor, planning, and control components.
-
Implement an async service client that calls external APIs for decision making.
-
Build a multi-threaded node that processes camera images while maintaining control loops.
-
Add comprehensive logging and performance monitoring to an existing ROS 2 node.
Advanced Problems
-
Design a thread-safe shared memory system for high-bandwidth sensor data.
-
Implement a debugging dashboard that visualizes node performance in real-time.
-
Create a node lifecycle manager that handles graceful shutdown and restart scenarios.
Summary
Key Takeaways:
- Component-based architecture enables modular, reusable robot software
- Async programming is valuable for I/O-bound operations and external API calls
- Multi-threading requires careful synchronization and error handling
- Debugging and profiling tools are essential for production robotics
- Proper resource management prevents memory leaks and performance degradation
Next Steps: In the next chapter, we'll explore ROS 2 launch systems and parameter management, learning how to orchestrate complex multi-node robotic systems efficiently.
Further Reading
- Effective Python by Brett Slatkin
- Fluent Python by Luciano Ramalho
- Python Concurrency with asyncio by Matthew Fowler
- High Performance Python by Micha Gorelick and Ian Ozsvald