blanchon's picture
Update
3380376
metadata
title: RobotHub Inference Server
emoji: ๐Ÿค–
colorFrom: blue
colorTo: purple
sdk: docker
app_port: 8001
suggested_hardware: t4-small
suggested_storage: medium
short_description: Real-time ACT model inference server for robot control
tags:
  - robotics
pinned: false
fullWidth: true

๐Ÿค– RobotHub Inference Server

AI-Powered Robot Control Engine for Real-time Robotics

The RobotHub Inference Server is the AI brain of the RobotHub ecosystem. It's a FastAPI server that processes real-time camera feeds and robot state data to generate precise control commands using transformer models like ACT, Pi0, SmolVLA, and Diffusion Policies.

๐Ÿ—๏ธ How It Works in the RobotHub Ecosystem

The RobotHub Inference Server is part of a complete robotics control pipeline:

โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”    โ”Œโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
โ”‚  RobotHub       โ”‚    โ”‚  RobotHub       โ”‚    โ”‚  RobotHub       โ”‚    โ”‚  Physical       โ”‚
โ”‚  Frontend       โ”‚โ”€โ”€โ”€โ–ถโ”‚  TransportServerโ”‚โ”€โ”€โ”€โ–ถโ”‚  InferenceServerโ”‚โ”€โ”€โ”€โ–ถโ”‚  Robot          โ”‚
โ”‚                 โ”‚    โ”‚                 โ”‚    โ”‚                 โ”‚    โ”‚                 โ”‚
โ”‚ โ€ข Web Interface โ”‚    โ”‚ โ€ข Video Streams โ”‚    โ”‚ โ€ข AI Models     โ”‚    โ”‚ โ€ข USB/Network   โ”‚
โ”‚ โ€ข Robot Config  โ”‚    โ”‚ โ€ข Joint States  โ”‚    โ”‚ โ€ข Real-time     โ”‚    โ”‚ โ€ข Joint Control โ”‚
โ”‚ โ€ข Monitoring    โ”‚    โ”‚ โ€ข WebRTC/WS     โ”‚    โ”‚ โ€ข Inference     โ”‚    โ”‚ โ€ข Cameras       โ”‚
โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜    โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
        โ”‚                        โ–ฒ                        โ”‚                        โ”‚
        โ”‚                        โ”‚                        โ”‚                        โ”‚
        โ””โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”ผโ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”˜
                                 โ”‚                        โ”‚
                            Status & Control         Action Commands

๐Ÿ”„ Data Flow

  1. Input Sources โ†’ TransportServer:

    • Camera Feeds: Real-time video from robot cameras (front, wrist, overhead, etc.)
    • Joint States: Current robot joint positions and velocities
    • Robot Configuration: Joint limits, kinematics, calibration data
  2. TransportServer โ†’ Inference Server:

    • Streams normalized camera images (RGB, 224x224 or custom resolution)
    • Sends normalized joint positions (most joints -100 to +100, gripper 0 to +100)
    • Maintains real-time communication via WebSocket/WebRTC
  3. Inference Server โ†’ AI Processing:

    • Vision Processing: Multi-camera image preprocessing and encoding
    • State Encoding: Joint position normalization and history buffering
    • Policy Inference: Transformer model processes visual + proprioceptive data
    • Action Generation: Outputs sequence of robot joint commands
  4. Output โ†’ Robot Execution:

    • Action Chunks: Sequences of joint commands (ACT outputs 10-100 actions per inference)
    • Real-time Control: 20Hz control loop, 2Hz inference loop
    • Safety Monitoring: Emergency stop, joint limit checking

๐Ÿš€ Quick Start

The server is primarily a FastAPI REST API, but includes an optional Gradio web interface for easy debugging and testing without needing to write code or use curl commands.

Option 1: Server + UI (Recommended for Testing)

# Clone and setup
git clone https://github.com/julien-blanchon/RobotHub-InferenceServer
cd RobotHub-InferenceServer
uv sync

# Launch with integrated UI (FastAPI + Gradio on same port)
python launch_simple.py

Access Points:

Option 2: Server Only (Production)

# Launch FastAPI server only (no UI)
python -m inference_server.cli --server-only

# Or with custom configuration
python -m inference_server.cli --server-only --host localhost --port 8080

Access:

Option 3: Docker

# Build and run
docker build -t robothub-inference-server .
docker run -p 7860:7860 \
  -v /path/to/your/models:/app/checkpoints \
  robothub-inference-server

๐Ÿ› ๏ธ Setting Up Your Robot

1. Connect Your Hardware

You need the RobotHub TransportServer running first:

# Start the RobotHub TransportServer (dependency)
cd ../RobotHub-TransportServer
docker run -p 8000:8000 robothub-transport-server

2. Create an Inference Session

Via Web Interface (Gradio UI):

  1. Open http://localhost:7860/
  2. Enter your model path (e.g., ./checkpoints/act_pick_place_model)
  3. Configure camera names (e.g., front,wrist,overhead)
  4. Set TransportServer URL (default: http://localhost:8000)
  5. Click "Create & Start AI Control"

Via REST API:

import httpx

session_config = {
    "session_id": "robot_assembly_task",
    "policy_path": "./checkpoints/act_assembly_model",
    "policy_type": "act",  # or "pi0", "smolvla", "diffusion"
    "camera_names": ["front_cam", "wrist_cam"],
    "transport_server_url": "http://localhost:8000",
    "language_instruction": "Pick up the red block and place it on the blue platform"  # For SmolVLA
}

async with httpx.AsyncClient() as client:
    # Create session
    response = await client.post("http://localhost:7860/api/sessions", json=session_config)
    
    # Start inference
    await client.post(f"http://localhost:7860/api/sessions/{session_config['session_id']}/start")

3. Connect Robot & Cameras

The robot and cameras connect to the TransportServer, not directly to the Inference Server:

# Example: Connect robot to TransportServer
from transport_server_client import RoboticsConsumer, RoboticsProducer
from transport_server_client.video import VideoProducer

# Robot receives AI commands and executes them
joint_consumer = RoboticsConsumer('http://localhost:8000')
await joint_consumer.connect(workspace_id, joint_input_room_id)

def execute_joint_commands(commands):
    """Execute commands on your actual robot hardware"""
    for cmd in commands:
        joint_name = cmd['name']
        position = cmd['value']  # Normalized: most joints -100 to +100, gripper 0 to +100
        robot.move_joint(joint_name, position)

joint_consumer.on_joint_update(execute_joint_commands)

# Robot sends its current state back
joint_producer = RoboticsProducer('http://localhost:8000')
await joint_producer.connect(workspace_id, joint_output_room_id)

# Send current robot state periodically
await joint_producer.send_state_sync({
    'shoulder_pan_joint': current_joint_positions[0],
    'shoulder_lift_joint': current_joint_positions[1],
    # ... etc
})

# Cameras stream to TransportServer
for camera_name, camera_device in cameras.items():
    video_producer = VideoProducer('http://localhost:8000')
    await video_producer.connect(workspace_id, camera_room_ids[camera_name])
    await video_producer.start_camera(camera_device)

๐ŸŽฎ Supported AI Models

ACT (Action Chunking Transformer)

  • Best for: Complex manipulation tasks requiring temporal coherence
  • Output: Chunks of 10-100 future actions per inference
  • Use case: Pick-and-place, assembly, cooking tasks

Pi0 (Vision-Language Policy)

  • Best for: Tasks requiring language understanding
  • Output: Single actions with language conditioning
  • Use case: "Pick up the red mug", "Open the top drawer"

SmolVLA (Small Vision-Language-Action)

  • Best for: Lightweight vision-language tasks
  • Use case: Simple manipulation with natural language

Diffusion Policy

  • Best for: High-precision continuous control
  • Use case: Precise assembly, drawing, writing

๐Ÿ“Š Monitoring & Debugging

Using the Web Interface

The Gradio UI provides real-time monitoring:

  • Active Sessions: View all running inference sessions
  • Performance Metrics: Inference rate, control rate, camera FPS
  • Action Queue: Current action buffer status
  • Error Logs: Real-time error tracking

Using the REST API

# Check active sessions
curl http://localhost:7860/api/sessions

# Get detailed session info
curl http://localhost:7860/api/sessions/my_robot_session

# Stop a session
curl -X POST http://localhost:7860/api/sessions/my_robot_session/stop

# Emergency stop all sessions
curl -X POST http://localhost:7860/api/debug/emergency_stop

๐Ÿ”ง Configuration

Multi-Camera Setup

# Configure multiple camera angles
session_config = {
    "camera_names": ["front_cam", "wrist_cam", "overhead_cam", "side_cam"],
    # Each camera gets its own TransportServer room
}

Custom Joint Mappings

The server handles various robot joint naming conventions automatically:

  • LeRobot names: shoulder_pan_joint, shoulder_lift_joint, elbow_joint, etc.
  • Custom names: base_rotation, shoulder_tilt, elbow_bend, etc.
  • Alternative names: joint_1, joint_2, base_joint, etc.

See src/inference_server/models/joint_config.py for full mapping details.

๐Ÿ”Œ Integration Examples

Standalone Python Application

import asyncio
from transport_server_client import RoboticsProducer, RoboticsConsumer  
from transport_server_client.video import VideoProducer
import httpx

class RobotAIController:
    def __init__(self):
        self.inference_client = httpx.AsyncClient(base_url="http://localhost:7860/api")
        self.transport_url = "http://localhost:8000"
        
    async def start_ai_control(self, task_description: str):
        # 1. Create inference session
        session_config = {
            "session_id": f"task_{int(time.time())}",
            "policy_path": "./checkpoints/general_manipulation_act",
            "policy_type": "act",
            "camera_names": ["front", "wrist"],
            "language_instruction": task_description
        }
        
        response = await self.inference_client.post("/sessions", json=session_config)
        session_data = response.json()
        
        # 2. Connect robot to the same workspace/rooms
        await self.connect_robot_hardware(session_data)
        
        # 3. Start AI inference
        await self.inference_client.post(f"/sessions/{session_config['session_id']}/start")
        
        print(f"๐Ÿค– AI control started for task: {task_description}")

# Usage
controller = RobotAIController()
await controller.start_ai_control("Pick up the blue cup and place it on the shelf")

๐Ÿšจ Safety & Best Practices

  • Emergency Stop: Built-in emergency stop via API: /sessions/{id}/stop
  • Joint Limits: All joint values are normalized (most joints -100 to +100, gripper 0 to +100)
  • Hardware Limits: Robot driver should enforce actual hardware joint limits
  • Session Timeouts: Automatic cleanup prevents runaway processes
  • Error Handling: Graceful degradation when cameras disconnect

๐Ÿš€ Deployment

Local Development

# All services on one machine
python launch_simple.py  # Inference Server with UI

Production Setup

# Server only (no UI)
python -m inference_server.cli --server-only --host localhost --port 7860

# Or with Docker
docker run -p 7860:7860 robothub-inference-server