Spaces:
Running
Running
# LeRobot Arena Python Client | |
Python client library for the LeRobot Arena robotics API with separate Producer and Consumer classes. | |
## Installation | |
```bash | |
pip install -e . | |
``` | |
Or with development dependencies: | |
```bash | |
pip install -e ".[dev]" | |
``` | |
## Basic Usage | |
### Producer (Controller) Example | |
```python | |
import asyncio | |
from lerobot_arena_client import RoboticsProducer | |
async def main(): | |
# Create producer client | |
producer = RoboticsProducer('http://localhost:8000') | |
# List available rooms | |
rooms = await producer.list_rooms() | |
print('Available rooms:', rooms) | |
# Create new room and connect | |
room_id = await producer.create_room() | |
await producer.connect(room_id) | |
# Send initial state | |
await producer.send_state_sync({ | |
'shoulder': 45.0, | |
'elbow': -20.0 | |
}) | |
# Send joint updates (only changed values will be forwarded!) | |
await producer.send_joint_update([ | |
{'name': 'shoulder', 'value': 45.0}, | |
{'name': 'elbow', 'value': -20.0} | |
]) | |
# Handle errors | |
producer.on_error(lambda err: print(f'Error: {err}')) | |
# Disconnect | |
await producer.disconnect() | |
if __name__ == "__main__": | |
asyncio.run(main()) | |
``` | |
### Consumer (Robot Executor) Example | |
```python | |
import asyncio | |
from lerobot_arena_client import RoboticsConsumer | |
async def main(): | |
consumer = RoboticsConsumer('http://localhost:8000') | |
# Connect to existing room | |
room_id = "your-room-id" | |
await consumer.connect(room_id) | |
# Get initial state | |
initial_state = await consumer.get_state_sync() | |
print('Initial state:', initial_state) | |
# Set up event handlers | |
def on_state_sync(state): | |
print('State sync:', state) | |
def on_joint_update(joints): | |
print('Execute joints:', joints) | |
# Execute on actual robot hardware | |
for joint in joints: | |
print(f"Moving {joint['name']} to {joint['value']}") | |
def on_error(error): | |
print(f'Error: {error}') | |
# Register callbacks | |
consumer.on_state_sync(on_state_sync) | |
consumer.on_joint_update(on_joint_update) | |
consumer.on_error(on_error) | |
# Keep running | |
try: | |
await asyncio.sleep(60) # Run for 60 seconds | |
finally: | |
await consumer.disconnect() | |
if __name__ == "__main__": | |
asyncio.run(main()) | |
``` | |
### Factory Function Usage | |
```python | |
import asyncio | |
from lerobot_arena_client import create_client | |
async def main(): | |
# Create clients using factory function | |
producer = create_client("producer", "http://localhost:8000") | |
consumer = create_client("consumer", "http://localhost:8000") | |
# Or use convenience functions | |
from lerobot_arena_client import create_producer_client, create_consumer_client | |
# Quick producer setup (auto-creates room and connects) | |
producer = await create_producer_client('http://localhost:8000') | |
print(f"Producer connected to room: {producer.room_id}") | |
# Quick consumer setup (connects to existing room) | |
consumer = await create_consumer_client(producer.room_id, 'http://localhost:8000') | |
# Use context managers for automatic cleanup | |
async with RoboticsProducer('http://localhost:8000') as producer: | |
room_id = await producer.create_room() | |
await producer.connect(room_id) | |
await producer.send_state_sync({'joint1': 10.0}) | |
if __name__ == "__main__": | |
asyncio.run(main()) | |
``` | |
### Advanced Example: Producer-Consumer Pair | |
```python | |
import asyncio | |
from lerobot_arena_client import RoboticsProducer, RoboticsConsumer | |
async def run_producer(room_id: str): | |
async with RoboticsProducer() as producer: | |
await producer.connect(room_id) | |
# Simulate sending commands | |
for i in range(10): | |
await producer.send_state_sync({ | |
'joint1': i * 10.0, | |
'joint2': i * -5.0 | |
}) | |
await asyncio.sleep(1) | |
async def run_consumer(room_id: str): | |
async with RoboticsConsumer() as consumer: | |
await consumer.connect(room_id) | |
def handle_joint_update(joints): | |
print(f"🤖 Executing: {joints}") | |
# Your robot control code here | |
consumer.on_joint_update(handle_joint_update) | |
# Keep listening | |
await asyncio.sleep(15) | |
async def main(): | |
# Create room | |
producer = RoboticsProducer() | |
room_id = await producer.create_room() | |
# Run producer and consumer concurrently | |
await asyncio.gather( | |
run_producer(room_id), | |
run_consumer(room_id) | |
) | |
if __name__ == "__main__": | |
asyncio.run(main()) | |
``` | |
## API Reference | |
### RoboticsProducer | |
**Connection Methods:** | |
- `connect(room_id, participant_id=None)` - Connect as producer to room | |
**Control Methods:** | |
- `send_joint_update(joints)` - Send joint updates | |
- `send_state_sync(state)` - Send state synchronization (dict format) | |
- `send_emergency_stop(reason)` - Send emergency stop | |
**Event Callbacks:** | |
- `on_error(callback)` - Set error callback | |
- `on_connected(callback)` - Set connection callback | |
- `on_disconnected(callback)` - Set disconnection callback | |
### RoboticsConsumer | |
**Connection Methods:** | |
- `connect(room_id, participant_id=None)` - Connect as consumer to room | |
**State Methods:** | |
- `get_state_sync()` - Get current state synchronously | |
**Event Callbacks:** | |
- `on_state_sync(callback)` - Set state sync callback | |
- `on_joint_update(callback)` - Set joint update callback | |
- `on_error(callback)` - Set error callback | |
- `on_connected(callback)` - Set connection callback | |
- `on_disconnected(callback)` - Set disconnection callback | |
### RoboticsClientCore (Base Class) | |
**REST API Methods:** | |
- `list_rooms()` - List all available rooms | |
- `create_room(room_id=None)` - Create a new room | |
- `delete_room(room_id)` - Delete a room | |
- `get_room_state(room_id)` - Get current room state | |
- `get_room_info(room_id)` - Get basic room information | |
**Utility Methods:** | |
- `send_heartbeat()` - Send heartbeat to server | |
- `is_connected()` - Check connection status | |
- `get_connection_info()` - Get connection details | |
- `disconnect()` - Disconnect from room | |
### Factory Functions | |
- `create_client(role, base_url)` - Create client by role ("producer" or "consumer") | |
- `create_producer_client(base_url, room_id=None)` - Create connected producer | |
- `create_consumer_client(room_id, base_url)` - Create connected consumer | |
## Requirements | |
- Python 3.12+ | |
- aiohttp>=3.9.0 | |
- websockets>=12.0 | |
## Migration from v1 | |
The old `RoboticsClient` is still available for backward compatibility but is now an alias to `RoboticsClientCore`. For new code, use the specific `RoboticsProducer` or `RoboticsConsumer` classes for better type safety and cleaner APIs. | |