File size: 13,891 Bytes
1e7b565
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
/**
 * Integration tests for producer-consumer interactions - equivalent to Python's test_integration.py
 */
import { test, expect, describe, beforeEach, afterEach } from "bun:test";
import { robotics } from "../src/index";
import { TEST_SERVER_URL, TestRoomManager, MessageCollector, sleep } from "./setup";

const { RoboticsProducer, createConsumerClient, createProducerClient } = robotics;

describe("Integration", () => {
    let roomManager: TestRoomManager;

    beforeEach(() => {
        roomManager = new TestRoomManager();
    });

    afterEach(async () => {
        // Cleanup handled by individual tests
    });

    test("full producer consumer workflow", async () => {
        // Create producer and room
        const producer = await createProducerClient(TEST_SERVER_URL);
        const producerInfo = producer.getConnectionInfo();
        const workspaceId = producerInfo.workspace_id!;
        const roomId = producerInfo.room_id!;
        roomManager.addRoom(workspaceId, roomId);

        try {
            // Create consumer and connect to same room
            const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);

            try {
                // Set up consumer to collect messages
                const stateCollector = new MessageCollector<Record<string, number>>(1);
                const updateCollector = new MessageCollector(4);
                const errorCollector = new MessageCollector<string>(1);

                consumer.onStateSync(stateCollector.collect);
                consumer.onJointUpdate(updateCollector.collect);
                consumer.onError(errorCollector.collect);

                // Wait for connections to stabilize
                await sleep(200);

                // Producer sends initial state
                const initialState = { shoulder: 0.0, elbow: 0.0, wrist: 0.0 };
                await producer.sendStateSync(initialState);
                await sleep(100);

                // Producer sends series of joint updates
                const jointSequences = [
                    [{ name: "shoulder", value: 45.0 }],
                    [{ name: "elbow", value: -30.0 }],
                    [{ name: "wrist", value: 15.0 }],
                    [
                        { name: "shoulder", value: 90.0 },
                        { name: "elbow", value: -60.0 }
                    ]
                ];

                for (const joints of jointSequences) {
                    await producer.sendJointUpdate(joints);
                    await sleep(100);
                }

                // Wait for all messages to be received
                const receivedUpdates = await updateCollector.waitForMessages(3000);

                // Verify consumer received messages
                expect(receivedUpdates.length).toBeGreaterThanOrEqual(4);

                // Verify final state
                const finalState = await consumer.getStateSyncAsync();
                const expectedFinalState = { shoulder: 90.0, elbow: -60.0, wrist: 15.0 };
                expect(finalState).toEqual(expectedFinalState);

            } finally {
                await consumer.disconnect();
            }
        } finally {
            await producer.disconnect();
            await roomManager.cleanup(producer);
        }
    });

    test("multiple consumers same room", async () => {
        const producer = await createProducerClient(TEST_SERVER_URL);
        const producerInfo = producer.getConnectionInfo();
        const workspaceId = producerInfo.workspace_id!;
        const roomId = producerInfo.room_id!;
        roomManager.addRoom(workspaceId, roomId);

        try {
            // Create multiple consumers
            const consumer1 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
            const consumer2 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);

            try {
                // Set up message collection for both consumers
                const consumer1Collector = new MessageCollector(1);
                const consumer2Collector = new MessageCollector(1);

                consumer1.onJointUpdate(consumer1Collector.collect);
                consumer2.onJointUpdate(consumer2Collector.collect);

                // Wait for connections
                await sleep(200);

                // Producer sends updates
                const testJoints = [
                    { name: "joint1", value: 10.0 },
                    { name: "joint2", value: 20.0 }
                ];
                await producer.sendJointUpdate(testJoints);

                // Wait for message propagation
                const consumer1Updates = await consumer1Collector.waitForMessages(2000);
                const consumer2Updates = await consumer2Collector.waitForMessages(2000);

                // Both consumers should receive the same update
                expect(consumer1Updates.length).toBeGreaterThanOrEqual(1);
                expect(consumer2Updates.length).toBeGreaterThanOrEqual(1);

                // Verify both received same data
                if (consumer1Updates.length > 0 && consumer2Updates.length > 0) {
                    expect(consumer1Updates[consumer1Updates.length - 1]).toEqual(
                        consumer2Updates[consumer2Updates.length - 1]
                    );
                }

            } finally {
                await consumer1.disconnect();
                await consumer2.disconnect();
            }
        } finally {
            await producer.disconnect();
            await roomManager.cleanup(producer);
        }
    });

    test("emergency stop propagation", async () => {
        const producer = await createProducerClient(TEST_SERVER_URL);
        const producerInfo = producer.getConnectionInfo();
        const workspaceId = producerInfo.workspace_id!;
        const roomId = producerInfo.room_id!;
        roomManager.addRoom(workspaceId, roomId);

        try {
            // Create consumers
            const consumer1 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
            const consumer2 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);

            try {
                // Set up error collection
                const consumer1ErrorCollector = new MessageCollector<string>(1);
                const consumer2ErrorCollector = new MessageCollector<string>(1);

                consumer1.onError(consumer1ErrorCollector.collect);
                consumer2.onError(consumer2ErrorCollector.collect);

                // Wait for connections
                await sleep(200);

                // Producer sends emergency stop
                await producer.sendEmergencyStop("Integration test emergency stop");

                // Wait for message propagation
                const consumer1Errors = await consumer1ErrorCollector.waitForMessages(2000);
                const consumer2Errors = await consumer2ErrorCollector.waitForMessages(2000);

                // Both consumers should receive emergency stop
                expect(consumer1Errors.length).toBeGreaterThanOrEqual(1);
                expect(consumer2Errors.length).toBeGreaterThanOrEqual(1);

                // Verify error messages contain emergency stop info
                if (consumer1Errors.length > 0) {
                    expect(consumer1Errors[consumer1Errors.length - 1].toLowerCase()).toContain("emergency stop");
                }
                if (consumer2Errors.length > 0) {
                    expect(consumer2Errors[consumer2Errors.length - 1].toLowerCase()).toContain("emergency stop");
                }

            } finally {
                await consumer1.disconnect();
                await consumer2.disconnect();
            }
        } finally {
            await producer.disconnect();
            await roomManager.cleanup(producer);
        }
    });

    test("producer reconnection workflow", async () => {
        // Create room first
        const tempProducer = new RoboticsProducer(TEST_SERVER_URL);
        const { workspaceId, roomId } = await tempProducer.createRoom();
        roomManager.addRoom(workspaceId, roomId);

        try {
            // Create consumer first
            const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);

            try {
                const updateCollector = new MessageCollector(2);
                consumer.onJointUpdate(updateCollector.collect);

                // Create producer and connect
                const producer = new RoboticsProducer(TEST_SERVER_URL);
                await producer.connect(workspaceId, roomId);

                // Send initial update
                await producer.sendStateSync({ joint1: 10.0 });
                await sleep(100);

                // Disconnect producer
                await producer.disconnect();

                // Reconnect producer
                await producer.connect(workspaceId, roomId);

                // Send another update
                await producer.sendStateSync({ joint1: 20.0 });
                await sleep(200);

                // Consumer should have received both updates
                const receivedUpdates = await updateCollector.waitForMessages(3000);
                expect(receivedUpdates.length).toBeGreaterThanOrEqual(2);

                await producer.disconnect();

            } finally {
                await consumer.disconnect();
            }
        } finally {
            await roomManager.cleanup(tempProducer);
        }
    });

    test("consumer late join", async () => {
        const producer = await createProducerClient(TEST_SERVER_URL);
        const producerInfo = producer.getConnectionInfo();
        const workspaceId = producerInfo.workspace_id!;
        const roomId = producerInfo.room_id!;
        roomManager.addRoom(workspaceId, roomId);

        try {
            // Producer sends some updates before consumer joins
            await producer.sendStateSync({ joint1: 10.0, joint2: 20.0 });
            await sleep(100);

            await producer.sendJointUpdate([{ name: "joint3", value: 30.0 }]);
            await sleep(100);

            // Now consumer joins
            const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);

            try {
                // Consumer should be able to get current state
                const currentState = await consumer.getStateSyncAsync();

                // Should contain all previously sent updates
                const expectedState = { joint1: 10.0, joint2: 20.0, joint3: 30.0 };
                expect(currentState).toEqual(expectedState);

            } finally {
                await consumer.disconnect();
            }
        } finally {
            await producer.disconnect();
            await roomManager.cleanup(producer);
        }
    });

    test("room cleanup on producer disconnect", async () => {
        const producer = await createProducerClient(TEST_SERVER_URL);
        const producerInfo = producer.getConnectionInfo();
        const workspaceId = producerInfo.workspace_id!;
        const roomId = producerInfo.room_id!;
        roomManager.addRoom(workspaceId, roomId);

        try {
            const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);

            try {
                // Send some state
                await producer.sendStateSync({ joint1: 42.0 });
                await sleep(100);

                // Verify state exists
                const stateBefore = await consumer.getStateSyncAsync();
                expect(stateBefore).toEqual({ joint1: 42.0 });

                // Producer disconnects
                await producer.disconnect();
                await sleep(100);

                // State should still be accessible to consumer
                const stateAfter = await consumer.getStateSyncAsync();
                expect(stateAfter).toEqual({ joint1: 42.0 });

            } finally {
                await consumer.disconnect();
            }
        } finally {
            // Room cleanup handled by roomManager since producer disconnected
        }
    });

    test("high frequency updates", async () => {
        const producer = await createProducerClient(TEST_SERVER_URL);
        const producerInfo = producer.getConnectionInfo();
        const workspaceId = producerInfo.workspace_id!;
        const roomId = producerInfo.room_id!;
        roomManager.addRoom(workspaceId, roomId);

        try {
            const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);

            try {
                const updateCollector = new MessageCollector(5);
                consumer.onJointUpdate(updateCollector.collect);

                // Wait for connection
                await sleep(100);

                // Send rapid updates
                for (let i = 0; i < 20; i++) {
                    await producer.sendStateSync({ joint1: i, timestamp: i });
                    await sleep(10); // 10ms intervals
                }

                // Wait for all messages
                const receivedUpdates = await updateCollector.waitForMessages(5000);

                // Should have received multiple updates
                // (exact number may vary due to change detection)
                expect(receivedUpdates.length).toBeGreaterThanOrEqual(5);

                // Final state should reflect last update
                const finalState = await consumer.getStateSyncAsync();
                expect(finalState.joint1).toBe(19);
                expect(finalState.timestamp).toBe(19);

            } finally {
                await consumer.disconnect();
            }
        } finally {
            await producer.disconnect();
            await roomManager.cleanup(producer);
        }
    });
});