# LeRobot Arena Python Client Python client library for the LeRobot Arena robotics API with separate Producer and Consumer classes. ## Installation ```bash pip install -e . ``` Or with development dependencies: ```bash pip install -e ".[dev]" ``` ## Basic Usage ### Producer (Controller) Example ```python import asyncio from lerobot_arena_client import RoboticsProducer async def main(): # Create producer client producer = RoboticsProducer('http://localhost:8000') # List available rooms rooms = await producer.list_rooms() print('Available rooms:', rooms) # Create new room and connect room_id = await producer.create_room() await producer.connect(room_id) # Send initial state await producer.send_state_sync({ 'shoulder': 45.0, 'elbow': -20.0 }) # Send joint updates (only changed values will be forwarded!) await producer.send_joint_update([ {'name': 'shoulder', 'value': 45.0}, {'name': 'elbow', 'value': -20.0} ]) # Handle errors producer.on_error(lambda err: print(f'Error: {err}')) # Disconnect await producer.disconnect() if __name__ == "__main__": asyncio.run(main()) ``` ### Consumer (Robot Executor) Example ```python import asyncio from lerobot_arena_client import RoboticsConsumer async def main(): consumer = RoboticsConsumer('http://localhost:8000') # Connect to existing room room_id = "your-room-id" await consumer.connect(room_id) # Get initial state initial_state = await consumer.get_state_sync() print('Initial state:', initial_state) # Set up event handlers def on_state_sync(state): print('State sync:', state) def on_joint_update(joints): print('Execute joints:', joints) # Execute on actual robot hardware for joint in joints: print(f"Moving {joint['name']} to {joint['value']}") def on_error(error): print(f'Error: {error}') # Register callbacks consumer.on_state_sync(on_state_sync) consumer.on_joint_update(on_joint_update) consumer.on_error(on_error) # Keep running try: await asyncio.sleep(60) # Run for 60 seconds finally: await consumer.disconnect() if __name__ == "__main__": asyncio.run(main()) ``` ### Factory Function Usage ```python import asyncio from lerobot_arena_client import create_client async def main(): # Create clients using factory function producer = create_client("producer", "http://localhost:8000") consumer = create_client("consumer", "http://localhost:8000") # Or use convenience functions from lerobot_arena_client import create_producer_client, create_consumer_client # Quick producer setup (auto-creates room and connects) producer = await create_producer_client('http://localhost:8000') print(f"Producer connected to room: {producer.room_id}") # Quick consumer setup (connects to existing room) consumer = await create_consumer_client(producer.room_id, 'http://localhost:8000') # Use context managers for automatic cleanup async with RoboticsProducer('http://localhost:8000') as producer: room_id = await producer.create_room() await producer.connect(room_id) await producer.send_state_sync({'joint1': 10.0}) if __name__ == "__main__": asyncio.run(main()) ``` ### Advanced Example: Producer-Consumer Pair ```python import asyncio from lerobot_arena_client import RoboticsProducer, RoboticsConsumer async def run_producer(room_id: str): async with RoboticsProducer() as producer: await producer.connect(room_id) # Simulate sending commands for i in range(10): await producer.send_state_sync({ 'joint1': i * 10.0, 'joint2': i * -5.0 }) await asyncio.sleep(1) async def run_consumer(room_id: str): async with RoboticsConsumer() as consumer: await consumer.connect(room_id) def handle_joint_update(joints): print(f"🤖 Executing: {joints}") # Your robot control code here consumer.on_joint_update(handle_joint_update) # Keep listening await asyncio.sleep(15) async def main(): # Create room producer = RoboticsProducer() room_id = await producer.create_room() # Run producer and consumer concurrently await asyncio.gather( run_producer(room_id), run_consumer(room_id) ) if __name__ == "__main__": asyncio.run(main()) ``` ## API Reference ### RoboticsProducer **Connection Methods:** - `connect(room_id, participant_id=None)` - Connect as producer to room **Control Methods:** - `send_joint_update(joints)` - Send joint updates - `send_state_sync(state)` - Send state synchronization (dict format) - `send_emergency_stop(reason)` - Send emergency stop **Event Callbacks:** - `on_error(callback)` - Set error callback - `on_connected(callback)` - Set connection callback - `on_disconnected(callback)` - Set disconnection callback ### RoboticsConsumer **Connection Methods:** - `connect(room_id, participant_id=None)` - Connect as consumer to room **State Methods:** - `get_state_sync()` - Get current state synchronously **Event Callbacks:** - `on_state_sync(callback)` - Set state sync callback - `on_joint_update(callback)` - Set joint update callback - `on_error(callback)` - Set error callback - `on_connected(callback)` - Set connection callback - `on_disconnected(callback)` - Set disconnection callback ### RoboticsClientCore (Base Class) **REST API Methods:** - `list_rooms()` - List all available rooms - `create_room(room_id=None)` - Create a new room - `delete_room(room_id)` - Delete a room - `get_room_state(room_id)` - Get current room state - `get_room_info(room_id)` - Get basic room information **Utility Methods:** - `send_heartbeat()` - Send heartbeat to server - `is_connected()` - Check connection status - `get_connection_info()` - Get connection details - `disconnect()` - Disconnect from room ### Factory Functions - `create_client(role, base_url)` - Create client by role ("producer" or "consumer") - `create_producer_client(base_url, room_id=None)` - Create connected producer - `create_consumer_client(room_id, base_url)` - Create connected consumer ## Requirements - Python 3.12+ - aiohttp>=3.9.0 - websockets>=12.0 ## Migration from v1 The old `RoboticsClient` is still available for backward compatibility but is now an alias to `RoboticsClientCore`. For new code, use the specific `RoboticsProducer` or `RoboticsConsumer` classes for better type safety and cleaner APIs.