File size: 11,059 Bytes
1e7b565
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
/**
 * Tests for RoboticsConsumer - equivalent to Python's test_consumer.py
 */
import { test, expect, describe, beforeEach, afterEach } from "bun:test";
import { robotics } from "../src/index";
import { TEST_SERVER_URL, TestRoomManager, MessageCollector, sleep, assertIsConnected, assertIsDisconnected } from "./setup";

const { RoboticsProducer, RoboticsConsumer } = robotics;

describe("RoboticsConsumer", () => {
    let consumer: robotics.RoboticsConsumer;
    let producer: robotics.RoboticsProducer;
    let roomManager: TestRoomManager;

    beforeEach(() => {
        consumer = new RoboticsConsumer(TEST_SERVER_URL);
        producer = new RoboticsProducer(TEST_SERVER_URL);
        roomManager = new TestRoomManager();
    });

    afterEach(async () => {
        if (consumer.isConnected()) {
            await consumer.disconnect();
        }
        if (producer.isConnected()) {
            await producer.disconnect();
        }
        await roomManager.cleanup(producer);
    });

    test("consumer connection", async () => {
        // Create room first
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        expect(consumer.isConnected()).toBe(false);

        const success = await consumer.connect(workspaceId, roomId);
        expect(success).toBe(true);
        assertIsConnected(consumer, workspaceId, roomId);

        const info = consumer.getConnectionInfo();
        expect(info.role).toBe("consumer");

        await consumer.disconnect();
        assertIsDisconnected(consumer);
    });

    test("consumer connection info", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);
        await consumer.connect(workspaceId, roomId);

        const info = consumer.getConnectionInfo();
        expect(info.connected).toBe(true);
        expect(info.room_id).toBe(roomId);
        expect(info.workspace_id).toBe(workspaceId);
        expect(info.role).toBe("consumer");
        expect(info.participant_id).toBeTruthy();
        expect(info.base_url).toBe(TEST_SERVER_URL);
    });

    test("get state sync", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);
        await consumer.connect(workspaceId, roomId);

        const state = await consumer.getStateSyncAsync();
        expect(typeof state).toBe("object");
        // Initial state should be empty
        expect(Object.keys(state)).toHaveLength(0);
    });

    test("consumer callbacks setup", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        let stateSyncCalled = false;
        let jointUpdateCalled = false;
        let errorCalled = false;
        let connectedCalled = false;
        let disconnectedCalled = false;

        consumer.onStateSync((state) => {
            stateSyncCalled = true;
        });

        consumer.onJointUpdate((joints) => {
            jointUpdateCalled = true;
        });

        consumer.onError((error) => {
            errorCalled = true;
        });

        consumer.onConnected(() => {
            connectedCalled = true;
        });

        consumer.onDisconnected(() => {
            disconnectedCalled = true;
        });

        // Connect and test connection callbacks
        await consumer.connect(workspaceId, roomId);
        await sleep(100);
        expect(connectedCalled).toBe(true);

        await consumer.disconnect();
        await sleep(100);
        expect(disconnectedCalled).toBe(true);
    });

    test("multiple consumers", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        const consumer1 = new RoboticsConsumer(TEST_SERVER_URL);
        const consumer2 = new RoboticsConsumer(TEST_SERVER_URL);

        try {
            // Both consumers should be able to connect
            const success1 = await consumer1.connect(workspaceId, roomId);
            const success2 = await consumer2.connect(workspaceId, roomId);

            expect(success1).toBe(true);
            expect(success2).toBe(true);
            expect(consumer1.isConnected()).toBe(true);
            expect(consumer2.isConnected()).toBe(true);
        } finally {
            if (consumer1.isConnected()) {
                await consumer1.disconnect();
            }
            if (consumer2.isConnected()) {
                await consumer2.disconnect();
            }
        }
    });

    test("consumer receive state sync", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        const updateCollector = new MessageCollector(1);
        consumer.onJointUpdate(updateCollector.collect);

        await producer.connect(workspaceId, roomId);
        await consumer.connect(workspaceId, roomId);

        // Give some time for connection to stabilize
        await sleep(100);

        // Producer sends state sync (which gets converted to joint updates)
        await producer.sendStateSync({ shoulder: 45.0, elbow: -20.0 });

        // Wait for message to be received
        const receivedUpdates = await updateCollector.waitForMessages(2000);

        // Consumer should have received the joint updates from the state sync
        expect(receivedUpdates.length).toBeGreaterThanOrEqual(1);
    });

    test("consumer receive joint updates", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        const updateCollector = new MessageCollector(1);
        consumer.onJointUpdate(updateCollector.collect);

        await producer.connect(workspaceId, roomId);
        await consumer.connect(workspaceId, roomId);

        // Give some time for connection to stabilize
        await sleep(100);

        // Producer sends joint updates
        const testJoints = [
            { name: "shoulder", value: 45.0 },
            { name: "elbow", value: -20.0 }
        ];
        await producer.sendJointUpdate(testJoints);

        // Wait for message to be received
        const receivedUpdates = await updateCollector.waitForMessages(2000);

        // Consumer should have received the joint update
        expect(receivedUpdates.length).toBeGreaterThanOrEqual(1);
        if (receivedUpdates.length > 0) {
            const receivedJoints = receivedUpdates[receivedUpdates.length - 1];
            expect(Array.isArray(receivedJoints)).toBe(true);
            expect(receivedJoints).toHaveLength(2);
        }
    });

    test("consumer multiple updates", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        const updateCollector = new MessageCollector(3);
        consumer.onJointUpdate(updateCollector.collect);

        await producer.connect(workspaceId, roomId);
        await consumer.connect(workspaceId, roomId);

        // Give some time for connection to stabilize
        await sleep(100);

        // Send multiple updates
        for (let i = 0; i < 5; i++) {
            await producer.sendStateSync({
                joint1: i * 10,
                joint2: i * -5
            });
            await sleep(50);
        }

        // Wait for all messages to be received
        const receivedUpdates = await updateCollector.waitForMessages(3000);

        // Should have received multiple updates
        expect(receivedUpdates.length).toBeGreaterThanOrEqual(3);
    });

    test("consumer emergency stop", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        const errorCollector = new MessageCollector<string>(1);
        consumer.onError(errorCollector.collect);

        await producer.connect(workspaceId, roomId);
        await consumer.connect(workspaceId, roomId);

        // Give some time for connection to stabilize
        await sleep(100);

        // Producer sends emergency stop
        await producer.sendEmergencyStop("Test emergency stop");

        // Wait for message to be received
        const receivedErrors = await errorCollector.waitForMessages(2000);

        // Consumer should have received emergency stop as error
        expect(receivedErrors.length).toBeGreaterThanOrEqual(1);
        if (receivedErrors.length > 0) {
            expect(receivedErrors[receivedErrors.length - 1].toLowerCase()).toContain("emergency stop");
        }
    });

    test("custom participant id", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);
        const customId = "custom-consumer-456";

        await consumer.connect(workspaceId, roomId, customId);

        const info = consumer.getConnectionInfo();
        expect(info.participant_id).toBe(customId);
    });

    test("get state without connection", async () => {
        expect(consumer.isConnected()).toBe(false);

        await expect(consumer.getStateSyncAsync())
            .rejects.toThrow("Must be connected");
    });

    test("consumer reconnection", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        // First connection
        await consumer.connect(workspaceId, roomId);
        expect(consumer.isConnected()).toBe(true);

        await consumer.disconnect();
        expect(consumer.isConnected()).toBe(false);

        // Reconnect to same room
        await consumer.connect(workspaceId, roomId);
        expect(consumer.isConnected()).toBe(true);
        expect(consumer.getConnectionInfo().room_id).toBe(roomId);
        expect(consumer.getConnectionInfo().workspace_id).toBe(workspaceId);
    });

    test("consumer state after producer updates", async () => {
        const { workspaceId, roomId } = await producer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        await producer.connect(workspaceId, roomId);
        await consumer.connect(workspaceId, roomId);

        // Give some time for connection to stabilize
        await sleep(100);

        // Producer sends some state updates
        await producer.sendStateSync({
            shoulder: 45.0,
            elbow: -20.0,
            wrist: 10.0
        });

        // Wait for state to propagate
        await sleep(200);

        // Consumer should be able to get updated state
        const state = await consumer.getStateSyncAsync();
        expect(typeof state).toBe("object");

        // State should contain the joints we sent
        const expectedJoints = new Set(["shoulder", "elbow", "wrist"]);
        if (Object.keys(state).length > 0) { // Only check if state is not empty
            expect(new Set(Object.keys(state))).toEqual(expectedJoints);
        }
    });
});