Spaces:
Sleeping
Sleeping
from fastapi import APIRouter, HTTPException, WebSocket | |
from .core import RoboticsCore | |
from .models import ( | |
CreateRoomRequest, | |
JointCommand, | |
ParticipantRole, | |
) | |
# ============= SIMPLIFIED API ============= | |
robotics_router = APIRouter(prefix="/robotics", tags=["robotics"]) | |
robotics_core = RoboticsCore() | |
# ============= CONVENIENT ROOM ENDPOINTS ============= | |
async def list_rooms(workspace_id: str): | |
"""List all robotics rooms in a workspace""" | |
return { | |
"success": True, | |
"workspace_id": workspace_id, | |
"rooms": robotics_core.list_rooms(workspace_id), | |
"total": len(robotics_core.list_rooms(workspace_id)), | |
} | |
async def create_room(workspace_id: str, request: CreateRoomRequest | None = None): | |
"""Create a new robotics room in a workspace""" | |
room_id = None | |
if request and hasattr(request, "room_id"): | |
room_id = request.room_id | |
actual_workspace_id, room_id = robotics_core.create_room(workspace_id, room_id) | |
return { | |
"success": True, | |
"workspace_id": actual_workspace_id, | |
"room_id": room_id, | |
"message": f"Room {room_id} created successfully in workspace {actual_workspace_id}", | |
} | |
async def get_room(workspace_id: str, room_id: str): | |
"""Get room details""" | |
room_info = robotics_core.get_room_info(workspace_id, room_id) | |
if "error" in room_info: | |
raise HTTPException(status_code=404, detail="Room not found") | |
return {"success": True, "room": room_info} | |
async def delete_room(workspace_id: str, room_id: str): | |
"""Delete a robotics room""" | |
if robotics_core.delete_room(workspace_id, room_id): | |
return { | |
"success": True, | |
"message": f"Room {room_id} deleted successfully from workspace {workspace_id}", | |
} | |
raise HTTPException(status_code=404, detail="Room not found") | |
async def get_room_state(workspace_id: str, room_id: str): | |
"""Get current room state with joints and participants""" | |
state = robotics_core.get_room_state(workspace_id, room_id) | |
if "error" in state: | |
raise HTTPException(status_code=404, detail="Room not found") | |
return {"success": True, "state": state} | |
# ============= CONTROL ENDPOINTS ============= | |
async def send_command(workspace_id: str, room_id: str, command: JointCommand): | |
"""Send joint commands to a room""" | |
room_info = robotics_core.get_room_info(workspace_id, room_id) | |
if "error" in room_info: | |
raise HTTPException(status_code=404, detail="Room not found") | |
try: | |
joints_data = [ | |
{"name": j.name, "value": j.value, "speed": j.speed} for j in command.joints | |
] | |
changed = await robotics_core.send_command_to_room( | |
workspace_id, room_id, joints_data | |
) | |
except ValueError as e: | |
raise HTTPException(status_code=404) from e | |
else: | |
return { | |
"success": True, | |
"workspace_id": workspace_id, | |
"room_id": room_id, | |
"joints_updated": changed, | |
"message": "Commands sent successfully", | |
} | |
# ============= UTILITY ENDPOINTS ============= | |
async def get_status(): | |
"""Get system status""" | |
stats = robotics_core.get_connection_stats() | |
cleanup_info = robotics_core.get_cleanup_status() | |
return { | |
"service": "robotics", | |
"status": "active", | |
"workspaces_count": stats["total_workspaces"], | |
"rooms_count": stats["total_rooms"], | |
"connections_count": stats["total_connections"], | |
"version": "2.0.0", | |
"supported_roles": [role.value for role in ParticipantRole], | |
"supported_robot_types": ["so-arm100", "generic"], | |
"cleanup": { | |
"enabled": cleanup_info["cleanup_enabled"], | |
"inactivity_timeout_hours": cleanup_info["inactivity_timeout_minutes"] / 60, | |
"cleanup_interval_minutes": cleanup_info["cleanup_interval_minutes"], | |
}, | |
} | |
async def health_check(): | |
"""Health check endpoint""" | |
return {"status": "healthy", "service": "robotics"} | |
async def get_cleanup_status(): | |
"""Get cleanup system status and room information""" | |
status = robotics_core.get_cleanup_status() | |
return {"success": True, "cleanup_status": status} | |
async def trigger_manual_cleanup(): | |
"""Manually trigger room cleanup""" | |
result = await robotics_core.manual_cleanup() | |
return {"success": True, "cleanup_result": result} | |
# ============= WEBSOCKET ENDPOINT ============= | |
async def websocket_endpoint(websocket: WebSocket, workspace_id: str, room_id: str): | |
"""WebSocket connection for real-time room communication""" | |
await robotics_core.handle_websocket(websocket, workspace_id, room_id) | |