Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Producer-Consumer Demo - LeRobot Arena | |
This example demonstrates: | |
- Producer and multiple consumers working together | |
- Real-time joint updates | |
- Emergency stop functionality | |
- State synchronization | |
- Connection management | |
""" | |
import asyncio | |
import logging | |
import random | |
from lerobot_arena_client import RoboticsConsumer, RoboticsProducer | |
# Setup logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class DemoConsumer: | |
"""Demo consumer that logs all received messages.""" | |
def __init__(self, name: str, room_id: str): | |
self.name = name | |
self.room_id = room_id | |
self.consumer = RoboticsConsumer("http://localhost:8000") | |
self.update_count = 0 | |
self.state_count = 0 | |
async def setup(self): | |
"""Setup consumer with callbacks.""" | |
def on_joint_update(joints): | |
self.update_count += 1 | |
logger.info( | |
f"[{self.name}] Joint update #{self.update_count}: {len(joints)} joints" | |
) | |
def on_state_sync(state): | |
self.state_count += 1 | |
logger.info( | |
f"[{self.name}] State sync #{self.state_count}: {len(state)} joints" | |
) | |
def on_error(error_msg): | |
logger.error(f"[{self.name}] ERROR: {error_msg}") | |
def on_connected(): | |
logger.info(f"[{self.name}] Connected!") | |
def on_disconnected(): | |
logger.info(f"[{self.name}] Disconnected!") | |
self.consumer.on_joint_update(on_joint_update) | |
self.consumer.on_state_sync(on_state_sync) | |
self.consumer.on_error(on_error) | |
self.consumer.on_connected(on_connected) | |
self.consumer.on_disconnected(on_disconnected) | |
async def connect(self): | |
"""Connect to room.""" | |
success = await self.consumer.connect(self.room_id, f"demo-{self.name}") | |
if success: | |
logger.info(f"[{self.name}] Successfully connected to room {self.room_id}") | |
else: | |
logger.error(f"[{self.name}] Failed to connect to room {self.room_id}") | |
return success | |
async def disconnect(self): | |
"""Disconnect from room.""" | |
if self.consumer.is_connected(): | |
await self.consumer.disconnect() | |
logger.info( | |
f"[{self.name}] Final stats: {self.update_count} updates, {self.state_count} states" | |
) | |
async def simulate_robot_movement(producer: RoboticsProducer): | |
"""Simulate realistic robot movement.""" | |
# Define some realistic joint ranges for a robotic arm | |
joints = { | |
"base": {"current": 0.0, "target": 0.0, "min": -180, "max": 180}, | |
"shoulder": {"current": 0.0, "target": 0.0, "min": -90, "max": 90}, | |
"elbow": {"current": 0.0, "target": 0.0, "min": -135, "max": 135}, | |
"wrist": {"current": 0.0, "target": 0.0, "min": -180, "max": 180}, | |
} | |
logger.info("[Producer] Starting robot movement simulation...") | |
for step in range(20): # 20 movement steps | |
# Occasionally set new random targets | |
if step % 5 == 0: | |
for joint_name, joint_data in joints.items(): | |
joint_data["target"] = random.uniform( | |
joint_data["min"], joint_data["max"] | |
) | |
logger.info(f"[Producer] Step {step + 1}: New targets set") | |
# Move each joint towards its target | |
joint_updates = [] | |
for joint_name, joint_data in joints.items(): | |
current = joint_data["current"] | |
target = joint_data["target"] | |
# Simple movement: move 10% towards target each step | |
diff = target - current | |
move = diff * 0.1 | |
new_value = current + move | |
joint_data["current"] = new_value | |
joint_updates.append({"name": joint_name, "value": new_value}) | |
# Send the joint updates | |
await producer.send_joint_update(joint_updates) | |
# Add some delay for realistic movement | |
await asyncio.sleep(0.5) | |
logger.info("[Producer] Movement simulation completed") | |
async def main(): | |
"""Main demo function.""" | |
logger.info("=== LeRobot Arena Producer-Consumer Demo ===") | |
# Create producer | |
producer = RoboticsProducer("http://localhost:8000") | |
# Setup producer callbacks | |
def on_producer_error(error_msg): | |
logger.error(f"[Producer] ERROR: {error_msg}") | |
def on_producer_connected(): | |
logger.info("[Producer] Connected!") | |
def on_producer_disconnected(): | |
logger.info("[Producer] Disconnected!") | |
producer.on_error(on_producer_error) | |
producer.on_connected(on_producer_connected) | |
producer.on_disconnected(on_producer_disconnected) | |
try: | |
# Create room and connect producer | |
room_id = await producer.create_room() | |
logger.info(f"Created room: {room_id}") | |
success = await producer.connect(room_id, "robot-controller") | |
if not success: | |
logger.error("Failed to connect producer!") | |
return | |
# Create multiple consumers | |
consumers = [] | |
consumer_names = ["visualizer", "logger", "safety-monitor"] | |
for name in consumer_names: | |
consumer = DemoConsumer(name, room_id) | |
await consumer.setup() | |
consumers.append(consumer) | |
# Connect all consumers | |
logger.info("Connecting consumers...") | |
for consumer in consumers: | |
await consumer.connect() | |
await asyncio.sleep(0.1) # Small delay between connections | |
# Send initial state | |
logger.info("[Producer] Sending initial state...") | |
initial_state = {"base": 0.0, "shoulder": 0.0, "elbow": 0.0, "wrist": 0.0} | |
await producer.send_state_sync(initial_state) | |
await asyncio.sleep(1) | |
# Start robot movement simulation | |
movement_task = asyncio.create_task(simulate_robot_movement(producer)) | |
# Let it run for a bit | |
await asyncio.sleep(5) | |
# Demonstrate emergency stop | |
logger.info("🚨 [Producer] Sending emergency stop!") | |
await producer.send_emergency_stop( | |
"Demo emergency stop - testing safety systems" | |
) | |
# Wait for movement to complete | |
await movement_task | |
# Final state check | |
logger.info("=== Final Demo Summary ===") | |
for consumer in consumers: | |
logger.info( | |
f"[{consumer.name}] Received {consumer.update_count} updates, {consumer.state_count} states" | |
) | |
logger.info("Demo completed successfully!") | |
except Exception as e: | |
logger.error(f"Demo error: {e}") | |
finally: | |
# Cleanup | |
logger.info("Cleaning up...") | |
# Disconnect all consumers | |
for consumer in consumers: | |
await consumer.disconnect() | |
# Disconnect producer | |
if producer.is_connected(): | |
await producer.disconnect() | |
logger.info("Demo cleanup completed") | |
if __name__ == "__main__": | |
asyncio.run(main()) | |