/** * Tests for RoboticsConsumer - equivalent to Python's test_consumer.py */ import { test, expect, describe, beforeEach, afterEach } from "bun:test"; import { robotics } from "../src/index"; import { TEST_SERVER_URL, TestRoomManager, MessageCollector, sleep, assertIsConnected, assertIsDisconnected } from "./setup"; const { RoboticsProducer, RoboticsConsumer } = robotics; describe("RoboticsConsumer", () => { let consumer: robotics.RoboticsConsumer; let producer: robotics.RoboticsProducer; let roomManager: TestRoomManager; beforeEach(() => { consumer = new RoboticsConsumer(TEST_SERVER_URL); producer = new RoboticsProducer(TEST_SERVER_URL); roomManager = new TestRoomManager(); }); afterEach(async () => { if (consumer.isConnected()) { await consumer.disconnect(); } if (producer.isConnected()) { await producer.disconnect(); } await roomManager.cleanup(producer); }); test("consumer connection", async () => { // Create room first const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); expect(consumer.isConnected()).toBe(false); const success = await consumer.connect(workspaceId, roomId); expect(success).toBe(true); assertIsConnected(consumer, workspaceId, roomId); const info = consumer.getConnectionInfo(); expect(info.role).toBe("consumer"); await consumer.disconnect(); assertIsDisconnected(consumer); }); test("consumer connection info", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); await consumer.connect(workspaceId, roomId); const info = consumer.getConnectionInfo(); expect(info.connected).toBe(true); expect(info.room_id).toBe(roomId); expect(info.workspace_id).toBe(workspaceId); expect(info.role).toBe("consumer"); expect(info.participant_id).toBeTruthy(); expect(info.base_url).toBe(TEST_SERVER_URL); }); test("get state sync", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); await consumer.connect(workspaceId, roomId); const state = await consumer.getStateSyncAsync(); expect(typeof state).toBe("object"); // Initial state should be empty expect(Object.keys(state)).toHaveLength(0); }); test("consumer callbacks setup", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); let stateSyncCalled = false; let jointUpdateCalled = false; let errorCalled = false; let connectedCalled = false; let disconnectedCalled = false; consumer.onStateSync((state) => { stateSyncCalled = true; }); consumer.onJointUpdate((joints) => { jointUpdateCalled = true; }); consumer.onError((error) => { errorCalled = true; }); consumer.onConnected(() => { connectedCalled = true; }); consumer.onDisconnected(() => { disconnectedCalled = true; }); // Connect and test connection callbacks await consumer.connect(workspaceId, roomId); await sleep(100); expect(connectedCalled).toBe(true); await consumer.disconnect(); await sleep(100); expect(disconnectedCalled).toBe(true); }); test("multiple consumers", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); const consumer1 = new RoboticsConsumer(TEST_SERVER_URL); const consumer2 = new RoboticsConsumer(TEST_SERVER_URL); try { // Both consumers should be able to connect const success1 = await consumer1.connect(workspaceId, roomId); const success2 = await consumer2.connect(workspaceId, roomId); expect(success1).toBe(true); expect(success2).toBe(true); expect(consumer1.isConnected()).toBe(true); expect(consumer2.isConnected()).toBe(true); } finally { if (consumer1.isConnected()) { await consumer1.disconnect(); } if (consumer2.isConnected()) { await consumer2.disconnect(); } } }); test("consumer receive state sync", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); const updateCollector = new MessageCollector(1); consumer.onJointUpdate(updateCollector.collect); await producer.connect(workspaceId, roomId); await consumer.connect(workspaceId, roomId); // Give some time for connection to stabilize await sleep(100); // Producer sends state sync (which gets converted to joint updates) await producer.sendStateSync({ shoulder: 45.0, elbow: -20.0 }); // Wait for message to be received const receivedUpdates = await updateCollector.waitForMessages(2000); // Consumer should have received the joint updates from the state sync expect(receivedUpdates.length).toBeGreaterThanOrEqual(1); }); test("consumer receive joint updates", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); const updateCollector = new MessageCollector(1); consumer.onJointUpdate(updateCollector.collect); await producer.connect(workspaceId, roomId); await consumer.connect(workspaceId, roomId); // Give some time for connection to stabilize await sleep(100); // Producer sends joint updates const testJoints = [ { name: "shoulder", value: 45.0 }, { name: "elbow", value: -20.0 } ]; await producer.sendJointUpdate(testJoints); // Wait for message to be received const receivedUpdates = await updateCollector.waitForMessages(2000); // Consumer should have received the joint update expect(receivedUpdates.length).toBeGreaterThanOrEqual(1); if (receivedUpdates.length > 0) { const receivedJoints = receivedUpdates[receivedUpdates.length - 1]; expect(Array.isArray(receivedJoints)).toBe(true); expect(receivedJoints).toHaveLength(2); } }); test("consumer multiple updates", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); const updateCollector = new MessageCollector(3); consumer.onJointUpdate(updateCollector.collect); await producer.connect(workspaceId, roomId); await consumer.connect(workspaceId, roomId); // Give some time for connection to stabilize await sleep(100); // Send multiple updates for (let i = 0; i < 5; i++) { await producer.sendStateSync({ joint1: i * 10, joint2: i * -5 }); await sleep(50); } // Wait for all messages to be received const receivedUpdates = await updateCollector.waitForMessages(3000); // Should have received multiple updates expect(receivedUpdates.length).toBeGreaterThanOrEqual(3); }); test("consumer emergency stop", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); const errorCollector = new MessageCollector(1); consumer.onError(errorCollector.collect); await producer.connect(workspaceId, roomId); await consumer.connect(workspaceId, roomId); // Give some time for connection to stabilize await sleep(100); // Producer sends emergency stop await producer.sendEmergencyStop("Test emergency stop"); // Wait for message to be received const receivedErrors = await errorCollector.waitForMessages(2000); // Consumer should have received emergency stop as error expect(receivedErrors.length).toBeGreaterThanOrEqual(1); if (receivedErrors.length > 0) { expect(receivedErrors[receivedErrors.length - 1].toLowerCase()).toContain("emergency stop"); } }); test("custom participant id", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); const customId = "custom-consumer-456"; await consumer.connect(workspaceId, roomId, customId); const info = consumer.getConnectionInfo(); expect(info.participant_id).toBe(customId); }); test("get state without connection", async () => { expect(consumer.isConnected()).toBe(false); await expect(consumer.getStateSyncAsync()) .rejects.toThrow("Must be connected"); }); test("consumer reconnection", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); // First connection await consumer.connect(workspaceId, roomId); expect(consumer.isConnected()).toBe(true); await consumer.disconnect(); expect(consumer.isConnected()).toBe(false); // Reconnect to same room await consumer.connect(workspaceId, roomId); expect(consumer.isConnected()).toBe(true); expect(consumer.getConnectionInfo().room_id).toBe(roomId); expect(consumer.getConnectionInfo().workspace_id).toBe(workspaceId); }); test("consumer state after producer updates", async () => { const { workspaceId, roomId } = await producer.createRoom(); roomManager.addRoom(workspaceId, roomId); await producer.connect(workspaceId, roomId); await consumer.connect(workspaceId, roomId); // Give some time for connection to stabilize await sleep(100); // Producer sends some state updates await producer.sendStateSync({ shoulder: 45.0, elbow: -20.0, wrist: 10.0 }); // Wait for state to propagate await sleep(200); // Consumer should be able to get updated state const state = await consumer.getStateSyncAsync(); expect(typeof state).toBe("object"); // State should contain the joints we sent const expectedJoints = new Set(["shoulder", "elbow", "wrist"]); if (Object.keys(state).length > 0) { // Only check if state is not empty expect(new Set(Object.keys(state))).toEqual(expectedJoints); } }); });