Spaces:
Running
Running
import asyncio | |
import logging | |
import uuid | |
from datetime import UTC, datetime | |
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect | |
from fastapi.middleware.cors import CORSMiddleware | |
from .connection_manager import ConnectionManager | |
from .models import ( | |
CreateRobotRequest, | |
Robot, | |
RobotStatus, | |
) | |
from .robot_manager import RobotManager | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
app = FastAPI( | |
title="LeRobot Arena Server", | |
description="WebSocket-based robot control server for master-slave architecture", | |
version="1.0.0", | |
) | |
# CORS middleware for web frontend | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=[ | |
"http://localhost:5173", | |
"http://localhost:5174", | |
"http://localhost:3000", | |
], # Add your frontend URLs | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Global managers | |
connection_manager = ConnectionManager() | |
robot_manager = RobotManager() | |
async def startup_event(): | |
logger.info("🤖 LeRobot Arena Server starting up...") | |
# Create some demo robots for testing | |
await robot_manager.create_robot("demo-arm-1", "so-arm100", "Demo Arm Robot 1") | |
await robot_manager.create_robot("demo-arm-2", "so-arm100", "Demo Arm Robot 2") | |
logger.info("✅ Server ready for robot connections!") | |
async def root(): | |
return { | |
"message": "LeRobot Arena Server", | |
"version": "1.0.0", | |
"robots_connected": len(robot_manager.robots), | |
"active_connections": connection_manager.get_connection_count(), | |
} | |
# ============= ROBOT MANAGEMENT API ============= | |
async def list_robots(): | |
"""Get list of all available robots""" | |
robots = list(robot_manager.robots.values()) | |
print("🔍 DEBUG: /api/robots called") | |
print(f"🔍 DEBUG: Found {len(robots)} robots") | |
for robot in robots: | |
print( | |
f"🔍 DEBUG: Robot - ID: {robot.id}, Name: {robot.name}, Type: {robot.robot_type}" | |
) | |
return robots | |
async def create_robot(request: CreateRobotRequest): | |
"""Create a new robot""" | |
robot_id = f"robot-{uuid.uuid4().hex[:8]}" | |
return await robot_manager.create_robot( | |
robot_id, request.robot_type, request.name or f"Robot {robot_id}" | |
) | |
async def get_robot(robot_id: str): | |
"""Get robot details""" | |
robot = robot_manager.get_robot(robot_id) | |
if not robot: | |
raise HTTPException(status_code=404, detail="Robot not found") | |
return robot | |
async def get_robot_status(robot_id: str): | |
"""Get robot connection status""" | |
status = robot_manager.get_robot_status(robot_id) | |
if not status: | |
raise HTTPException(status_code=404, detail="Robot not found") | |
return status | |
async def delete_robot(robot_id: str): | |
"""Delete a robot""" | |
if not robot_manager.get_robot(robot_id): | |
raise HTTPException(status_code=404, detail="Robot not found") | |
await robot_manager.delete_robot(robot_id) | |
return {"message": f"Robot {robot_id} deleted"} | |
# ============= DEMO SEQUENCE API ============= | |
async def list_demo_sequences(): | |
"""Get list of available demo sequences""" | |
from .models import DEMO_SEQUENCES | |
return [ | |
{ | |
"id": seq.id, | |
"name": seq.name, | |
"total_duration": seq.total_duration, | |
"command_count": len(seq.commands), | |
} | |
for seq in DEMO_SEQUENCES | |
] | |
async def play_demo_sequence(robot_id: str, sequence_id: str): | |
"""Play a demo sequence on a robot""" | |
# Check if robot exists | |
robot = robot_manager.get_robot(robot_id) | |
if not robot: | |
raise HTTPException(status_code=404, detail="Robot not found") | |
# Find the sequence | |
from .models import DEMO_SEQUENCES | |
sequence = next((seq for seq in DEMO_SEQUENCES if seq.id == sequence_id), None) | |
if not sequence: | |
raise HTTPException(status_code=404, detail="Sequence not found") | |
# Get available connections | |
slave_connections = connection_manager.get_slave_connections(robot_id) | |
master_connection = connection_manager.get_master_connection(robot_id) | |
if not slave_connections and not master_connection: | |
raise HTTPException( | |
status_code=400, | |
detail="No connections available. Connect a master (for 3D visualization) or slave (for robot execution) to play sequences.", | |
) | |
# Send sequence to slaves and/or master | |
notifications_sent = 0 | |
try: | |
# Send to slaves if available (physical robots) | |
if slave_connections: | |
await broadcast_to_slaves( | |
robot_id, | |
{ | |
"type": "execute_sequence", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"data": sequence.model_dump(mode="json"), | |
}, | |
) | |
notifications_sent += len(slave_connections) | |
logger.info( | |
f"🤖 Sent sequence '{sequence.name}' to {len(slave_connections)} slaves" | |
) | |
# Send to master if available (3D visualization) | |
if master_connection: | |
await broadcast_to_master( | |
robot_id, | |
{ | |
"type": "play_sequence", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"data": sequence.model_dump(mode="json"), | |
}, | |
) | |
notifications_sent += 1 | |
logger.info( | |
f"🎬 Sent sequence '{sequence.name}' to master for visualization" | |
) | |
logger.info( | |
f"🎬 Playing sequence '{sequence.name}' on robot {robot_id} ({notifications_sent} connections)" | |
) | |
return { | |
"message": f"Sequence '{sequence.name}' started on robot {robot_id}", | |
"sequence": { | |
"id": sequence.id, | |
"name": sequence.name, | |
"duration": sequence.total_duration, | |
}, | |
"slaves_notified": len(slave_connections), | |
"master_notified": 1 if master_connection else 0, | |
"total_notifications": notifications_sent, | |
} | |
except Exception as e: | |
logger.error(f"Failed to play sequence {sequence_id} on robot {robot_id}: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to play sequence: {e}") | |
async def stop_sequence(robot_id: str): | |
"""Stop any running sequence on a robot""" | |
# Check if robot exists | |
robot = robot_manager.get_robot(robot_id) | |
if not robot: | |
raise HTTPException(status_code=404, detail="Robot not found") | |
# Check if robot has slaves | |
slave_connections = connection_manager.get_slave_connections(robot_id) | |
if not slave_connections: | |
raise HTTPException(status_code=400, detail="No slave connections available") | |
try: | |
# Send stop command to all slaves | |
await broadcast_to_slaves( | |
robot_id, | |
{ | |
"type": "stop_sequence", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"data": {}, | |
}, | |
) | |
logger.info(f"⏹️ Stopped sequences on robot {robot_id}") | |
return { | |
"message": f"Sequences stopped on robot {robot_id}", | |
"slaves_notified": len(slave_connections), | |
} | |
except Exception as e: | |
logger.error(f"Failed to stop sequences on robot {robot_id}: {e}") | |
raise HTTPException(status_code=500, detail=f"Failed to stop sequences: {e}") | |
# ============= WEBSOCKET ENDPOINTS ============= | |
async def websocket_master_endpoint(websocket: WebSocket, robot_id: str): | |
"""WebSocket endpoint for master connections (command sources)""" | |
await websocket.accept() | |
robot = robot_manager.get_robot(robot_id) | |
if not robot: | |
# Auto-create robot if it doesn't exist | |
logger.info(f"🤖 Auto-creating robot {robot_id} for master connection") | |
try: | |
robot = await robot_manager.create_robot( | |
robot_id, "so-arm100", f"Auto-created Robot {robot_id}" | |
) | |
except Exception as e: | |
logger.error(f"Failed to auto-create robot {robot_id}: {e}") | |
await websocket.close(code=4003, reason="Failed to create robot") | |
return | |
connection_id = f"master-{uuid.uuid4().hex[:8]}" | |
logger.info(f"🎮 Master connected: {connection_id} for robot {robot_id}") | |
try: | |
# Register master connection | |
await connection_manager.connect_master(connection_id, robot_id, websocket) | |
await robot_manager.set_master_connected(robot_id, connection_id) | |
# Send initial robot state | |
await websocket.send_json({ | |
"type": "robot_state", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"data": robot.model_dump(mode="json"), | |
}) | |
# Handle incoming messages | |
async for message in websocket.iter_json(): | |
await handle_master_message(connection_id, robot_id, message) | |
except WebSocketDisconnect: | |
logger.info(f"🔌 Master disconnected: {connection_id}") | |
except Exception as e: | |
logger.error(f"❌ Master connection error: {e}") | |
finally: | |
await connection_manager.disconnect_master(connection_id) | |
await robot_manager.set_master_disconnected(robot_id) | |
async def websocket_slave_endpoint(websocket: WebSocket, robot_id: str): | |
"""WebSocket endpoint for slave connections (execution targets)""" | |
print(f"🔍 DEBUG: Slave WebSocket connection attempt for robot {robot_id}") | |
await websocket.accept() | |
robot = robot_manager.get_robot(robot_id) | |
if not robot: | |
# Auto-create robot if it doesn't exist | |
print(f"🔍 DEBUG: Robot {robot_id} not found, auto-creating...") | |
logger.info(f"🤖 Auto-creating robot {robot_id} for slave connection") | |
try: | |
robot = await robot_manager.create_robot( | |
robot_id, "so-arm100", f"Auto-created Robot {robot_id}" | |
) | |
print(f"🔍 DEBUG: Successfully auto-created robot {robot_id}") | |
except Exception as e: | |
print(f"🔍 DEBUG: Failed to auto-create robot {robot_id}: {e}") | |
logger.error(f"Failed to auto-create robot {robot_id}: {e}") | |
await websocket.close(code=4003, reason="Failed to create robot") | |
return | |
else: | |
print(f"🔍 DEBUG: Robot {robot_id} found, proceeding with connection") | |
connection_id = f"slave-{uuid.uuid4().hex[:8]}" | |
print(f"🔍 DEBUG: Generated slave connection ID: {connection_id}") | |
logger.info(f"🤖 Slave connected: {connection_id} for robot {robot_id}") | |
try: | |
# Register slave connection | |
await connection_manager.connect_slave(connection_id, robot_id, websocket) | |
await robot_manager.add_slave_connection(robot_id, connection_id) | |
print(f"🔍 DEBUG: Slave {connection_id} registered successfully") | |
# Send initial commands if any | |
await sync_slave_with_current_state(robot_id, websocket) | |
print(f"🔍 DEBUG: Initial state sync sent to slave {connection_id}") | |
# Handle incoming messages (mainly status updates) | |
async for message in websocket.iter_json(): | |
print(f"🔍 DEBUG: Received message from slave {connection_id}: {message}") | |
await handle_slave_message(connection_id, robot_id, message) | |
except WebSocketDisconnect: | |
print(f"🔍 DEBUG: Slave {connection_id} disconnected normally") | |
logger.info(f"🔌 Slave disconnected: {connection_id}") | |
except Exception as e: | |
print(f"🔍 DEBUG: Slave {connection_id} connection error: {e}") | |
logger.error(f"❌ Slave connection error: {e}") | |
finally: | |
print(f"🔍 DEBUG: Cleaning up slave {connection_id}") | |
await connection_manager.disconnect_slave(connection_id) | |
await robot_manager.remove_slave_connection(robot_id, connection_id) | |
# ============= MESSAGE HANDLERS ============= | |
async def handle_master_message(connection_id: str, robot_id: str, message: dict): | |
"""Handle incoming messages from master connections""" | |
print(f"🔍 DEBUG: Received message from master {connection_id}: {message}") | |
try: | |
msg_type = message.get("type") | |
print(f"🔍 DEBUG: Message type: {msg_type}") | |
if msg_type == "command": | |
print("🔍 DEBUG: Processing command message") | |
# Forward command to all slaves | |
await broadcast_to_slaves( | |
robot_id, | |
{ | |
"type": "execute_command", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"data": message.get("data"), | |
}, | |
) | |
elif msg_type == "sequence": | |
print("🔍 DEBUG: Processing sequence message") | |
# Forward sequence to all slaves | |
await broadcast_to_slaves( | |
robot_id, | |
{ | |
"type": "execute_sequence", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"data": message.get("data"), | |
}, | |
) | |
elif msg_type == "start_control": | |
print("🔍 DEBUG: Processing start_control message") | |
# Handle start control message (acknowledge it) | |
master_ws = connection_manager.get_master_connection(robot_id) | |
if master_ws: | |
await master_ws.send_json({ | |
"type": "control_started", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"data": {"robot_id": robot_id}, | |
}) | |
print("🔍 DEBUG: Sent control_started acknowledgment") | |
elif msg_type == "stop_control": | |
print("🔍 DEBUG: Processing stop_control message") | |
# Handle stop control message | |
elif msg_type == "pause_control": | |
print("🔍 DEBUG: Processing pause_control message") | |
# Handle pause control message | |
elif msg_type == "resume_control": | |
print("🔍 DEBUG: Processing resume_control message") | |
# Handle resume control message | |
elif msg_type == "heartbeat": | |
print("🔍 DEBUG: Processing heartbeat message") | |
# Respond to heartbeat | |
master_ws = connection_manager.get_master_connection(robot_id) | |
if master_ws: | |
await master_ws.send_json({ | |
"type": "heartbeat_ack", | |
"timestamp": datetime.now(UTC).isoformat(), | |
}) | |
print("🔍 DEBUG: Sent heartbeat_ack") | |
else: | |
print(f"🔍 DEBUG: Unknown message type: {msg_type}") | |
logger.warning( | |
f"Unknown message type from master {connection_id}: {msg_type}" | |
) | |
except Exception as e: | |
print(f"🔍 DEBUG: Error handling master message: {e}") | |
logger.error(f"Error handling master message: {e}") | |
async def handle_slave_message(connection_id: str, robot_id: str, message: dict): | |
"""Handle incoming messages from slave connections""" | |
print(f"🔍 DEBUG: Processing slave message from {connection_id}: {message}") | |
try: | |
msg_type = message.get("type") | |
print(f"🔍 DEBUG: Slave message type: {msg_type}") | |
if msg_type == "status_update": | |
print("🔍 DEBUG: Processing slave status_update") | |
# Forward status to master | |
await broadcast_to_master( | |
robot_id, | |
{ | |
"type": "slave_status", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"slave_id": connection_id, | |
"data": message.get("data"), | |
}, | |
) | |
elif msg_type == "joint_states": | |
print("🔍 DEBUG: Processing slave joint_states") | |
# Forward joint states to master | |
await broadcast_to_master( | |
robot_id, | |
{ | |
"type": "joint_states", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"slave_id": connection_id, | |
"data": message.get("data"), | |
}, | |
) | |
elif msg_type == "error": | |
print("🔍 DEBUG: Processing slave error") | |
# Forward error to master | |
await broadcast_to_master( | |
robot_id, | |
{ | |
"type": "slave_error", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"slave_id": connection_id, | |
"data": message.get("data"), | |
}, | |
) | |
else: | |
print(f"🔍 DEBUG: Unknown slave message type: {msg_type}") | |
logger.warning( | |
f"Unknown message type from slave {connection_id}: {msg_type}" | |
) | |
except Exception as e: | |
print(f"🔍 DEBUG: Error handling slave message: {e}") | |
logger.error(f"Error handling slave message: {e}") | |
# ============= UTILITY FUNCTIONS ============= | |
async def broadcast_to_slaves(robot_id: str, message: dict): | |
"""Broadcast message to all slaves of a robot""" | |
slave_connections = connection_manager.get_slave_connections(robot_id) | |
print(f"🔍 DEBUG: Broadcasting to slaves for robot {robot_id}") | |
print(f"🔍 DEBUG: Found {len(slave_connections)} slave connections") | |
print(f"🔍 DEBUG: Message to broadcast: {message}") | |
if slave_connections: | |
logger.info( | |
f"📡 Broadcasting to {len(slave_connections)} slaves for robot {robot_id}" | |
) | |
results = await asyncio.gather( | |
*[ws.send_json(message) for ws in slave_connections], return_exceptions=True | |
) | |
print(f"🔍 DEBUG: Broadcast results: {results}") | |
async def broadcast_to_master(robot_id: str, message: dict): | |
"""Send message to master of a robot""" | |
master_ws = connection_manager.get_master_connection(robot_id) | |
print(f"🔍 DEBUG: Broadcasting to master for robot {robot_id}") | |
print(f"🔍 DEBUG: Master connection found: {master_ws is not None}") | |
print(f"🔍 DEBUG: Message to send: {message}") | |
if master_ws: | |
await master_ws.send_json(message) | |
print("🔍 DEBUG: Message sent to master successfully") | |
async def sync_slave_with_current_state(robot_id: str, websocket: WebSocket): | |
"""Send current robot state to newly connected slave""" | |
robot = robot_manager.get_robot(robot_id) | |
if robot: | |
await websocket.send_json({ | |
"type": "sync_state", | |
"timestamp": datetime.now(UTC).isoformat(), | |
"data": robot.model_dump(mode="json"), | |
}) | |
if __name__ == "__main__": | |
import uvicorn | |
"""Start the LeRobot Arena server""" | |
logging.basicConfig( | |
level=logging.INFO, | |
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
) | |
logger = logging.getLogger("lerobot-arena") | |
logger.info("🚀 Starting LeRobot Arena WebSocket Server...") | |
# Start the server | |
uvicorn.run( | |
app, | |
host="0.0.0.0", | |
port=8080, | |
log_level="info", | |
reload=False, # Auto-reload on code changes | |
) | |