Skip to main content

Overview

Publishers send data to ROS2 topics. Use publishers for sensor data, robot status, and computed values that other nodes need.

Basic Publisher Template

simple_publisher.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class SimplePublisher(Node):
    def __init__(self):
        super().__init__('simple_publisher')

        # Create publisher
        self.publisher = self.create_publisher(
            String,          # Message type
            'topic_name',    # Topic name
            10               # Queue size
        )

        # Publish at 1 Hz
        self.timer = self.create_timer(1.0, self.timer_callback)
        self.count = 0

    def timer_callback(self):
        msg = String()
        msg.data = f'Hello World: {self.count}'
        self.publisher.publish(msg)
        self.get_logger().info(f'Published: "{msg.data}"')
        self.count += 1

def main(args=None):
    rclpy.init(args=args)
    node = SimplePublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Practical Examples

1. IMU Data Publisher

imu_publisher.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu
import smbus2
import time

class IMUPublisher(Node):
    def __init__(self):
        super().__init__('imu_publisher')

        self.publisher = self.create_publisher(Imu, '/imu/data', 10)
        self.timer = self.create_timer(0.02, self.publish_imu)  # 50 Hz

        # Initialize I2C (ICM-20948)
        self.bus = smbus2.SMBus(1)
        self.imu_address = 0x68
        self.init_imu()

    def init_imu(self):
        # Wake up IMU, set sample rate, etc.
        self.bus.write_byte_data(self.imu_address, 0x6B, 0x00)
        time.sleep(0.1)

    def read_imu(self):
        # Read accelerometer and gyroscope (simplified)
        accel_x = self.read_word_2c(0x3B)
        accel_y = self.read_word_2c(0x3D)
        accel_z = self.read_word_2c(0x3F)
        gyro_x = self.read_word_2c(0x43)
        gyro_y = self.read_word_2c(0x45)
        gyro_z = self.read_word_2c(0x47)

        # Convert to SI units
        ax = accel_x / 16384.0 * 9.81  # m/s²
        ay = accel_y / 16384.0 * 9.81
        az = accel_z / 16384.0 * 9.81
        gx = gyro_x / 131.0 * 3.14159 / 180.0  # rad/s
        gy = gyro_y / 131.0 * 3.14159 / 180.0
        gz = gyro_z / 131.0 * 3.14159 / 180.0

        return ax, ay, az, gx, gy, gz

    def read_word_2c(self, addr):
        high = self.bus.read_byte_data(self.imu_address, addr)
        low = self.bus.read_byte_data(self.imu_address, addr + 1)
        val = (high << 8) + low
        if val >= 0x8000:
            return -((65535 - val) + 1)
        else:
            return val

    def publish_imu(self):
        ax, ay, az, gx, gy, gz = self.read_imu()

        msg = Imu()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.header.frame_id = 'imu_link'

        msg.angular_velocity.x = gx
        msg.angular_velocity.y = gy
        msg.angular_velocity.z = gz

        msg.linear_acceleration.x = ax
        msg.linear_acceleration.y = ay
        msg.linear_acceleration.z = az

        # Orientation (not computed, set covariance to -1)
        msg.orientation_covariance[0] = -1.0

        self.publisher.publish(msg)

def main(args=None):
    rclpy.init(args=args)
    node = IMUPublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

2. Battery Monitor Publisher

battery_monitor.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import BatteryState

class BatteryMonitor(Node):
    def __init__(self):
        super().__init__('battery_monitor')

        self.publisher = self.create_publisher(BatteryState, '/battery', 10)
        self.timer = self.create_timer(1.0, self.publish_battery)  # 1 Hz

    def read_voltage(self):
        # Read from ADC (example: through serial or GPIO)
        # This is placeholder; actual implementation depends on hardware
        return 11.1  # Example: 3S LiPo at full charge

    def publish_battery(self):
        voltage = self.read_voltage()

        msg = BatteryState()
        msg.header.stamp = self.get_clock().now().to_msg()
        msg.voltage = voltage
        msg.current = float('nan')  # Not measured
        msg.percentage = self.voltage_to_percentage(voltage)
        msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
        msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_GOOD

        self.publisher.publish(msg)

        if voltage < 9.6:  # 3S LiPo critical (3.2V/cell)
            self.get_logger().warn(f'Low battery: {voltage:.2f}V')

    def voltage_to_percentage(self, voltage):
        # 3S LiPo: 12.6V (full) to 9.0V (empty)
        voltage_min = 9.0
        voltage_max = 12.6
        percentage = (voltage - voltage_min) / (voltage_max - voltage_min) * 100.0
        return max(0.0, min(100.0, percentage))

def main(args=None):
    rclpy.init(args=args)
    node = BatteryMonitor()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

3. Odometry Publisher

odometry_publisher.py
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from geometry_msgs.msg import TransformStamped
from tf2_ros import TransformBroadcaster
import math

class OdometryPublisher(Node):
    def __init__(self):
        super().__init__('odometry_publisher')

        self.odom_pub = self.create_publisher(Odometry, '/odom', 10)
        self.tf_broadcaster = TransformBroadcaster(self)

        self.timer = self.create_timer(0.05, self.publish_odometry)  # 20 Hz

        # Robot state
        self.x = 0.0
        self.y = 0.0
        self.theta = 0.0
        self.vx = 0.0
        self.vy = 0.0
        self.omega = 0.0

    def update_odometry(self, vx, vy, omega, dt):
        # Update velocities
        self.vx = vx
        self.vy = vy
        self.omega = omega

        # Update pose
        delta_x = (vx * math.cos(self.theta) - vy * math.sin(self.theta)) * dt
        delta_y = (vx * math.sin(self.theta) + vy * math.cos(self.theta)) * dt
        delta_theta = omega * dt

        self.x += delta_x
        self.y += delta_y
        self.theta += delta_theta

        # Normalize theta
        self.theta = math.atan2(math.sin(self.theta), math.cos(self.theta))

    def publish_odometry(self):
        current_time = self.get_clock().now()

        # Create odometry message
        odom = Odometry()
        odom.header.stamp = current_time.to_msg()
        odom.header.frame_id = 'odom'
        odom.child_frame_id = 'base_link'

        # Position
        odom.pose.pose.position.x = self.x
        odom.pose.pose.position.y = self.y
        odom.pose.pose.position.z = 0.0

        # Orientation (convert theta to quaternion)
        odom.pose.pose.orientation.z = math.sin(self.theta / 2.0)
        odom.pose.pose.orientation.w = math.cos(self.theta / 2.0)

        # Velocity
        odom.twist.twist.linear.x = self.vx
        odom.twist.twist.linear.y = self.vy
        odom.twist.twist.angular.z = self.omega

        # Publish odometry
        self.odom_pub.publish(odom)

        # Broadcast TF
        t = TransformStamped()
        t.header.stamp = current_time.to_msg()
        t.header.frame_id = 'odom'
        t.child_frame_id = 'base_link'
        t.transform.translation.x = self.x
        t.transform.translation.y = self.y
        t.transform.translation.z = 0.0
        t.transform.rotation = odom.pose.pose.orientation

        self.tf_broadcaster.sendTransform(t)

def main(args=None):
    rclpy.init(args=args)
    node = OdometryPublisher()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

Best Practices

Message Types

  • Use standard ROS2 messages when possible
  • sensor_msgs for sensors
  • nav_msgs for navigation
  • geometry_msgs for poses/velocities

Publishing Rate

  • Match sensor rate (IMU: 50Hz, LiDAR: 5Hz)
  • Don’t publish faster than needed
  • Use timers for consistent rates
  • Balance performance vs. latency

Topic Names

  • Use descriptive names: /imu/data, /battery
  • Follow REP-144 conventions
  • Use namespaces for organization
  • Avoid generic names: /data, /output

Error Handling

  • Validate data before publishing
  • Handle sensor failures gracefully
  • Log warnings for invalid data
  • Set appropriate covariance matrices

Next Steps