Skip to main content

Overview

ROS2 (Robot Operating System 2) is a flexible framework for writing robot software. It provides tools, libraries, and conventions that aim to simplify creating complex and robust robot behavior across platforms. Understanding ROS2’s core architecture is essential for building autonomous navigation systems.

Distributed System

Multiple programs (nodes) communicate over standardized interfaces

Middleware Layer

DDS (Data Distribution Service) handles inter-process communication

Core Concepts

Nodes

Nodes are individual processes that perform specific computations. A robot system is composed of many nodes working together.
simple_node.py
import rclpy
from rclpy.node import Node

class MinimalNode(Node):
    def __init__(self):
        super().__init__('minimal_node')  # Node name
        self.get_logger().info('Node has started!')

def main(args=None):
    rclpy.init(args=args)
    node = MinimalNode()
    rclpy.spin(node)  # Keep node running
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()
Example nodes in mecanum robot:
  • motor_controller_node: Controls wheel motors via ESP32
  • lidar_driver_node: Publishes laser scan data from RPLIDAR
  • imu_driver_node: Publishes IMU sensor data
  • ekf_localization_node: Fuses odometry and IMU for robot pose
  • slam_toolbox_node: Creates map using LiDAR data
  • nav2_controller_node: Autonomous path following
Node characteristics:
  • Single, focused responsibility (one node = one job)
  • Can be started/stopped independently
  • Communicate via topics, services, or actions
  • Written in Python or C++ (this course uses Python)

Topics

Topics implement publish-subscribe messaging. Nodes publish messages to topics, and other nodes subscribe to receive those messages.
┌──────────────┐      /cmd_vel      ┌──────────────────┐
│ Teleop Node  ├────────────────────►│ Motor Controller │
│ (publisher)  │  (Twist message)   │  (subscriber)    │
└──────────────┘                     └──────────────────┘

Publisher: Creates velocity commands
Subscriber: Receives commands and moves robot
Topic: Named channel (/cmd_vel)
Message Type: geometry_msgs/Twist (vx, vy, omega)
Key properties:
  • Many-to-many: Multiple publishers and subscribers allowed
  • Asynchronous: Publishers don’t wait for subscribers
  • Decoupled: Nodes don’t know about each other, only topics
  • Typed: Each topic has a specific message type
Python example:
publisher_example.py
from geometry_msgs.msg import Twist
from rclpy.node import Node

class VelocityPublisher(Node):
    def __init__(self):
        super().__init__('velocity_publisher')

        # Create publisher
        self.publisher = self.create_publisher(
            Twist,              # Message type
            '/cmd_vel',         # Topic name
            10                  # Queue size
        )

        # Publish at 10 Hz
        self.timer = self.create_timer(0.1, self.publish_velocity)

    def publish_velocity(self):
        msg = Twist()
        msg.linear.x = 0.5   # Forward 0.5 m/s
        msg.linear.y = 0.0   # No lateral movement
        msg.angular.z = 0.0  # No rotation

        self.publisher.publish(msg)
        self.get_logger().info(f'Published: vx={msg.linear.x}')
subscriber_example.py
from geometry_msgs.msg import Twist
from rclpy.node import Node

class VelocitySubscriber(Node):
    def __init__(self):
        super().__init__('velocity_subscriber')

        # Create subscriber
        self.subscription = self.create_subscription(
            Twist,              # Message type
            '/cmd_vel',         # Topic name
            self.velocity_callback,  # Callback function
            10                  # Queue size
        )

    def velocity_callback(self, msg):
        self.get_logger().info(
            f'Received: vx={msg.linear.x}, vy={msg.linear.y}, '
            f'omega={msg.angular.z}'
        )

Services

Services implement request-response communication. A client sends a request and waits for a response from the server.
┌──────────────┐                    ┌──────────────────┐
│ Client Node  ├───── Request ─────►│  Server Node     │
│              │                     │                  │
│              │◄──── Response ─────┤                  │
└──────────────┘                     └──────────────────┘

Example: Reset odometry
Request: Empty (trigger only)
Response: bool success, string message
When to use services:
  • One-time requests (e.g., “reset odometry”)
  • Configuration changes (e.g., “set PID gains”)
  • Queries (e.g., “get current position”)
Contrast with topics:
  • Topics: Continuous data streams (sensor readings, velocities)
  • Services: Occasional requests (commands, queries)
Python example:
service_server.py
from std_srvs.srv import Trigger
from rclpy.node import Node

class OdometryResetServer(Node):
    def __init__(self):
        super().__init__('odometry_reset_server')

        # Create service
        self.srv = self.create_service(
            Trigger,                     # Service type
            'reset_odometry',            # Service name
            self.reset_callback          # Callback function
        )

    def reset_callback(self, request, response):
        # Reset odometry (implementation details omitted)
        self.reset_odometry()

        response.success = True
        response.message = 'Odometry reset successfully'
        self.get_logger().info('Odometry reset')
        return response
service_client.py
from std_srvs.srv import Trigger
from rclpy.node import Node

class OdometryResetClient(Node):
    def __init__(self):
        super().__init__('odometry_reset_client')

        # Create client
        self.client = self.create_client(Trigger, 'reset_odometry')

        # Wait for service to be available
        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Waiting for service...')

    def send_request(self):
        request = Trigger.Request()

        # Call service (blocks until response received)
        future = self.client.call_async(request)
        rclpy.spin_until_future_complete(self, future)

        response = future.result()
        if response.success:
            self.get_logger().info(f'Success: {response.message}')
        else:
            self.get_logger().error(f'Failed: {response.message}')

Actions

Actions are for long-running tasks that provide feedback. Combines goal request, continuous feedback, and final result.
┌──────────────┐                    ┌──────────────────┐
│ Client Node  ├───── Goal ────────►│  Server Node     │
│              │                     │                  │
│              │◄──── Feedback ─────┤  (executing...)  │
│              │◄──── Feedback ─────┤                  │
│              │◄──── Result ───────┤  (completed)     │
└──────────────┘                     └──────────────────┘

Example: Navigate to waypoint
Goal: target_pose (x, y, theta)
Feedback: distance_remaining, estimated_time_remaining
Result: bool success, PoseStamped final_pose
When to use actions:
  • Long-running tasks (navigation, mapping)
  • Tasks requiring feedback (progress updates)
  • Cancellable operations (stop navigation mid-route)
Example: Navigate to position (simplified Nav2 action)
navigation_action_client.py
from nav2_msgs.action import NavigateToPose
from rclpy.action import ActionClient
from rclpy.node import Node

class NavigationClient(Node):
    def __init__(self):
        super().__init__('navigation_client')

        # Create action client
        self._action_client = ActionClient(
            self,
            NavigateToPose,
            'navigate_to_pose'
        )

    def send_goal(self, x, y, theta):
        goal_msg = NavigateToPose.Goal()
        goal_msg.pose.header.frame_id = 'map'
        goal_msg.pose.pose.position.x = x
        goal_msg.pose.pose.position.y = y
        # (orientation quaternion calculation omitted)

        self._action_client.wait_for_server()

        # Send goal and register callbacks
        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.feedback_callback
        )

        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        self.get_logger().info(
            f'Distance remaining: {feedback.distance_remaining:.2f} m'
        )

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().error('Goal rejected')
            return

        self.get_logger().info('Goal accepted, navigating...')

        # Wait for result
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.result_callback)

    def result_callback(self, future):
        result = future.result().result
        if result.success:
            self.get_logger().info('Navigation succeeded!')
        else:
            self.get_logger().error('Navigation failed')

ROS2 Communication Patterns

Comparison Table

FeatureTopicsServicesActions
PatternPub-SubRequest-ResponseGoal-Feedback-Result
DirectionOne-wayTwo-wayTwo-way + continuous
BlockingNo (async)Yes (waits for response)Yes (can cancel)
FeedbackNoNoYes (during execution)
Use CasesSensor data, velocitiesCommands, queriesLong tasks, navigation
Examples/scan, /odom, /cmd_velreset_odom, get_posenavigate_to_pose

When to Use Each

  • Topics
  • Services
  • Actions
Use topics for:
  • Continuous data streams (sensor readings)
  • Real-time control signals (motor commands)
  • Status updates (battery level, warnings)
  • Broadcasting to multiple receivers
Mecanum robot examples:
  • /scan (LaserScan): LiDAR data at 5.5 Hz
  • /imu/data (Imu): IMU readings at 50 Hz
  • /cmd_vel (Twist): Velocity commands at 10 Hz
  • /odom (Odometry): Position estimates at 20 Hz

ROS2 Graph Architecture

Visualize how nodes communicate in the mecanum robot system:
                        ROS2 Computation Graph

┌─────────────────────────────────────────────────────────────────┐
│                         Sensors                                 │
└─────────────────────────────────────────────────────────────────┘
     │                    │                    │
     │ /scan              │ /imu/data          │ /joint_states
     ▼                    ▼                    ▼
┌─────────┐          ┌─────────┐          ┌──────────────┐
│ LiDAR   │          │  IMU    │          │ ESP32 Bridge │
│ Driver  │          │ Driver  │          │    Node      │
└─────────┘          └─────────┘          └──────────────┘
                          │                     │
                          │                     ▼
                          │              ┌──────────────┐
                          └─────────────►│ EKF Filter   │
                                          │  (robot_loc) │
                                          └──────────────┘

                                                │ /odom/filtered

┌─────────────────────────────────────────────────────────────────┐
│                     Navigation Stack                            │
│  ┌────────────┐    ┌────────────┐    ┌────────────────────┐   │
│  │ SLAM       │───►│  Map       │───►│  Path Planner      │   │
│  │ Toolbox    │    │  Server    │    │  (NavFn/Smac)      │   │
│  └────────────┘    └────────────┘    └────────────────────┘   │
│                                              │                  │
│                                              ▼                  │
│                                       ┌────────────────────┐   │
│                                       │ Local Controller   │   │
│                                       │ (DWB)              │   │
│                                       └────────────────────┘   │
└─────────────────────────────────────────────────────────────────┘

                                                │ /cmd_vel

                                       ┌────────────────────┐
                                       │ Motor Controller   │
                                       │ (mecanum_drive)    │
                                       └────────────────────┘


                                           [Motors]
Key observations:
  • Sensors publish data to topics (one-way flow)
  • EKF fuses multiple sensors (subscribes to multiple topics)
  • Navigation stack orchestrates multiple nodes
  • Final control commands go to motor controller
  • Each node is independent and can be replaced

ROS2 Quality of Service (QoS)

QoS policies control how messages are delivered between publishers and subscribers.

Common QoS Profiles

from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy

# Sensor data (lossy, fast, no history needed)
sensor_qos = QoSProfile(
    reliability=ReliabilityPolicy.BEST_EFFORT,  # Allow message loss
    history=HistoryPolicy.KEEP_LAST,            # Keep only latest
    depth=1                                      # Queue size = 1
)

# Commands (reliable, no message loss)
command_qos = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE,     # Guarantee delivery
    history=HistoryPolicy.KEEP_LAST,
    depth=10                                     # Buffer 10 messages
)

# Create publisher with QoS
self.scan_pub = self.create_publisher(
    LaserScan, '/scan', sensor_qos
)
QoS parameters:
  • Reliability: BEST_EFFORT (fast, lossy) vs. RELIABLE (slow, guaranteed)
  • History: KEEP_LAST (only recent) vs. KEEP_ALL (all messages)
  • Depth: Queue size (1 = only latest, 10+ = buffer messages)
  • Durability: VOLATILE (RAM only) vs. TRANSIENT_LOCAL (persist for late joiners)
Typical use cases:
  • Sensor data: BEST_EFFORT, KEEP_LAST(1) → Speed over reliability
  • Commands: RELIABLE, KEEP_LAST(10) → No message loss
  • Diagnostics: TRANSIENT_LOCAL → Late joiners get status updates

ROS2 Parameters

Parameters allow configuring nodes at runtime without recompiling code.
node_with_parameters.py
class ConfigurableNode(Node):
    def __init__(self):
        super().__init__('configurable_node')

        # Declare parameters with default values
        self.declare_parameter('max_speed', 0.5)  # m/s
        self.declare_parameter('pid_kp', 1.0)
        self.declare_parameter('pid_ki', 0.1)
        self.declare_parameter('pid_kd', 0.05)

        # Read parameter values
        self.max_speed = self.get_parameter('max_speed').value
        self.kp = self.get_parameter('pid_kp').value

        self.get_logger().info(f'Max speed: {self.max_speed} m/s')
        self.get_logger().info(f'PID: Kp={self.kp}')
Set parameters via command line:
ros2 run my_package configurable_node --ros-args \
    -p max_speed:=1.0 \
    -p pid_kp:=2.5
Set parameters via YAML file:
robot_params.yaml
/configurable_node:
  ros__parameters:
    max_speed: 1.0
    pid_kp: 2.5
    pid_ki: 0.2
    pid_kd: 0.1
ros2 run my_package configurable_node --ros-args \
    --params-file robot_params.yaml
List/get/set parameters at runtime:
# List all parameters
ros2 param list

# Get specific parameter
ros2 param get /configurable_node max_speed

# Set parameter (temporarily, until node restarts)
ros2 param set /configurable_node max_speed 0.8

Best Practices

Node Design

  • One node = one responsibility
  • Keep nodes small and focused
  • Use descriptive node names (lidar_driver, not node1)
  • Initialize in __init__, run in spin()

Topic Naming

  • Use lowercase with underscores: /cmd_vel, /scan
  • Namespace related topics: /imu/data, /imu/raw
  • Follow ROS conventions (REP-144)
  • Avoid generic names (/data, /output)

Message Types

  • Use standard ROS2 messages when possible
  • geometry_msgs/Twist for velocities
  • sensor_msgs/LaserScan for LiDAR
  • nav_msgs/Odometry for position
  • Create custom messages only if necessary

Error Handling

  • Log errors with self.get_logger().error()
  • Validate message content before use
  • Handle missing topics/services gracefully
  • Use try-except for critical operations

CLI Commands Reference

# List active nodes
ros2 node list

# Get node info
ros2 node info /lidar_driver

# List all topics
ros2 topic list

# See topic info (type, publishers, subscribers)
ros2 topic info /scan

# Echo topic messages (view live data)
ros2 topic echo /scan

# Publish to topic manually
ros2 topic pub /cmd_vel geometry_msgs/Twist \
    '{linear: {x: 0.5}, angular: {z: 0.0}}'

# List all services
ros2 service list

# Call service
ros2 service call /reset_odometry std_srvs/srv/Trigger

# List all actions
ros2 action list

# Send action goal
ros2 action send_goal /navigate_to_pose \
    nav2_msgs/action/NavigateToPose \
    "{pose: {header: {frame_id: 'map'}, pose: {position: {x: 1.0, y: 0.5}}}}"

Next Steps


References