import asyncio import pytest from lerobot_arena_client import ( RoboticsProducer, create_consumer_client, create_producer_client, ) class TestIntegration: """End-to-end integration tests.""" @pytest.mark.asyncio async def test_full_producer_consumer_workflow(self): """Test complete producer-consumer workflow.""" # Create producer and room producer = await create_producer_client("http://localhost:8000") room_id = producer.room_id try: # Create consumer and connect to same room consumer = await create_consumer_client(room_id, "http://localhost:8000") try: # Set up consumer to collect messages received_states = [] received_updates = [] received_errors = [] def on_state_sync(state): received_states.append(state) def on_joint_update(joints): received_updates.append(joints) def on_error(error): received_errors.append(error) consumer.on_state_sync(on_state_sync) consumer.on_joint_update(on_joint_update) consumer.on_error(on_error) # Wait for connections to stabilize await asyncio.sleep(0.2) # Producer sends initial state initial_state = {"shoulder": 0.0, "elbow": 0.0, "wrist": 0.0} await producer.send_state_sync(initial_state) await asyncio.sleep(0.1) # Producer sends series of joint updates joint_sequences = [ [{"name": "shoulder", "value": 45.0}], [{"name": "elbow", "value": -30.0}], [{"name": "wrist", "value": 15.0}], [ {"name": "shoulder", "value": 90.0}, {"name": "elbow", "value": -60.0}, ], ] for joints in joint_sequences: await producer.send_joint_update(joints) await asyncio.sleep(0.1) # Wait for all messages to be received await asyncio.sleep(0.3) # Verify consumer received messages assert len(received_updates) >= 4 # At least the joint updates # Verify final state final_state = await consumer.get_state_sync() expected_final_state = {"shoulder": 90.0, "elbow": -60.0, "wrist": 15.0} assert final_state == expected_final_state finally: await consumer.disconnect() finally: await producer.disconnect() await producer.delete_room(room_id) @pytest.mark.asyncio async def test_multiple_consumers_same_room(self): """Test multiple consumers receiving same messages.""" producer = await create_producer_client("http://localhost:8000") room_id = producer.room_id try: # Create multiple consumers consumer1 = await create_consumer_client(room_id, "http://localhost:8000") consumer2 = await create_consumer_client(room_id, "http://localhost:8000") try: # Set up message collection for both consumers consumer1_updates = [] consumer2_updates = [] consumer1.on_joint_update( lambda joints: consumer1_updates.append(joints) ) consumer2.on_joint_update( lambda joints: consumer2_updates.append(joints) ) # Wait for connections await asyncio.sleep(0.2) # Producer sends updates test_joints = [ {"name": "joint1", "value": 10.0}, {"name": "joint2", "value": 20.0}, ] await producer.send_joint_update(test_joints) # Wait for message propagation await asyncio.sleep(0.2) # Both consumers should receive the same update assert len(consumer1_updates) >= 1 assert len(consumer2_updates) >= 1 # Verify both received same data if consumer1_updates and consumer2_updates: assert consumer1_updates[-1] == consumer2_updates[-1] finally: await consumer1.disconnect() await consumer2.disconnect() finally: await producer.disconnect() await producer.delete_room(room_id) @pytest.mark.asyncio async def test_emergency_stop_propagation(self): """Test emergency stop propagation to all consumers.""" producer = await create_producer_client("http://localhost:8000") room_id = producer.room_id try: # Create consumers consumer1 = await create_consumer_client(room_id, "http://localhost:8000") consumer2 = await create_consumer_client(room_id, "http://localhost:8000") try: # Set up error collection consumer1_errors = [] consumer2_errors = [] consumer1.on_error(lambda error: consumer1_errors.append(error)) consumer2.on_error(lambda error: consumer2_errors.append(error)) # Wait for connections await asyncio.sleep(0.2) # Producer sends emergency stop await producer.send_emergency_stop("Integration test emergency stop") # Wait for message propagation await asyncio.sleep(0.2) # Both consumers should receive emergency stop assert len(consumer1_errors) >= 1 assert len(consumer2_errors) >= 1 # Verify error messages contain emergency stop info if consumer1_errors: assert "emergency stop" in consumer1_errors[-1].lower() if consumer2_errors: assert "emergency stop" in consumer2_errors[-1].lower() finally: await consumer1.disconnect() await consumer2.disconnect() finally: await producer.disconnect() await producer.delete_room(room_id) @pytest.mark.asyncio async def test_producer_reconnection_workflow(self): """Test producer reconnecting and resuming operation.""" # Create room first temp_producer = RoboticsProducer("http://localhost:8000") room_id = await temp_producer.create_room() try: # Create consumer first consumer = await create_consumer_client(room_id, "http://localhost:8000") try: received_updates = [] consumer.on_joint_update(lambda joints: received_updates.append(joints)) # Create producer and connect producer = RoboticsProducer("http://localhost:8000") await producer.connect(room_id) # Send initial update await producer.send_state_sync({"joint1": 10.0}) await asyncio.sleep(0.1) # Disconnect producer await producer.disconnect() # Reconnect producer await producer.connect(room_id) # Send another update await producer.send_state_sync({"joint1": 20.0}) await asyncio.sleep(0.2) # Consumer should have received both updates assert len(received_updates) >= 2 await producer.disconnect() finally: await consumer.disconnect() finally: await temp_producer.delete_room(room_id) @pytest.mark.asyncio async def test_consumer_late_join(self): """Test consumer joining room after producer has sent updates.""" producer = await create_producer_client("http://localhost:8000") room_id = producer.room_id try: # Producer sends some updates before consumer joins await producer.send_state_sync({"joint1": 10.0, "joint2": 20.0}) await asyncio.sleep(0.1) await producer.send_joint_update([{"name": "joint3", "value": 30.0}]) await asyncio.sleep(0.1) # Now consumer joins consumer = await create_consumer_client(room_id, "http://localhost:8000") try: # Consumer should be able to get current state current_state = await consumer.get_state_sync() # Should contain all previously sent updates expected_state = {"joint1": 10.0, "joint2": 20.0, "joint3": 30.0} assert current_state == expected_state finally: await consumer.disconnect() finally: await producer.disconnect() await producer.delete_room(room_id) @pytest.mark.asyncio async def test_room_cleanup_on_producer_disconnect(self): """Test room state when producer disconnects.""" producer = await create_producer_client("http://localhost:8000") room_id = producer.room_id try: consumer = await create_consumer_client(room_id, "http://localhost:8000") try: # Send some state await producer.send_state_sync({"joint1": 42.0}) await asyncio.sleep(0.1) # Verify state exists state_before = await consumer.get_state_sync() assert state_before == {"joint1": 42.0} # Producer disconnects await producer.disconnect() await asyncio.sleep(0.1) # State should still be accessible to consumer state_after = await consumer.get_state_sync() assert state_after == {"joint1": 42.0} finally: await consumer.disconnect() finally: # Clean up room manually since producer disconnected temp_producer = RoboticsProducer("http://localhost:8000") await temp_producer.delete_room(room_id) @pytest.mark.asyncio async def test_high_frequency_updates(self): """Test handling high frequency updates.""" producer = await create_producer_client("http://localhost:8000") room_id = producer.room_id try: consumer = await create_consumer_client(room_id, "http://localhost:8000") try: received_updates = [] consumer.on_joint_update(lambda joints: received_updates.append(joints)) # Wait for connection await asyncio.sleep(0.1) # Send rapid updates for i in range(20): await producer.send_state_sync({"joint1": float(i), "timestamp": i}) await asyncio.sleep(0.01) # 10ms intervals # Wait for all messages await asyncio.sleep(0.5) # Should have received multiple updates # (exact number may vary due to change detection) assert len(received_updates) >= 5 # Final state should reflect last update final_state = await consumer.get_state_sync() assert final_state["joint1"] == 19.0 assert final_state["timestamp"] == 19 finally: await consumer.disconnect() finally: await producer.disconnect() await producer.delete_room(room_id)