Spaces:
Sleeping
Sleeping
/** | |
* Integration tests for producer-consumer interactions - equivalent to Python's test_integration.py | |
*/ | |
import { test, expect, describe, beforeEach, afterEach } from "bun:test"; | |
import { robotics } from "../src/index"; | |
import { TEST_SERVER_URL, TestRoomManager, MessageCollector, sleep } from "./setup"; | |
const { RoboticsProducer, createConsumerClient, createProducerClient } = robotics; | |
describe("Integration", () => { | |
let roomManager: TestRoomManager; | |
beforeEach(() => { | |
roomManager = new TestRoomManager(); | |
}); | |
afterEach(async () => { | |
// Cleanup handled by individual tests | |
}); | |
test("full producer consumer workflow", async () => { | |
// Create producer and room | |
const producer = await createProducerClient(TEST_SERVER_URL); | |
const producerInfo = producer.getConnectionInfo(); | |
const workspaceId = producerInfo.workspace_id!; | |
const roomId = producerInfo.room_id!; | |
roomManager.addRoom(workspaceId, roomId); | |
try { | |
// Create consumer and connect to same room | |
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); | |
try { | |
// Set up consumer to collect messages | |
const stateCollector = new MessageCollector<Record<string, number>>(1); | |
const updateCollector = new MessageCollector(4); | |
const errorCollector = new MessageCollector<string>(1); | |
consumer.onStateSync(stateCollector.collect); | |
consumer.onJointUpdate(updateCollector.collect); | |
consumer.onError(errorCollector.collect); | |
// Wait for connections to stabilize | |
await sleep(200); | |
// Producer sends initial state | |
const initialState = { shoulder: 0.0, elbow: 0.0, wrist: 0.0 }; | |
await producer.sendStateSync(initialState); | |
await sleep(100); | |
// Producer sends series of joint updates | |
const jointSequences = [ | |
[{ name: "shoulder", value: 45.0 }], | |
[{ name: "elbow", value: -30.0 }], | |
[{ name: "wrist", value: 15.0 }], | |
[ | |
{ name: "shoulder", value: 90.0 }, | |
{ name: "elbow", value: -60.0 } | |
] | |
]; | |
for (const joints of jointSequences) { | |
await producer.sendJointUpdate(joints); | |
await sleep(100); | |
} | |
// Wait for all messages to be received | |
const receivedUpdates = await updateCollector.waitForMessages(3000); | |
// Verify consumer received messages | |
expect(receivedUpdates.length).toBeGreaterThanOrEqual(4); | |
// Verify final state | |
const finalState = await consumer.getStateSyncAsync(); | |
const expectedFinalState = { shoulder: 90.0, elbow: -60.0, wrist: 15.0 }; | |
expect(finalState).toEqual(expectedFinalState); | |
} finally { | |
await consumer.disconnect(); | |
} | |
} finally { | |
await producer.disconnect(); | |
await roomManager.cleanup(producer); | |
} | |
}); | |
test("multiple consumers same room", async () => { | |
const producer = await createProducerClient(TEST_SERVER_URL); | |
const producerInfo = producer.getConnectionInfo(); | |
const workspaceId = producerInfo.workspace_id!; | |
const roomId = producerInfo.room_id!; | |
roomManager.addRoom(workspaceId, roomId); | |
try { | |
// Create multiple consumers | |
const consumer1 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); | |
const consumer2 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); | |
try { | |
// Set up message collection for both consumers | |
const consumer1Collector = new MessageCollector(1); | |
const consumer2Collector = new MessageCollector(1); | |
consumer1.onJointUpdate(consumer1Collector.collect); | |
consumer2.onJointUpdate(consumer2Collector.collect); | |
// Wait for connections | |
await sleep(200); | |
// Producer sends updates | |
const testJoints = [ | |
{ name: "joint1", value: 10.0 }, | |
{ name: "joint2", value: 20.0 } | |
]; | |
await producer.sendJointUpdate(testJoints); | |
// Wait for message propagation | |
const consumer1Updates = await consumer1Collector.waitForMessages(2000); | |
const consumer2Updates = await consumer2Collector.waitForMessages(2000); | |
// Both consumers should receive the same update | |
expect(consumer1Updates.length).toBeGreaterThanOrEqual(1); | |
expect(consumer2Updates.length).toBeGreaterThanOrEqual(1); | |
// Verify both received same data | |
if (consumer1Updates.length > 0 && consumer2Updates.length > 0) { | |
expect(consumer1Updates[consumer1Updates.length - 1]).toEqual( | |
consumer2Updates[consumer2Updates.length - 1] | |
); | |
} | |
} finally { | |
await consumer1.disconnect(); | |
await consumer2.disconnect(); | |
} | |
} finally { | |
await producer.disconnect(); | |
await roomManager.cleanup(producer); | |
} | |
}); | |
test("emergency stop propagation", async () => { | |
const producer = await createProducerClient(TEST_SERVER_URL); | |
const producerInfo = producer.getConnectionInfo(); | |
const workspaceId = producerInfo.workspace_id!; | |
const roomId = producerInfo.room_id!; | |
roomManager.addRoom(workspaceId, roomId); | |
try { | |
// Create consumers | |
const consumer1 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); | |
const consumer2 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); | |
try { | |
// Set up error collection | |
const consumer1ErrorCollector = new MessageCollector<string>(1); | |
const consumer2ErrorCollector = new MessageCollector<string>(1); | |
consumer1.onError(consumer1ErrorCollector.collect); | |
consumer2.onError(consumer2ErrorCollector.collect); | |
// Wait for connections | |
await sleep(200); | |
// Producer sends emergency stop | |
await producer.sendEmergencyStop("Integration test emergency stop"); | |
// Wait for message propagation | |
const consumer1Errors = await consumer1ErrorCollector.waitForMessages(2000); | |
const consumer2Errors = await consumer2ErrorCollector.waitForMessages(2000); | |
// Both consumers should receive emergency stop | |
expect(consumer1Errors.length).toBeGreaterThanOrEqual(1); | |
expect(consumer2Errors.length).toBeGreaterThanOrEqual(1); | |
// Verify error messages contain emergency stop info | |
if (consumer1Errors.length > 0) { | |
expect(consumer1Errors[consumer1Errors.length - 1].toLowerCase()).toContain("emergency stop"); | |
} | |
if (consumer2Errors.length > 0) { | |
expect(consumer2Errors[consumer2Errors.length - 1].toLowerCase()).toContain("emergency stop"); | |
} | |
} finally { | |
await consumer1.disconnect(); | |
await consumer2.disconnect(); | |
} | |
} finally { | |
await producer.disconnect(); | |
await roomManager.cleanup(producer); | |
} | |
}); | |
test("producer reconnection workflow", async () => { | |
// Create room first | |
const tempProducer = new RoboticsProducer(TEST_SERVER_URL); | |
const { workspaceId, roomId } = await tempProducer.createRoom(); | |
roomManager.addRoom(workspaceId, roomId); | |
try { | |
// Create consumer first | |
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); | |
try { | |
const updateCollector = new MessageCollector(2); | |
consumer.onJointUpdate(updateCollector.collect); | |
// Create producer and connect | |
const producer = new RoboticsProducer(TEST_SERVER_URL); | |
await producer.connect(workspaceId, roomId); | |
// Send initial update | |
await producer.sendStateSync({ joint1: 10.0 }); | |
await sleep(100); | |
// Disconnect producer | |
await producer.disconnect(); | |
// Reconnect producer | |
await producer.connect(workspaceId, roomId); | |
// Send another update | |
await producer.sendStateSync({ joint1: 20.0 }); | |
await sleep(200); | |
// Consumer should have received both updates | |
const receivedUpdates = await updateCollector.waitForMessages(3000); | |
expect(receivedUpdates.length).toBeGreaterThanOrEqual(2); | |
await producer.disconnect(); | |
} finally { | |
await consumer.disconnect(); | |
} | |
} finally { | |
await roomManager.cleanup(tempProducer); | |
} | |
}); | |
test("consumer late join", async () => { | |
const producer = await createProducerClient(TEST_SERVER_URL); | |
const producerInfo = producer.getConnectionInfo(); | |
const workspaceId = producerInfo.workspace_id!; | |
const roomId = producerInfo.room_id!; | |
roomManager.addRoom(workspaceId, roomId); | |
try { | |
// Producer sends some updates before consumer joins | |
await producer.sendStateSync({ joint1: 10.0, joint2: 20.0 }); | |
await sleep(100); | |
await producer.sendJointUpdate([{ name: "joint3", value: 30.0 }]); | |
await sleep(100); | |
// Now consumer joins | |
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); | |
try { | |
// Consumer should be able to get current state | |
const currentState = await consumer.getStateSyncAsync(); | |
// Should contain all previously sent updates | |
const expectedState = { joint1: 10.0, joint2: 20.0, joint3: 30.0 }; | |
expect(currentState).toEqual(expectedState); | |
} finally { | |
await consumer.disconnect(); | |
} | |
} finally { | |
await producer.disconnect(); | |
await roomManager.cleanup(producer); | |
} | |
}); | |
test("room cleanup on producer disconnect", async () => { | |
const producer = await createProducerClient(TEST_SERVER_URL); | |
const producerInfo = producer.getConnectionInfo(); | |
const workspaceId = producerInfo.workspace_id!; | |
const roomId = producerInfo.room_id!; | |
roomManager.addRoom(workspaceId, roomId); | |
try { | |
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); | |
try { | |
// Send some state | |
await producer.sendStateSync({ joint1: 42.0 }); | |
await sleep(100); | |
// Verify state exists | |
const stateBefore = await consumer.getStateSyncAsync(); | |
expect(stateBefore).toEqual({ joint1: 42.0 }); | |
// Producer disconnects | |
await producer.disconnect(); | |
await sleep(100); | |
// State should still be accessible to consumer | |
const stateAfter = await consumer.getStateSyncAsync(); | |
expect(stateAfter).toEqual({ joint1: 42.0 }); | |
} finally { | |
await consumer.disconnect(); | |
} | |
} finally { | |
// Room cleanup handled by roomManager since producer disconnected | |
} | |
}); | |
test("high frequency updates", async () => { | |
const producer = await createProducerClient(TEST_SERVER_URL); | |
const producerInfo = producer.getConnectionInfo(); | |
const workspaceId = producerInfo.workspace_id!; | |
const roomId = producerInfo.room_id!; | |
roomManager.addRoom(workspaceId, roomId); | |
try { | |
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); | |
try { | |
const updateCollector = new MessageCollector(5); | |
consumer.onJointUpdate(updateCollector.collect); | |
// Wait for connection | |
await sleep(100); | |
// Send rapid updates | |
for (let i = 0; i < 20; i++) { | |
await producer.sendStateSync({ joint1: i, timestamp: i }); | |
await sleep(10); // 10ms intervals | |
} | |
// Wait for all messages | |
const receivedUpdates = await updateCollector.waitForMessages(5000); | |
// Should have received multiple updates | |
// (exact number may vary due to change detection) | |
expect(receivedUpdates.length).toBeGreaterThanOrEqual(5); | |
// Final state should reflect last update | |
const finalState = await consumer.getStateSyncAsync(); | |
expect(finalState.joint1).toBe(19); | |
expect(finalState.timestamp).toBe(19); | |
} finally { | |
await consumer.disconnect(); | |
} | |
} finally { | |
await producer.disconnect(); | |
await roomManager.cleanup(producer); | |
} | |
}); | |
}); |