--- title: RobotHub Inference Server emoji: ๐Ÿค– colorFrom: blue colorTo: purple sdk: docker app_port: 8001 suggested_hardware: t4-small suggested_storage: medium short_description: Real-time ACT model inference server for robot control tags: - robotics pinned: false fullWidth: true --- # ๐Ÿค– RobotHub Inference Server **AI-Powered Robot Control Engine for Real-time Robotics** The RobotHub Inference Server is the **AI brain** of the RobotHub ecosystem. It's a FastAPI server that processes real-time camera feeds and robot state data to generate precise control commands using transformer models like ACT, Pi0, SmolVLA, and Diffusion Policies. ## ๐Ÿ—๏ธ How It Works in the RobotHub Ecosystem The RobotHub Inference Server is part of a complete robotics control pipeline: ``` โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ” โ”‚ RobotHub โ”‚ โ”‚ RobotHub โ”‚ โ”‚ RobotHub โ”‚ โ”‚ Physical โ”‚ โ”‚ Frontend โ”‚โ”€โ”€โ”€โ–ถโ”‚ TransportServerโ”‚โ”€โ”€โ”€โ–ถโ”‚ InferenceServerโ”‚โ”€โ”€โ”€โ–ถโ”‚ Robot โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ€ข Web Interface โ”‚ โ”‚ โ€ข Video Streams โ”‚ โ”‚ โ€ข AI Models โ”‚ โ”‚ โ€ข USB/Network โ”‚ โ”‚ โ€ข Robot Config โ”‚ โ”‚ โ€ข Joint States โ”‚ โ”‚ โ€ข Real-time โ”‚ โ”‚ โ€ข Joint Control โ”‚ โ”‚ โ€ข Monitoring โ”‚ โ”‚ โ€ข WebRTC/WS โ”‚ โ”‚ โ€ข Inference โ”‚ โ”‚ โ€ข Cameras โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ–ฒ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ”‚ โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜ โ”‚ โ”‚ Status & Control Action Commands ``` ### ๐Ÿ”„ Data Flow 1. **Input Sources** โ†’ TransportServer: - **Camera Feeds**: Real-time video from robot cameras (front, wrist, overhead, etc.) - **Joint States**: Current robot joint positions and velocities - **Robot Configuration**: Joint limits, kinematics, calibration data 2. **TransportServer** โ†’ **Inference Server**: - Streams normalized camera images (RGB, 224x224 or custom resolution) - Sends normalized joint positions (most joints -100 to +100, gripper 0 to +100) - Maintains real-time communication via WebSocket/WebRTC 3. **Inference Server** โ†’ **AI Processing**: - **Vision Processing**: Multi-camera image preprocessing and encoding - **State Encoding**: Joint position normalization and history buffering - **Policy Inference**: Transformer model processes visual + proprioceptive data - **Action Generation**: Outputs sequence of robot joint commands 4. **Output** โ†’ **Robot Execution**: - **Action Chunks**: Sequences of joint commands (ACT outputs 10-100 actions per inference) - **Real-time Control**: 20Hz control loop, 2Hz inference loop - **Safety Monitoring**: Emergency stop, joint limit checking ## ๐Ÿš€ Quick Start The server is primarily a **FastAPI REST API**, but includes an optional **Gradio web interface** for easy debugging and testing without needing to write code or use curl commands. ### Option 1: Server + UI (Recommended for Testing) ```bash # Clone and setup git clone https://github.com/julien-blanchon/RobotHub-InferenceServer cd RobotHub-InferenceServer uv sync # Launch with integrated UI (FastAPI + Gradio on same port) python launch_simple.py ``` **Access Points:** - ๐ŸŽจ **Web Interface**: http://localhost:7860/ (create sessions, monitor performance) - ๐Ÿ“– **API Documentation**: http://localhost:7860/api/docs (REST API reference) - ๐Ÿ” **Health Check**: http://localhost:7860/api/health (system status) ### Option 2: Server Only (Production) ```bash # Launch FastAPI server only (no UI) python -m inference_server.cli --server-only # Or with custom configuration python -m inference_server.cli --server-only --host localhost --port 8080 ``` **Access:** - ๐Ÿ“– **API Only**: http://localhost:7860/api/docs - ๐Ÿ” **Health Check**: http://localhost:7860/api/health ### Option 3: Docker ```bash # Build and run docker build -t robothub-inference-server . docker run -p 7860:7860 \ robothub-inference-server ``` ## ๐Ÿ› ๏ธ Setting Up Your Robot ### 1. **Connect Your Hardware** You need the RobotHub TransportServer running first: ```bash # Start the RobotHub TransportServer (dependency) cd ../RobotHub-TransportServer docker run -p 8000:8000 robothub-transport-server ``` ### 2. **Create an Inference Session** **Via Web Interface (Gradio UI):** 1. Open http://localhost:7860/ 2. Enter your **model path** (e.g., `./checkpoints/act_pick_place_model`) 3. Configure **camera names** (e.g., `front,wrist,overhead`) 4. Set **TransportServer URL** (default: `http://localhost:8000`) 5. Click **"Create & Start AI Control"** **Via REST API:** ```python import httpx session_config = { "session_id": "robot_assembly_task", "policy_path": "./checkpoints/act_assembly_model", "policy_type": "act", # or "pi0", "smolvla", "diffusion" "camera_names": ["front_cam", "wrist_cam"], "transport_server_url": "http://localhost:8000", "language_instruction": "Pick up the red block and place it on the blue platform" # For SmolVLA } async with httpx.AsyncClient() as client: # Create session response = await client.post("http://localhost:7860/api/sessions", json=session_config) # Start inference await client.post(f"http://localhost:7860/api/sessions/{session_config['session_id']}/start") ``` ### 3. **Connect Robot & Cameras** The robot and cameras connect to the **TransportServer**, not directly to the Inference Server: ```python # Example: Connect robot to TransportServer from transport_server_client import RoboticsConsumer, RoboticsProducer from transport_server_client.video import VideoProducer # Robot receives AI commands and executes them joint_consumer = RoboticsConsumer('http://localhost:8000') await joint_consumer.connect(workspace_id, joint_input_room_id) def execute_joint_commands(commands): """Execute commands on your actual robot hardware""" for cmd in commands: joint_name = cmd['name'] position = cmd['value'] # Normalized: most joints -100 to +100, gripper 0 to +100 robot.move_joint(joint_name, position) joint_consumer.on_joint_update(execute_joint_commands) # Robot sends its current state back joint_producer = RoboticsProducer('http://localhost:8000') await joint_producer.connect(workspace_id, joint_output_room_id) # Send current robot state periodically await joint_producer.send_state_sync({ 'shoulder_pan_joint': current_joint_positions[0], 'shoulder_lift_joint': current_joint_positions[1], # ... etc }) # Cameras stream to TransportServer for camera_name, camera_device in cameras.items(): video_producer = VideoProducer('http://localhost:8000') await video_producer.connect(workspace_id, camera_room_ids[camera_name]) await video_producer.start_camera(camera_device) ``` ## ๐ŸŽฎ Supported AI Models ### **ACT (Action Chunking Transformer)** - **Best for**: Complex manipulation tasks requiring temporal coherence - **Output**: Chunks of 10-100 future actions per inference - **Use case**: Pick-and-place, assembly, cooking tasks ### **Pi0 (Vision-Language Policy)** - **Best for**: Tasks requiring language understanding - **Output**: Single actions with language conditioning - **Use case**: "Pick up the red mug", "Open the top drawer" ### **SmolVLA (Small Vision-Language-Action)** - **Best for**: Lightweight vision-language tasks - **Use case**: Simple manipulation with natural language ### **Diffusion Policy** - **Best for**: High-precision continuous control - **Use case**: Precise assembly, drawing, writing ## ๐Ÿ“Š Monitoring & Debugging ### Using the Web Interface The Gradio UI provides real-time monitoring: - **Active Sessions**: View all running inference sessions - **Performance Metrics**: Inference rate, control rate, camera FPS - **Action Queue**: Current action buffer status - **Error Logs**: Real-time error tracking ### Using the REST API ```bash # Check active sessions curl http://localhost:7860/api/sessions # Get detailed session info curl http://localhost:7860/api/sessions/my_robot_session # Stop a session curl -X POST http://localhost:7860/api/sessions/my_robot_session/stop # Emergency stop all sessions curl -X POST http://localhost:7860/api/debug/emergency_stop ``` ## ๐Ÿ”ง Configuration ### Multi-Camera Setup ```python # Configure multiple camera angles session_config = { "camera_names": ["front_cam", "wrist_cam", "overhead_cam", "side_cam"], # Each camera gets its own TransportServer room } ``` ### Custom Joint Mappings The server handles various robot joint naming conventions automatically: - **LeRobot names**: `shoulder_pan_joint`, `shoulder_lift_joint`, `elbow_joint`, etc. - **Custom names**: `base_rotation`, `shoulder_tilt`, `elbow_bend`, etc. - **Alternative names**: `joint_1`, `joint_2`, `base_joint`, etc. See `src/inference_server/models/joint_config.py` for full mapping details. ## ๐Ÿ”Œ Integration Examples ### **Standalone Python Application** ```python import asyncio from transport_server_client import RoboticsProducer, RoboticsConsumer from transport_server_client.video import VideoProducer import httpx class RobotAIController: def __init__(self): self.inference_client = httpx.AsyncClient(base_url="http://localhost:7860/api") self.transport_url = "http://localhost:8000" async def start_ai_control(self, task_description: str): # 1. Create inference session session_config = { "session_id": f"task_{int(time.time())}", "policy_path": "./checkpoints/general_manipulation_act", "policy_type": "act", "camera_names": ["front", "wrist"], "language_instruction": task_description } response = await self.inference_client.post("/sessions", json=session_config) session_data = response.json() # 2. Connect robot to the same workspace/rooms await self.connect_robot_hardware(session_data) # 3. Start AI inference await self.inference_client.post(f"/sessions/{session_config['session_id']}/start") print(f"๐Ÿค– AI control started for task: {task_description}") # Usage controller = RobotAIController() await controller.start_ai_control("Pick up the blue cup and place it on the shelf") ``` ## ๐Ÿšจ Safety & Best Practices - **Emergency Stop**: Built-in emergency stop via API: `/sessions/{id}/stop` - **Joint Limits**: All joint values are normalized (most joints -100 to +100, gripper 0 to +100) - **Hardware Limits**: Robot driver should enforce actual hardware joint limits - **Session Timeouts**: Automatic cleanup prevents runaway processes - **Error Handling**: Graceful degradation when cameras disconnect ## ๐Ÿš€ Deployment ### **Local Development** ```bash # All services on one machine python launch_simple.py # Inference Server with UI ``` ### **Production Setup** ```bash # Server only (no UI) python -m inference_server.cli --server-only --host localhost --port 7860 # Or with Docker docker run -p 7860:7860 robothub-inference-server ```