blanchon's picture
Update
3380376
---
title: RobotHub Inference Server
emoji: 🤖
colorFrom: blue
colorTo: purple
sdk: docker
app_port: 8001
suggested_hardware: t4-small
suggested_storage: medium
short_description: Real-time ACT model inference server for robot control
tags:
- robotics
pinned: false
fullWidth: true
---
# 🤖 RobotHub Inference Server
**AI-Powered Robot Control Engine for Real-time Robotics**
The RobotHub Inference Server is the **AI brain** of the RobotHub ecosystem. It's a FastAPI server that processes real-time camera feeds and robot state data to generate precise control commands using transformer models like ACT, Pi0, SmolVLA, and Diffusion Policies.
## 🏗️ How It Works in the RobotHub Ecosystem
The RobotHub Inference Server is part of a complete robotics control pipeline:
```
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ RobotHub │ │ RobotHub │ │ RobotHub │ │ Physical │
│ Frontend │───▶│ TransportServer│───▶│ InferenceServer│───▶│ Robot │
│ │ │ │ │ │ │ │
│ • Web Interface │ │ • Video Streams │ │ • AI Models │ │ • USB/Network │
│ • Robot Config │ │ • Joint States │ │ • Real-time │ │ • Joint Control │
│ • Monitoring │ │ • WebRTC/WS │ │ • Inference │ │ • Cameras │
└─────────────────┘ └─────────────────┘ └─────────────────┘ └─────────────────┘
│ ▲ │ │
│ │ │ │
└────────────────────────┼────────────────────────┼────────────────────────┘
│ │
Status & Control Action Commands
```
### 🔄 Data Flow
1. **Input Sources** → TransportServer:
- **Camera Feeds**: Real-time video from robot cameras (front, wrist, overhead, etc.)
- **Joint States**: Current robot joint positions and velocities
- **Robot Configuration**: Joint limits, kinematics, calibration data
2. **TransportServer****Inference Server**:
- Streams normalized camera images (RGB, 224x224 or custom resolution)
- Sends normalized joint positions (most joints -100 to +100, gripper 0 to +100)
- Maintains real-time communication via WebSocket/WebRTC
3. **Inference Server****AI Processing**:
- **Vision Processing**: Multi-camera image preprocessing and encoding
- **State Encoding**: Joint position normalization and history buffering
- **Policy Inference**: Transformer model processes visual + proprioceptive data
- **Action Generation**: Outputs sequence of robot joint commands
4. **Output****Robot Execution**:
- **Action Chunks**: Sequences of joint commands (ACT outputs 10-100 actions per inference)
- **Real-time Control**: 20Hz control loop, 2Hz inference loop
- **Safety Monitoring**: Emergency stop, joint limit checking
## 🚀 Quick Start
The server is primarily a **FastAPI REST API**, but includes an optional **Gradio web interface** for easy debugging and testing without needing to write code or use curl commands.
### Option 1: Server + UI (Recommended for Testing)
```bash
# Clone and setup
git clone https://github.com/julien-blanchon/RobotHub-InferenceServer
cd RobotHub-InferenceServer
uv sync
# Launch with integrated UI (FastAPI + Gradio on same port)
python launch_simple.py
```
**Access Points:**
- 🎨 **Web Interface**: http://localhost:7860/ (create sessions, monitor performance)
- 📖 **API Documentation**: http://localhost:7860/api/docs (REST API reference)
- 🔍 **Health Check**: http://localhost:7860/api/health (system status)
### Option 2: Server Only (Production)
```bash
# Launch FastAPI server only (no UI)
python -m inference_server.cli --server-only
# Or with custom configuration
python -m inference_server.cli --server-only --host localhost --port 8080
```
**Access:**
- 📖 **API Only**: http://localhost:7860/api/docs
- 🔍 **Health Check**: http://localhost:7860/api/health
### Option 3: Docker
```bash
# Build and run
docker build -t robothub-inference-server .
docker run -p 7860:7860 \
-v /path/to/your/models:/app/checkpoints \
robothub-inference-server
```
## 🛠️ Setting Up Your Robot
### 1. **Connect Your Hardware**
You need the RobotHub TransportServer running first:
```bash
# Start the RobotHub TransportServer (dependency)
cd ../RobotHub-TransportServer
docker run -p 8000:8000 robothub-transport-server
```
### 2. **Create an Inference Session**
**Via Web Interface (Gradio UI):**
1. Open http://localhost:7860/
2. Enter your **model path** (e.g., `./checkpoints/act_pick_place_model`)
3. Configure **camera names** (e.g., `front,wrist,overhead`)
4. Set **TransportServer URL** (default: `http://localhost:8000`)
5. Click **"Create & Start AI Control"**
**Via REST API:**
```python
import httpx
session_config = {
"session_id": "robot_assembly_task",
"policy_path": "./checkpoints/act_assembly_model",
"policy_type": "act", # or "pi0", "smolvla", "diffusion"
"camera_names": ["front_cam", "wrist_cam"],
"transport_server_url": "http://localhost:8000",
"language_instruction": "Pick up the red block and place it on the blue platform" # For SmolVLA
}
async with httpx.AsyncClient() as client:
# Create session
response = await client.post("http://localhost:7860/api/sessions", json=session_config)
# Start inference
await client.post(f"http://localhost:7860/api/sessions/{session_config['session_id']}/start")
```
### 3. **Connect Robot & Cameras**
The robot and cameras connect to the **TransportServer**, not directly to the Inference Server:
```python
# Example: Connect robot to TransportServer
from transport_server_client import RoboticsConsumer, RoboticsProducer
from transport_server_client.video import VideoProducer
# Robot receives AI commands and executes them
joint_consumer = RoboticsConsumer('http://localhost:8000')
await joint_consumer.connect(workspace_id, joint_input_room_id)
def execute_joint_commands(commands):
"""Execute commands on your actual robot hardware"""
for cmd in commands:
joint_name = cmd['name']
position = cmd['value'] # Normalized: most joints -100 to +100, gripper 0 to +100
robot.move_joint(joint_name, position)
joint_consumer.on_joint_update(execute_joint_commands)
# Robot sends its current state back
joint_producer = RoboticsProducer('http://localhost:8000')
await joint_producer.connect(workspace_id, joint_output_room_id)
# Send current robot state periodically
await joint_producer.send_state_sync({
'shoulder_pan_joint': current_joint_positions[0],
'shoulder_lift_joint': current_joint_positions[1],
# ... etc
})
# Cameras stream to TransportServer
for camera_name, camera_device in cameras.items():
video_producer = VideoProducer('http://localhost:8000')
await video_producer.connect(workspace_id, camera_room_ids[camera_name])
await video_producer.start_camera(camera_device)
```
## 🎮 Supported AI Models
### **ACT (Action Chunking Transformer)**
- **Best for**: Complex manipulation tasks requiring temporal coherence
- **Output**: Chunks of 10-100 future actions per inference
- **Use case**: Pick-and-place, assembly, cooking tasks
### **Pi0 (Vision-Language Policy)**
- **Best for**: Tasks requiring language understanding
- **Output**: Single actions with language conditioning
- **Use case**: "Pick up the red mug", "Open the top drawer"
### **SmolVLA (Small Vision-Language-Action)**
- **Best for**: Lightweight vision-language tasks
- **Use case**: Simple manipulation with natural language
### **Diffusion Policy**
- **Best for**: High-precision continuous control
- **Use case**: Precise assembly, drawing, writing
## 📊 Monitoring & Debugging
### Using the Web Interface
The Gradio UI provides real-time monitoring:
- **Active Sessions**: View all running inference sessions
- **Performance Metrics**: Inference rate, control rate, camera FPS
- **Action Queue**: Current action buffer status
- **Error Logs**: Real-time error tracking
### Using the REST API
```bash
# Check active sessions
curl http://localhost:7860/api/sessions
# Get detailed session info
curl http://localhost:7860/api/sessions/my_robot_session
# Stop a session
curl -X POST http://localhost:7860/api/sessions/my_robot_session/stop
# Emergency stop all sessions
curl -X POST http://localhost:7860/api/debug/emergency_stop
```
## 🔧 Configuration
### Multi-Camera Setup
```python
# Configure multiple camera angles
session_config = {
"camera_names": ["front_cam", "wrist_cam", "overhead_cam", "side_cam"],
# Each camera gets its own TransportServer room
}
```
### Custom Joint Mappings
The server handles various robot joint naming conventions automatically:
- **LeRobot names**: `shoulder_pan_joint`, `shoulder_lift_joint`, `elbow_joint`, etc.
- **Custom names**: `base_rotation`, `shoulder_tilt`, `elbow_bend`, etc.
- **Alternative names**: `joint_1`, `joint_2`, `base_joint`, etc.
See `src/inference_server/models/joint_config.py` for full mapping details.
## 🔌 Integration Examples
### **Standalone Python Application**
```python
import asyncio
from transport_server_client import RoboticsProducer, RoboticsConsumer
from transport_server_client.video import VideoProducer
import httpx
class RobotAIController:
def __init__(self):
self.inference_client = httpx.AsyncClient(base_url="http://localhost:7860/api")
self.transport_url = "http://localhost:8000"
async def start_ai_control(self, task_description: str):
# 1. Create inference session
session_config = {
"session_id": f"task_{int(time.time())}",
"policy_path": "./checkpoints/general_manipulation_act",
"policy_type": "act",
"camera_names": ["front", "wrist"],
"language_instruction": task_description
}
response = await self.inference_client.post("/sessions", json=session_config)
session_data = response.json()
# 2. Connect robot to the same workspace/rooms
await self.connect_robot_hardware(session_data)
# 3. Start AI inference
await self.inference_client.post(f"/sessions/{session_config['session_id']}/start")
print(f"🤖 AI control started for task: {task_description}")
# Usage
controller = RobotAIController()
await controller.start_ai_control("Pick up the blue cup and place it on the shelf")
```
## 🚨 Safety & Best Practices
- **Emergency Stop**: Built-in emergency stop via API: `/sessions/{id}/stop`
- **Joint Limits**: All joint values are normalized (most joints -100 to +100, gripper 0 to +100)
- **Hardware Limits**: Robot driver should enforce actual hardware joint limits
- **Session Timeouts**: Automatic cleanup prevents runaway processes
- **Error Handling**: Graceful degradation when cameras disconnect
## 🚀 Deployment
### **Local Development**
```bash
# All services on one machine
python launch_simple.py # Inference Server with UI
```
### **Production Setup**
```bash
# Server only (no UI)
python -m inference_server.cli --server-only --host localhost --port 7860
# Or with Docker
docker run -p 7860:7860 robothub-inference-server
```