Spaces:
Running
Running
/** | |
* Producer client for video streaming in LeRobot Arena | |
*/ | |
import { VideoClientCore } from './core.js'; | |
import type { | |
WebSocketMessage, | |
VideoConfigUpdateMessage, | |
StreamStartedMessage, | |
StreamStoppedMessage, | |
StatusUpdateMessage, | |
StreamStatsMessage, | |
ClientOptions, | |
VideoConfig, | |
WebRTCAnswerMessage, | |
WebRTCIceMessage, | |
} from './types.js'; | |
export class VideoProducer extends VideoClientCore { | |
// Multiple peer connections - one per consumer | |
private consumerConnections: Map<string, RTCPeerConnection> = new Map(); | |
constructor(baseUrl = 'http://localhost:8000', options: ClientOptions = {}) { | |
super(baseUrl, options); | |
} | |
// ============= PRODUCER CONNECTION ============= | |
async connect(workspaceId: string, roomId: string, participantId?: string): Promise<boolean> { | |
const success = await this.connectToRoom(workspaceId, roomId, 'producer', participantId); | |
if (success) { | |
// Listen for consumer join events to initiate WebRTC | |
this.on('consumer_joined', (consumerId: string) => { | |
console.info(`π― Consumer ${consumerId} joined, initiating WebRTC...`); | |
this.initiateWebRTCWithConsumer(consumerId); | |
}); | |
// Also check for existing consumers and initiate connections after a delay | |
setTimeout(() => this.connectToExistingConsumers(), 1000); | |
} | |
return success; | |
} | |
private async connectToExistingConsumers(): Promise<void> { | |
if (!this.workspaceId || !this.roomId) return; | |
try { | |
const roomInfo = await this.getRoomInfo(this.workspaceId, this.roomId); | |
for (const consumerId of roomInfo.participants.consumers) { | |
if (!this.consumerConnections.has(consumerId)) { | |
console.info(`π Connecting to existing consumer ${consumerId}`); | |
await this.initiateWebRTCWithConsumer(consumerId); | |
} | |
} | |
} catch (error) { | |
console.error('Failed to connect to existing consumers:', error); | |
} | |
} | |
private createPeerConnectionForConsumer(consumerId: string): RTCPeerConnection { | |
const config: RTCConfiguration = { | |
iceServers: this.webrtcConfig.iceServers || [ | |
{ urls: 'stun:stun.l.google.com:19302' } | |
] | |
}; | |
const peerConnection = new RTCPeerConnection(config); | |
// Add local stream tracks to this connection | |
if (this.localStream) { | |
this.localStream.getTracks().forEach(track => { | |
peerConnection.addTrack(track, this.localStream!); | |
}); | |
} | |
// Connection state changes | |
peerConnection.onconnectionstatechange = () => { | |
const state = peerConnection.connectionState; | |
console.info(`π WebRTC connection state for ${consumerId}: ${state}`); | |
if (state === 'failed' || state === 'disconnected') { | |
console.warn(`Connection to ${consumerId} failed, attempting restart...`); | |
setTimeout(() => this.restartConnectionToConsumer(consumerId), 2000); | |
} | |
}; | |
// ICE connection state | |
peerConnection.oniceconnectionstatechange = () => { | |
const state = peerConnection.iceConnectionState; | |
console.info(`π§ ICE connection state for ${consumerId}: ${state}`); | |
}; | |
// ICE candidate handling for this specific consumer | |
peerConnection.onicecandidate = (event) => { | |
if (event.candidate && this.workspaceId && this.roomId && this.participantId) { | |
this.sendWebRTCSignal(this.workspaceId, this.roomId, this.participantId, { | |
type: 'ice', | |
candidate: event.candidate.toJSON(), | |
target_consumer: consumerId, | |
} as Record<string, unknown>); | |
} | |
}; | |
// Store the connection | |
this.consumerConnections.set(consumerId, peerConnection); | |
return peerConnection; | |
} | |
private async restartConnectionToConsumer(consumerId: string): Promise<void> { | |
console.info(`π Restarting connection to consumer ${consumerId}`); | |
await this.initiateWebRTCWithConsumer(consumerId); | |
} | |
private handleConsumerLeft(consumerId: string): void { | |
const peerConnection = this.consumerConnections.get(consumerId); | |
if (peerConnection) { | |
peerConnection.close(); | |
this.consumerConnections.delete(consumerId); | |
console.info(`π§Ή Cleaned up peer connection for consumer ${consumerId}`); | |
} | |
} | |
private async restartConnectionsWithNewStream(stream: MediaStream): Promise<void> { | |
console.info('π Restarting connections with new stream...'); | |
// Close all existing connections | |
for (const [consumerId, peerConnection] of this.consumerConnections) { | |
peerConnection.close(); | |
console.info(`π§Ή Closed existing connection to consumer ${consumerId}`); | |
} | |
this.consumerConnections.clear(); | |
// Get current consumers and restart connections | |
try { | |
if (this.workspaceId && this.roomId) { | |
const roomInfo = await this.getRoomInfo(this.workspaceId, this.roomId); | |
for (const consumerId of roomInfo.participants.consumers) { | |
console.info(`π Creating new connection to consumer ${consumerId}...`); | |
await this.initiateWebRTCWithConsumer(consumerId); | |
} | |
} | |
} catch (error) { | |
console.error('Failed to restart connections:', error); | |
} | |
} | |
// ============= PRODUCER METHODS ============= | |
async startCamera(constraints?: MediaStreamConstraints): Promise<MediaStream> { | |
if (!this.connected) { | |
throw new Error('Must be connected to start camera'); | |
} | |
const stream = await this.startProducing(constraints); | |
// Store the stream and restart connections with new tracks | |
this.localStream = stream; | |
await this.restartConnectionsWithNewStream(stream); | |
// Notify about stream start | |
this.notifyStreamStarted(stream); | |
return stream; | |
} | |
override async startScreenShare(): Promise<MediaStream> { | |
if (!this.connected) { | |
throw new Error('Must be connected to start screen share'); | |
} | |
const stream = await super.startScreenShare(); | |
// Store the stream and restart connections with new tracks | |
this.localStream = stream; | |
await this.restartConnectionsWithNewStream(stream); | |
// Notify about stream start | |
this.notifyStreamStarted(stream); | |
return stream; | |
} | |
async stopStreaming(): Promise<void> { | |
if (!this.connected || !this.websocket) { | |
throw new Error('Must be connected to stop streaming'); | |
} | |
// Close all consumer connections | |
for (const [consumerId, peerConnection] of this.consumerConnections) { | |
peerConnection.close(); | |
console.info(`π§Ή Closed connection to consumer ${consumerId}`); | |
} | |
this.consumerConnections.clear(); | |
// Stop local stream | |
this.stopProducing(); | |
// Notify about stream stop | |
this.notifyStreamStopped(); | |
} | |
async updateVideoConfig(config: VideoConfig): Promise<void> { | |
if (!this.connected || !this.websocket) { | |
throw new Error('Must be connected to update video config'); | |
} | |
const message: VideoConfigUpdateMessage = { | |
type: 'video_config_update', | |
config, | |
timestamp: new Date().toISOString(), | |
}; | |
this.websocket.send(JSON.stringify(message)); | |
} | |
async sendEmergencyStop(reason = 'Emergency stop'): Promise<void> { | |
if (!this.connected || !this.websocket) { | |
throw new Error('Must be connected to send emergency stop'); | |
} | |
const message = { | |
type: 'emergency_stop' as const, | |
reason, | |
timestamp: new Date().toISOString(), | |
}; | |
this.websocket.send(JSON.stringify(message)); | |
} | |
// ============= WEBRTC NEGOTIATION ============= | |
async initiateWebRTCWithConsumer(consumerId: string): Promise<void> { | |
if (!this.workspaceId || !this.roomId || !this.participantId) { | |
console.warn('WebRTC not ready, skipping negotiation with consumer'); | |
return; | |
} | |
// Clean up existing connection if any | |
if (this.consumerConnections.has(consumerId)) { | |
const existingConn = this.consumerConnections.get(consumerId); | |
existingConn?.close(); | |
this.consumerConnections.delete(consumerId); | |
} | |
try { | |
console.info(`π Creating WebRTC offer for consumer ${consumerId}...`); | |
// Create a new peer connection specifically for this consumer | |
const peerConnection = this.createPeerConnectionForConsumer(consumerId); | |
// Create offer with this consumer's peer connection | |
const offer = await peerConnection.createOffer(); | |
await peerConnection.setLocalDescription(offer); | |
console.info(`π€ Sending WebRTC offer to consumer ${consumerId}...`); | |
// Send offer to server/consumer | |
await this.sendWebRTCSignal(this.workspaceId, this.roomId, this.participantId, { | |
type: 'offer', | |
sdp: offer.sdp, | |
target_consumer: consumerId, | |
} as Record<string, unknown>); | |
console.info(`β WebRTC offer sent to consumer ${consumerId}`); | |
} catch (error) { | |
console.error(`Failed to initiate WebRTC with consumer ${consumerId}:`, error); | |
} | |
} | |
private async handleWebRTCAnswer(message: WebRTCAnswerMessage): Promise<void> { | |
try { | |
const consumerId = message.from_consumer; | |
console.info(`π₯ Received WebRTC answer from consumer ${consumerId}`); | |
const peerConnection = this.consumerConnections.get(consumerId); | |
if (!peerConnection) { | |
console.warn(`No peer connection found for consumer ${consumerId}`); | |
return; | |
} | |
// Set remote description on the correct peer connection | |
const answer = new RTCSessionDescription({ | |
type: 'answer', | |
sdp: message.answer.sdp | |
}); | |
await peerConnection.setRemoteDescription(answer); | |
console.info(`β WebRTC negotiation completed with consumer ${consumerId}`); | |
} catch (error) { | |
console.error(`Failed to handle WebRTC answer from ${message.from_consumer}:`, error); | |
this.handleError(`Failed to handle WebRTC answer: ${error}`); | |
} | |
} | |
private async handleWebRTCIce(message: WebRTCIceMessage): Promise<void> { | |
try { | |
const consumerId = message.from_consumer; | |
if (!consumerId) { | |
console.warn('No consumer ID in ICE message'); | |
return; | |
} | |
const peerConnection = this.consumerConnections.get(consumerId); | |
if (!peerConnection) { | |
console.warn(`No peer connection found for consumer ${consumerId}`); | |
return; | |
} | |
console.info(`π₯ Received WebRTC ICE from consumer ${consumerId}`); | |
// Add ICE candidate to the correct peer connection | |
const candidate = new RTCIceCandidate(message.candidate); | |
await peerConnection.addIceCandidate(candidate); | |
console.info(`β WebRTC ICE handled with consumer ${consumerId}`); | |
} catch (error) { | |
console.error(`Failed to handle WebRTC ICE from ${message.from_consumer}:`, error); | |
this.handleError(`Failed to handle WebRTC ICE: ${error}`); | |
} | |
} | |
// ============= MESSAGE HANDLING ============= | |
protected override handleRoleSpecificMessage(message: WebSocketMessage): void { | |
switch (message.type) { | |
case 'participant_joined': | |
// Check if this is a consumer joining | |
if (message.role === 'consumer' && message.participant_id !== this.participantId) { | |
console.info(`π― Consumer ${message.participant_id} joined room`); | |
this.emit('consumer_joined', message.participant_id); | |
} | |
break; | |
case 'participant_left': | |
// Check if this is a consumer leaving | |
if (message.role === 'consumer') { | |
console.info(`π Consumer ${message.participant_id} left room`); | |
this.handleConsumerLeft(message.participant_id); | |
} | |
break; | |
case 'webrtc_answer': | |
this.handleWebRTCAnswer(message as WebRTCAnswerMessage); | |
break; | |
case 'webrtc_ice': | |
this.handleWebRTCIce(message as WebRTCIceMessage); | |
break; | |
case 'status_update': | |
this.handleStatusUpdate(message as StatusUpdateMessage); | |
break; | |
case 'stream_stats': | |
this.handleStreamStats(message as StreamStatsMessage); | |
break; | |
case 'emergency_stop': | |
console.warn(`π¨ Emergency stop: ${message.reason || 'Unknown reason'}`); | |
this.handleError(`Emergency stop: ${message.reason || 'Unknown reason'}`); | |
break; | |
case 'error': | |
console.error(`Server error: ${message.message}`); | |
this.handleError(message.message); | |
break; | |
default: | |
console.warn(`Unknown message type for producer: ${message.type}`); | |
} | |
} | |
private handleStatusUpdate(message: StatusUpdateMessage): void { | |
console.info(`π Status update: ${message.status}`, message.data); | |
this.emit('statusUpdate', message.status, message.data); | |
} | |
private handleStreamStats(message: StreamStatsMessage): void { | |
console.debug(`π Stream stats:`, message.stats); | |
this.emit('streamStats', message.stats); | |
} | |
// ============= UTILITY METHODS ============= | |
private async notifyStreamStarted(stream: MediaStream): Promise<void> { | |
if (!this.websocket) return; | |
const message: StreamStartedMessage = { | |
type: 'stream_started', | |
config: { | |
resolution: this.webrtcConfig.resolution, | |
framerate: this.webrtcConfig.framerate, | |
bitrate: this.webrtcConfig.bitrate, | |
}, | |
participant_id: this.participantId!, | |
timestamp: new Date().toISOString(), | |
}; | |
this.websocket.send(JSON.stringify(message)); | |
this.emit('streamStarted', stream); | |
} | |
private async notifyStreamStopped(): Promise<void> { | |
if (!this.websocket) return; | |
const message: StreamStoppedMessage = { | |
type: 'stream_stopped', | |
participant_id: this.participantId!, | |
timestamp: new Date().toISOString(), | |
}; | |
this.websocket.send(JSON.stringify(message)); | |
this.emit('streamStopped'); | |
} | |
/** | |
* Create a room and automatically connect as producer | |
*/ | |
static async createAndConnect( | |
baseUrl = 'http://localhost:8000', | |
workspaceId?: string, | |
roomId?: string, | |
participantId?: string | |
): Promise<VideoProducer> { | |
const producer = new VideoProducer(baseUrl); | |
const roomData = await producer.createRoom(workspaceId, roomId); | |
const connected = await producer.connect(roomData.workspaceId, roomData.roomId, participantId); | |
if (!connected) { | |
throw new Error('Failed to connect as video producer'); | |
} | |
return producer; | |
} | |
/** | |
* Get the current room ID (useful when auto-created) | |
*/ | |
get currentRoomId(): string | null { | |
return this.roomId; | |
} | |
} |