Spaces:
Sleeping
Sleeping
File size: 5,343 Bytes
8aedc84 8344c24 8aedc84 8344c24 8aedc84 8344c24 8aedc84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 |
from fastapi import APIRouter, HTTPException, WebSocket
from .core import RoboticsCore
from .models import (
CreateRoomRequest,
JointCommand,
ParticipantRole,
)
# ============= SIMPLIFIED API =============
robotics_router = APIRouter(prefix="/robotics", tags=["robotics"])
robotics_core = RoboticsCore()
# ============= CONVENIENT ROOM ENDPOINTS =============
@robotics_router.get("/workspaces/{workspace_id}/rooms")
async def list_rooms(workspace_id: str):
"""List all robotics rooms in a workspace"""
return {
"success": True,
"workspace_id": workspace_id,
"rooms": robotics_core.list_rooms(workspace_id),
"total": len(robotics_core.list_rooms(workspace_id)),
}
@robotics_router.post("/workspaces/{workspace_id}/rooms")
async def create_room(workspace_id: str, request: CreateRoomRequest | None = None):
"""Create a new robotics room in a workspace"""
room_id = None
if request and hasattr(request, "room_id"):
room_id = request.room_id
actual_workspace_id, room_id = robotics_core.create_room(workspace_id, room_id)
return {
"success": True,
"workspace_id": actual_workspace_id,
"room_id": room_id,
"message": f"Room {room_id} created successfully in workspace {actual_workspace_id}",
}
@robotics_router.get("/workspaces/{workspace_id}/rooms/{room_id}")
async def get_room(workspace_id: str, room_id: str):
"""Get room details"""
room_info = robotics_core.get_room_info(workspace_id, room_id)
if "error" in room_info:
raise HTTPException(status_code=404, detail="Room not found")
return {"success": True, "room": room_info}
@robotics_router.delete("/workspaces/{workspace_id}/rooms/{room_id}")
async def delete_room(workspace_id: str, room_id: str):
"""Delete a robotics room"""
if robotics_core.delete_room(workspace_id, room_id):
return {
"success": True,
"message": f"Room {room_id} deleted successfully from workspace {workspace_id}",
}
raise HTTPException(status_code=404, detail="Room not found")
@robotics_router.get("/workspaces/{workspace_id}/rooms/{room_id}/state")
async def get_room_state(workspace_id: str, room_id: str):
"""Get current room state with joints and participants"""
state = robotics_core.get_room_state(workspace_id, room_id)
if "error" in state:
raise HTTPException(status_code=404, detail="Room not found")
return {"success": True, "state": state}
# ============= CONTROL ENDPOINTS =============
@robotics_router.post("/workspaces/{workspace_id}/rooms/{room_id}/command")
async def send_command(workspace_id: str, room_id: str, command: JointCommand):
"""Send joint commands to a room"""
room_info = robotics_core.get_room_info(workspace_id, room_id)
if "error" in room_info:
raise HTTPException(status_code=404, detail="Room not found")
try:
joints_data = [
{"name": j.name, "value": j.value, "speed": j.speed} for j in command.joints
]
changed = await robotics_core.send_command_to_room(
workspace_id, room_id, joints_data
)
except ValueError as e:
raise HTTPException(status_code=404) from e
else:
return {
"success": True,
"workspace_id": workspace_id,
"room_id": room_id,
"joints_updated": changed,
"message": "Commands sent successfully",
}
# ============= UTILITY ENDPOINTS =============
@robotics_router.get("/status")
async def get_status():
"""Get system status"""
stats = robotics_core.get_connection_stats()
cleanup_info = robotics_core.get_cleanup_status()
return {
"service": "robotics",
"status": "active",
"workspaces_count": stats["total_workspaces"],
"rooms_count": stats["total_rooms"],
"connections_count": stats["total_connections"],
"version": "2.0.0",
"supported_roles": [role.value for role in ParticipantRole],
"supported_robot_types": ["so-arm100", "generic"],
"cleanup": {
"enabled": cleanup_info["cleanup_enabled"],
"inactivity_timeout_hours": cleanup_info["inactivity_timeout_minutes"] / 60,
"cleanup_interval_minutes": cleanup_info["cleanup_interval_minutes"],
},
}
@robotics_router.get("/health")
async def health_check():
"""Health check endpoint"""
return {"status": "healthy", "service": "robotics"}
@robotics_router.get("/cleanup/status")
async def get_cleanup_status():
"""Get cleanup system status and room information"""
status = robotics_core.get_cleanup_status()
return {"success": True, "cleanup_status": status}
@robotics_router.post("/cleanup/manual")
async def trigger_manual_cleanup():
"""Manually trigger room cleanup"""
result = await robotics_core.manual_cleanup()
return {"success": True, "cleanup_result": result}
# ============= WEBSOCKET ENDPOINT =============
@robotics_router.websocket("/workspaces/{workspace_id}/rooms/{room_id}/ws")
async def websocket_endpoint(websocket: WebSocket, workspace_id: str, room_id: str):
"""WebSocket connection for real-time room communication"""
await robotics_core.handle_websocket(websocket, workspace_id, room_id)
|