blanchon's picture
squash: initial commit
3aea7c6
raw
history blame
19.8 kB
import asyncio
import logging
import uuid
from datetime import UTC, datetime
from fastapi import FastAPI, HTTPException, WebSocket, WebSocketDisconnect
from fastapi.middleware.cors import CORSMiddleware
from .connection_manager import ConnectionManager
from .models import (
CreateRobotRequest,
Robot,
RobotStatus,
)
from .robot_manager import RobotManager
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(
title="LeRobot Arena Server",
description="WebSocket-based robot control server for master-slave architecture",
version="1.0.0",
)
# CORS middleware for web frontend
app.add_middleware(
CORSMiddleware,
allow_origins=[
"http://localhost:5173",
"http://localhost:5174",
"http://localhost:3000",
], # Add your frontend URLs
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global managers
connection_manager = ConnectionManager()
robot_manager = RobotManager()
@app.on_event("startup")
async def startup_event():
logger.info("🤖 LeRobot Arena Server starting up...")
# Create some demo robots for testing
await robot_manager.create_robot("demo-arm-1", "so-arm100", "Demo Arm Robot 1")
await robot_manager.create_robot("demo-arm-2", "so-arm100", "Demo Arm Robot 2")
logger.info("✅ Server ready for robot connections!")
@app.get("/")
async def root():
return {
"message": "LeRobot Arena Server",
"version": "1.0.0",
"robots_connected": len(robot_manager.robots),
"active_connections": connection_manager.get_connection_count(),
}
# ============= ROBOT MANAGEMENT API =============
@app.get("/api/robots", response_model=list[Robot])
async def list_robots():
"""Get list of all available robots"""
robots = list(robot_manager.robots.values())
print("🔍 DEBUG: /api/robots called")
print(f"🔍 DEBUG: Found {len(robots)} robots")
for robot in robots:
print(
f"🔍 DEBUG: Robot - ID: {robot.id}, Name: {robot.name}, Type: {robot.robot_type}"
)
return robots
@app.post("/api/robots", response_model=Robot)
async def create_robot(request: CreateRobotRequest):
"""Create a new robot"""
robot_id = f"robot-{uuid.uuid4().hex[:8]}"
return await robot_manager.create_robot(
robot_id, request.robot_type, request.name or f"Robot {robot_id}"
)
@app.get("/api/robots/{robot_id}", response_model=Robot)
async def get_robot(robot_id: str):
"""Get robot details"""
robot = robot_manager.get_robot(robot_id)
if not robot:
raise HTTPException(status_code=404, detail="Robot not found")
return robot
@app.get("/api/robots/{robot_id}/status", response_model=RobotStatus)
async def get_robot_status(robot_id: str):
"""Get robot connection status"""
status = robot_manager.get_robot_status(robot_id)
if not status:
raise HTTPException(status_code=404, detail="Robot not found")
return status
@app.delete("/api/robots/{robot_id}")
async def delete_robot(robot_id: str):
"""Delete a robot"""
if not robot_manager.get_robot(robot_id):
raise HTTPException(status_code=404, detail="Robot not found")
await robot_manager.delete_robot(robot_id)
return {"message": f"Robot {robot_id} deleted"}
# ============= DEMO SEQUENCE API =============
@app.get("/api/sequences")
async def list_demo_sequences():
"""Get list of available demo sequences"""
from .models import DEMO_SEQUENCES
return [
{
"id": seq.id,
"name": seq.name,
"total_duration": seq.total_duration,
"command_count": len(seq.commands),
}
for seq in DEMO_SEQUENCES
]
@app.post("/api/robots/{robot_id}/play-sequence/{sequence_id}")
async def play_demo_sequence(robot_id: str, sequence_id: str):
"""Play a demo sequence on a robot"""
# Check if robot exists
robot = robot_manager.get_robot(robot_id)
if not robot:
raise HTTPException(status_code=404, detail="Robot not found")
# Find the sequence
from .models import DEMO_SEQUENCES
sequence = next((seq for seq in DEMO_SEQUENCES if seq.id == sequence_id), None)
if not sequence:
raise HTTPException(status_code=404, detail="Sequence not found")
# Get available connections
slave_connections = connection_manager.get_slave_connections(robot_id)
master_connection = connection_manager.get_master_connection(robot_id)
if not slave_connections and not master_connection:
raise HTTPException(
status_code=400,
detail="No connections available. Connect a master (for 3D visualization) or slave (for robot execution) to play sequences.",
)
# Send sequence to slaves and/or master
notifications_sent = 0
try:
# Send to slaves if available (physical robots)
if slave_connections:
await broadcast_to_slaves(
robot_id,
{
"type": "execute_sequence",
"timestamp": datetime.now(UTC).isoformat(),
"data": sequence.model_dump(mode="json"),
},
)
notifications_sent += len(slave_connections)
logger.info(
f"🤖 Sent sequence '{sequence.name}' to {len(slave_connections)} slaves"
)
# Send to master if available (3D visualization)
if master_connection:
await broadcast_to_master(
robot_id,
{
"type": "play_sequence",
"timestamp": datetime.now(UTC).isoformat(),
"data": sequence.model_dump(mode="json"),
},
)
notifications_sent += 1
logger.info(
f"🎬 Sent sequence '{sequence.name}' to master for visualization"
)
logger.info(
f"🎬 Playing sequence '{sequence.name}' on robot {robot_id} ({notifications_sent} connections)"
)
return {
"message": f"Sequence '{sequence.name}' started on robot {robot_id}",
"sequence": {
"id": sequence.id,
"name": sequence.name,
"duration": sequence.total_duration,
},
"slaves_notified": len(slave_connections),
"master_notified": 1 if master_connection else 0,
"total_notifications": notifications_sent,
}
except Exception as e:
logger.error(f"Failed to play sequence {sequence_id} on robot {robot_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to play sequence: {e}")
@app.post("/api/robots/{robot_id}/stop-sequence")
async def stop_sequence(robot_id: str):
"""Stop any running sequence on a robot"""
# Check if robot exists
robot = robot_manager.get_robot(robot_id)
if not robot:
raise HTTPException(status_code=404, detail="Robot not found")
# Check if robot has slaves
slave_connections = connection_manager.get_slave_connections(robot_id)
if not slave_connections:
raise HTTPException(status_code=400, detail="No slave connections available")
try:
# Send stop command to all slaves
await broadcast_to_slaves(
robot_id,
{
"type": "stop_sequence",
"timestamp": datetime.now(UTC).isoformat(),
"data": {},
},
)
logger.info(f"⏹️ Stopped sequences on robot {robot_id}")
return {
"message": f"Sequences stopped on robot {robot_id}",
"slaves_notified": len(slave_connections),
}
except Exception as e:
logger.error(f"Failed to stop sequences on robot {robot_id}: {e}")
raise HTTPException(status_code=500, detail=f"Failed to stop sequences: {e}")
# ============= WEBSOCKET ENDPOINTS =============
@app.websocket("/ws/master/{robot_id}")
async def websocket_master_endpoint(websocket: WebSocket, robot_id: str):
"""WebSocket endpoint for master connections (command sources)"""
await websocket.accept()
robot = robot_manager.get_robot(robot_id)
if not robot:
# Auto-create robot if it doesn't exist
logger.info(f"🤖 Auto-creating robot {robot_id} for master connection")
try:
robot = await robot_manager.create_robot(
robot_id, "so-arm100", f"Auto-created Robot {robot_id}"
)
except Exception as e:
logger.error(f"Failed to auto-create robot {robot_id}: {e}")
await websocket.close(code=4003, reason="Failed to create robot")
return
connection_id = f"master-{uuid.uuid4().hex[:8]}"
logger.info(f"🎮 Master connected: {connection_id} for robot {robot_id}")
try:
# Register master connection
await connection_manager.connect_master(connection_id, robot_id, websocket)
await robot_manager.set_master_connected(robot_id, connection_id)
# Send initial robot state
await websocket.send_json({
"type": "robot_state",
"timestamp": datetime.now(UTC).isoformat(),
"data": robot.model_dump(mode="json"),
})
# Handle incoming messages
async for message in websocket.iter_json():
await handle_master_message(connection_id, robot_id, message)
except WebSocketDisconnect:
logger.info(f"🔌 Master disconnected: {connection_id}")
except Exception as e:
logger.error(f"❌ Master connection error: {e}")
finally:
await connection_manager.disconnect_master(connection_id)
await robot_manager.set_master_disconnected(robot_id)
@app.websocket("/ws/slave/{robot_id}")
async def websocket_slave_endpoint(websocket: WebSocket, robot_id: str):
"""WebSocket endpoint for slave connections (execution targets)"""
print(f"🔍 DEBUG: Slave WebSocket connection attempt for robot {robot_id}")
await websocket.accept()
robot = robot_manager.get_robot(robot_id)
if not robot:
# Auto-create robot if it doesn't exist
print(f"🔍 DEBUG: Robot {robot_id} not found, auto-creating...")
logger.info(f"🤖 Auto-creating robot {robot_id} for slave connection")
try:
robot = await robot_manager.create_robot(
robot_id, "so-arm100", f"Auto-created Robot {robot_id}"
)
print(f"🔍 DEBUG: Successfully auto-created robot {robot_id}")
except Exception as e:
print(f"🔍 DEBUG: Failed to auto-create robot {robot_id}: {e}")
logger.error(f"Failed to auto-create robot {robot_id}: {e}")
await websocket.close(code=4003, reason="Failed to create robot")
return
else:
print(f"🔍 DEBUG: Robot {robot_id} found, proceeding with connection")
connection_id = f"slave-{uuid.uuid4().hex[:8]}"
print(f"🔍 DEBUG: Generated slave connection ID: {connection_id}")
logger.info(f"🤖 Slave connected: {connection_id} for robot {robot_id}")
try:
# Register slave connection
await connection_manager.connect_slave(connection_id, robot_id, websocket)
await robot_manager.add_slave_connection(robot_id, connection_id)
print(f"🔍 DEBUG: Slave {connection_id} registered successfully")
# Send initial commands if any
await sync_slave_with_current_state(robot_id, websocket)
print(f"🔍 DEBUG: Initial state sync sent to slave {connection_id}")
# Handle incoming messages (mainly status updates)
async for message in websocket.iter_json():
print(f"🔍 DEBUG: Received message from slave {connection_id}: {message}")
await handle_slave_message(connection_id, robot_id, message)
except WebSocketDisconnect:
print(f"🔍 DEBUG: Slave {connection_id} disconnected normally")
logger.info(f"🔌 Slave disconnected: {connection_id}")
except Exception as e:
print(f"🔍 DEBUG: Slave {connection_id} connection error: {e}")
logger.error(f"❌ Slave connection error: {e}")
finally:
print(f"🔍 DEBUG: Cleaning up slave {connection_id}")
await connection_manager.disconnect_slave(connection_id)
await robot_manager.remove_slave_connection(robot_id, connection_id)
# ============= MESSAGE HANDLERS =============
async def handle_master_message(connection_id: str, robot_id: str, message: dict):
"""Handle incoming messages from master connections"""
print(f"🔍 DEBUG: Received message from master {connection_id}: {message}")
try:
msg_type = message.get("type")
print(f"🔍 DEBUG: Message type: {msg_type}")
if msg_type == "command":
print("🔍 DEBUG: Processing command message")
# Forward command to all slaves
await broadcast_to_slaves(
robot_id,
{
"type": "execute_command",
"timestamp": datetime.now(UTC).isoformat(),
"data": message.get("data"),
},
)
elif msg_type == "sequence":
print("🔍 DEBUG: Processing sequence message")
# Forward sequence to all slaves
await broadcast_to_slaves(
robot_id,
{
"type": "execute_sequence",
"timestamp": datetime.now(UTC).isoformat(),
"data": message.get("data"),
},
)
elif msg_type == "start_control":
print("🔍 DEBUG: Processing start_control message")
# Handle start control message (acknowledge it)
master_ws = connection_manager.get_master_connection(robot_id)
if master_ws:
await master_ws.send_json({
"type": "control_started",
"timestamp": datetime.now(UTC).isoformat(),
"data": {"robot_id": robot_id},
})
print("🔍 DEBUG: Sent control_started acknowledgment")
elif msg_type == "stop_control":
print("🔍 DEBUG: Processing stop_control message")
# Handle stop control message
elif msg_type == "pause_control":
print("🔍 DEBUG: Processing pause_control message")
# Handle pause control message
elif msg_type == "resume_control":
print("🔍 DEBUG: Processing resume_control message")
# Handle resume control message
elif msg_type == "heartbeat":
print("🔍 DEBUG: Processing heartbeat message")
# Respond to heartbeat
master_ws = connection_manager.get_master_connection(robot_id)
if master_ws:
await master_ws.send_json({
"type": "heartbeat_ack",
"timestamp": datetime.now(UTC).isoformat(),
})
print("🔍 DEBUG: Sent heartbeat_ack")
else:
print(f"🔍 DEBUG: Unknown message type: {msg_type}")
logger.warning(
f"Unknown message type from master {connection_id}: {msg_type}"
)
except Exception as e:
print(f"🔍 DEBUG: Error handling master message: {e}")
logger.error(f"Error handling master message: {e}")
async def handle_slave_message(connection_id: str, robot_id: str, message: dict):
"""Handle incoming messages from slave connections"""
print(f"🔍 DEBUG: Processing slave message from {connection_id}: {message}")
try:
msg_type = message.get("type")
print(f"🔍 DEBUG: Slave message type: {msg_type}")
if msg_type == "status_update":
print("🔍 DEBUG: Processing slave status_update")
# Forward status to master
await broadcast_to_master(
robot_id,
{
"type": "slave_status",
"timestamp": datetime.now(UTC).isoformat(),
"slave_id": connection_id,
"data": message.get("data"),
},
)
elif msg_type == "joint_states":
print("🔍 DEBUG: Processing slave joint_states")
# Forward joint states to master
await broadcast_to_master(
robot_id,
{
"type": "joint_states",
"timestamp": datetime.now(UTC).isoformat(),
"slave_id": connection_id,
"data": message.get("data"),
},
)
elif msg_type == "error":
print("🔍 DEBUG: Processing slave error")
# Forward error to master
await broadcast_to_master(
robot_id,
{
"type": "slave_error",
"timestamp": datetime.now(UTC).isoformat(),
"slave_id": connection_id,
"data": message.get("data"),
},
)
else:
print(f"🔍 DEBUG: Unknown slave message type: {msg_type}")
logger.warning(
f"Unknown message type from slave {connection_id}: {msg_type}"
)
except Exception as e:
print(f"🔍 DEBUG: Error handling slave message: {e}")
logger.error(f"Error handling slave message: {e}")
# ============= UTILITY FUNCTIONS =============
async def broadcast_to_slaves(robot_id: str, message: dict):
"""Broadcast message to all slaves of a robot"""
slave_connections = connection_manager.get_slave_connections(robot_id)
print(f"🔍 DEBUG: Broadcasting to slaves for robot {robot_id}")
print(f"🔍 DEBUG: Found {len(slave_connections)} slave connections")
print(f"🔍 DEBUG: Message to broadcast: {message}")
if slave_connections:
logger.info(
f"📡 Broadcasting to {len(slave_connections)} slaves for robot {robot_id}"
)
results = await asyncio.gather(
*[ws.send_json(message) for ws in slave_connections], return_exceptions=True
)
print(f"🔍 DEBUG: Broadcast results: {results}")
async def broadcast_to_master(robot_id: str, message: dict):
"""Send message to master of a robot"""
master_ws = connection_manager.get_master_connection(robot_id)
print(f"🔍 DEBUG: Broadcasting to master for robot {robot_id}")
print(f"🔍 DEBUG: Master connection found: {master_ws is not None}")
print(f"🔍 DEBUG: Message to send: {message}")
if master_ws:
await master_ws.send_json(message)
print("🔍 DEBUG: Message sent to master successfully")
async def sync_slave_with_current_state(robot_id: str, websocket: WebSocket):
"""Send current robot state to newly connected slave"""
robot = robot_manager.get_robot(robot_id)
if robot:
await websocket.send_json({
"type": "sync_state",
"timestamp": datetime.now(UTC).isoformat(),
"data": robot.model_dump(mode="json"),
})
if __name__ == "__main__":
import uvicorn
"""Start the LeRobot Arena server"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
logger = logging.getLogger("lerobot-arena")
logger.info("🚀 Starting LeRobot Arena WebSocket Server...")
# Start the server
uvicorn.run(
app,
host="0.0.0.0",
port=8080,
log_level="info",
reload=False, # Auto-reload on code changes
)