import asyncio import logging import os import uuid from datetime import UTC, datetime from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import FileResponse from fastapi.staticfiles import StaticFiles from .connection_manager import ConnectionManager from .models import ( CreateRobotRequest, Robot, RobotStatus, ) from .robot_manager import RobotManager # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) app = FastAPI( title="LeRobot Arena Server", description="WebSocket-based robot control server for master-slave architecture", version="1.0.0", ) # Dynamic CORS origins based on environment cors_origins = [ "http://localhost:5173", "http://localhost:5174", "http://localhost:7860", "http://localhost:3000", "https://*.hf.space", "https://hf.space", "https://huggingface.co", ] # Add SPACE_HOST if running in Hugging Face Spaces space_host = os.environ.get("SPACE_HOST") if space_host: cors_origins.extend([ f"https://{space_host}", f"http://{space_host}", ]) logger.info(f"🌐 Running in Hugging Face Spaces: {space_host}") logger.info(f"🔒 CORS origins: {cors_origins}") # CORS middleware for web frontend app.add_middleware( CORSMiddleware, allow_origins=cors_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global managers connection_manager = ConnectionManager() robot_manager = RobotManager() # Mount static files for the frontend # Try different possible locations for the static frontend static_dir_candidates = [ os.path.join( os.path.dirname(os.path.dirname(__file__)), "..", "static-frontend" ), # From src-python/src os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static-frontend" ), # From src-python "/home/user/app/static-frontend", # HF Spaces absolute path "static-frontend", # Relative to working directory ] static_dir = None for candidate in static_dir_candidates: if os.path.exists(candidate): static_dir = candidate break if static_dir: app.mount("/static", StaticFiles(directory=static_dir), name="static") logger.info(f"📁 Serving static files from: {static_dir}") else: logger.warning(f"âš ī¸ Static directory not found in any of: {static_dir_candidates}") # Mount robot assets (URDF files, meshes, etc.) robot_assets_candidates = [ os.path.join( os.path.dirname(os.path.dirname(__file__)), "..", "..", "static" ), # From src-python/src os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static" ), # From src-python "/home/user/app/static", # HF Spaces absolute path "static", # Relative to working directory ] robot_assets_dir = None for candidate in robot_assets_candidates: if os.path.exists(candidate): robot_assets_dir = candidate break if robot_assets_dir: app.mount( "/robots", StaticFiles(directory=os.path.join(robot_assets_dir, "robots")), name="robots", ) # Mount individual favicon file properly @app.get("/favicon.png") async def get_favicon(): favicon_path = os.path.join(robot_assets_dir, "favicon.png") if os.path.exists(favicon_path): return FileResponse(favicon_path) raise HTTPException(status_code=404, detail="Favicon not found") @app.get("/favicon_1024.png") async def get_favicon_1024(): favicon_path = os.path.join(robot_assets_dir, "favicon_1024.png") if os.path.exists(favicon_path): return FileResponse(favicon_path) raise HTTPException(status_code=404, detail="Favicon not found") logger.info(f"🤖 Serving robot assets from: {robot_assets_dir}") else: logger.warning( f"âš ī¸ Robot assets directory not found in any of: {robot_assets_candidates}" ) def get_static_dir() -> str | None: """Get the static directory path""" static_dir_candidates = [ os.path.join( os.path.dirname(os.path.dirname(__file__)), "..", "static-frontend" ), # From src-python/src os.path.join( os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "static-frontend", ), # From src-python "/home/user/app/static-frontend", # HF Spaces absolute path "static-frontend", # Relative to working directory ] for candidate in static_dir_candidates: if os.path.exists(candidate): return candidate return None @app.on_event("startup") async def startup_event(): logger.info("🤖 LeRobot Arena Server starting up...") # Create some demo robots for testing await robot_manager.create_robot("demo-arm-1", "so-arm100", "Demo Arm Robot 1") await robot_manager.create_robot("demo-arm-2", "so-arm100", "Demo Arm Robot 2") logger.info("✅ Server ready for robot connections!") @app.get("/status") async def get_status(): """Get server status for health checks""" return { "message": "LeRobot Arena Server", "version": "1.0.0", "robots_connected": len(robot_manager.robots), "active_connections": connection_manager.get_connection_count(), "status": "healthy", } # ============= ROBOT MANAGEMENT API ============= @app.get("/api/robots", response_model=list[Robot]) async def list_robots(): """Get list of all available robots""" robots = list(robot_manager.robots.values()) print("🔍 DEBUG: /api/robots called") print(f"🔍 DEBUG: Found {len(robots)} robots") for robot in robots: print( f"🔍 DEBUG: Robot - ID: {robot.id}, Name: {robot.name}, Type: {robot.robot_type}" ) return robots @app.post("/api/robots", response_model=Robot) async def create_robot(request: CreateRobotRequest): """Create a new robot""" robot_id = f"robot-{uuid.uuid4().hex[:8]}" return await robot_manager.create_robot( robot_id, request.robot_type, request.name or f"Robot {robot_id}" ) @app.get("/api/robots/{robot_id}", response_model=Robot) async def get_robot(robot_id: str): """Get robot details""" robot = robot_manager.get_robot(robot_id) if not robot: raise HTTPException(status_code=404, detail="Robot not found") return robot @app.get("/api/robots/{robot_id}/status", response_model=RobotStatus) async def get_robot_status(robot_id: str): """Get robot connection status""" status = robot_manager.get_robot_status(robot_id) if not status: raise HTTPException(status_code=404, detail="Robot not found") return status @app.delete("/api/robots/{robot_id}") async def delete_robot(robot_id: str): """Delete a robot""" if not robot_manager.get_robot(robot_id): raise HTTPException(status_code=404, detail="Robot not found") await robot_manager.delete_robot(robot_id) return {"message": f"Robot {robot_id} deleted"} # ============= DEMO SEQUENCE API ============= @app.get("/api/sequences") async def list_demo_sequences(): """Get list of available demo sequences""" from .models import DEMO_SEQUENCES return [ { "id": seq.id, "name": seq.name, "total_duration": seq.total_duration, "command_count": len(seq.commands), } for seq in DEMO_SEQUENCES ] @app.post("/api/robots/{robot_id}/play-sequence/{sequence_id}") async def play_demo_sequence(robot_id: str, sequence_id: str): """Play a demo sequence on a robot""" # Check if robot exists robot = robot_manager.get_robot(robot_id) if not robot: raise HTTPException(status_code=404, detail="Robot not found") # Find the sequence from .models import DEMO_SEQUENCES sequence = next((seq for seq in DEMO_SEQUENCES if seq.id == sequence_id), None) if not sequence: raise HTTPException(status_code=404, detail="Sequence not found") # Get available connections slave_connections = connection_manager.get_slave_connections(robot_id) master_connection = connection_manager.get_master_connection(robot_id) if not slave_connections and not master_connection: raise HTTPException( status_code=400, detail="No connections available. Connect a master (for 3D visualization) or slave (for robot execution) to play sequences.", ) # Send sequence to slaves and/or master notifications_sent = 0 try: # Send to slaves if available (physical robots) if slave_connections: await broadcast_to_slaves( robot_id, { "type": "execute_sequence", "timestamp": datetime.now(UTC).isoformat(), "data": sequence.model_dump(mode="json"), }, ) notifications_sent += len(slave_connections) logger.info( f"🤖 Sent sequence '{sequence.name}' to {len(slave_connections)} slaves" ) # Send to master if available (3D visualization) if master_connection: await broadcast_to_master( robot_id, { "type": "play_sequence", "timestamp": datetime.now(UTC).isoformat(), "data": sequence.model_dump(mode="json"), }, ) notifications_sent += 1 logger.info( f"đŸŽŦ Sent sequence '{sequence.name}' to master for visualization" ) logger.info( f"đŸŽŦ Playing sequence '{sequence.name}' on robot {robot_id} ({notifications_sent} connections)" ) return { "message": f"Sequence '{sequence.name}' started on robot {robot_id}", "sequence": { "id": sequence.id, "name": sequence.name, "duration": sequence.total_duration, }, "slaves_notified": len(slave_connections), "master_notified": 1 if master_connection else 0, "total_notifications": notifications_sent, } except Exception as e: logger.error(f"Failed to play sequence {sequence_id} on robot {robot_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to play sequence: {e}") @app.post("/api/robots/{robot_id}/stop-sequence") async def stop_sequence(robot_id: str): """Stop any running sequence on a robot""" # Check if robot exists robot = robot_manager.get_robot(robot_id) if not robot: raise HTTPException(status_code=404, detail="Robot not found") # Check if robot has slaves slave_connections = connection_manager.get_slave_connections(robot_id) if not slave_connections: raise HTTPException(status_code=400, detail="No slave connections available") try: # Send stop command to all slaves await broadcast_to_slaves( robot_id, { "type": "stop_sequence", "timestamp": datetime.now(UTC).isoformat(), "data": {}, }, ) logger.info(f"âšī¸ Stopped sequences on robot {robot_id}") return { "message": f"Sequences stopped on robot {robot_id}", "slaves_notified": len(slave_connections), } except Exception as e: logger.error(f"Failed to stop sequences on robot {robot_id}: {e}") raise HTTPException(status_code=500, detail=f"Failed to stop sequences: {e}") # ============= WEBSOCKET ENDPOINTS ============= @app.websocket("/ws/master/{robot_id}") async def websocket_master_endpoint(websocket: WebSocket, robot_id: str): """WebSocket endpoint for master connections (command sources)""" await websocket.accept() robot = robot_manager.get_robot(robot_id) if not robot: # Auto-create robot if it doesn't exist logger.info(f"🤖 Auto-creating robot {robot_id} for master connection") try: robot = await robot_manager.create_robot( robot_id, "so-arm100", f"Auto-created Robot {robot_id}" ) except Exception as e: logger.error(f"Failed to auto-create robot {robot_id}: {e}") await websocket.close(code=4003, reason="Failed to create robot") return connection_id = f"master-{uuid.uuid4().hex[:8]}" logger.info(f"🎮 Master connected: {connection_id} for robot {robot_id}") try: # Register master connection await connection_manager.connect_master(connection_id, robot_id, websocket) await robot_manager.set_master_connected(robot_id, connection_id) # Send initial robot state await websocket.send_json({ "type": "robot_state", "timestamp": datetime.now(UTC).isoformat(), "data": robot.model_dump(mode="json"), }) # Handle incoming messages async for message in websocket.iter_json(): await handle_master_message(connection_id, robot_id, message) except WebSocketDisconnect: logger.info(f"🔌 Master disconnected: {connection_id}") except Exception as e: logger.error(f"❌ Master connection error: {e}") finally: await connection_manager.disconnect_master(connection_id) await robot_manager.set_master_disconnected(robot_id) @app.websocket("/ws/slave/{robot_id}") async def websocket_slave_endpoint(websocket: WebSocket, robot_id: str): """WebSocket endpoint for slave connections (execution targets)""" print(f"🔍 DEBUG: Slave WebSocket connection attempt for robot {robot_id}") await websocket.accept() robot = robot_manager.get_robot(robot_id) if not robot: # Auto-create robot if it doesn't exist print(f"🔍 DEBUG: Robot {robot_id} not found, auto-creating...") logger.info(f"🤖 Auto-creating robot {robot_id} for slave connection") try: robot = await robot_manager.create_robot( robot_id, "so-arm100", f"Auto-created Robot {robot_id}" ) print(f"🔍 DEBUG: Successfully auto-created robot {robot_id}") except Exception as e: print(f"🔍 DEBUG: Failed to auto-create robot {robot_id}: {e}") logger.error(f"Failed to auto-create robot {robot_id}: {e}") await websocket.close(code=4003, reason="Failed to create robot") return else: print(f"🔍 DEBUG: Robot {robot_id} found, proceeding with connection") connection_id = f"slave-{uuid.uuid4().hex[:8]}" print(f"🔍 DEBUG: Generated slave connection ID: {connection_id}") logger.info(f"🤖 Slave connected: {connection_id} for robot {robot_id}") try: # Register slave connection await connection_manager.connect_slave(connection_id, robot_id, websocket) await robot_manager.add_slave_connection(robot_id, connection_id) print(f"🔍 DEBUG: Slave {connection_id} registered successfully") # Send initial commands if any await sync_slave_with_current_state(robot_id, websocket) print(f"🔍 DEBUG: Initial state sync sent to slave {connection_id}") # Handle incoming messages (mainly status updates) async for message in websocket.iter_json(): print(f"🔍 DEBUG: Received message from slave {connection_id}: {message}") await handle_slave_message(connection_id, robot_id, message) except WebSocketDisconnect: print(f"🔍 DEBUG: Slave {connection_id} disconnected normally") logger.info(f"🔌 Slave disconnected: {connection_id}") except Exception as e: print(f"🔍 DEBUG: Slave {connection_id} connection error: {e}") logger.error(f"❌ Slave connection error: {e}") finally: print(f"🔍 DEBUG: Cleaning up slave {connection_id}") await connection_manager.disconnect_slave(connection_id) await robot_manager.remove_slave_connection(robot_id, connection_id) # ============= MESSAGE HANDLERS ============= async def handle_master_message(connection_id: str, robot_id: str, message: dict): """Handle incoming messages from master connections""" print(f"🔍 DEBUG: Received message from master {connection_id}: {message}") try: msg_type = message.get("type") print(f"🔍 DEBUG: Message type: {msg_type}") if msg_type == "command": print("🔍 DEBUG: Processing command message") # Forward command to all slaves await broadcast_to_slaves( robot_id, { "type": "execute_command", "timestamp": datetime.now(UTC).isoformat(), "data": message.get("data"), }, ) elif msg_type == "sequence": print("🔍 DEBUG: Processing sequence message") # Forward sequence to all slaves await broadcast_to_slaves( robot_id, { "type": "execute_sequence", "timestamp": datetime.now(UTC).isoformat(), "data": message.get("data"), }, ) elif msg_type == "start_control": print("🔍 DEBUG: Processing start_control message") # Handle start control message (acknowledge it) master_ws = connection_manager.get_master_connection(robot_id) if master_ws: await master_ws.send_json({ "type": "control_started", "timestamp": datetime.now(UTC).isoformat(), "data": {"robot_id": robot_id}, }) print("🔍 DEBUG: Sent control_started acknowledgment") elif msg_type == "stop_control": print("🔍 DEBUG: Processing stop_control message") # Handle stop control message elif msg_type == "pause_control": print("🔍 DEBUG: Processing pause_control message") # Handle pause control message elif msg_type == "resume_control": print("🔍 DEBUG: Processing resume_control message") # Handle resume control message elif msg_type == "heartbeat": print("🔍 DEBUG: Processing heartbeat message") # Respond to heartbeat master_ws = connection_manager.get_master_connection(robot_id) if master_ws: await master_ws.send_json({ "type": "heartbeat_ack", "timestamp": datetime.now(UTC).isoformat(), }) print("🔍 DEBUG: Sent heartbeat_ack") else: print(f"🔍 DEBUG: Unknown message type: {msg_type}") logger.warning( f"Unknown message type from master {connection_id}: {msg_type}" ) except Exception as e: print(f"🔍 DEBUG: Error handling master message: {e}") logger.error(f"Error handling master message: {e}") async def handle_slave_message(connection_id: str, robot_id: str, message: dict): """Handle incoming messages from slave connections""" print(f"🔍 DEBUG: Processing slave message from {connection_id}: {message}") try: msg_type = message.get("type") print(f"🔍 DEBUG: Slave message type: {msg_type}") if msg_type == "status_update": print("🔍 DEBUG: Processing slave status_update") # Forward status to master await broadcast_to_master( robot_id, { "type": "slave_status", "timestamp": datetime.now(UTC).isoformat(), "slave_id": connection_id, "data": message.get("data"), }, ) elif msg_type == "joint_states": print("🔍 DEBUG: Processing slave joint_states") # Forward joint states to master await broadcast_to_master( robot_id, { "type": "joint_states", "timestamp": datetime.now(UTC).isoformat(), "slave_id": connection_id, "data": message.get("data"), }, ) elif msg_type == "error": print("🔍 DEBUG: Processing slave error") # Forward error to master await broadcast_to_master( robot_id, { "type": "slave_error", "timestamp": datetime.now(UTC).isoformat(), "slave_id": connection_id, "data": message.get("data"), }, ) else: print(f"🔍 DEBUG: Unknown slave message type: {msg_type}") logger.warning( f"Unknown message type from slave {connection_id}: {msg_type}" ) except Exception as e: print(f"🔍 DEBUG: Error handling slave message: {e}") logger.error(f"Error handling slave message: {e}") # ============= UTILITY FUNCTIONS ============= async def broadcast_to_slaves(robot_id: str, message: dict): """Broadcast message to all slaves of a robot""" slave_connections = connection_manager.get_slave_connections(robot_id) print(f"🔍 DEBUG: Broadcasting to slaves for robot {robot_id}") print(f"🔍 DEBUG: Found {len(slave_connections)} slave connections") print(f"🔍 DEBUG: Message to broadcast: {message}") if slave_connections: logger.info( f"📡 Broadcasting to {len(slave_connections)} slaves for robot {robot_id}" ) results = await asyncio.gather( *[ws.send_json(message) for ws in slave_connections], return_exceptions=True ) print(f"🔍 DEBUG: Broadcast results: {results}") async def broadcast_to_master(robot_id: str, message: dict): """Send message to master of a robot""" master_ws = connection_manager.get_master_connection(robot_id) print(f"🔍 DEBUG: Broadcasting to master for robot {robot_id}") print(f"🔍 DEBUG: Master connection found: {master_ws is not None}") print(f"🔍 DEBUG: Message to send: {message}") if master_ws: await master_ws.send_json(message) print("🔍 DEBUG: Message sent to master successfully") async def sync_slave_with_current_state(robot_id: str, websocket: WebSocket): """Send current robot state to newly connected slave""" robot = robot_manager.get_robot(robot_id) if robot: await websocket.send_json({ "type": "sync_state", "timestamp": datetime.now(UTC).isoformat(), "data": robot.model_dump(mode="json"), }) # ============= FRONTEND SERVING ROUTES (Must be last) ============= EXPOSE_FRONTEND = os.getenv("EXPOSE_FRONTEND", "false").lower() == "true" if EXPOSE_FRONTEND: # Serve static assets from the _app directory @app.get("/_app/{path:path}") async def serve_app_assets(path: str): """Serve Svelte app assets""" static_dir = get_static_dir() if not static_dir: raise HTTPException(status_code=404, detail="Frontend not found") file_path = os.path.join(static_dir, "_app", path) if os.path.exists(file_path) and os.path.isfile(file_path): return FileResponse(file_path) raise HTTPException(status_code=404, detail="File not found") @app.get("/") async def serve_frontend(): """Serve the main frontend page""" static_dir = get_static_dir() if not static_dir: return { "message": "Frontend not built. Run 'bun run build' to build the frontend." } index_file = os.path.join(static_dir, "index.html") if os.path.exists(index_file): return FileResponse(index_file) return { "message": "Frontend not built. Run 'bun run build' to build the frontend." } # Catch-all route for client-side routing (SPA fallback) - MUST BE LAST @app.get("/{path:path}") async def serve_spa_fallback(path: str): """Serve the frontend for client-side routing""" # For all paths not handled by other routes, serve the index.html (SPA) static_dir = get_static_dir() if not static_dir: raise HTTPException(status_code=404, detail="Frontend not found") index_file = os.path.join(static_dir, "index.html") if os.path.exists(index_file): return FileResponse(index_file) raise HTTPException(status_code=404, detail="Frontend not found") if __name__ == "__main__": import uvicorn """Start the LeRobot Arena server""" logging.basicConfig( level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) logger = logging.getLogger("lerobot-arena") logger.info("🚀 Starting LeRobot Arena WebSocket Server...") # Start the server on port 7860 for HF Spaces compatibility uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info", reload=False, )