import asyncio import logging import uuid from datetime import UTC, datetime from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from .connection_manager import ConnectionManager from .models import ( CreateRobotRequest, Robot, RobotStatus, ) from .robot_manager import RobotManager # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI( title="LeRobot Arena Server", description="WebSocket-based robot control server for master-slave architecture", version="1.0.0", ) # CORS middleware for web frontend app.add_middleware( CORSMiddleware, allow_origins=[ "http://localhost:5173", "http://localhost:5174", "http://localhost:3000", ], # Add your frontend URLs allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global managers connection_manager = ConnectionManager() robot_manager = RobotManager() @app.on_event("startup") async def startup_event(): logger.info("🤖 LeRobot Arena Server starting up...") # Create some demo robots for testing await robot_manager.create_robot("demo-arm-1", "so-arm100", "Demo Arm Robot 1") await robot_manager.create_robot("demo-arm-2", "so-arm100", "Demo Arm Robot 2") logger.info("✅ Server ready for robot connections!") @app.get("/") async def root(): return { "message": "LeRobot Arena Server", "version": "1.0.0", "robots_connected": len(robot_manager.robots), "active_connections": connection_manager.get_connection_count(), } # ============= ROBOT MANAGEMENT API ============= @app.get("/api/robots", response_model=list[Robot]) async def list_robots(): """Get list of all available robots""" robots = list(robot_manager.robots.values()) print("🔍 DEBUG: /api/robots called") print(f"🔍 DEBUG: Found {len(robots)} robots") for robot in robots: print( f"🔍 DEBUG: Robot - ID: {robot.id}, Name: {robot.name}, Type: {robot.robot_type}" ) return robots @app.post("/api/robots", response_model=Robot) async def create_robot(request: CreateRobotRequest): """Create a new robot""" robot_id = f"robot-{uuid.uuid4().hex[:8]}" return await robot_manager.create_robot( robot_id, request.robot_type, request.name or f"Robot {robot_id}" ) @app.get("/api/robots/{robot_id}", response_model=Robot) async def get_robot(robot_id: str): """Get robot details""" robot = robot_manager.get_robot(robot_id) if not robot: raise HTTPException(status_code=404, detail="Robot not found") return robot @app.get("/api/robots/{robot_id}/status", response_model=RobotStatus) async def get_robot_status(robot_id: str): """Get robot connection status""" status = robot_manager.get_robot_status(robot_id) if not status: raise HTTPException(status_code=404, detail="Robot not found") return status @app.delete("/api/robots/{robot_id}") async def delete_robot(robot_id: str): """Delete a robot""" if not robot_manager.get_robot(robot_id): raise HTTPException(status_code=404, detail="Robot not found") await robot_manager.delete_robot(robot_id) return {"message": f"Robot {robot_id} deleted"} # ============= DEMO SEQUENCE API ============= @app.get("/api/sequences") async def list_demo_sequences(): """Get list of available demo sequences""" from .models import DEMO_SEQUENCES return [ { "id": seq.id, "name": seq.name, "total_duration": seq.total_duration, "command_count": len(seq.commands), } for seq in DEMO_SEQUENCES ] @app.post("/api/robots/{robot_id}/play-sequence/{sequence_id}") async def play_demo_sequence(robot_id: str, sequence_id: str): """Play a demo sequence on a robot""" # Check if robot exists robot = robot_manager.get_robot(robot_id) if not robot: raise HTTPException(status_code=404, detail="Robot not found") # Find the sequence from .models import DEMO_SEQUENCES sequence = next((seq for seq in DEMO_SEQUENCES if seq.id == sequence_id), None) if not sequence: raise HTTPException(status_code=404, detail="Sequence not found") # Get available connections slave_connections = connection_manager.get_slave_connections(robot_id) master_connection = connection_manager.get_master_connection(robot_id) if not slave_connections and not master_connection: raise HTTPException( status_code=400, detail="No connections available. Connect a master (for 3D visualization) or slave (for robot execution) to play sequences.", ) # Send sequence to slaves and/or master notifications_sent = 0 try: # Send to slaves if available (physical robots) if slave_connections: await broadcast_to_slaves( robot_id, { "type": "execute_sequence", "timestamp": datetime.now(UTC).isoformat(), "data": sequence.model_dump(mode="json"), }, ) notifications_sent += len(slave_connections) logger.info( f"🤖 Sent sequence '{sequence.name}' to {len(slave_connections)} slaves" ) # Send to master if available (3D visualization) if master_connection: await broadcast_to_master( robot_id, { "type": "play_sequence", "timestamp": datetime.now(UTC).isoformat(), "data": sequence.model_dump(mode="json"), }, ) notifications_sent += 1 logger.info( f"đŸŽŦ Sent sequence '{sequence.name}' to master for visualization" ) logger.info( f"đŸŽŦ Playing sequence '{sequence.name}' on robot {robot_id} ({notifications_sent} connections)" ) return { "message": f"Sequence '{sequence.name}' started on robot {robot_id}", "sequence": { "id": sequence.id, "name": sequence.name, "duration": sequence.total_duration, }, "slaves_notified": len(slave_connections), "master_notified": 1 if master_connection else 0, "total_notifications": notifications_sent, } except Exception as e: logger.error(f"Failed to play sequence {sequence_id} on robot {robot_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to play sequence: {e}") @app.post("/api/robots/{robot_id}/stop-sequence") async def stop_sequence(robot_id: str): """Stop any running sequence on a robot""" # Check if robot exists robot = robot_manager.get_robot(robot_id) if not robot: raise HTTPException(status_code=404, detail="Robot not found") # Check if robot has slaves slave_connections = connection_manager.get_slave_connections(robot_id) if not slave_connections: raise HTTPException(status_code=400, detail="No slave connections available") try: # Send stop command to all slaves await broadcast_to_slaves( robot_id, { "type": "stop_sequence", "timestamp": datetime.now(UTC).isoformat(), "data": {}, }, ) logger.info(f"âšī¸ Stopped sequences on robot {robot_id}") return { "message": f"Sequences stopped on robot {robot_id}", "slaves_notified": len(slave_connections), } except Exception as e: logger.error(f"Failed to stop sequences on robot {robot_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to stop sequences: {e}") # ============= WEBSOCKET ENDPOINTS ============= @app.websocket("/ws/master/{robot_id}") async def websocket_master_endpoint(websocket: WebSocket, robot_id: str): """WebSocket endpoint for master connections (command sources)""" await websocket.accept() robot = robot_manager.get_robot(robot_id) if not robot: # Auto-create robot if it doesn't exist logger.info(f"🤖 Auto-creating robot {robot_id} for master connection") try: robot = await robot_manager.create_robot( robot_id, "so-arm100", f"Auto-created Robot {robot_id}" ) except Exception as e: logger.error(f"Failed to auto-create robot {robot_id}: {e}") await websocket.close(code=4003, reason="Failed to create robot") return connection_id = f"master-{uuid.uuid4().hex[:8]}" logger.info(f"🎮 Master connected: {connection_id} for robot {robot_id}") try: # Register master connection await connection_manager.connect_master(connection_id, robot_id, websocket) await robot_manager.set_master_connected(robot_id, connection_id) # Send initial robot state await websocket.send_json({ "type": "robot_state", "timestamp": datetime.now(UTC).isoformat(), "data": robot.model_dump(mode="json"), }) # Handle incoming messages async for message in websocket.iter_json(): await handle_master_message(connection_id, robot_id, message) except WebSocketDisconnect: logger.info(f"🔌 Master disconnected: {connection_id}") except Exception as e: logger.error(f"❌ Master connection error: {e}") finally: await connection_manager.disconnect_master(connection_id) await robot_manager.set_master_disconnected(robot_id) @app.websocket("/ws/slave/{robot_id}") async def websocket_slave_endpoint(websocket: WebSocket, robot_id: str): """WebSocket endpoint for slave connections (execution targets)""" print(f"🔍 DEBUG: Slave WebSocket connection attempt for robot {robot_id}") await websocket.accept() robot = robot_manager.get_robot(robot_id) if not robot: # Auto-create robot if it doesn't exist print(f"🔍 DEBUG: Robot {robot_id} not found, auto-creating...") logger.info(f"🤖 Auto-creating robot {robot_id} for slave connection") try: robot = await robot_manager.create_robot( robot_id, "so-arm100", f"Auto-created Robot {robot_id}" ) print(f"🔍 DEBUG: Successfully auto-created robot {robot_id}") except Exception as e: print(f"🔍 DEBUG: Failed to auto-create robot {robot_id}: {e}") logger.error(f"Failed to auto-create robot {robot_id}: {e}") await websocket.close(code=4003, reason="Failed to create robot") return else: print(f"🔍 DEBUG: Robot {robot_id} found, proceeding with connection") connection_id = f"slave-{uuid.uuid4().hex[:8]}" print(f"🔍 DEBUG: Generated slave connection ID: {connection_id}") logger.info(f"🤖 Slave connected: {connection_id} for robot {robot_id}") try: # Register slave connection await connection_manager.connect_slave(connection_id, robot_id, websocket) await robot_manager.add_slave_connection(robot_id, connection_id) print(f"🔍 DEBUG: Slave {connection_id} registered successfully") # Send initial commands if any await sync_slave_with_current_state(robot_id, websocket) print(f"🔍 DEBUG: Initial state sync sent to slave {connection_id}") # Handle incoming messages (mainly status updates) async for message in websocket.iter_json(): print(f"🔍 DEBUG: Received message from slave {connection_id}: {message}") await handle_slave_message(connection_id, robot_id, message) except WebSocketDisconnect: print(f"🔍 DEBUG: Slave {connection_id} disconnected normally") logger.info(f"🔌 Slave disconnected: {connection_id}") except Exception as e: print(f"🔍 DEBUG: Slave {connection_id} connection error: {e}") logger.error(f"❌ Slave connection error: {e}") finally: print(f"🔍 DEBUG: Cleaning up slave {connection_id}") await connection_manager.disconnect_slave(connection_id) await robot_manager.remove_slave_connection(robot_id, connection_id) # ============= MESSAGE HANDLERS ============= async def handle_master_message(connection_id: str, robot_id: str, message: dict): """Handle incoming messages from master connections""" print(f"🔍 DEBUG: Received message from master {connection_id}: {message}") try: msg_type = message.get("type") print(f"🔍 DEBUG: Message type: {msg_type}") if msg_type == "command": print("🔍 DEBUG: Processing command message") # Forward command to all slaves await broadcast_to_slaves( robot_id, { "type": "execute_command", "timestamp": datetime.now(UTC).isoformat(), "data": message.get("data"), }, ) elif msg_type == "sequence": print("🔍 DEBUG: Processing sequence message") # Forward sequence to all slaves await broadcast_to_slaves( robot_id, { "type": "execute_sequence", "timestamp": datetime.now(UTC).isoformat(), "data": message.get("data"), }, ) elif msg_type == "start_control": print("🔍 DEBUG: Processing start_control message") # Handle start control message (acknowledge it) master_ws = connection_manager.get_master_connection(robot_id) if master_ws: await master_ws.send_json({ "type": "control_started", "timestamp": datetime.now(UTC).isoformat(), "data": {"robot_id": robot_id}, }) print("🔍 DEBUG: Sent control_started acknowledgment") elif msg_type == "stop_control": print("🔍 DEBUG: Processing stop_control message") # Handle stop control message elif msg_type == "pause_control": print("🔍 DEBUG: Processing pause_control message") # Handle pause control message elif msg_type == "resume_control": print("🔍 DEBUG: Processing resume_control message") # Handle resume control message elif msg_type == "heartbeat": print("🔍 DEBUG: Processing heartbeat message") # Respond to heartbeat master_ws = connection_manager.get_master_connection(robot_id) if master_ws: await master_ws.send_json({ "type": "heartbeat_ack", "timestamp": datetime.now(UTC).isoformat(), }) print("🔍 DEBUG: Sent heartbeat_ack") else: print(f"🔍 DEBUG: Unknown message type: {msg_type}") logger.warning( f"Unknown message type from master {connection_id}: {msg_type}" ) except Exception as e: print(f"🔍 DEBUG: Error handling master message: {e}") logger.error(f"Error handling master message: {e}") async def handle_slave_message(connection_id: str, robot_id: str, message: dict): """Handle incoming messages from slave connections""" print(f"🔍 DEBUG: Processing slave message from {connection_id}: {message}") try: msg_type = message.get("type") print(f"🔍 DEBUG: Slave message type: {msg_type}") if msg_type == "status_update": print("🔍 DEBUG: Processing slave status_update") # Forward status to master await broadcast_to_master( robot_id, { "type": "slave_status", "timestamp": datetime.now(UTC).isoformat(), "slave_id": connection_id, "data": message.get("data"), }, ) elif msg_type == "joint_states": print("🔍 DEBUG: Processing slave joint_states") # Forward joint states to master await broadcast_to_master( robot_id, { "type": "joint_states", "timestamp": datetime.now(UTC).isoformat(), "slave_id": connection_id, "data": message.get("data"), }, ) elif msg_type == "error": print("🔍 DEBUG: Processing slave error") # Forward error to master await broadcast_to_master( robot_id, { "type": "slave_error", "timestamp": datetime.now(UTC).isoformat(), "slave_id": connection_id, "data": message.get("data"), }, ) else: print(f"🔍 DEBUG: Unknown slave message type: {msg_type}") logger.warning( f"Unknown message type from slave {connection_id}: {msg_type}" ) except Exception as e: print(f"🔍 DEBUG: Error handling slave message: {e}") logger.error(f"Error handling slave message: {e}") # ============= UTILITY FUNCTIONS ============= async def broadcast_to_slaves(robot_id: str, message: dict): """Broadcast message to all slaves of a robot""" slave_connections = connection_manager.get_slave_connections(robot_id) print(f"🔍 DEBUG: Broadcasting to slaves for robot {robot_id}") print(f"🔍 DEBUG: Found {len(slave_connections)} slave connections") print(f"🔍 DEBUG: Message to broadcast: {message}") if slave_connections: logger.info( f"📡 Broadcasting to {len(slave_connections)} slaves for robot {robot_id}" ) results = await asyncio.gather( *[ws.send_json(message) for ws in slave_connections], return_exceptions=True ) print(f"🔍 DEBUG: Broadcast results: {results}") async def broadcast_to_master(robot_id: str, message: dict): """Send message to master of a robot""" master_ws = connection_manager.get_master_connection(robot_id) print(f"🔍 DEBUG: Broadcasting to master for robot {robot_id}") print(f"🔍 DEBUG: Master connection found: {master_ws is not None}") print(f"🔍 DEBUG: Message to send: {message}") if master_ws: await master_ws.send_json(message) print("🔍 DEBUG: Message sent to master successfully") async def sync_slave_with_current_state(robot_id: str, websocket: WebSocket): """Send current robot state to newly connected slave""" robot = robot_manager.get_robot(robot_id) if robot: await websocket.send_json({ "type": "sync_state", "timestamp": datetime.now(UTC).isoformat(), "data": robot.model_dump(mode="json"), }) if __name__ == "__main__": import uvicorn """Start the LeRobot Arena server""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("lerobot-arena") logger.info("🚀 Starting LeRobot Arena WebSocket Server...") # Start the server uvicorn.run( app, host="0.0.0.0", port=8080, log_level="info", reload=False, # Auto-reload on code changes )