RobotTransportServer / client /python /examples /producer_consumer_demo.py
blanchon's picture
Initial commit
02eac4b
#!/usr/bin/env python3
"""
Producer-Consumer Demo - LeRobot Arena
This example demonstrates:
- Producer and multiple consumers working together
- Real-time joint updates
- Emergency stop functionality
- State synchronization
- Connection management
"""
import asyncio
import logging
import random
from lerobot_arena_client import RoboticsConsumer, RoboticsProducer
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DemoConsumer:
"""Demo consumer that logs all received messages."""
def __init__(self, name: str, room_id: str):
self.name = name
self.room_id = room_id
self.consumer = RoboticsConsumer("http://localhost:8000")
self.update_count = 0
self.state_count = 0
async def setup(self):
"""Setup consumer with callbacks."""
def on_joint_update(joints):
self.update_count += 1
logger.info(
f"[{self.name}] Joint update #{self.update_count}: {len(joints)} joints"
)
def on_state_sync(state):
self.state_count += 1
logger.info(
f"[{self.name}] State sync #{self.state_count}: {len(state)} joints"
)
def on_error(error_msg):
logger.error(f"[{self.name}] ERROR: {error_msg}")
def on_connected():
logger.info(f"[{self.name}] Connected!")
def on_disconnected():
logger.info(f"[{self.name}] Disconnected!")
self.consumer.on_joint_update(on_joint_update)
self.consumer.on_state_sync(on_state_sync)
self.consumer.on_error(on_error)
self.consumer.on_connected(on_connected)
self.consumer.on_disconnected(on_disconnected)
async def connect(self):
"""Connect to room."""
success = await self.consumer.connect(self.room_id, f"demo-{self.name}")
if success:
logger.info(f"[{self.name}] Successfully connected to room {self.room_id}")
else:
logger.error(f"[{self.name}] Failed to connect to room {self.room_id}")
return success
async def disconnect(self):
"""Disconnect from room."""
if self.consumer.is_connected():
await self.consumer.disconnect()
logger.info(
f"[{self.name}] Final stats: {self.update_count} updates, {self.state_count} states"
)
async def simulate_robot_movement(producer: RoboticsProducer):
"""Simulate realistic robot movement."""
# Define some realistic joint ranges for a robotic arm
joints = {
"base": {"current": 0.0, "target": 0.0, "min": -180, "max": 180},
"shoulder": {"current": 0.0, "target": 0.0, "min": -90, "max": 90},
"elbow": {"current": 0.0, "target": 0.0, "min": -135, "max": 135},
"wrist": {"current": 0.0, "target": 0.0, "min": -180, "max": 180},
}
logger.info("[Producer] Starting robot movement simulation...")
for step in range(20): # 20 movement steps
# Occasionally set new random targets
if step % 5 == 0:
for joint_name, joint_data in joints.items():
joint_data["target"] = random.uniform(
joint_data["min"], joint_data["max"]
)
logger.info(f"[Producer] Step {step + 1}: New targets set")
# Move each joint towards its target
joint_updates = []
for joint_name, joint_data in joints.items():
current = joint_data["current"]
target = joint_data["target"]
# Simple movement: move 10% towards target each step
diff = target - current
move = diff * 0.1
new_value = current + move
joint_data["current"] = new_value
joint_updates.append({"name": joint_name, "value": new_value})
# Send the joint updates
await producer.send_joint_update(joint_updates)
# Add some delay for realistic movement
await asyncio.sleep(0.5)
logger.info("[Producer] Movement simulation completed")
async def main():
"""Main demo function."""
logger.info("=== LeRobot Arena Producer-Consumer Demo ===")
# Create producer
producer = RoboticsProducer("http://localhost:8000")
# Setup producer callbacks
def on_producer_error(error_msg):
logger.error(f"[Producer] ERROR: {error_msg}")
def on_producer_connected():
logger.info("[Producer] Connected!")
def on_producer_disconnected():
logger.info("[Producer] Disconnected!")
producer.on_error(on_producer_error)
producer.on_connected(on_producer_connected)
producer.on_disconnected(on_producer_disconnected)
try:
# Create room and connect producer
room_id = await producer.create_room()
logger.info(f"Created room: {room_id}")
success = await producer.connect(room_id, "robot-controller")
if not success:
logger.error("Failed to connect producer!")
return
# Create multiple consumers
consumers = []
consumer_names = ["visualizer", "logger", "safety-monitor"]
for name in consumer_names:
consumer = DemoConsumer(name, room_id)
await consumer.setup()
consumers.append(consumer)
# Connect all consumers
logger.info("Connecting consumers...")
for consumer in consumers:
await consumer.connect()
await asyncio.sleep(0.1) # Small delay between connections
# Send initial state
logger.info("[Producer] Sending initial state...")
initial_state = {"base": 0.0, "shoulder": 0.0, "elbow": 0.0, "wrist": 0.0}
await producer.send_state_sync(initial_state)
await asyncio.sleep(1)
# Start robot movement simulation
movement_task = asyncio.create_task(simulate_robot_movement(producer))
# Let it run for a bit
await asyncio.sleep(5)
# Demonstrate emergency stop
logger.info("🚨 [Producer] Sending emergency stop!")
await producer.send_emergency_stop(
"Demo emergency stop - testing safety systems"
)
# Wait for movement to complete
await movement_task
# Final state check
logger.info("=== Final Demo Summary ===")
for consumer in consumers:
logger.info(
f"[{consumer.name}] Received {consumer.update_count} updates, {consumer.state_count} states"
)
logger.info("Demo completed successfully!")
except Exception as e:
logger.error(f"Demo error: {e}")
finally:
# Cleanup
logger.info("Cleaning up...")
# Disconnect all consumers
for consumer in consumers:
await consumer.disconnect()
# Disconnect producer
if producer.is_connected():
await producer.disconnect()
logger.info("Demo cleanup completed")
if __name__ == "__main__":
asyncio.run(main())