Spaces:
Running
Running
File size: 7,425 Bytes
02eac4b |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
import asyncio
import pytest
from lerobot_arena_client import (
RoboticsConsumer,
RoboticsProducer,
create_client,
create_consumer_client,
create_producer_client,
)
class TestFactoryFunctions:
"""Test factory and convenience functions."""
def test_create_client_producer(self):
"""Test creating producer client via factory."""
client = create_client("producer", "http://localhost:8000")
assert isinstance(client, RoboticsProducer)
assert client.base_url == "http://localhost:8000"
assert not client.is_connected()
def test_create_client_consumer(self):
"""Test creating consumer client via factory."""
client = create_client("consumer", "http://localhost:8000")
assert isinstance(client, RoboticsConsumer)
assert client.base_url == "http://localhost:8000"
assert not client.is_connected()
def test_create_client_invalid_role(self):
"""Test creating client with invalid role."""
with pytest.raises(ValueError, match="Invalid role"):
create_client("invalid_role", "http://localhost:8000")
def test_create_client_default_url(self):
"""Test creating client with default URL."""
producer = create_client("producer")
consumer = create_client("consumer")
assert producer.base_url == "http://localhost:8000"
assert consumer.base_url == "http://localhost:8000"
@pytest.mark.asyncio
async def test_create_producer_client_auto_room(self):
"""Test creating producer client with auto room creation."""
producer = await create_producer_client("http://localhost:8000")
try:
assert isinstance(producer, RoboticsProducer)
assert producer.is_connected()
assert producer.room_id is not None
assert producer.role == "producer"
# Should be able to send commands immediately
await producer.send_state_sync({"test": 123.0})
finally:
room_id = producer.room_id
await producer.disconnect()
if room_id:
await producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_create_producer_client_specific_room(self):
"""Test creating producer client with specific room."""
# First create a room
temp_producer = RoboticsProducer("http://localhost:8000")
room_id = await temp_producer.create_room()
try:
producer = await create_producer_client("http://localhost:8000", room_id)
assert isinstance(producer, RoboticsProducer)
assert producer.is_connected()
assert producer.room_id == room_id
assert producer.role == "producer"
await producer.disconnect()
finally:
await temp_producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_create_consumer_client(self):
"""Test creating consumer client."""
# First create a room
temp_producer = RoboticsProducer("http://localhost:8000")
room_id = await temp_producer.create_room()
try:
consumer = await create_consumer_client(room_id, "http://localhost:8000")
assert isinstance(consumer, RoboticsConsumer)
assert consumer.is_connected()
assert consumer.room_id == room_id
assert consumer.role == "consumer"
# Should be able to get state immediately
state = await consumer.get_state_sync()
assert isinstance(state, dict)
await consumer.disconnect()
finally:
await temp_producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_create_producer_consumer_pair(self):
"""Test creating producer-consumer pair using convenience functions."""
producer = await create_producer_client("http://localhost:8000")
room_id = producer.room_id
try:
consumer = await create_consumer_client(room_id, "http://localhost:8000")
# Both should be connected to same room
assert producer.room_id == consumer.room_id
assert producer.is_connected()
assert consumer.is_connected()
# Test communication
received_updates = []
def on_joint_update(joints):
received_updates.append(joints)
consumer.on_joint_update(on_joint_update)
# Give some time for connection to stabilize
await asyncio.sleep(0.1)
# Send update from producer
await producer.send_state_sync({"test_joint": 42.0})
# Wait for message
await asyncio.sleep(0.2)
# Consumer should have received update
assert len(received_updates) >= 1
await consumer.disconnect()
finally:
await producer.disconnect()
if room_id:
await producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_convenience_functions_with_default_url(self):
"""Test convenience functions with default URL."""
producer = await create_producer_client()
room_id = producer.room_id
try:
assert producer.base_url == "http://localhost:8000"
assert producer.is_connected()
consumer = await create_consumer_client(room_id)
try:
assert consumer.base_url == "http://localhost:8000"
assert consumer.is_connected()
finally:
await consumer.disconnect()
finally:
await producer.disconnect()
if room_id:
await producer.delete_room(room_id)
@pytest.mark.asyncio
async def test_multiple_convenience_producers(self):
"""Test creating multiple producers via convenience function."""
producer1 = await create_producer_client("http://localhost:8000")
producer2 = await create_producer_client("http://localhost:8000")
try:
# Both should be connected to different rooms
assert producer1.room_id != producer2.room_id
assert producer1.is_connected()
assert producer2.is_connected()
# Both should work independently
await producer1.send_state_sync({"joint1": 10.0})
await producer2.send_state_sync({"joint2": 20.0})
finally:
room1 = producer1.room_id
room2 = producer2.room_id
await producer1.disconnect()
await producer2.disconnect()
if room1:
await producer1.delete_room(room1)
if room2:
await producer2.delete_room(room2)
@pytest.mark.asyncio
async def test_create_consumer_nonexistent_room(self):
"""Test creating consumer with nonexistent room."""
fake_room_id = "nonexistent-room-12345"
try:
consumer = await create_consumer_client(
fake_room_id, "http://localhost:8000"
)
# If this succeeds, the server creates room automatically
# or connection fails silently
if consumer.is_connected():
await consumer.disconnect()
except Exception:
# Expected behavior - connection should fail
pass
|