Overview
Publishers send data to ROS2 topics. Use publishers for sensor data, robot status, and computed values that other nodes need.Basic Publisher Template
simple_publisher.py
Copy
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class SimplePublisher(Node):
def __init__(self):
super().__init__('simple_publisher')
# Create publisher
self.publisher = self.create_publisher(
String, # Message type
'topic_name', # Topic name
10 # Queue size
)
# Publish at 1 Hz
self.timer = self.create_timer(1.0, self.timer_callback)
self.count = 0
def timer_callback(self):
msg = String()
msg.data = f'Hello World: {self.count}'
self.publisher.publish(msg)
self.get_logger().info(f'Published: "{msg.data}"')
self.count += 1
def main(args=None):
rclpy.init(args=args)
node = SimplePublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Practical Examples
1. IMU Data Publisher
imu_publisher.py
Copy
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Imu
import smbus2
import time
class IMUPublisher(Node):
def __init__(self):
super().__init__('imu_publisher')
self.publisher = self.create_publisher(Imu, '/imu/data', 10)
self.timer = self.create_timer(0.02, self.publish_imu) # 50 Hz
# Initialize I2C (ICM-20948)
self.bus = smbus2.SMBus(1)
self.imu_address = 0x68
self.init_imu()
def init_imu(self):
# Wake up IMU, set sample rate, etc.
self.bus.write_byte_data(self.imu_address, 0x6B, 0x00)
time.sleep(0.1)
def read_imu(self):
# Read accelerometer and gyroscope (simplified)
accel_x = self.read_word_2c(0x3B)
accel_y = self.read_word_2c(0x3D)
accel_z = self.read_word_2c(0x3F)
gyro_x = self.read_word_2c(0x43)
gyro_y = self.read_word_2c(0x45)
gyro_z = self.read_word_2c(0x47)
# Convert to SI units
ax = accel_x / 16384.0 * 9.81 # m/s²
ay = accel_y / 16384.0 * 9.81
az = accel_z / 16384.0 * 9.81
gx = gyro_x / 131.0 * 3.14159 / 180.0 # rad/s
gy = gyro_y / 131.0 * 3.14159 / 180.0
gz = gyro_z / 131.0 * 3.14159 / 180.0
return ax, ay, az, gx, gy, gz
def read_word_2c(self, addr):
high = self.bus.read_byte_data(self.imu_address, addr)
low = self.bus.read_byte_data(self.imu_address, addr + 1)
val = (high << 8) + low
if val >= 0x8000:
return -((65535 - val) + 1)
else:
return val
def publish_imu(self):
ax, ay, az, gx, gy, gz = self.read_imu()
msg = Imu()
msg.header.stamp = self.get_clock().now().to_msg()
msg.header.frame_id = 'imu_link'
msg.angular_velocity.x = gx
msg.angular_velocity.y = gy
msg.angular_velocity.z = gz
msg.linear_acceleration.x = ax
msg.linear_acceleration.y = ay
msg.linear_acceleration.z = az
# Orientation (not computed, set covariance to -1)
msg.orientation_covariance[0] = -1.0
self.publisher.publish(msg)
def main(args=None):
rclpy.init(args=args)
node = IMUPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
2. Battery Monitor Publisher
battery_monitor.py
Copy
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import BatteryState
class BatteryMonitor(Node):
def __init__(self):
super().__init__('battery_monitor')
self.publisher = self.create_publisher(BatteryState, '/battery', 10)
self.timer = self.create_timer(1.0, self.publish_battery) # 1 Hz
def read_voltage(self):
# Read from ADC (example: through serial or GPIO)
# This is placeholder; actual implementation depends on hardware
return 11.1 # Example: 3S LiPo at full charge
def publish_battery(self):
voltage = self.read_voltage()
msg = BatteryState()
msg.header.stamp = self.get_clock().now().to_msg()
msg.voltage = voltage
msg.current = float('nan') # Not measured
msg.percentage = self.voltage_to_percentage(voltage)
msg.power_supply_status = BatteryState.POWER_SUPPLY_STATUS_DISCHARGING
msg.power_supply_health = BatteryState.POWER_SUPPLY_HEALTH_GOOD
self.publisher.publish(msg)
if voltage < 9.6: # 3S LiPo critical (3.2V/cell)
self.get_logger().warn(f'Low battery: {voltage:.2f}V')
def voltage_to_percentage(self, voltage):
# 3S LiPo: 12.6V (full) to 9.0V (empty)
voltage_min = 9.0
voltage_max = 12.6
percentage = (voltage - voltage_min) / (voltage_max - voltage_min) * 100.0
return max(0.0, min(100.0, percentage))
def main(args=None):
rclpy.init(args=args)
node = BatteryMonitor()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
3. Odometry Publisher
odometry_publisher.py
Copy
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from nav_msgs.msg import Odometry
from geometry_msgs.msg import TransformStamped
from tf2_ros import TransformBroadcaster
import math
class OdometryPublisher(Node):
def __init__(self):
super().__init__('odometry_publisher')
self.odom_pub = self.create_publisher(Odometry, '/odom', 10)
self.tf_broadcaster = TransformBroadcaster(self)
self.timer = self.create_timer(0.05, self.publish_odometry) # 20 Hz
# Robot state
self.x = 0.0
self.y = 0.0
self.theta = 0.0
self.vx = 0.0
self.vy = 0.0
self.omega = 0.0
def update_odometry(self, vx, vy, omega, dt):
# Update velocities
self.vx = vx
self.vy = vy
self.omega = omega
# Update pose
delta_x = (vx * math.cos(self.theta) - vy * math.sin(self.theta)) * dt
delta_y = (vx * math.sin(self.theta) + vy * math.cos(self.theta)) * dt
delta_theta = omega * dt
self.x += delta_x
self.y += delta_y
self.theta += delta_theta
# Normalize theta
self.theta = math.atan2(math.sin(self.theta), math.cos(self.theta))
def publish_odometry(self):
current_time = self.get_clock().now()
# Create odometry message
odom = Odometry()
odom.header.stamp = current_time.to_msg()
odom.header.frame_id = 'odom'
odom.child_frame_id = 'base_link'
# Position
odom.pose.pose.position.x = self.x
odom.pose.pose.position.y = self.y
odom.pose.pose.position.z = 0.0
# Orientation (convert theta to quaternion)
odom.pose.pose.orientation.z = math.sin(self.theta / 2.0)
odom.pose.pose.orientation.w = math.cos(self.theta / 2.0)
# Velocity
odom.twist.twist.linear.x = self.vx
odom.twist.twist.linear.y = self.vy
odom.twist.twist.angular.z = self.omega
# Publish odometry
self.odom_pub.publish(odom)
# Broadcast TF
t = TransformStamped()
t.header.stamp = current_time.to_msg()
t.header.frame_id = 'odom'
t.child_frame_id = 'base_link'
t.transform.translation.x = self.x
t.transform.translation.y = self.y
t.transform.translation.z = 0.0
t.transform.rotation = odom.pose.pose.orientation
self.tf_broadcaster.sendTransform(t)
def main(args=None):
rclpy.init(args=args)
node = OdometryPublisher()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Best Practices
Message Types
- Use standard ROS2 messages when possible
- sensor_msgs for sensors
- nav_msgs for navigation
- geometry_msgs for poses/velocities
Publishing Rate
- Match sensor rate (IMU: 50Hz, LiDAR: 5Hz)
- Don’t publish faster than needed
- Use timers for consistent rates
- Balance performance vs. latency
Topic Names
- Use descriptive names:
/imu/data,/battery - Follow REP-144 conventions
- Use namespaces for organization
- Avoid generic names:
/data,/output
Error Handling
- Validate data before publishing
- Handle sensor failures gracefully
- Log warnings for invalid data
- Set appropriate covariance matrices