Skip to main content

Overview

Subscribers receive data from ROS2 topics. Use subscribers to process sensor data, react to commands, and monitor system state.

Basic Subscriber Template

simple_subscriber.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class SimpleSubscriber(Node):
    def __init__(self):
        super().__init__('simple_subscriber')

        # Create subscriber
        self.subscription = self.create_subscription(
            String,                  # Message type
            'topic_name',            # Topic name
            self.listener_callback,  # Callback function
            10                       # Queue size
        )

    def listener_callback(self, msg):
        self.get_logger().info(f'Received: "{msg.data}"')

def main(args=None):
    rclpy.init(args=args)
    node = SimpleSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Practical Examples

1. Velocity Command Subscriber

cmd_vel_subscriber.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from geometry_msgs.msg import Twist

class CmdVelSubscriber(Node):
    def __init__(self):
        super().__init__('cmd_vel_subscriber')

        self.subscription = self.create_subscription(
            Twist,
            '/cmd_vel',
            self.cmd_vel_callback,
            10
        )

        self.get_logger().info('Velocity command subscriber started')

    def cmd_vel_callback(self, msg):
        vx = msg.linear.x
        vy = msg.linear.y
        omega = msg.angular.z

        self.get_logger().info(
            f'Received velocity: vx={vx:.2f} vy={vy:.2f} omega={omega:.2f}'
        )

        # Send to motor controller (example)
        self.send_to_motors(vx, vy, omega)

    def send_to_motors(self, vx, vy, omega):
        # Calculate wheel speeds using inverse kinematics
        wheel_radius = 0.05
        wheelbase = 0.20
        l = wheelbase / 2.0

        w_fl = (1 / wheel_radius) * (vx - vy - l * omega)
        w_fr = (1 / wheel_radius) * (vx + vy + l * omega)
        w_rl = (1 / wheel_radius) * (vx + vy - l * omega)
        w_rr = (1 / wheel_radius) * (vx - vy + l * omega)

        # Send to ESP32 via serial (simplified)
        # self.serial.write(f"CMD,{w_fl},{w_fr},{w_rl},{w_rr}\n")

def main(args=None):
    rclpy.init(args=args)
    node = CmdVelSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

2. LaserScan Subscriber (Obstacle Detection)

obstacle_detector.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import LaserScan
import math

class ObstacleDetector(Node):
    def __init__(self):
        super().__init__('obstacle_detector')

        self.subscription = self.create_subscription(
            LaserScan,
            '/scan',
            self.scan_callback,
            10
        )

        self.min_distance = 0.5  # meters
        self.get_logger().info(f'Obstacle detector started (threshold: {self.min_distance}m)')

    def scan_callback(self, msg):
        # Find minimum distance in scan
        valid_ranges = [r for r in msg.ranges if msg.range_min < r < msg.range_max]

        if not valid_ranges:
            return

        min_range = min(valid_ranges)
        min_index = msg.ranges.index(min_range)

        # Calculate angle of closest obstacle
        angle = msg.angle_min + min_index * msg.angle_increment
        angle_deg = math.degrees(angle)

        if min_range < self.min_distance:
            self.get_logger().warn(
                f'Obstacle detected! Distance: {min_range:.2f}m at {angle_deg:.1f}°'
            )
        else:
            self.get_logger().info(f'Clear (closest: {min_range:.2f}m)')

    def detect_obstacles_in_front(self, msg, angle_range=30.0):
        # Check obstacles in front (±angle_range degrees)
        angle_range_rad = math.radians(angle_range)

        obstacles = []
        for i, r in enumerate(msg.ranges):
            if msg.range_min < r < msg.range_max:
                angle = msg.angle_min + i * msg.angle_increment
                if abs(angle) < angle_range_rad:
                    obstacles.append((r, angle))

        return obstacles

def main(args=None):
    rclpy.init(args=args)
    node = ObstacleDetector()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

3. Multi-Subscriber Node

robot_monitor.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import BatteryState, Imu
from nav_msgs.msg import Odometry

class RobotMonitor(Node):
    def __init__(self):
        super().__init__('robot_monitor')

        # Multiple subscribers
        self.battery_sub = self.create_subscription(
            BatteryState, '/battery', self.battery_callback, 10
        )

        self.imu_sub = self.create_subscription(
            Imu, '/imu/data', self.imu_callback, 10
        )

        self.odom_sub = self.create_subscription(
            Odometry, '/odom', self.odom_callback, 10
        )

        # State variables
        self.battery_voltage = 0.0
        self.robot_speed = 0.0
        self.imu_angular_velocity = 0.0

        # Status publishing timer
        self.timer = self.create_timer(1.0, self.publish_status)

    def battery_callback(self, msg):
        self.battery_voltage = msg.voltage

    def imu_callback(self, msg):
        self.imu_angular_velocity = msg.angular_velocity.z

    def odom_callback(self, msg):
        vx = msg.twist.twist.linear.x
        vy = msg.twist.twist.linear.y
        self.robot_speed = (vx**2 + vy**2) ** 0.5

    def publish_status(self):
        self.get_logger().info(
            f'Status: Battery={self.battery_voltage:.2f}V '
            f'Speed={self.robot_speed:.2f}m/s '
            f'Omega={self.imu_angular_velocity:.2f}rad/s'
        )

def main(args=None):
    rclpy.init(args=args)
    node = RobotMonitor()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Advanced: Subscriber with QoS

reliable_subscriber.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, ReliabilityPolicy, HistoryPolicy
from sensor_msgs.msg import LaserScan

class ReliableSubscriber(Node):
    def __init__(self):
        super().__init__('reliable_subscriber')

        # Custom QoS for reliable delivery
        qos_profile = QoSProfile(
            reliability=ReliabilityPolicy.RELIABLE,  # Guarantee delivery
            history=HistoryPolicy.KEEP_LAST,
            depth=10
        )

        self.subscription = self.create_subscription(
            LaserScan,
            '/scan',
            self.scan_callback,
            qos_profile
        )

    def scan_callback(self, msg):
        self.get_logger().info(f'Received scan with {len(msg.ranges)} points')

def main(args=None):
    rclpy.init(args=args)
    node = ReliableSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Callback Best Practices

Keep Callbacks Fast

  • Process data quickly in callback
  • Don’t block for I/O operations
  • Move heavy computation to separate thread
  • Callbacks should return within milliseconds

Thread Safety

  • Callbacks run in separate threads
  • Use locks for shared data
  • Or use class variables (atomic in Python)
  • Avoid race conditions

Error Handling

  • Wrap callbacks in try-except
  • Validate message data
  • Log errors, don’t crash
  • Handle missing fields gracefully

Data Validation

  • Check for NaN/inf values
  • Verify array sizes match expected
  • Validate timestamps
  • Ignore stale data

Common Patterns

Message Synchronizer

from message_filters import ApproximateTimeSynchronizer, Subscriber

class SynchronizedSubscriber(Node):
    def __init__(self):
        super().__init__('synchronized_subscriber')

        # Create filtered subscribers
        image_sub = Subscriber(self, Image, '/camera/image')
        depth_sub = Subscriber(self, Image, '/camera/depth')

        # Synchronize messages by timestamp
        ts = ApproximateTimeSynchronizer(
            [image_sub, depth_sub],
            queue_size=10,
            slop=0.1  # 100ms tolerance
        )
        ts.registerCallback(self.sync_callback)

    def sync_callback(self, image_msg, depth_msg):
        # Process synchronized messages
        pass

Throttled Subscriber

class ThrottledSubscriber(Node):
    def __init__(self):
        super().__init__('throttled_subscriber')

        self.subscription = self.create_subscription(
            LaserScan, '/scan', self.scan_callback, 10
        )

        self.last_process_time = self.get_clock().now()
        self.process_interval = 1.0  # Process every 1 second

    def scan_callback(self, msg):
        current_time = self.get_clock().now()
        dt = (current_time - self.last_process_time).nanoseconds / 1e9

        if dt >= self.process_interval:
            self.process_scan(msg)
            self.last_process_time = current_time

    def process_scan(self, msg):
        # Heavy processing here
        pass

Troubleshooting

Solutions:
  1. Verify topic exists:
    ros2 topic list
    
  2. Check message type matches:
    ros2 topic info /cmd_vel
    
  3. Ensure publisher is running
  4. Check QoS compatibility
Solutions:
  1. Check timestamp in message header
  2. Increase queue_size if processing slowly
  3. Use BEST_EFFORT QoS for real-time data
  4. Filter based on age:
    age = self.get_clock().now() - msg.header.stamp
    if age.nanoseconds / 1e9 > 1.0:
        return  # Too old, ignore
    
Cause: Callback processing too slowSolutions:
  1. Profile callback execution time
  2. Downsample data (every Nth message)
  3. Use throttled subscriber pattern
  4. Move computation to separate thread

Next Steps