Spaces:
Sleeping
Sleeping
File size: 5,949 Bytes
8aedc84 ec2d8f0 8aedc84 51f51c3 3367d1b 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 3367d1b 8aedc84 51f51c3 8aedc84 3367d1b 8aedc84 51f51c3 8aedc84 3367d1b 8aedc84 51f51c3 8aedc84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 |
import pytest
import pytest_asyncio
from transport_server_client import RoboticsProducer
@pytest_asyncio.fixture
async def producer():
"""Create a producer for REST API testing."""
client = RoboticsProducer("http://localhost:8000")
yield client
class TestRestAPI:
"""Test REST API functionality."""
@pytest.mark.asyncio
async def test_list_rooms_empty(self, producer):
"""Test listing rooms when no rooms exist."""
# Use a temporary workspace ID
workspace_id = producer.generate_workspace_id()
rooms = await producer.list_rooms(workspace_id)
assert isinstance(rooms, list)
@pytest.mark.asyncio
async def test_create_room(self, producer):
"""Test creating a room."""
workspace_id, room_id = await producer.create_room()
assert isinstance(workspace_id, str)
assert isinstance(room_id, str)
assert len(workspace_id) > 0
assert len(room_id) > 0
# Cleanup
await producer.delete_room(workspace_id, room_id)
@pytest.mark.asyncio
async def test_create_room_with_id(self, producer):
"""Test creating a room with specific ID."""
custom_room_id = "test-room-123"
custom_workspace_id = "test-workspace-456"
workspace_id, room_id = await producer.create_room(
custom_workspace_id, custom_room_id
)
assert workspace_id == custom_workspace_id
assert room_id == custom_room_id
# Cleanup
await producer.delete_room(workspace_id, room_id)
@pytest.mark.asyncio
async def test_list_rooms_with_rooms(self, producer):
"""Test listing rooms when rooms exist."""
# Create a test room
workspace_id, room_id = await producer.create_room()
try:
rooms = await producer.list_rooms(workspace_id)
assert isinstance(rooms, list)
assert len(rooms) >= 1
# Check if our room is in the list
room_ids = [room["id"] for room in rooms]
assert room_id in room_ids
# Verify room structure
test_room = next(room for room in rooms if room["id"] == room_id)
assert "participants" in test_room
assert "joints_count" in test_room
finally:
await producer.delete_room(workspace_id, room_id)
@pytest.mark.asyncio
async def test_get_room_info(self, producer):
"""Test getting room information."""
workspace_id, room_id = await producer.create_room()
try:
room_info = await producer.get_room_info(workspace_id, room_id)
assert isinstance(room_info, dict)
assert room_info["id"] == room_id
assert room_info["workspace_id"] == workspace_id
assert "participants" in room_info
assert "joints_count" in room_info
assert "has_producer" in room_info
assert "active_consumers" in room_info
finally:
await producer.delete_room(workspace_id, room_id)
@pytest.mark.asyncio
async def test_get_room_state(self, producer):
"""Test getting room state."""
workspace_id, room_id = await producer.create_room()
try:
room_state = await producer.get_room_state(workspace_id, room_id)
assert isinstance(room_state, dict)
assert "room_id" in room_state
assert "workspace_id" in room_state
assert "joints" in room_state
assert "participants" in room_state
assert "timestamp" in room_state
assert room_state["room_id"] == room_id
assert room_state["workspace_id"] == workspace_id
finally:
await producer.delete_room(workspace_id, room_id)
@pytest.mark.asyncio
async def test_delete_room(self, producer):
"""Test deleting a room."""
workspace_id, room_id = await producer.create_room()
# Verify room exists
rooms_before = await producer.list_rooms(workspace_id)
room_ids_before = [room["id"] for room in rooms_before]
assert room_id in room_ids_before
# Delete room
success = await producer.delete_room(workspace_id, room_id)
assert success is True
# Verify room is deleted
rooms_after = await producer.list_rooms(workspace_id)
room_ids_after = [room["id"] for room in rooms_after]
assert room_id not in room_ids_after
@pytest.mark.asyncio
async def test_delete_nonexistent_room(self, producer):
"""Test deleting a room that doesn't exist."""
workspace_id = producer.generate_workspace_id()
fake_room_id = "nonexistent-room-id"
success = await producer.delete_room(workspace_id, fake_room_id)
assert success is False
@pytest.mark.asyncio
async def test_get_room_info_nonexistent(self, producer):
"""Test getting info for a room that doesn't exist."""
workspace_id = producer.generate_workspace_id()
fake_room_id = "nonexistent-room-id"
# This should raise an exception or return error info
try:
await producer.get_room_info(workspace_id, fake_room_id)
# If no exception, check for error in response
except Exception:
# Expected behavior for nonexistent room
pass
@pytest.mark.asyncio
async def test_get_room_state_nonexistent(self, producer):
"""Test getting state for a room that doesn't exist."""
workspace_id = producer.generate_workspace_id()
fake_room_id = "nonexistent-room-id"
# This should raise an exception or return error info
try:
await producer.get_room_state(workspace_id, fake_room_id)
# If no exception, check for error in response
except Exception:
# Expected behavior for nonexistent room
pass
|