blanchon commited on
Commit
8344c24
·
1 Parent(s): 1395b21
.dockerignore CHANGED
@@ -1,3 +1,5 @@
 
 
1
  **/node_modules/
2
  **/.npm
3
  **/npm-debug.log*
 
1
+ **/static-frontend/
2
+
3
  **/node_modules/
4
  **/.npm
5
  **/npm-debug.log*
Dockerfile CHANGED
@@ -1,13 +1,6 @@
1
  # Stage 1: Build frontend with Bun (client library + demo)
2
  FROM oven/bun:1-alpine AS frontend-builder
3
 
4
- # Build argument for transport server URL
5
- ARG PUBLIC_TRANSPORT_SERVER_URL=https://blanchon-robothub-transportserver.hf.space/api
6
- ENV PUBLIC_TRANSPORT_SERVER_URL=${PUBLIC_TRANSPORT_SERVER_URL}
7
-
8
- ARG PORT=8000
9
- ENV PORT=${PORT}
10
-
11
  WORKDIR /app
12
 
13
  # Install git for dependencies that might need it
@@ -113,6 +106,13 @@ WORKDIR /app
113
  # Add virtual environment to PATH
114
  ENV PATH="/app/server/.venv/bin:$PATH"
115
 
 
 
 
 
 
 
 
116
  # Expose the configured port (default 8000)
117
  EXPOSE ${PORT}
118
 
 
1
  # Stage 1: Build frontend with Bun (client library + demo)
2
  FROM oven/bun:1-alpine AS frontend-builder
3
 
 
 
 
 
 
 
 
4
  WORKDIR /app
5
 
6
  # Install git for dependencies that might need it
 
106
  # Add virtual environment to PATH
107
  ENV PATH="/app/server/.venv/bin:$PATH"
108
 
109
+ # Build argument for transport server URL
110
+ ARG PUBLIC_TRANSPORT_SERVER_URL=https://blanchon-robothub-transportserver.hf.space/api
111
+ ENV PUBLIC_TRANSPORT_SERVER_URL=${PUBLIC_TRANSPORT_SERVER_URL}
112
+
113
+ ARG PORT=8000
114
+ ENV PORT=${PORT}
115
+
116
  # Expose the configured port (default 8000)
117
  EXPOSE ${PORT}
118
 
client/python/examples/video_producer_example.py CHANGED
@@ -3,7 +3,7 @@
3
  Video Producer Example - Updated for Workspace API
4
 
5
  Demonstrates how to use the RobotHub TransportServer Python video client for streaming.
6
- This example creates animated video content and streams it to the arena server.
7
  """
8
 
9
  import asyncio
 
3
  Video Producer Example - Updated for Workspace API
4
 
5
  Demonstrates how to use the RobotHub TransportServer Python video client for streaming.
6
+ This example creates animated video content and streams it to the robot hub tranport server.
7
  """
8
 
9
  import asyncio
server/__pycache__/launch_with_ui.cpython-312.pyc CHANGED
Binary files a/server/__pycache__/launch_with_ui.cpython-312.pyc and b/server/__pycache__/launch_with_ui.cpython-312.pyc differ
 
server/launch_with_ui.py CHANGED
@@ -14,7 +14,7 @@ from src.api import app as api_app
14
  logging.basicConfig(
15
  level=logging.INFO,
16
  format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
17
- handlers=[logging.StreamHandler(), logging.FileHandler("server.log")],
18
  )
19
 
20
  logger = logging.getLogger(__name__)
 
14
  logging.basicConfig(
15
  level=logging.INFO,
16
  format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
17
+ handlers=[logging.StreamHandler()],
18
  )
19
 
20
  logger = logging.getLogger(__name__)
server/launch_without_ui.py CHANGED
@@ -8,7 +8,7 @@ import uvicorn
8
  logging.basicConfig(
9
  level=logging.INFO,
10
  format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
11
- handlers=[logging.StreamHandler(), logging.FileHandler("server.log")],
12
  )
13
 
14
  logger = logging.getLogger(__name__)
 
8
  logging.basicConfig(
9
  level=logging.INFO,
10
  format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
11
+ handlers=[logging.StreamHandler()],
12
  )
13
 
14
  logger = logging.getLogger(__name__)
server/server.log DELETED
The diff for this file is too large to render. See raw diff
 
server/src/__pycache__/api.cpython-312.pyc CHANGED
Binary files a/server/src/__pycache__/api.cpython-312.pyc and b/server/src/__pycache__/api.cpython-312.pyc differ
 
server/src/api.py CHANGED
@@ -8,7 +8,7 @@ from fastapi.middleware.cors import CORSMiddleware
8
  logging.basicConfig(
9
  level=logging.INFO,
10
  format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
11
- handlers=[logging.StreamHandler(), logging.FileHandler("server.log")],
12
  )
13
 
14
  logger = logging.getLogger(__name__)
 
8
  logging.basicConfig(
9
  level=logging.INFO,
10
  format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
11
+ handlers=[logging.StreamHandler()],
12
  )
13
 
14
  logger = logging.getLogger(__name__)
server/src/robotics/__pycache__/core.cpython-312.pyc CHANGED
Binary files a/server/src/robotics/__pycache__/core.cpython-312.pyc and b/server/src/robotics/__pycache__/core.cpython-312.pyc differ
 
server/src/robotics/api.py CHANGED
@@ -111,6 +111,7 @@ async def send_command(workspace_id: str, room_id: str, command: JointCommand):
111
  async def get_status():
112
  """Get system status"""
113
  stats = robotics_core.get_connection_stats()
 
114
  return {
115
  "service": "robotics",
116
  "status": "active",
@@ -120,6 +121,11 @@ async def get_status():
120
  "version": "2.0.0",
121
  "supported_roles": [role.value for role in ParticipantRole],
122
  "supported_robot_types": ["so-arm100", "generic"],
 
 
 
 
 
123
  }
124
 
125
 
@@ -129,6 +135,20 @@ async def health_check():
129
  return {"status": "healthy", "service": "robotics"}
130
 
131
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
132
  # ============= WEBSOCKET ENDPOINT =============
133
 
134
 
 
111
  async def get_status():
112
  """Get system status"""
113
  stats = robotics_core.get_connection_stats()
114
+ cleanup_info = robotics_core.get_cleanup_status()
115
  return {
116
  "service": "robotics",
117
  "status": "active",
 
121
  "version": "2.0.0",
122
  "supported_roles": [role.value for role in ParticipantRole],
123
  "supported_robot_types": ["so-arm100", "generic"],
124
+ "cleanup": {
125
+ "enabled": cleanup_info["cleanup_enabled"],
126
+ "inactivity_timeout_hours": cleanup_info["inactivity_timeout_minutes"] / 60,
127
+ "cleanup_interval_minutes": cleanup_info["cleanup_interval_minutes"],
128
+ },
129
  }
130
 
131
 
 
135
  return {"status": "healthy", "service": "robotics"}
136
 
137
 
138
+ @robotics_router.get("/cleanup/status")
139
+ async def get_cleanup_status():
140
+ """Get cleanup system status and room information"""
141
+ status = robotics_core.get_cleanup_status()
142
+ return {"success": True, "cleanup_status": status}
143
+
144
+
145
+ @robotics_router.post("/cleanup/manual")
146
+ async def trigger_manual_cleanup():
147
+ """Manually trigger room cleanup"""
148
+ result = await robotics_core.manual_cleanup()
149
+ return {"success": True, "cleanup_result": result}
150
+
151
+
152
  # ============= WEBSOCKET ENDPOINT =============
153
 
154
 
server/src/robotics/core.py CHANGED
@@ -1,7 +1,7 @@
1
  import json
2
  import logging
3
  import uuid
4
- from datetime import UTC, datetime
5
 
6
  from fastapi import WebSocket, WebSocketDisconnect
7
  from typing_extensions import TypedDict
@@ -40,6 +40,10 @@ class RoboticsRoom:
40
  # State
41
  self.joints: dict[str, float] = {}
42
 
 
 
 
 
43
 
44
  class RoboticsCore:
45
  """Core robotics system - simplified and merged with workspace support"""
@@ -52,6 +56,83 @@ class RoboticsCore:
52
  str, ConnectionMetadata
53
  ] = {} # participant_id -> metadata
54
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
55
  # ============= ROOM MANAGEMENT =============
56
 
57
  def create_room(
@@ -68,6 +149,10 @@ class RoboticsCore:
68
  room = RoboticsRoom(room_id, workspace_id)
69
  self.workspaces[workspace_id][room_id] = room
70
 
 
 
 
 
71
  logger.info(f"Created room {room_id} in workspace {workspace_id}")
72
  return workspace_id, room_id
73
 
@@ -170,6 +255,7 @@ class RoboticsCore:
170
  if role == ParticipantRole.PRODUCER:
171
  if room.producer is None:
172
  room.producer = participant_id
 
173
  logger.info(
174
  f"Producer {participant_id} joined room {room_id} in workspace {workspace_id}"
175
  )
@@ -183,6 +269,7 @@ class RoboticsCore:
183
  if role == ParticipantRole.CONSUMER:
184
  if participant_id not in room.consumers:
185
  room.consumers.append(participant_id)
 
186
  logger.info(
187
  f"Consumer {participant_id} joined room {room_id} in workspace {workspace_id}"
188
  )
@@ -228,6 +315,10 @@ class RoboticsCore:
228
  room.joints[name] = value
229
  changed_joints.append(joint)
230
 
 
 
 
 
231
  return changed_joints
232
 
233
  # ============= WEBSOCKET HANDLING =============
@@ -341,6 +432,9 @@ class RoboticsCore:
341
  )
342
  self.connection_metadata[participant_id]["message_count"] += 1
343
 
 
 
 
344
  try:
345
  msg_type = MessageType(message.get("type"))
346
  except ValueError:
@@ -612,3 +706,65 @@ class RoboticsCore:
612
  )
613
 
614
  return len(changed_joints)
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
  import json
2
  import logging
3
  import uuid
4
+ from datetime import UTC, datetime, timedelta
5
 
6
  from fastapi import WebSocket, WebSocketDisconnect
7
  from typing_extensions import TypedDict
 
40
  # State
41
  self.joints: dict[str, float] = {}
42
 
43
+ # Activity tracking
44
+ self.created_at = datetime.now(tz=UTC)
45
+ self.last_activity = datetime.now(tz=UTC)
46
+
47
 
48
  class RoboticsCore:
49
  """Core robotics system - simplified and merged with workspace support"""
 
56
  str, ConnectionMetadata
57
  ] = {} # participant_id -> metadata
58
 
59
+ # Cleanup configuration
60
+ self.inactivity_timeout = timedelta(hours=1) # 1 hour of inactivity
61
+ self.cleanup_interval = timedelta(minutes=15) # Check every 15 minutes
62
+
63
+ # Start cleanup task
64
+ self._cleanup_task = None
65
+ self._start_cleanup_task()
66
+
67
+ def _start_cleanup_task(self):
68
+ """Start the background cleanup task"""
69
+ import asyncio
70
+
71
+ async def cleanup_loop():
72
+ while True:
73
+ try:
74
+ await asyncio.sleep(self.cleanup_interval.total_seconds())
75
+ await self._cleanup_inactive_rooms()
76
+ except Exception:
77
+ logger.exception("Error in cleanup task")
78
+
79
+ try:
80
+ loop = asyncio.get_event_loop()
81
+ self._cleanup_task = loop.create_task(cleanup_loop())
82
+ logger.info("Started robotics room cleanup task")
83
+ except RuntimeError:
84
+ # No event loop running yet, cleanup will start when first room is created
85
+ logger.info("No event loop running, cleanup task will start later")
86
+
87
+ async def _cleanup_inactive_rooms(self):
88
+ """Remove rooms that have been inactive for more than the timeout period"""
89
+ current_time = datetime.now(tz=UTC)
90
+ rooms_to_remove = []
91
+
92
+ for workspace_id, rooms in self.workspaces.items():
93
+ for room_id, room in rooms.items():
94
+ # Check if room has any active connections
95
+ has_active_connections = False
96
+ room_last_activity = room.last_activity
97
+
98
+ # Check all connections for this room to find most recent activity
99
+ for metadata in self.connection_metadata.values():
100
+ if (
101
+ metadata["workspace_id"] == workspace_id
102
+ and metadata["room_id"] == room_id
103
+ ):
104
+ has_active_connections = True
105
+ room_last_activity = max(
106
+ room_last_activity, metadata["last_activity"]
107
+ )
108
+
109
+ # If no active connections, use room's last activity
110
+ if not has_active_connections:
111
+ time_since_activity = current_time - room_last_activity
112
+
113
+ if time_since_activity > self.inactivity_timeout:
114
+ rooms_to_remove.append((workspace_id, room_id))
115
+ logger.info(
116
+ f"Marking room {room_id} in workspace {workspace_id} for cleanup "
117
+ f"(inactive for {time_since_activity})"
118
+ )
119
+
120
+ # Remove inactive rooms
121
+ for workspace_id, room_id in rooms_to_remove:
122
+ if self.delete_room(workspace_id, room_id):
123
+ logger.info(
124
+ f"Auto-removed inactive room {room_id} from workspace {workspace_id}"
125
+ )
126
+
127
+ if rooms_to_remove:
128
+ logger.info(f"Cleaned up {len(rooms_to_remove)} inactive robotics rooms")
129
+
130
+ def _update_room_activity(self, workspace_id: str, room_id: str):
131
+ """Update the last activity timestamp for a room"""
132
+ room = self._get_room(workspace_id, room_id)
133
+ if room:
134
+ room.last_activity = datetime.now(tz=UTC)
135
+
136
  # ============= ROOM MANAGEMENT =============
137
 
138
  def create_room(
 
149
  room = RoboticsRoom(room_id, workspace_id)
150
  self.workspaces[workspace_id][room_id] = room
151
 
152
+ # Start cleanup task if not already running
153
+ if self._cleanup_task is None:
154
+ self._start_cleanup_task()
155
+
156
  logger.info(f"Created room {room_id} in workspace {workspace_id}")
157
  return workspace_id, room_id
158
 
 
255
  if role == ParticipantRole.PRODUCER:
256
  if room.producer is None:
257
  room.producer = participant_id
258
+ self._update_room_activity(workspace_id, room_id)
259
  logger.info(
260
  f"Producer {participant_id} joined room {room_id} in workspace {workspace_id}"
261
  )
 
269
  if role == ParticipantRole.CONSUMER:
270
  if participant_id not in room.consumers:
271
  room.consumers.append(participant_id)
272
+ self._update_room_activity(workspace_id, room_id)
273
  logger.info(
274
  f"Consumer {participant_id} joined room {room_id} in workspace {workspace_id}"
275
  )
 
315
  room.joints[name] = value
316
  changed_joints.append(joint)
317
 
318
+ # Update room activity if there were changes
319
+ if changed_joints:
320
+ self._update_room_activity(workspace_id, room_id)
321
+
322
  return changed_joints
323
 
324
  # ============= WEBSOCKET HANDLING =============
 
432
  )
433
  self.connection_metadata[participant_id]["message_count"] += 1
434
 
435
+ # Update room activity
436
+ self._update_room_activity(workspace_id, room_id)
437
+
438
  try:
439
  msg_type = MessageType(message.get("type"))
440
  except ValueError:
 
706
  )
707
 
708
  return len(changed_joints)
709
+
710
+ # ============= CLEANUP MANAGEMENT =============
711
+
712
+ async def manual_cleanup(self) -> dict:
713
+ """Manually trigger room cleanup and return results"""
714
+ logger.info("Manual robotics room cleanup triggered")
715
+ rooms_before = sum(len(rooms) for rooms in self.workspaces.values())
716
+ await self._cleanup_inactive_rooms()
717
+ rooms_after = sum(len(rooms) for rooms in self.workspaces.values())
718
+
719
+ return {
720
+ "cleanup_triggered": True,
721
+ "rooms_before": rooms_before,
722
+ "rooms_after": rooms_after,
723
+ "rooms_removed": rooms_before - rooms_after,
724
+ "timestamp": datetime.now(tz=UTC).isoformat(),
725
+ }
726
+
727
+ def get_cleanup_status(self) -> dict:
728
+ """Get cleanup system status and configuration"""
729
+ current_time = datetime.now(tz=UTC)
730
+
731
+ # Calculate room ages and activity
732
+ room_info = []
733
+ for workspace_id, rooms in self.workspaces.items():
734
+ for room_id, room in rooms.items():
735
+ # Find latest activity for this room
736
+ latest_activity = room.last_activity
737
+ for metadata in self.connection_metadata.values():
738
+ if (
739
+ metadata["workspace_id"] == workspace_id
740
+ and metadata["room_id"] == room_id
741
+ ):
742
+ latest_activity = max(
743
+ latest_activity, metadata["last_activity"]
744
+ )
745
+
746
+ age = current_time - room.created_at
747
+ inactivity = current_time - latest_activity
748
+
749
+ room_info.append({
750
+ "workspace_id": workspace_id,
751
+ "room_id": room_id,
752
+ "age_minutes": age.total_seconds() / 60,
753
+ "inactivity_minutes": inactivity.total_seconds() / 60,
754
+ "has_connections": any(
755
+ metadata["workspace_id"] == workspace_id
756
+ and metadata["room_id"] == room_id
757
+ for metadata in self.connection_metadata.values()
758
+ ),
759
+ "will_be_cleaned": inactivity > self.inactivity_timeout,
760
+ })
761
+
762
+ return {
763
+ "service": "robotics",
764
+ "cleanup_enabled": self._cleanup_task is not None,
765
+ "inactivity_timeout_minutes": self.inactivity_timeout.total_seconds() / 60,
766
+ "cleanup_interval_minutes": self.cleanup_interval.total_seconds() / 60,
767
+ "total_rooms": len(room_info),
768
+ "rooms": room_info,
769
+ "timestamp": current_time.isoformat(),
770
+ }
server/src/video/__pycache__/core.cpython-312.pyc CHANGED
Binary files a/server/src/video/__pycache__/core.cpython-312.pyc and b/server/src/video/__pycache__/core.cpython-312.pyc differ
 
server/src/video/api.py CHANGED
@@ -145,6 +145,7 @@ async def get_status():
145
  """Get video service status"""
146
  total_workspaces = len(video_core.workspaces)
147
  total_rooms = sum(len(rooms) for rooms in video_core.workspaces.values())
 
148
 
149
  return {
150
  "service": "video",
@@ -157,6 +158,11 @@ async def get_status():
157
  "supported_roles": [role.value for role in ParticipantRole],
158
  "supported_encodings": [encoding.value for encoding in VideoEncoding],
159
  "recovery_policies": [policy.value for policy in RecoveryPolicy],
 
 
 
 
 
160
  }
161
 
162
 
@@ -166,6 +172,20 @@ async def health_check():
166
  return {"status": "healthy", "service": "video"}
167
 
168
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
169
  @video_router.get("/")
170
  async def video_status():
171
  """Video service main endpoint"""
@@ -181,5 +201,7 @@ async def video_status():
181
  "/video/workspaces/{workspace_id}/rooms/{room_id}/webrtc/signal - WebRTC signaling",
182
  "/video/workspaces/{workspace_id}/rooms/{room_id}/ws - WebSocket connection",
183
  "/video/status - Service status",
 
 
184
  ],
185
  }
 
145
  """Get video service status"""
146
  total_workspaces = len(video_core.workspaces)
147
  total_rooms = sum(len(rooms) for rooms in video_core.workspaces.values())
148
+ cleanup_info = video_core.get_cleanup_status()
149
 
150
  return {
151
  "service": "video",
 
158
  "supported_roles": [role.value for role in ParticipantRole],
159
  "supported_encodings": [encoding.value for encoding in VideoEncoding],
160
  "recovery_policies": [policy.value for policy in RecoveryPolicy],
161
+ "cleanup": {
162
+ "enabled": cleanup_info["cleanup_enabled"],
163
+ "inactivity_timeout_hours": cleanup_info["inactivity_timeout_minutes"] / 60,
164
+ "cleanup_interval_minutes": cleanup_info["cleanup_interval_minutes"],
165
+ },
166
  }
167
 
168
 
 
172
  return {"status": "healthy", "service": "video"}
173
 
174
 
175
+ @video_router.get("/cleanup/status")
176
+ async def get_cleanup_status():
177
+ """Get cleanup system status and room information"""
178
+ status = video_core.get_cleanup_status()
179
+ return {"success": True, "cleanup_status": status}
180
+
181
+
182
+ @video_router.post("/cleanup/manual")
183
+ async def trigger_manual_cleanup():
184
+ """Manually trigger room cleanup"""
185
+ result = await video_core.manual_cleanup()
186
+ return {"success": True, "cleanup_result": result}
187
+
188
+
189
  @video_router.get("/")
190
  async def video_status():
191
  """Video service main endpoint"""
 
201
  "/video/workspaces/{workspace_id}/rooms/{room_id}/webrtc/signal - WebRTC signaling",
202
  "/video/workspaces/{workspace_id}/rooms/{room_id}/ws - WebSocket connection",
203
  "/video/status - Service status",
204
+ "/video/cleanup/status - Cleanup system status",
205
+ "/video/cleanup/manual - Manual cleanup trigger",
206
  ],
207
  }
server/src/video/core.py CHANGED
@@ -4,7 +4,7 @@ import logging
4
  import time
5
  import uuid
6
  from collections.abc import Callable, Coroutine
7
- from datetime import UTC, datetime
8
  from fractions import Fraction
9
  from functools import lru_cache
10
 
@@ -600,6 +600,10 @@ class VideoRoom:
600
  self.start_time = datetime.now(tz=UTC)
601
  self.last_frame_time: datetime | None = None
602
 
 
 
 
 
603
 
604
  # ============= VIDEO CORE (main class) =============
605
 
@@ -616,6 +620,84 @@ class VideoCore:
616
  # Track background tasks to prevent garbage collection
617
  self.background_tasks: set = set()
618
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
619
  # ============= ROOM MANAGEMENT (same pattern as robotics) =============
620
 
621
  def create_room(
@@ -635,6 +717,11 @@ class VideoCore:
635
 
636
  room = VideoRoom(room_id, workspace_id, config, recovery_config)
637
  self.workspaces[workspace_id][room_id] = room
 
 
 
 
 
638
  logger.info(f"Created video room {room_id} in workspace {workspace_id}")
639
  return workspace_id, room_id
640
 
@@ -769,6 +856,7 @@ class VideoCore:
769
  if role == ParticipantRole.PRODUCER:
770
  if room.producer is None:
771
  room.producer = participant_id
 
772
  logger.info(
773
  f"Producer {participant_id} joined video room {room_id} in workspace {workspace_id}"
774
  )
@@ -788,6 +876,7 @@ class VideoCore:
788
  if role == ParticipantRole.CONSUMER:
789
  if participant_id not in room.consumers:
790
  room.consumers.append(participant_id)
 
791
  logger.info(
792
  f"Consumer {participant_id} joined video room {room_id} in workspace {workspace_id}"
793
  )
@@ -953,6 +1042,8 @@ class VideoCore:
953
  logger.exception(f"Error sending frame to {consumer_id}")
954
 
955
  if consumer_count > 0:
 
 
956
  logger.debug(f"Broadcasted frame to {consumer_count} consumers")
957
 
958
  return consumer_count
@@ -1053,6 +1144,9 @@ class VideoCore:
1053
  )
1054
  self.connection_metadata[participant_id]["message_count"] += 1
1055
 
 
 
 
1056
  # Handle heartbeat
1057
  if message["type"] == MessageType.HEARTBEAT:
1058
  heartbeat_ack: HeartbeatAckMessageDict = {
@@ -1319,3 +1413,67 @@ class VideoCore:
1319
  self.background_tasks.add(task)
1320
  task.add_done_callback(self.background_tasks.discard)
1321
  return task
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
4
  import time
5
  import uuid
6
  from collections.abc import Callable, Coroutine
7
+ from datetime import UTC, datetime, timedelta
8
  from fractions import Fraction
9
  from functools import lru_cache
10
 
 
600
  self.start_time = datetime.now(tz=UTC)
601
  self.last_frame_time: datetime | None = None
602
 
603
+ # Activity tracking
604
+ self.created_at = datetime.now(tz=UTC)
605
+ self.last_activity = datetime.now(tz=UTC)
606
+
607
 
608
  # ============= VIDEO CORE (main class) =============
609
 
 
620
  # Track background tasks to prevent garbage collection
621
  self.background_tasks: set = set()
622
 
623
+ # Cleanup configuration
624
+ self.inactivity_timeout = timedelta(hours=1) # 1 hour of inactivity
625
+ self.cleanup_interval = timedelta(minutes=15) # Check every 15 minutes
626
+
627
+ # Start cleanup task
628
+ self._cleanup_task = None
629
+ self._start_cleanup_task()
630
+
631
+ def _start_cleanup_task(self):
632
+ """Start the background cleanup task"""
633
+
634
+ async def cleanup_loop():
635
+ while True:
636
+ try:
637
+ await asyncio.sleep(self.cleanup_interval.total_seconds())
638
+ await self._cleanup_inactive_rooms()
639
+ except Exception:
640
+ logger.exception("Error in video cleanup task")
641
+
642
+ try:
643
+ loop = asyncio.get_event_loop()
644
+ self._cleanup_task = loop.create_task(cleanup_loop())
645
+ logger.info("Started video room cleanup task")
646
+ except RuntimeError:
647
+ # No event loop running yet, cleanup will start when first room is created
648
+ logger.info("No event loop running, video cleanup task will start later")
649
+
650
+ async def _cleanup_inactive_rooms(self):
651
+ """Remove rooms that have been inactive for more than the timeout period"""
652
+ current_time = datetime.now(tz=UTC)
653
+ rooms_to_remove = []
654
+
655
+ for workspace_id, rooms in self.workspaces.items():
656
+ for room_id, room in rooms.items():
657
+ # Check if room has any active connections
658
+ has_active_connections = False
659
+ room_last_activity = room.last_activity
660
+
661
+ # Check all connections for this room to find most recent activity
662
+ for metadata in self.connection_metadata.values():
663
+ if (
664
+ metadata.get("workspace_id") == workspace_id
665
+ and metadata.get("room_id") == room_id
666
+ ):
667
+ has_active_connections = True
668
+ if (
669
+ metadata.get("last_activity")
670
+ and metadata["last_activity"] > room_last_activity
671
+ ):
672
+ room_last_activity = metadata["last_activity"]
673
+
674
+ # If no active connections, use room's last activity
675
+ if not has_active_connections:
676
+ time_since_activity = current_time - room_last_activity
677
+
678
+ if time_since_activity > self.inactivity_timeout:
679
+ rooms_to_remove.append((workspace_id, room_id))
680
+ logger.info(
681
+ f"Marking video room {room_id} in workspace {workspace_id} for cleanup "
682
+ f"(inactive for {time_since_activity})"
683
+ )
684
+
685
+ # Remove inactive rooms
686
+ for workspace_id, room_id in rooms_to_remove:
687
+ if self.delete_room(workspace_id, room_id):
688
+ logger.info(
689
+ f"Auto-removed inactive video room {room_id} from workspace {workspace_id}"
690
+ )
691
+
692
+ if rooms_to_remove:
693
+ logger.info(f"Cleaned up {len(rooms_to_remove)} inactive video rooms")
694
+
695
+ def _update_room_activity(self, workspace_id: str, room_id: str):
696
+ """Update the last activity timestamp for a room"""
697
+ room = self._get_room(workspace_id, room_id)
698
+ if room:
699
+ room.last_activity = datetime.now(tz=UTC)
700
+
701
  # ============= ROOM MANAGEMENT (same pattern as robotics) =============
702
 
703
  def create_room(
 
717
 
718
  room = VideoRoom(room_id, workspace_id, config, recovery_config)
719
  self.workspaces[workspace_id][room_id] = room
720
+
721
+ # Start cleanup task if not already running
722
+ if self._cleanup_task is None:
723
+ self._start_cleanup_task()
724
+
725
  logger.info(f"Created video room {room_id} in workspace {workspace_id}")
726
  return workspace_id, room_id
727
 
 
856
  if role == ParticipantRole.PRODUCER:
857
  if room.producer is None:
858
  room.producer = participant_id
859
+ self._update_room_activity(workspace_id, room_id)
860
  logger.info(
861
  f"Producer {participant_id} joined video room {room_id} in workspace {workspace_id}"
862
  )
 
876
  if role == ParticipantRole.CONSUMER:
877
  if participant_id not in room.consumers:
878
  room.consumers.append(participant_id)
879
+ self._update_room_activity(workspace_id, room_id)
880
  logger.info(
881
  f"Consumer {participant_id} joined video room {room_id} in workspace {workspace_id}"
882
  )
 
1042
  logger.exception(f"Error sending frame to {consumer_id}")
1043
 
1044
  if consumer_count > 0:
1045
+ # Update room activity when frames are being broadcast
1046
+ self._update_room_activity(workspace_id, room_id)
1047
  logger.debug(f"Broadcasted frame to {consumer_count} consumers")
1048
 
1049
  return consumer_count
 
1144
  )
1145
  self.connection_metadata[participant_id]["message_count"] += 1
1146
 
1147
+ # Update room activity
1148
+ self._update_room_activity(workspace_id, room_id)
1149
+
1150
  # Handle heartbeat
1151
  if message["type"] == MessageType.HEARTBEAT:
1152
  heartbeat_ack: HeartbeatAckMessageDict = {
 
1413
  self.background_tasks.add(task)
1414
  task.add_done_callback(self.background_tasks.discard)
1415
  return task
1416
+
1417
+ # ============= CLEANUP MANAGEMENT =============
1418
+
1419
+ async def manual_cleanup(self) -> dict:
1420
+ """Manually trigger room cleanup and return results"""
1421
+ logger.info("Manual video room cleanup triggered")
1422
+ rooms_before = sum(len(rooms) for rooms in self.workspaces.values())
1423
+ await self._cleanup_inactive_rooms()
1424
+ rooms_after = sum(len(rooms) for rooms in self.workspaces.values())
1425
+
1426
+ return {
1427
+ "cleanup_triggered": True,
1428
+ "rooms_before": rooms_before,
1429
+ "rooms_after": rooms_after,
1430
+ "rooms_removed": rooms_before - rooms_after,
1431
+ "timestamp": datetime.now(tz=UTC).isoformat(),
1432
+ }
1433
+
1434
+ def get_cleanup_status(self) -> dict:
1435
+ """Get cleanup system status and configuration"""
1436
+ current_time = datetime.now(tz=UTC)
1437
+
1438
+ # Calculate room ages and activity
1439
+ room_info = []
1440
+ for workspace_id, rooms in self.workspaces.items():
1441
+ for room_id, room in rooms.items():
1442
+ # Find latest activity for this room
1443
+ latest_activity = room.last_activity
1444
+ for metadata in self.connection_metadata.values():
1445
+ if (
1446
+ metadata.get("workspace_id") == workspace_id
1447
+ and metadata.get("room_id") == room_id
1448
+ ):
1449
+ if (
1450
+ metadata.get("last_activity")
1451
+ and metadata["last_activity"] > latest_activity
1452
+ ):
1453
+ latest_activity = metadata["last_activity"]
1454
+
1455
+ age = current_time - room.created_at
1456
+ inactivity = current_time - latest_activity
1457
+
1458
+ room_info.append({
1459
+ "workspace_id": workspace_id,
1460
+ "room_id": room_id,
1461
+ "age_minutes": age.total_seconds() / 60,
1462
+ "inactivity_minutes": inactivity.total_seconds() / 60,
1463
+ "has_connections": any(
1464
+ metadata.get("workspace_id") == workspace_id
1465
+ and metadata.get("room_id") == room_id
1466
+ for metadata in self.connection_metadata.values()
1467
+ ),
1468
+ "will_be_cleaned": inactivity > self.inactivity_timeout,
1469
+ })
1470
+
1471
+ return {
1472
+ "service": "video",
1473
+ "cleanup_enabled": self._cleanup_task is not None,
1474
+ "inactivity_timeout_minutes": self.inactivity_timeout.total_seconds() / 60,
1475
+ "cleanup_interval_minutes": self.cleanup_interval.total_seconds() / 60,
1476
+ "total_rooms": len(room_info),
1477
+ "rooms": room_info,
1478
+ "timestamp": current_time.isoformat(),
1479
+ }