blanchon's picture
Update
63ed3a7
raw
history blame
13.4 kB
"""
Redesigned Gradio UI for Inference Server
This module provides a user-friendly, workflow-oriented interface.
"""
import subprocess
import time
from pathlib import Path
import gradio as gr
import httpx
# Configuration
DEFAULT_SERVER_HOST = "localhost"
DEFAULT_SERVER_PORT = 8001
DEFAULT_ARENA_SERVER_URL = "http://localhost:8000"
class AIServerManager:
"""Manages communication with the AI Server."""
def __init__(
self, server_url: str = f"http://{DEFAULT_SERVER_HOST}:{DEFAULT_SERVER_PORT}"
):
self.server_url = server_url
self.server_process: subprocess.Popen | None = None
async def check_server_health(self) -> tuple[bool, str]:
"""Check if the AI server is running and healthy."""
try:
async with httpx.AsyncClient(timeout=5.0) as client:
response = await client.get(f"{self.server_url}/health")
if response.status_code == 200:
data = response.json()
return (
True,
f"✅ Server running - {data['active_sessions']} sessions active",
)
return False, f"❌ Server error: {response.status_code}"
except Exception as e:
return False, f"❌ Server not reachable: {e!s}"
def start_server(self) -> str:
"""Start the AI server process using uv."""
if self.server_process and self.server_process.poll() is None:
return "⚠️ Server is already running"
try:
cmd = [
"uv",
"run",
"uvicorn",
"inference_server.main:app",
"--host",
"0.0.0.0",
"--port",
str(DEFAULT_SERVER_PORT),
"--reload",
]
self.server_process = subprocess.Popen(
cmd,
cwd=Path(__file__).parent.parent.parent,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
)
time.sleep(4)
if self.server_process.poll() is None:
return f"🚀 AI Server started on {self.server_url}"
return "❌ Failed to start server - check your model path and dependencies"
except Exception as e:
return f"❌ Error starting server: {e!s}"
async def create_and_start_session(
self,
session_id: str,
policy_path: str,
camera_names: str,
arena_server_url: str,
workspace_id: str | None = None,
) -> str:
"""Create and immediately start an inference session."""
try:
# Parse camera names
cameras = [name.strip() for name in camera_names.split(",") if name.strip()]
if not cameras:
cameras = ["front"]
request_data = {
"session_id": session_id,
"policy_path": policy_path,
"camera_names": cameras,
"arena_server_url": arena_server_url,
}
if workspace_id and workspace_id.strip():
request_data["workspace_id"] = workspace_id.strip()
async with httpx.AsyncClient(timeout=30.0) as client:
# Create session
response = await client.post(
f"{self.server_url}/sessions", json=request_data
)
if response.status_code != 200:
error_detail = response.json().get("detail", "Unknown error")
return f"❌ Failed to create session: {error_detail}"
data = response.json()
# Immediately start inference
start_response = await client.post(
f"{self.server_url}/sessions/{session_id}/start"
)
if start_response.status_code != 200:
error_detail = start_response.json().get("detail", "Unknown error")
return f"⚠️ Session created but failed to start: {error_detail}"
return f"""✅ Session '{session_id}' created and started!
📡 Connection Details:
• Workspace: {data["workspace_id"]}
• Camera rooms: {", ".join(f"{k}:{v}" for k, v in data["camera_room_ids"].items())}
• Joint input room: {data["joint_input_room_id"]}
• Joint output room: {data["joint_output_room_id"]}
🤖 Ready for robot control!"""
except Exception as e:
return f"❌ Error: {e!s}"
# Initialize the server manager
server_manager = AIServerManager()
def create_main_interface() -> gr.Blocks:
"""Create the main user-friendly interface."""
with gr.Blocks(title="🤖 Robot AI Control Center", theme=gr.themes.Soft()) as demo:
gr.Markdown("""
# 🤖 Robot AI Control Center
**Control your robot with AI using ACT (Action Chunking Transformer) models**
Follow the steps below to set up real-time AI control for your robot.
""")
# Step 1: Server Status
with gr.Group():
gr.Markdown("## 📡 Step 1: AI Server")
with gr.Row():
with gr.Column(scale=2):
server_status_display = gr.Textbox(
label="Server Status",
value="Checking server...",
interactive=False,
lines=2,
)
with gr.Column(scale=1):
start_server_btn = gr.Button("🚀 Start Server", variant="primary")
check_health_btn = gr.Button("🔍 Check Status", variant="secondary")
# Step 2: Robot Setup
with gr.Group():
gr.Markdown("## 🤖 Step 2: Set Up Robot AI")
with gr.Row():
with gr.Column():
session_id_input = gr.Textbox(
label="Session Name",
placeholder="my-robot-session",
value="my-robot-01",
)
policy_path_input = gr.Textbox(
label="AI Model Path",
placeholder="./checkpoints/act_so101_beyond",
value="./checkpoints/act_so101_beyond",
)
camera_names_input = gr.Textbox(
label="Camera Names (comma-separated)",
placeholder="front, wrist, overhead",
value="front",
)
arena_server_url_input = gr.Textbox(
label="Arena Server URL",
placeholder="http://localhost:8000",
value=DEFAULT_ARENA_SERVER_URL,
)
create_start_btn = gr.Button(
"🎯 Create & Start AI Control", variant="primary"
)
with gr.Column():
setup_result = gr.Textbox(
label="Setup Result",
lines=10,
interactive=False,
placeholder="Click 'Create & Start AI Control' to begin...",
)
# Control buttons
with gr.Group():
gr.Markdown("## 🎮 Step 3: Control Session")
with gr.Row():
current_session_input = gr.Textbox(
label="Session ID", placeholder="Enter session ID"
)
start_btn = gr.Button("▶️ Start", variant="primary")
stop_btn = gr.Button("⏸️ Stop", variant="secondary")
status_btn = gr.Button("📊 Status", variant="secondary")
session_status_display = gr.Textbox(
label="Session Status", lines=8, interactive=False
)
# Event Handlers
def start_server_click():
return server_manager.start_server()
async def check_health_click():
_is_healthy, message = await server_manager.check_server_health()
return message
async def create_start_session_click(
session_id, policy_path, camera_names, arena_server_url
):
result = await server_manager.create_and_start_session(
session_id, policy_path, camera_names, arena_server_url
)
return result, session_id
async def control_session(session_id, action):
"""Control a session (start/stop)."""
if not session_id.strip():
return "⚠️ No session ID provided"
try:
async with httpx.AsyncClient(timeout=30.0) as client:
endpoint = f"/sessions/{session_id}/{action}"
response = await client.post(
f"{server_manager.server_url}{endpoint}"
)
if response.status_code == 200:
result = response.json()
return f"✅ {result['message']}"
error_detail = response.json().get("detail", "Unknown error")
return f"❌ Failed to {action}: {error_detail}"
except Exception as e:
return f"❌ Error: {e!s}"
async def get_session_status(session_id):
"""Get session status."""
if not session_id.strip():
return "⚠️ No session ID provided"
try:
async with httpx.AsyncClient(timeout=10.0) as client:
response = await client.get(
f"{server_manager.server_url}/sessions/{session_id}"
)
if response.status_code == 200:
session = response.json()
status_emoji = {
"running": "🟢",
"ready": "🟡",
"stopped": "🔴",
"initializing": "🟠",
}.get(session["status"], "⚪")
return f"""{status_emoji} Session: {session_id}
Status: {session["status"].upper()}
Model: {session["policy_path"]}
Cameras: {", ".join(session["camera_names"])}
📊 Performance:
• Inferences: {session["stats"]["inference_count"]}
• Commands sent: {session["stats"]["commands_sent"]}
• Queue: {session["stats"]["actions_in_queue"]} actions
• Errors: {session["stats"]["errors"]}
🔧 Data flow:
• Images received: {session["stats"]["images_received"]}
• Joint states received: {session["stats"]["joints_received"]}"""
return f"❌ Session not found or error: {response.status_code}"
except Exception as e:
return f"❌ Error: {e!s}"
# Connect events
start_server_btn.click(start_server_click, outputs=[server_status_display])
check_health_btn.click(check_health_click, outputs=[server_status_display])
create_start_btn.click(
create_start_session_click,
inputs=[
session_id_input,
policy_path_input,
camera_names_input,
arena_server_url_input,
],
outputs=[setup_result, current_session_input],
)
# Session control buttons - create proper async wrappers
async def start_session_click(session_id):
return await control_session(session_id, "start")
async def stop_session_click(session_id):
return await control_session(session_id, "stop")
start_btn.click(
start_session_click,
inputs=[current_session_input],
outputs=[session_status_display],
)
stop_btn.click(
stop_session_click,
inputs=[current_session_input],
outputs=[session_status_display],
)
status_btn.click(
get_session_status,
inputs=[current_session_input],
outputs=[session_status_display],
)
# Auto-refresh on load
demo.load(check_health_click, outputs=[server_status_display])
# Add helpful instructions
gr.Markdown("""
---
### 📖 Quick Guide:
1. **Start the Server**: Ensure the AI server is running (Step 1)
2. **Configure Your Robot**: Enter your model path and camera setup (Step 2)
3. **Create Session**: Click "Create & Start AI Control" to begin
4. **Monitor & Control**: Use Step 3 to start/stop and monitor your session
💡 **Tips**:
- Make sure your ACT model path exists before creating a session
- Camera names should match your robot's camera configuration
- Session will automatically start after creation
""")
return demo
def launch_ui(
server_name: str = "localhost", server_port: int = 7860, share: bool = False
) -> None:
"""Launch the redesigned UI."""
demo = create_main_interface()
demo.launch(
server_name=server_name, server_port=server_port, share=share, show_error=True
)
if __name__ == "__main__":
launch_ui()