Skip to main content

Chapter 3: Implementing Your First ROS 2 System

Hands-On: Building a Humanoid Control Architecture

Let's transition from theory to practice. We'll build a complete ROS 2 system where an AI agent (simulated LLM) communicates with motor controllers through ROS 2 primitives.

Setting Up Your ROS 2 Workspace

# Create workspace directory structure
mkdir -p ~/ros2_humanoid_ws/src
cd ~/ros2_humanoid_ws/src

# Create a new ROS 2 Python package
ros2 pkg create --build-type ament_python humanoid_controller

# Navigate into the package
cd humanoid_controller
mkdir -p humanoid_controller launch urdf rviz

Chapter 3.1: Building Your First Publisher Node

Create humanoid_controller/arm_command_node.py:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import random
import time

class ArmCommandNode(Node):
def __init__(self):
super().__init__('arm_command_node')
self.publisher_ = self.create_publisher(
String,
'arm_commands',
10
)
timer_period = 2.0
self.timer = self.create_timer(timer_period, self.publish_command)
self.get_logger().info('Arm Command Node initialized')

def publish_command(self):
commands = ['wave', 'reach_forward', 'grab', 'rest', 'point']
selected = random.choice(commands)
msg = String()
msg.data = f'COMMAND:{selected}:TIMESTAMP:{time.time()}'
self.publisher_.publish(msg)
self.get_logger().info(f'Published: {msg.data}')

def main(args=None):
rclpy.init(args=args)
node = ArmCommandNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Chapter 3.2: Creating a Subscriber + Publisher Bridge

Create humanoid_controller/motor_controller_node.py:

import rclpy
from rclpy.node import Node
from std_msgs.msg import String, Float32MultiArray

class MotorControllerNode(Node):
def __init__(self):
super().__init__('motor_controller_node')

# Subscribe to arm commands
self.subscription = self.create_subscription(
String,
'arm_commands',
self.command_callback,
10
)

# Publish motor signals
self.motor_publisher = self.create_publisher(
Float32MultiArray,
'motor_signals',
10
)

self.get_logger().info('Motor Controller Node ready')

def command_callback(self, msg):
self.get_logger().info(f'Received command: {msg.data}')

# Parse command and translate to motor signals
if 'wave' in msg.data:
self.execute_wave()
elif 'reach_forward' in msg.data:
self.execute_reach_forward()
elif 'grab' in msg.data:
self.execute_grab()

def execute_wave(self):
motor_msg = Float32MultiArray()
motor_msg.data = [45.0, 30.0, 20.0] # Example: shoulder, elbow, wrist angles
self.motor_publisher.publish(motor_msg)
self.get_logger().info('Executing: Wave motion')

def execute_reach_forward(self):
motor_msg = Float32MultiArray()
motor_msg.data = [0.0, 90.0, 0.0]
self.motor_publisher.publish(motor_msg)
self.get_logger().info('Executing: Reach forward')

def execute_grab(self):
motor_msg = Float32MultiArray()
motor_msg.data = [30.0, 60.0, -45.0]
self.motor_publisher.publish(motor_msg)
self.get_logger().info('Executing: Grab object')

def main(args=None):
rclpy.init(args=args)
node = MotorControllerNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Chapter 3.3: Service Implementation - Status Requests

Create humanoid_controller/status_service_node.py:

import rclpy
from rclpy.node import Node
from std_srvs.srv import Trigger

class StatusServiceNode(Node):
def __init__(self):
super().__init__('status_service_node')
self.service = self.create_service(
Trigger,
'get_robot_status',
self.status_callback
)
self.robot_ready = True
self.get_logger().info('Status Service Node ready')

def status_callback(self, request, response):
response.success = self.robot_ready
response.message = f'Robot Status: {"READY" if self.robot_ready else "ERROR"}'
self.get_logger().info(f'Status requested: {response.message}')
return response

def main(args=None):
rclpy.init(args=args)
node = StatusServiceNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Chapter 3.4: Building and Running

Update setup.py:

from setuptools import setup, find_packages

setup(
name='humanoid_controller',
version='0.0.1',
packages=find_packages(exclude=['test']),
data_files=[
('share/humanoid_controller', ['package.xml']),
],
install_requires=['setuptools'],
entry_points={
'console_scripts': [
'arm_command = humanoid_controller.arm_command_node:main',
'motor_controller = humanoid_controller.motor_controller_node:main',
'status_service = humanoid_controller.status_service_node:main',
],
},
)

Build and test:

cd ~/ros2_humanoid_ws
colcon build
source install/setup.bash

# Terminal 1: Run arm command publisher
ros2 run humanoid_controller arm_command

# Terminal 2: Run motor controller subscriber
ros2 run humanoid_controller motor_controller

# Terminal 3: Monitor communication
ros2 topic echo arm_commands