LeRobot-Arena / src /lib /robot /drivers /RemoteServerSlave.ts
blanchon's picture
Mostly UI Update
18b0fa5
import type {
SlaveDriver,
DriverJointState,
ConnectionStatus,
RobotCommand,
RemoteServerSlaveConfig,
StateUpdateCallback,
StatusChangeCallback,
UnsubscribeFn
} from "$lib/types/robotDriver";
import { getWebSocketConfig, getCommunicationConfig } from "$lib/configs/performanceConfig";
/**
* Remote Server Slave Driver
* Connects to FastAPI WebSocket server as a slave to receive and execute commands
*/
export class RemoteServerSlave implements SlaveDriver {
readonly type = "slave" as const;
readonly id: string;
readonly name: string;
private _status: ConnectionStatus = { isConnected: false };
private config: RemoteServerSlaveConfig;
// Joint states
private jointStates: DriverJointState[] = [];
// WebSocket connection
private ws?: WebSocket;
private reconnectAttempts = 0;
private maxReconnectAttempts = getWebSocketConfig().MAX_RECONNECT_ATTEMPTS;
private reconnectDelay = getWebSocketConfig().INITIAL_RECONNECT_DELAY_MS;
private heartbeatInterval?: number;
// Event callbacks
private stateCallbacks: StateUpdateCallback[] = [];
private statusCallbacks: StatusChangeCallback[] = [];
constructor(config: RemoteServerSlaveConfig, initialJointStates: DriverJointState[]) {
this.config = config;
this.id = `remote-slave-${config.robotId}-${Date.now()}`;
this.name = `Remote Server Slave (${config.robotId})`;
// Initialize joint states
this.jointStates = initialJointStates.map((state) => ({ ...state }));
console.log(
`Created RemoteServerSlave for robot ${config.robotId} with ${this.jointStates.length} joints`
);
}
get status(): ConnectionStatus {
return this._status;
}
async connect(): Promise<void> {
console.log(`Connecting ${this.name} to ${this.config.url}...`);
try {
// Build WebSocket URL
const wsUrl = this.buildWebSocketUrl();
// Create WebSocket connection
this.ws = new WebSocket(wsUrl);
// Set up event handlers
this.setupWebSocketHandlers();
// Wait for connection
await this.waitForConnection();
this._status = {
isConnected: true,
lastConnected: new Date(),
error: undefined
};
this.notifyStatusChange();
console.log(`${this.name} connected successfully`);
// Send initial status
await this.sendStatusUpdate();
} catch (error) {
this._status = {
isConnected: false,
error: `Connection failed: ${error}`
};
this.notifyStatusChange();
throw error;
}
}
async disconnect(): Promise<void> {
console.log(`Disconnecting ${this.name}...`);
this.stopHeartbeat();
if (this.ws) {
this.ws.close();
this.ws = undefined;
}
this._status = { isConnected: false };
this.notifyStatusChange();
console.log(`${this.name} disconnected`);
}
async executeCommand(command: RobotCommand): Promise<void> {
if (!this._status.isConnected) {
throw new Error("Cannot execute command: slave not connected");
}
console.log(`RemoteServerSlave executing command with ${command.joints.length} joint updates`);
try {
// Update joint states (simulate execution)
for (const jointUpdate of command.joints) {
const joint = this.jointStates.find((j) => j.name === jointUpdate.name);
if (joint) {
joint.virtualValue = jointUpdate.value;
joint.realValue = jointUpdate.value; // For remote slaves, assume perfect execution
}
}
// Notify state update
this.notifyStateUpdate();
// Send joint states to server (so master can receive updates)
await this.sendJointStates();
// Send status update to server
await this.sendStatusUpdate();
} catch (error) {
console.error("Error executing command:", error);
await this.sendError(`Command execution failed: ${error}`);
throw error;
}
}
async executeCommands(commands: RobotCommand[]): Promise<void> {
console.log(`RemoteServerSlave executing batch of ${commands.length} commands`);
for (const command of commands) {
await this.executeCommand(command);
// Use optimized delay between commands
if (commands.length > 1) {
await new Promise((resolve) =>
setTimeout(resolve, getCommunicationConfig().BATCH_COMMAND_DELAY_MS)
);
}
}
}
async readJointStates(): Promise<DriverJointState[]> {
return [...this.jointStates];
}
async writeJointState(jointName: string, value: number): Promise<void> {
const command: RobotCommand = {
timestamp: Date.now(),
joints: [{ name: jointName, value }]
};
await this.executeCommand(command);
}
async writeJointStates(updates: { jointName: string; value: number }[]): Promise<void> {
const command: RobotCommand = {
timestamp: Date.now(),
joints: updates.map((update) => ({ name: update.jointName, value: update.value }))
};
await this.executeCommand(command);
}
// Event subscription methods
onStateUpdate(callback: StateUpdateCallback): UnsubscribeFn {
this.stateCallbacks.push(callback);
return () => {
const index = this.stateCallbacks.indexOf(callback);
if (index >= 0) {
this.stateCallbacks.splice(index, 1);
}
};
}
onStatusChange(callback: StatusChangeCallback): UnsubscribeFn {
this.statusCallbacks.push(callback);
return () => {
const index = this.statusCallbacks.indexOf(callback);
if (index >= 0) {
this.statusCallbacks.splice(index, 1);
}
};
}
// Private methods
private buildWebSocketUrl(): string {
const baseUrl = this.config.url.replace(/^http/, "ws");
return `${baseUrl}/ws/slave/${this.config.robotId}`;
}
private setupWebSocketHandlers(): void {
if (!this.ws) return;
this.ws.onopen = () => {
console.log(`WebSocket connected for slave ${this.config.robotId}`);
this.reconnectAttempts = 0; // Reset on successful connection
this.startHeartbeat();
};
this.ws.onmessage = (event) => {
try {
const message = JSON.parse(event.data);
this.handleServerMessage(message);
} catch (error) {
console.error("Failed to parse WebSocket message:", error);
}
};
this.ws.onclose = (event) => {
console.log(`WebSocket closed for slave ${this.config.robotId}:`, event.code, event.reason);
this.handleDisconnection();
};
this.ws.onerror = (error) => {
console.error(`WebSocket error for slave ${this.config.robotId}:`, error);
this._status = {
isConnected: false,
error: `WebSocket error: ${error}`
};
this.notifyStatusChange();
};
}
private async waitForConnection(): Promise<void> {
if (!this.ws) throw new Error("WebSocket not created");
return new Promise((resolve, reject) => {
const timeout = setTimeout(() => {
reject(new Error("Connection timeout"));
}, getWebSocketConfig().CONNECTION_TIMEOUT_MS);
if (this.ws!.readyState === WebSocket.OPEN) {
clearTimeout(timeout);
resolve();
return;
}
this.ws!.onopen = () => {
clearTimeout(timeout);
resolve();
};
this.ws!.onerror = (error) => {
clearTimeout(timeout);
reject(error);
};
});
}
private async sendMessage(message: Record<string, unknown>): Promise<void> {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
throw new Error("WebSocket not connected");
}
this.ws.send(JSON.stringify(message));
}
private handleServerMessage(message: Record<string, unknown>): void {
const { type, data } = message;
switch (type) {
case "execute_command":
if (data) {
this.executeCommand(data as RobotCommand).catch((error) => {
console.error("Failed to execute command from server:", error);
});
}
break;
case "execute_sequence":
if (data && typeof data === "object" && "commands" in data) {
const sequence = data as { commands: RobotCommand[] };
this.executeCommands(sequence.commands).catch((error) => {
console.error("Failed to execute sequence from server:", error);
});
}
break;
case "play_sequence":
if (data && typeof data === "object" && "commands" in data) {
console.log(`Playing sequence from server on remote slave ${this.config.robotId}`);
const sequence = data as { commands: RobotCommand[] };
this.executeCommands(sequence.commands).catch((error) => {
console.error("Failed to play sequence from server:", error);
});
}
break;
case "stop_sequence":
console.log(`Stopping sequences on remote slave ${this.config.robotId}`);
// For a simple slave, we don't track running sequences, so just log
break;
case "status_request":
this.sendStatusUpdate().catch((error) => {
console.error("Failed to send status update:", error);
});
break;
default:
console.warn(`Unknown message type from server: ${type}`);
}
}
private handleDisconnection(): void {
this._status = { isConnected: false };
this.notifyStatusChange();
this.stopHeartbeat();
// Attempt reconnection if not manually disconnected
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.attemptReconnection();
}
}
private async attemptReconnection(): Promise<void> {
this.reconnectAttempts++;
const maxDelay = getWebSocketConfig().MAX_RECONNECT_DELAY_MS;
const delay = Math.min(
this.reconnectDelay * Math.pow(1.5, this.reconnectAttempts - 1),
maxDelay
);
console.log(
`Attempting slave reconnection ${this.reconnectAttempts}/${this.maxReconnectAttempts} in ${delay}ms...`
);
setTimeout(async () => {
try {
await this.connect();
} catch (error) {
console.error(`Slave reconnection attempt ${this.reconnectAttempts} failed:`, error);
}
}, delay);
}
private startHeartbeat(): void {
this.heartbeatInterval = setInterval(async () => {
if (this._status.isConnected && this.ws) {
try {
await this.sendStatusUpdate();
} catch (error) {
console.error("Failed to send heartbeat status:", error);
}
}
}, getWebSocketConfig().HEARTBEAT_INTERVAL_MS);
}
private stopHeartbeat(): void {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = undefined;
}
}
private async sendStatusUpdate(): Promise<void> {
if (!this._status.isConnected) return;
try {
await this.sendMessage({
type: "status_update",
timestamp: new Date().toISOString(),
data: {
slave_id: this.id,
robot_id: this.config.robotId,
is_connected: this._status.isConnected,
joint_count: this.jointStates.length
}
});
} catch (error) {
console.error("Failed to send status update:", error);
}
}
private async sendJointStates(): Promise<void> {
if (!this._status.isConnected) return;
try {
await this.sendMessage({
type: "joint_states",
timestamp: new Date().toISOString(),
data: {
slave_id: this.id,
robot_id: this.config.robotId,
joints: this.jointStates.map((joint) => ({
name: joint.name,
virtual_value: joint.virtualValue,
real_value: joint.realValue
}))
}
});
} catch (error) {
console.error("Failed to send joint states:", error);
}
}
private async sendError(errorMessage: string): Promise<void> {
if (!this._status.isConnected) return;
try {
await this.sendMessage({
type: "error",
timestamp: new Date().toISOString(),
data: {
slave_id: this.id,
robot_id: this.config.robotId,
error: errorMessage
}
});
} catch (error) {
console.error("Failed to send error message:", error);
}
}
private notifyStateUpdate(): void {
this.stateCallbacks.forEach((callback) => {
try {
callback([...this.jointStates]);
} catch (error) {
console.error("Error in state update callback:", error);
}
});
}
private notifyStatusChange(): void {
this.statusCallbacks.forEach((callback) => {
try {
callback(this._status);
} catch (error) {
console.error("Error in status change callback:", error);
}
});
}
}