blanchon's picture
Initial commit
02eac4b

LeRobot Arena Python Client

Python client library for the LeRobot Arena robotics API with separate Producer and Consumer classes.

Installation

pip install -e .

Or with development dependencies:

pip install -e ".[dev]"

Basic Usage

Producer (Controller) Example

import asyncio
from lerobot_arena_client import RoboticsProducer

async def main():
    # Create producer client
    producer = RoboticsProducer('http://localhost:8000')
    
    # List available rooms
    rooms = await producer.list_rooms()
    print('Available rooms:', rooms)
    
    # Create new room and connect
    room_id = await producer.create_room()
    await producer.connect(room_id)
    
    # Send initial state
    await producer.send_state_sync({
        'shoulder': 45.0,
        'elbow': -20.0
    })
    
    # Send joint updates (only changed values will be forwarded!)
    await producer.send_joint_update([
        {'name': 'shoulder', 'value': 45.0},
        {'name': 'elbow', 'value': -20.0}
    ])
    
    # Handle errors
    producer.on_error(lambda err: print(f'Error: {err}'))
    
    # Disconnect
    await producer.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Consumer (Robot Executor) Example

import asyncio
from lerobot_arena_client import RoboticsConsumer

async def main():
    consumer = RoboticsConsumer('http://localhost:8000')
    
    # Connect to existing room
    room_id = "your-room-id"
    await consumer.connect(room_id)
    
    # Get initial state
    initial_state = await consumer.get_state_sync()
    print('Initial state:', initial_state)
    
    # Set up event handlers
    def on_state_sync(state):
        print('State sync:', state)
    
    def on_joint_update(joints):
        print('Execute joints:', joints)
        # Execute on actual robot hardware
        for joint in joints:
            print(f"Moving {joint['name']} to {joint['value']}")
    
    def on_error(error):
        print(f'Error: {error}')
    
    # Register callbacks
    consumer.on_state_sync(on_state_sync)
    consumer.on_joint_update(on_joint_update)
    consumer.on_error(on_error)
    
    # Keep running
    try:
        await asyncio.sleep(60)  # Run for 60 seconds
    finally:
        await consumer.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

Factory Function Usage

import asyncio
from lerobot_arena_client import create_client

async def main():
    # Create clients using factory function
    producer = create_client("producer", "http://localhost:8000")
    consumer = create_client("consumer", "http://localhost:8000")
    
    # Or use convenience functions
    from lerobot_arena_client import create_producer_client, create_consumer_client
    
    # Quick producer setup (auto-creates room and connects)
    producer = await create_producer_client('http://localhost:8000')
    print(f"Producer connected to room: {producer.room_id}")
    
    # Quick consumer setup (connects to existing room)
    consumer = await create_consumer_client(producer.room_id, 'http://localhost:8000')
    
    # Use context managers for automatic cleanup
    async with RoboticsProducer('http://localhost:8000') as producer:
        room_id = await producer.create_room()
        await producer.connect(room_id)
        await producer.send_state_sync({'joint1': 10.0})

if __name__ == "__main__":
    asyncio.run(main())

Advanced Example: Producer-Consumer Pair

import asyncio
from lerobot_arena_client import RoboticsProducer, RoboticsConsumer

async def run_producer(room_id: str):
    async with RoboticsProducer() as producer:
        await producer.connect(room_id)
        
        # Simulate sending commands
        for i in range(10):
            await producer.send_state_sync({
                'joint1': i * 10.0,
                'joint2': i * -5.0
            })
            await asyncio.sleep(1)

async def run_consumer(room_id: str):
    async with RoboticsConsumer() as consumer:
        await consumer.connect(room_id)
        
        def handle_joint_update(joints):
            print(f"🤖 Executing: {joints}")
            # Your robot control code here
        
        consumer.on_joint_update(handle_joint_update)
        
        # Keep listening
        await asyncio.sleep(15)

async def main():
    # Create room
    producer = RoboticsProducer()
    room_id = await producer.create_room()
    
    # Run producer and consumer concurrently
    await asyncio.gather(
        run_producer(room_id),
        run_consumer(room_id)
    )

if __name__ == "__main__":
    asyncio.run(main())

API Reference

RoboticsProducer

Connection Methods:

  • connect(room_id, participant_id=None) - Connect as producer to room

Control Methods:

  • send_joint_update(joints) - Send joint updates
  • send_state_sync(state) - Send state synchronization (dict format)
  • send_emergency_stop(reason) - Send emergency stop

Event Callbacks:

  • on_error(callback) - Set error callback
  • on_connected(callback) - Set connection callback
  • on_disconnected(callback) - Set disconnection callback

RoboticsConsumer

Connection Methods:

  • connect(room_id, participant_id=None) - Connect as consumer to room

State Methods:

  • get_state_sync() - Get current state synchronously

Event Callbacks:

  • on_state_sync(callback) - Set state sync callback
  • on_joint_update(callback) - Set joint update callback
  • on_error(callback) - Set error callback
  • on_connected(callback) - Set connection callback
  • on_disconnected(callback) - Set disconnection callback

RoboticsClientCore (Base Class)

REST API Methods:

  • list_rooms() - List all available rooms
  • create_room(room_id=None) - Create a new room
  • delete_room(room_id) - Delete a room
  • get_room_state(room_id) - Get current room state
  • get_room_info(room_id) - Get basic room information

Utility Methods:

  • send_heartbeat() - Send heartbeat to server
  • is_connected() - Check connection status
  • get_connection_info() - Get connection details
  • disconnect() - Disconnect from room

Factory Functions

  • create_client(role, base_url) - Create client by role ("producer" or "consumer")
  • create_producer_client(base_url, room_id=None) - Create connected producer
  • create_consumer_client(room_id, base_url) - Create connected consumer

Requirements

  • Python 3.12+
  • aiohttp>=3.9.0
  • websockets>=12.0

Migration from v1

The old RoboticsClient is still available for backward compatibility but is now an alias to RoboticsClientCore. For new code, use the specific RoboticsProducer or RoboticsConsumer classes for better type safety and cleaner APIs.