Spaces:
Sleeping
Sleeping
File size: 13,891 Bytes
1e7b565 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 |
/**
* Integration tests for producer-consumer interactions - equivalent to Python's test_integration.py
*/
import { test, expect, describe, beforeEach, afterEach } from "bun:test";
import { robotics } from "../src/index";
import { TEST_SERVER_URL, TestRoomManager, MessageCollector, sleep } from "./setup";
const { RoboticsProducer, createConsumerClient, createProducerClient } = robotics;
describe("Integration", () => {
let roomManager: TestRoomManager;
beforeEach(() => {
roomManager = new TestRoomManager();
});
afterEach(async () => {
// Cleanup handled by individual tests
});
test("full producer consumer workflow", async () => {
// Create producer and room
const producer = await createProducerClient(TEST_SERVER_URL);
const producerInfo = producer.getConnectionInfo();
const workspaceId = producerInfo.workspace_id!;
const roomId = producerInfo.room_id!;
roomManager.addRoom(workspaceId, roomId);
try {
// Create consumer and connect to same room
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
try {
// Set up consumer to collect messages
const stateCollector = new MessageCollector<Record<string, number>>(1);
const updateCollector = new MessageCollector(4);
const errorCollector = new MessageCollector<string>(1);
consumer.onStateSync(stateCollector.collect);
consumer.onJointUpdate(updateCollector.collect);
consumer.onError(errorCollector.collect);
// Wait for connections to stabilize
await sleep(200);
// Producer sends initial state
const initialState = { shoulder: 0.0, elbow: 0.0, wrist: 0.0 };
await producer.sendStateSync(initialState);
await sleep(100);
// Producer sends series of joint updates
const jointSequences = [
[{ name: "shoulder", value: 45.0 }],
[{ name: "elbow", value: -30.0 }],
[{ name: "wrist", value: 15.0 }],
[
{ name: "shoulder", value: 90.0 },
{ name: "elbow", value: -60.0 }
]
];
for (const joints of jointSequences) {
await producer.sendJointUpdate(joints);
await sleep(100);
}
// Wait for all messages to be received
const receivedUpdates = await updateCollector.waitForMessages(3000);
// Verify consumer received messages
expect(receivedUpdates.length).toBeGreaterThanOrEqual(4);
// Verify final state
const finalState = await consumer.getStateSyncAsync();
const expectedFinalState = { shoulder: 90.0, elbow: -60.0, wrist: 15.0 };
expect(finalState).toEqual(expectedFinalState);
} finally {
await consumer.disconnect();
}
} finally {
await producer.disconnect();
await roomManager.cleanup(producer);
}
});
test("multiple consumers same room", async () => {
const producer = await createProducerClient(TEST_SERVER_URL);
const producerInfo = producer.getConnectionInfo();
const workspaceId = producerInfo.workspace_id!;
const roomId = producerInfo.room_id!;
roomManager.addRoom(workspaceId, roomId);
try {
// Create multiple consumers
const consumer1 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
const consumer2 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
try {
// Set up message collection for both consumers
const consumer1Collector = new MessageCollector(1);
const consumer2Collector = new MessageCollector(1);
consumer1.onJointUpdate(consumer1Collector.collect);
consumer2.onJointUpdate(consumer2Collector.collect);
// Wait for connections
await sleep(200);
// Producer sends updates
const testJoints = [
{ name: "joint1", value: 10.0 },
{ name: "joint2", value: 20.0 }
];
await producer.sendJointUpdate(testJoints);
// Wait for message propagation
const consumer1Updates = await consumer1Collector.waitForMessages(2000);
const consumer2Updates = await consumer2Collector.waitForMessages(2000);
// Both consumers should receive the same update
expect(consumer1Updates.length).toBeGreaterThanOrEqual(1);
expect(consumer2Updates.length).toBeGreaterThanOrEqual(1);
// Verify both received same data
if (consumer1Updates.length > 0 && consumer2Updates.length > 0) {
expect(consumer1Updates[consumer1Updates.length - 1]).toEqual(
consumer2Updates[consumer2Updates.length - 1]
);
}
} finally {
await consumer1.disconnect();
await consumer2.disconnect();
}
} finally {
await producer.disconnect();
await roomManager.cleanup(producer);
}
});
test("emergency stop propagation", async () => {
const producer = await createProducerClient(TEST_SERVER_URL);
const producerInfo = producer.getConnectionInfo();
const workspaceId = producerInfo.workspace_id!;
const roomId = producerInfo.room_id!;
roomManager.addRoom(workspaceId, roomId);
try {
// Create consumers
const consumer1 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
const consumer2 = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
try {
// Set up error collection
const consumer1ErrorCollector = new MessageCollector<string>(1);
const consumer2ErrorCollector = new MessageCollector<string>(1);
consumer1.onError(consumer1ErrorCollector.collect);
consumer2.onError(consumer2ErrorCollector.collect);
// Wait for connections
await sleep(200);
// Producer sends emergency stop
await producer.sendEmergencyStop("Integration test emergency stop");
// Wait for message propagation
const consumer1Errors = await consumer1ErrorCollector.waitForMessages(2000);
const consumer2Errors = await consumer2ErrorCollector.waitForMessages(2000);
// Both consumers should receive emergency stop
expect(consumer1Errors.length).toBeGreaterThanOrEqual(1);
expect(consumer2Errors.length).toBeGreaterThanOrEqual(1);
// Verify error messages contain emergency stop info
if (consumer1Errors.length > 0) {
expect(consumer1Errors[consumer1Errors.length - 1].toLowerCase()).toContain("emergency stop");
}
if (consumer2Errors.length > 0) {
expect(consumer2Errors[consumer2Errors.length - 1].toLowerCase()).toContain("emergency stop");
}
} finally {
await consumer1.disconnect();
await consumer2.disconnect();
}
} finally {
await producer.disconnect();
await roomManager.cleanup(producer);
}
});
test("producer reconnection workflow", async () => {
// Create room first
const tempProducer = new RoboticsProducer(TEST_SERVER_URL);
const { workspaceId, roomId } = await tempProducer.createRoom();
roomManager.addRoom(workspaceId, roomId);
try {
// Create consumer first
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
try {
const updateCollector = new MessageCollector(2);
consumer.onJointUpdate(updateCollector.collect);
// Create producer and connect
const producer = new RoboticsProducer(TEST_SERVER_URL);
await producer.connect(workspaceId, roomId);
// Send initial update
await producer.sendStateSync({ joint1: 10.0 });
await sleep(100);
// Disconnect producer
await producer.disconnect();
// Reconnect producer
await producer.connect(workspaceId, roomId);
// Send another update
await producer.sendStateSync({ joint1: 20.0 });
await sleep(200);
// Consumer should have received both updates
const receivedUpdates = await updateCollector.waitForMessages(3000);
expect(receivedUpdates.length).toBeGreaterThanOrEqual(2);
await producer.disconnect();
} finally {
await consumer.disconnect();
}
} finally {
await roomManager.cleanup(tempProducer);
}
});
test("consumer late join", async () => {
const producer = await createProducerClient(TEST_SERVER_URL);
const producerInfo = producer.getConnectionInfo();
const workspaceId = producerInfo.workspace_id!;
const roomId = producerInfo.room_id!;
roomManager.addRoom(workspaceId, roomId);
try {
// Producer sends some updates before consumer joins
await producer.sendStateSync({ joint1: 10.0, joint2: 20.0 });
await sleep(100);
await producer.sendJointUpdate([{ name: "joint3", value: 30.0 }]);
await sleep(100);
// Now consumer joins
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
try {
// Consumer should be able to get current state
const currentState = await consumer.getStateSyncAsync();
// Should contain all previously sent updates
const expectedState = { joint1: 10.0, joint2: 20.0, joint3: 30.0 };
expect(currentState).toEqual(expectedState);
} finally {
await consumer.disconnect();
}
} finally {
await producer.disconnect();
await roomManager.cleanup(producer);
}
});
test("room cleanup on producer disconnect", async () => {
const producer = await createProducerClient(TEST_SERVER_URL);
const producerInfo = producer.getConnectionInfo();
const workspaceId = producerInfo.workspace_id!;
const roomId = producerInfo.room_id!;
roomManager.addRoom(workspaceId, roomId);
try {
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
try {
// Send some state
await producer.sendStateSync({ joint1: 42.0 });
await sleep(100);
// Verify state exists
const stateBefore = await consumer.getStateSyncAsync();
expect(stateBefore).toEqual({ joint1: 42.0 });
// Producer disconnects
await producer.disconnect();
await sleep(100);
// State should still be accessible to consumer
const stateAfter = await consumer.getStateSyncAsync();
expect(stateAfter).toEqual({ joint1: 42.0 });
} finally {
await consumer.disconnect();
}
} finally {
// Room cleanup handled by roomManager since producer disconnected
}
});
test("high frequency updates", async () => {
const producer = await createProducerClient(TEST_SERVER_URL);
const producerInfo = producer.getConnectionInfo();
const workspaceId = producerInfo.workspace_id!;
const roomId = producerInfo.room_id!;
roomManager.addRoom(workspaceId, roomId);
try {
const consumer = await createConsumerClient(workspaceId, roomId, TEST_SERVER_URL);
try {
const updateCollector = new MessageCollector(5);
consumer.onJointUpdate(updateCollector.collect);
// Wait for connection
await sleep(100);
// Send rapid updates
for (let i = 0; i < 20; i++) {
await producer.sendStateSync({ joint1: i, timestamp: i });
await sleep(10); // 10ms intervals
}
// Wait for all messages
const receivedUpdates = await updateCollector.waitForMessages(5000);
// Should have received multiple updates
// (exact number may vary due to change detection)
expect(receivedUpdates.length).toBeGreaterThanOrEqual(5);
// Final state should reflect last update
const finalState = await consumer.getStateSyncAsync();
expect(finalState.joint1).toBe(19);
expect(finalState.timestamp).toBe(19);
} finally {
await consumer.disconnect();
}
} finally {
await producer.disconnect();
await roomManager.cleanup(producer);
}
});
}); |