Spaces:
Sleeping
title: RobotHub Inference Server
emoji: ๐ค
colorFrom: blue
colorTo: purple
sdk: docker
app_port: 8001
suggested_hardware: t4-small
suggested_storage: medium
short_description: Real-time ACT model inference server for robot control
tags:
- robotics
pinned: false
fullWidth: true
๐ค RobotHub Inference Server
AI-Powered Robot Control Engine for Real-time Robotics
The RobotHub Inference Server is the AI brain of the RobotHub ecosystem. It's a FastAPI server that processes real-time camera feeds and robot state data to generate precise control commands using transformer models like ACT, Pi0, SmolVLA, and Diffusion Policies.
๐๏ธ How It Works in the RobotHub Ecosystem
The RobotHub Inference Server is part of a complete robotics control pipeline:
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ RobotHub โ โ RobotHub โ โ RobotHub โ โ Physical โ
โ Frontend โโโโโถโ TransportServerโโโโโถโ InferenceServerโโโโโถโ Robot โ
โ โ โ โ โ โ โ โ
โ โข Web Interface โ โ โข Video Streams โ โ โข AI Models โ โ โข USB/Network โ
โ โข Robot Config โ โ โข Joint States โ โ โข Real-time โ โ โข Joint Control โ
โ โข Monitoring โ โ โข WebRTC/WS โ โ โข Inference โ โ โข Cameras โ
โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ โโโโโโโโโโโโโโโโโโโ
โ โฒ โ โ
โ โ โ โ
โโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโผโโโโโโโโโโโโโโโโโโโโโโโโโ
โ โ
Status & Control Action Commands
๐ Data Flow
Input Sources โ TransportServer:
- Camera Feeds: Real-time video from robot cameras (front, wrist, overhead, etc.)
- Joint States: Current robot joint positions and velocities
- Robot Configuration: Joint limits, kinematics, calibration data
TransportServer โ Inference Server:
- Streams normalized camera images (RGB, 224x224 or custom resolution)
- Sends normalized joint positions (most joints -100 to +100, gripper 0 to +100)
- Maintains real-time communication via WebSocket/WebRTC
Inference Server โ AI Processing:
- Vision Processing: Multi-camera image preprocessing and encoding
- State Encoding: Joint position normalization and history buffering
- Policy Inference: Transformer model processes visual + proprioceptive data
- Action Generation: Outputs sequence of robot joint commands
Output โ Robot Execution:
- Action Chunks: Sequences of joint commands (ACT outputs 10-100 actions per inference)
- Real-time Control: 20Hz control loop, 2Hz inference loop
- Safety Monitoring: Emergency stop, joint limit checking
๐ Quick Start
The server is primarily a FastAPI REST API, but includes an optional Gradio web interface for easy debugging and testing without needing to write code or use curl commands.
Option 1: Server + UI (Recommended for Testing)
# Clone and setup
git clone https://github.com/julien-blanchon/RobotHub-InferenceServer
cd RobotHub-InferenceServer
uv sync
# Launch with integrated UI (FastAPI + Gradio on same port)
python launch_simple.py
Access Points:
- ๐จ Web Interface: http://localhost:7860/ (create sessions, monitor performance)
- ๐ API Documentation: http://localhost:7860/api/docs (REST API reference)
- ๐ Health Check: http://localhost:7860/api/health (system status)
Option 2: Server Only (Production)
# Launch FastAPI server only (no UI)
python -m inference_server.cli --server-only
# Or with custom configuration
python -m inference_server.cli --server-only --host localhost --port 8080
Access:
- ๐ API Only: http://localhost:7860/api/docs
- ๐ Health Check: http://localhost:7860/api/health
Option 3: Docker
# Build and run
docker build -t robothub-inference-server .
docker run -p 7860:7860 \
-v /path/to/your/models:/app/checkpoints \
robothub-inference-server
๐ ๏ธ Setting Up Your Robot
1. Connect Your Hardware
You need the RobotHub TransportServer running first:
# Start the RobotHub TransportServer (dependency)
cd ../RobotHub-TransportServer
docker run -p 8000:8000 robothub-transport-server
2. Create an Inference Session
Via Web Interface (Gradio UI):
- Open http://localhost:7860/
- Enter your model path (e.g.,
./checkpoints/act_pick_place_model
) - Configure camera names (e.g.,
front,wrist,overhead
) - Set TransportServer URL (default:
http://localhost:8000
) - Click "Create & Start AI Control"
Via REST API:
import httpx
session_config = {
"session_id": "robot_assembly_task",
"policy_path": "./checkpoints/act_assembly_model",
"policy_type": "act", # or "pi0", "smolvla", "diffusion"
"camera_names": ["front_cam", "wrist_cam"],
"transport_server_url": "http://localhost:8000",
"language_instruction": "Pick up the red block and place it on the blue platform" # For SmolVLA
}
async with httpx.AsyncClient() as client:
# Create session
response = await client.post("http://localhost:7860/api/sessions", json=session_config)
# Start inference
await client.post(f"http://localhost:7860/api/sessions/{session_config['session_id']}/start")
3. Connect Robot & Cameras
The robot and cameras connect to the TransportServer, not directly to the Inference Server:
# Example: Connect robot to TransportServer
from transport_server_client import RoboticsConsumer, RoboticsProducer
from transport_server_client.video import VideoProducer
# Robot receives AI commands and executes them
joint_consumer = RoboticsConsumer('http://localhost:8000')
await joint_consumer.connect(workspace_id, joint_input_room_id)
def execute_joint_commands(commands):
"""Execute commands on your actual robot hardware"""
for cmd in commands:
joint_name = cmd['name']
position = cmd['value'] # Normalized: most joints -100 to +100, gripper 0 to +100
robot.move_joint(joint_name, position)
joint_consumer.on_joint_update(execute_joint_commands)
# Robot sends its current state back
joint_producer = RoboticsProducer('http://localhost:8000')
await joint_producer.connect(workspace_id, joint_output_room_id)
# Send current robot state periodically
await joint_producer.send_state_sync({
'shoulder_pan_joint': current_joint_positions[0],
'shoulder_lift_joint': current_joint_positions[1],
# ... etc
})
# Cameras stream to TransportServer
for camera_name, camera_device in cameras.items():
video_producer = VideoProducer('http://localhost:8000')
await video_producer.connect(workspace_id, camera_room_ids[camera_name])
await video_producer.start_camera(camera_device)
๐ฎ Supported AI Models
ACT (Action Chunking Transformer)
- Best for: Complex manipulation tasks requiring temporal coherence
- Output: Chunks of 10-100 future actions per inference
- Use case: Pick-and-place, assembly, cooking tasks
Pi0 (Vision-Language Policy)
- Best for: Tasks requiring language understanding
- Output: Single actions with language conditioning
- Use case: "Pick up the red mug", "Open the top drawer"
SmolVLA (Small Vision-Language-Action)
- Best for: Lightweight vision-language tasks
- Use case: Simple manipulation with natural language
Diffusion Policy
- Best for: High-precision continuous control
- Use case: Precise assembly, drawing, writing
๐ Monitoring & Debugging
Using the Web Interface
The Gradio UI provides real-time monitoring:
- Active Sessions: View all running inference sessions
- Performance Metrics: Inference rate, control rate, camera FPS
- Action Queue: Current action buffer status
- Error Logs: Real-time error tracking
Using the REST API
# Check active sessions
curl http://localhost:7860/api/sessions
# Get detailed session info
curl http://localhost:7860/api/sessions/my_robot_session
# Stop a session
curl -X POST http://localhost:7860/api/sessions/my_robot_session/stop
# Emergency stop all sessions
curl -X POST http://localhost:7860/api/debug/emergency_stop
๐ง Configuration
Multi-Camera Setup
# Configure multiple camera angles
session_config = {
"camera_names": ["front_cam", "wrist_cam", "overhead_cam", "side_cam"],
# Each camera gets its own TransportServer room
}
Custom Joint Mappings
The server handles various robot joint naming conventions automatically:
- LeRobot names:
shoulder_pan_joint
,shoulder_lift_joint
,elbow_joint
, etc. - Custom names:
base_rotation
,shoulder_tilt
,elbow_bend
, etc. - Alternative names:
joint_1
,joint_2
,base_joint
, etc.
See src/inference_server/models/joint_config.py
for full mapping details.
๐ Integration Examples
Standalone Python Application
import asyncio
from transport_server_client import RoboticsProducer, RoboticsConsumer
from transport_server_client.video import VideoProducer
import httpx
class RobotAIController:
def __init__(self):
self.inference_client = httpx.AsyncClient(base_url="http://localhost:7860/api")
self.transport_url = "http://localhost:8000"
async def start_ai_control(self, task_description: str):
# 1. Create inference session
session_config = {
"session_id": f"task_{int(time.time())}",
"policy_path": "./checkpoints/general_manipulation_act",
"policy_type": "act",
"camera_names": ["front", "wrist"],
"language_instruction": task_description
}
response = await self.inference_client.post("/sessions", json=session_config)
session_data = response.json()
# 2. Connect robot to the same workspace/rooms
await self.connect_robot_hardware(session_data)
# 3. Start AI inference
await self.inference_client.post(f"/sessions/{session_config['session_id']}/start")
print(f"๐ค AI control started for task: {task_description}")
# Usage
controller = RobotAIController()
await controller.start_ai_control("Pick up the blue cup and place it on the shelf")
๐จ Safety & Best Practices
- Emergency Stop: Built-in emergency stop via API:
/sessions/{id}/stop
- Joint Limits: All joint values are normalized (most joints -100 to +100, gripper 0 to +100)
- Hardware Limits: Robot driver should enforce actual hardware joint limits
- Session Timeouts: Automatic cleanup prevents runaway processes
- Error Handling: Graceful degradation when cameras disconnect
๐ Deployment
Local Development
# All services on one machine
python launch_simple.py # Inference Server with UI
Production Setup
# Server only (no UI)
python -m inference_server.cli --server-only --host localhost --port 7860
# Or with Docker
docker run -p 7860:7860 robothub-inference-server