Spaces:
Sleeping
Sleeping
""" | |
Redesigned Gradio UI for Inference Server | |
This module provides a user-friendly, workflow-oriented interface. | |
""" | |
import subprocess | |
import time | |
from pathlib import Path | |
import gradio as gr | |
import httpx | |
# Configuration | |
DEFAULT_SERVER_HOST = "localhost" | |
DEFAULT_SERVER_PORT = 8001 | |
DEFAULT_ARENA_SERVER_URL = "http://localhost:8000" | |
class AIServerManager: | |
"""Manages communication with the AI Server.""" | |
def __init__( | |
self, server_url: str = f"http://{DEFAULT_SERVER_HOST}:{DEFAULT_SERVER_PORT}" | |
): | |
self.server_url = server_url | |
self.server_process: subprocess.Popen | None = None | |
async def check_server_health(self) -> tuple[bool, str]: | |
"""Check if the AI server is running and healthy.""" | |
try: | |
async with httpx.AsyncClient(timeout=5.0) as client: | |
response = await client.get(f"{self.server_url}/health") | |
if response.status_code == 200: | |
data = response.json() | |
return ( | |
True, | |
f"✅ Server running - {data['active_sessions']} sessions active", | |
) | |
return False, f"❌ Server error: {response.status_code}" | |
except Exception as e: | |
return False, f"❌ Server not reachable: {e!s}" | |
def start_server(self) -> str: | |
"""Start the AI server process using uv.""" | |
if self.server_process and self.server_process.poll() is None: | |
return "⚠️ Server is already running" | |
try: | |
cmd = [ | |
"uv", | |
"run", | |
"uvicorn", | |
"inference_server.main:app", | |
"--host", | |
"0.0.0.0", | |
"--port", | |
str(DEFAULT_SERVER_PORT), | |
"--reload", | |
] | |
self.server_process = subprocess.Popen( | |
cmd, | |
cwd=Path(__file__).parent.parent.parent, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.STDOUT, | |
text=True, | |
) | |
time.sleep(4) | |
if self.server_process.poll() is None: | |
return f"🚀 AI Server started on {self.server_url}" | |
return "❌ Failed to start server - check your model path and dependencies" | |
except Exception as e: | |
return f"❌ Error starting server: {e!s}" | |
async def create_and_start_session( | |
self, | |
session_id: str, | |
policy_path: str, | |
camera_names: str, | |
arena_server_url: str, | |
workspace_id: str | None = None, | |
) -> str: | |
"""Create and immediately start an inference session.""" | |
try: | |
# Parse camera names | |
cameras = [name.strip() for name in camera_names.split(",") if name.strip()] | |
if not cameras: | |
cameras = ["front"] | |
request_data = { | |
"session_id": session_id, | |
"policy_path": policy_path, | |
"camera_names": cameras, | |
"arena_server_url": arena_server_url, | |
} | |
if workspace_id and workspace_id.strip(): | |
request_data["workspace_id"] = workspace_id.strip() | |
async with httpx.AsyncClient(timeout=30.0) as client: | |
# Create session | |
response = await client.post( | |
f"{self.server_url}/sessions", json=request_data | |
) | |
if response.status_code != 200: | |
error_detail = response.json().get("detail", "Unknown error") | |
return f"❌ Failed to create session: {error_detail}" | |
data = response.json() | |
# Immediately start inference | |
start_response = await client.post( | |
f"{self.server_url}/sessions/{session_id}/start" | |
) | |
if start_response.status_code != 200: | |
error_detail = start_response.json().get("detail", "Unknown error") | |
return f"⚠️ Session created but failed to start: {error_detail}" | |
return f"""✅ Session '{session_id}' created and started! | |
📡 Connection Details: | |
• Workspace: {data["workspace_id"]} | |
• Camera rooms: {", ".join(f"{k}:{v}" for k, v in data["camera_room_ids"].items())} | |
• Joint input room: {data["joint_input_room_id"]} | |
• Joint output room: {data["joint_output_room_id"]} | |
🤖 Ready for robot control!""" | |
except Exception as e: | |
return f"❌ Error: {e!s}" | |
# Initialize the server manager | |
server_manager = AIServerManager() | |
def create_main_interface() -> gr.Blocks: | |
"""Create the main user-friendly interface.""" | |
with gr.Blocks(title="🤖 Robot AI Control Center", theme=gr.themes.Soft()) as demo: | |
gr.Markdown(""" | |
# 🤖 Robot AI Control Center | |
**Control your robot with AI using ACT (Action Chunking Transformer) models** | |
Follow the steps below to set up real-time AI control for your robot. | |
""") | |
# Step 1: Server Status | |
with gr.Group(): | |
gr.Markdown("## 📡 Step 1: AI Server") | |
with gr.Row(): | |
with gr.Column(scale=2): | |
server_status_display = gr.Textbox( | |
label="Server Status", | |
value="Checking server...", | |
interactive=False, | |
lines=2, | |
) | |
with gr.Column(scale=1): | |
start_server_btn = gr.Button("🚀 Start Server", variant="primary") | |
check_health_btn = gr.Button("🔍 Check Status", variant="secondary") | |
# Step 2: Robot Setup | |
with gr.Group(): | |
gr.Markdown("## 🤖 Step 2: Set Up Robot AI") | |
with gr.Row(): | |
with gr.Column(): | |
session_id_input = gr.Textbox( | |
label="Session Name", | |
placeholder="my-robot-session", | |
value="my-robot-01", | |
) | |
policy_path_input = gr.Textbox( | |
label="AI Model Path", | |
placeholder="./checkpoints/act_so101_beyond", | |
value="./checkpoints/act_so101_beyond", | |
) | |
camera_names_input = gr.Textbox( | |
label="Camera Names (comma-separated)", | |
placeholder="front, wrist, overhead", | |
value="front", | |
) | |
arena_server_url_input = gr.Textbox( | |
label="Arena Server URL", | |
placeholder="http://localhost:8000", | |
value=DEFAULT_ARENA_SERVER_URL, | |
) | |
create_start_btn = gr.Button( | |
"🎯 Create & Start AI Control", variant="primary" | |
) | |
with gr.Column(): | |
setup_result = gr.Textbox( | |
label="Setup Result", | |
lines=10, | |
interactive=False, | |
placeholder="Click 'Create & Start AI Control' to begin...", | |
) | |
# Control buttons | |
with gr.Group(): | |
gr.Markdown("## 🎮 Step 3: Control Session") | |
with gr.Row(): | |
current_session_input = gr.Textbox( | |
label="Session ID", placeholder="Enter session ID" | |
) | |
start_btn = gr.Button("▶️ Start", variant="primary") | |
stop_btn = gr.Button("⏸️ Stop", variant="secondary") | |
status_btn = gr.Button("📊 Status", variant="secondary") | |
session_status_display = gr.Textbox( | |
label="Session Status", lines=8, interactive=False | |
) | |
# Event Handlers | |
def start_server_click(): | |
return server_manager.start_server() | |
async def check_health_click(): | |
_is_healthy, message = await server_manager.check_server_health() | |
return message | |
async def create_start_session_click( | |
session_id, policy_path, camera_names, arena_server_url | |
): | |
result = await server_manager.create_and_start_session( | |
session_id, policy_path, camera_names, arena_server_url | |
) | |
return result, session_id | |
async def control_session(session_id, action): | |
"""Control a session (start/stop).""" | |
if not session_id.strip(): | |
return "⚠️ No session ID provided" | |
try: | |
async with httpx.AsyncClient(timeout=30.0) as client: | |
endpoint = f"/sessions/{session_id}/{action}" | |
response = await client.post( | |
f"{server_manager.server_url}{endpoint}" | |
) | |
if response.status_code == 200: | |
result = response.json() | |
return f"✅ {result['message']}" | |
error_detail = response.json().get("detail", "Unknown error") | |
return f"❌ Failed to {action}: {error_detail}" | |
except Exception as e: | |
return f"❌ Error: {e!s}" | |
async def get_session_status(session_id): | |
"""Get session status.""" | |
if not session_id.strip(): | |
return "⚠️ No session ID provided" | |
try: | |
async with httpx.AsyncClient(timeout=10.0) as client: | |
response = await client.get( | |
f"{server_manager.server_url}/sessions/{session_id}" | |
) | |
if response.status_code == 200: | |
session = response.json() | |
status_emoji = { | |
"running": "🟢", | |
"ready": "🟡", | |
"stopped": "🔴", | |
"initializing": "🟠", | |
}.get(session["status"], "⚪") | |
return f"""{status_emoji} Session: {session_id} | |
Status: {session["status"].upper()} | |
Model: {session["policy_path"]} | |
Cameras: {", ".join(session["camera_names"])} | |
📊 Performance: | |
• Inferences: {session["stats"]["inference_count"]} | |
• Commands sent: {session["stats"]["commands_sent"]} | |
• Queue: {session["stats"]["actions_in_queue"]} actions | |
• Errors: {session["stats"]["errors"]} | |
🔧 Data flow: | |
• Images received: {session["stats"]["images_received"]} | |
• Joint states received: {session["stats"]["joints_received"]}""" | |
return f"❌ Session not found or error: {response.status_code}" | |
except Exception as e: | |
return f"❌ Error: {e!s}" | |
# Connect events | |
start_server_btn.click(start_server_click, outputs=[server_status_display]) | |
check_health_btn.click(check_health_click, outputs=[server_status_display]) | |
create_start_btn.click( | |
create_start_session_click, | |
inputs=[ | |
session_id_input, | |
policy_path_input, | |
camera_names_input, | |
arena_server_url_input, | |
], | |
outputs=[setup_result, current_session_input], | |
) | |
# Session control buttons - create proper async wrappers | |
async def start_session_click(session_id): | |
return await control_session(session_id, "start") | |
async def stop_session_click(session_id): | |
return await control_session(session_id, "stop") | |
start_btn.click( | |
start_session_click, | |
inputs=[current_session_input], | |
outputs=[session_status_display], | |
) | |
stop_btn.click( | |
stop_session_click, | |
inputs=[current_session_input], | |
outputs=[session_status_display], | |
) | |
status_btn.click( | |
get_session_status, | |
inputs=[current_session_input], | |
outputs=[session_status_display], | |
) | |
# Auto-refresh on load | |
demo.load(check_health_click, outputs=[server_status_display]) | |
# Add helpful instructions | |
gr.Markdown(""" | |
--- | |
### 📖 Quick Guide: | |
1. **Start the Server**: Ensure the AI server is running (Step 1) | |
2. **Configure Your Robot**: Enter your model path and camera setup (Step 2) | |
3. **Create Session**: Click "Create & Start AI Control" to begin | |
4. **Monitor & Control**: Use Step 3 to start/stop and monitor your session | |
💡 **Tips**: | |
- Make sure your ACT model path exists before creating a session | |
- Camera names should match your robot's camera configuration | |
- Session will automatically start after creation | |
""") | |
return demo | |
def launch_ui( | |
server_name: str = "localhost", server_port: int = 7860, share: bool = False | |
) -> None: | |
"""Launch the redesigned UI.""" | |
demo = create_main_interface() | |
demo.launch( | |
server_name=server_name, server_port=server_port, share=share, show_error=True | |
) | |
if __name__ == "__main__": | |
launch_ui() | |