/** * Integration tests for producer-consumer interactions - equivalent to Python's test_integration.py */ import { test, expect, describe, beforeEach, afterEach } from "bun:test"; import { robotics } from "../src/index"; import { TEST_SERVER_URL, TestRoomManager, MessageCollector, sleep } from "./setup"; const { RoboticsProducer, createConsumerClient, createProducerClient } = robotics; describe("Integration", () => { let roomManager: TestRoomManager; beforeEach(() => { roomManager = new TestRoomManager(); }); afterEach(async () => { // Cleanup handled by individual tests }); test("full producer consumer workflow", async () => { // Create producer and room const producer = await createProducerClient(TEST_SERVER_URL); const producerInfo = producer.getConnectionInfo(); const workspaceId = producerInfo.workspace_id!; const roomId = producerInfo.room_id!; roomManager.addRoom(workspaceId, roomId); try { // Create consumer and connect to same room const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); try { // Set up consumer to collect messages const stateCollector = new MessageCollector>(1); const updateCollector = new MessageCollector(4); const errorCollector = new MessageCollector(1); consumer.onStateSync(stateCollector.collect); consumer.onJointUpdate(updateCollector.collect); consumer.onError(errorCollector.collect); // Wait for connections to stabilize await sleep(200); // Producer sends initial state const initialState = { shoulder: 0.0, elbow: 0.0, wrist: 0.0 }; await producer.sendStateSync(initialState); await sleep(100); // Producer sends series of joint updates const jointSequences = [ [{ name: "shoulder", value: 45.0 }], [{ name: "elbow", value: -30.0 }], [{ name: "wrist", value: 15.0 }], [ { name: "shoulder", value: 90.0 }, { name: "elbow", value: -60.0 } ] ]; for (const joints of jointSequences) { await producer.sendJointUpdate(joints); await sleep(100); } // Wait for all messages to be received const receivedUpdates = await updateCollector.waitForMessages(3000); // Verify consumer received messages expect(receivedUpdates.length).toBeGreaterThanOrEqual(4); // Verify final state const finalState = await consumer.getStateSyncAsync(); const expectedFinalState = { shoulder: 90.0, elbow: -60.0, wrist: 15.0 }; expect(finalState).toEqual(expectedFinalState); } finally { await consumer.disconnect(); } } finally { await producer.disconnect(); await roomManager.cleanup(producer); } }); test("multiple consumers same room", async () => { const producer = await createProducerClient(TEST_SERVER_URL); const producerInfo = producer.getConnectionInfo(); const workspaceId = producerInfo.workspace_id!; const roomId = producerInfo.room_id!; roomManager.addRoom(workspaceId, roomId); try { // Create multiple consumers const consumer1 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); const consumer2 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); try { // Set up message collection for both consumers const consumer1Collector = new MessageCollector(1); const consumer2Collector = new MessageCollector(1); consumer1.onJointUpdate(consumer1Collector.collect); consumer2.onJointUpdate(consumer2Collector.collect); // Wait for connections await sleep(200); // Producer sends updates const testJoints = [ { name: "joint1", value: 10.0 }, { name: "joint2", value: 20.0 } ]; await producer.sendJointUpdate(testJoints); // Wait for message propagation const consumer1Updates = await consumer1Collector.waitForMessages(2000); const consumer2Updates = await consumer2Collector.waitForMessages(2000); // Both consumers should receive the same update expect(consumer1Updates.length).toBeGreaterThanOrEqual(1); expect(consumer2Updates.length).toBeGreaterThanOrEqual(1); // Verify both received same data if (consumer1Updates.length > 0 && consumer2Updates.length > 0) { expect(consumer1Updates[consumer1Updates.length - 1]).toEqual( consumer2Updates[consumer2Updates.length - 1] ); } } finally { await consumer1.disconnect(); await consumer2.disconnect(); } } finally { await producer.disconnect(); await roomManager.cleanup(producer); } }); test("emergency stop propagation", async () => { const producer = await createProducerClient(TEST_SERVER_URL); const producerInfo = producer.getConnectionInfo(); const workspaceId = producerInfo.workspace_id!; const roomId = producerInfo.room_id!; roomManager.addRoom(workspaceId, roomId); try { // Create consumers const consumer1 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); const consumer2 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); try { // Set up error collection const consumer1ErrorCollector = new MessageCollector(1); const consumer2ErrorCollector = new MessageCollector(1); consumer1.onError(consumer1ErrorCollector.collect); consumer2.onError(consumer2ErrorCollector.collect); // Wait for connections await sleep(200); // Producer sends emergency stop await producer.sendEmergencyStop("Integration test emergency stop"); // Wait for message propagation const consumer1Errors = await consumer1ErrorCollector.waitForMessages(2000); const consumer2Errors = await consumer2ErrorCollector.waitForMessages(2000); // Both consumers should receive emergency stop expect(consumer1Errors.length).toBeGreaterThanOrEqual(1); expect(consumer2Errors.length).toBeGreaterThanOrEqual(1); // Verify error messages contain emergency stop info if (consumer1Errors.length > 0) { expect(consumer1Errors[consumer1Errors.length - 1].toLowerCase()).toContain("emergency stop"); } if (consumer2Errors.length > 0) { expect(consumer2Errors[consumer2Errors.length - 1].toLowerCase()).toContain("emergency stop"); } } finally { await consumer1.disconnect(); await consumer2.disconnect(); } } finally { await producer.disconnect(); await roomManager.cleanup(producer); } }); test("producer reconnection workflow", async () => { // Create room first const tempProducer = new RoboticsProducer(TEST_SERVER_URL); const { workspaceId, roomId } = await tempProducer.createRoom(); roomManager.addRoom(workspaceId, roomId); try { // Create consumer first const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); try { const updateCollector = new MessageCollector(2); consumer.onJointUpdate(updateCollector.collect); // Create producer and connect const producer = new RoboticsProducer(TEST_SERVER_URL); await producer.connect(workspaceId, roomId); // Send initial update await producer.sendStateSync({ joint1: 10.0 }); await sleep(100); // Disconnect producer await producer.disconnect(); // Reconnect producer await producer.connect(workspaceId, roomId); // Send another update await producer.sendStateSync({ joint1: 20.0 }); await sleep(200); // Consumer should have received both updates const receivedUpdates = await updateCollector.waitForMessages(3000); expect(receivedUpdates.length).toBeGreaterThanOrEqual(2); await producer.disconnect(); } finally { await consumer.disconnect(); } } finally { await roomManager.cleanup(tempProducer); } }); test("consumer late join", async () => { const producer = await createProducerClient(TEST_SERVER_URL); const producerInfo = producer.getConnectionInfo(); const workspaceId = producerInfo.workspace_id!; const roomId = producerInfo.room_id!; roomManager.addRoom(workspaceId, roomId); try { // Producer sends some updates before consumer joins await producer.sendStateSync({ joint1: 10.0, joint2: 20.0 }); await sleep(100); await producer.sendJointUpdate([{ name: "joint3", value: 30.0 }]); await sleep(100); // Now consumer joins const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); try { // Consumer should be able to get current state const currentState = await consumer.getStateSyncAsync(); // Should contain all previously sent updates const expectedState = { joint1: 10.0, joint2: 20.0, joint3: 30.0 }; expect(currentState).toEqual(expectedState); } finally { await consumer.disconnect(); } } finally { await producer.disconnect(); await roomManager.cleanup(producer); } }); test("room cleanup on producer disconnect", async () => { const producer = await createProducerClient(TEST_SERVER_URL); const producerInfo = producer.getConnectionInfo(); const workspaceId = producerInfo.workspace_id!; const roomId = producerInfo.room_id!; roomManager.addRoom(workspaceId, roomId); try { const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); try { // Send some state await producer.sendStateSync({ joint1: 42.0 }); await sleep(100); // Verify state exists const stateBefore = await consumer.getStateSyncAsync(); expect(stateBefore).toEqual({ joint1: 42.0 }); // Producer disconnects await producer.disconnect(); await sleep(100); // State should still be accessible to consumer const stateAfter = await consumer.getStateSyncAsync(); expect(stateAfter).toEqual({ joint1: 42.0 }); } finally { await consumer.disconnect(); } } finally { // Room cleanup handled by roomManager since producer disconnected } }); test("high frequency updates", async () => { const producer = await createProducerClient(TEST_SERVER_URL); const producerInfo = producer.getConnectionInfo(); const workspaceId = producerInfo.workspace_id!; const roomId = producerInfo.room_id!; roomManager.addRoom(workspaceId, roomId); try { const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL); try { const updateCollector = new MessageCollector(5); consumer.onJointUpdate(updateCollector.collect); // Wait for connection await sleep(100); // Send rapid updates for (let i = 0; i < 20; i++) { await producer.sendStateSync({ joint1: i, timestamp: i }); await sleep(10); // 10ms intervals } // Wait for all messages const receivedUpdates = await updateCollector.waitForMessages(5000); // Should have received multiple updates // (exact number may vary due to change detection) expect(receivedUpdates.length).toBeGreaterThanOrEqual(5); // Final state should reflect last update const finalState = await consumer.getStateSyncAsync(); expect(finalState.joint1).toBe(19); expect(finalState.timestamp).toBe(19); } finally { await consumer.disconnect(); } } finally { await producer.disconnect(); await roomManager.cleanup(producer); } }); });