File size: 7,547 Bytes
02eac4b
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python3
"""
Basic Video Producer Example

Demonstrates how to use the LeRobot Arena Python video client for streaming.
This example creates animated video content and streams it to the arena server.
"""

import asyncio
import logging
import time

import numpy as np

# Import the video client
from lerobot_arena_client.video import (
    VideoProducer,
    create_producer_client,
)


async def animated_frame_source() -> np.ndarray | None:
    """Create animated frames with colorful patterns"""
    # Create a colorful animated frame
    height, width = 480, 640
    frame_count = int(time.time() * 30) % 1000  # 30 fps simulation

    # Generate animated RGB channels using vectorized operations
    time_factor = frame_count * 0.1

    # Create colorful animated patterns
    y_coords, x_coords = np.meshgrid(np.arange(width), np.arange(height), indexing="xy")

    r = (128 + 127 * np.sin(time_factor + x_coords * 0.01)).astype(np.uint8)
    g = (128 + 127 * np.sin(time_factor + y_coords * 0.01)).astype(np.uint8)
    b = (128 + 127 * np.sin(time_factor) * np.ones((height, width))).astype(np.uint8)

    # Stack into RGB frame
    frame = np.stack([r, g, b], axis=2)

    # Add a moving circle for visual feedback
    center_x = int(320 + 200 * np.sin(frame_count * 0.05))
    center_y = int(240 + 100 * np.cos(frame_count * 0.05))

    # Create circle mask
    circle_mask = (x_coords - center_x) ** 2 + (y_coords - center_y) ** 2 < 50**2
    frame[circle_mask] = [255, 255, 0]  # Yellow circle

    # Add frame counter text overlay
    import cv2

    cv2.putText(
        frame,
        f"Frame {frame_count}",
        (20, 50),
        cv2.FONT_HERSHEY_SIMPLEX,
        1.5,
        (0, 0, 0),
        3,
    )  # Black outline
    cv2.putText(
        frame,
        f"Frame {frame_count}",
        (20, 50),
        cv2.FONT_HERSHEY_SIMPLEX,
        1.5,
        (255, 255, 255),
        2,
    )  # White text

    return frame


async def main():
    """Main function demonstrating video producer functionality"""
    # Configure logging
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
    )
    logger = logging.getLogger(__name__)

    logger.info("πŸš€ Starting LeRobot Arena Video Producer Example")

    try:
        # Create video producer with configuration
        producer = VideoProducer(base_url="http://localhost:8000")

        # Set up event handlers
        producer.on_connected(lambda: logger.info("βœ… Connected to video server"))
        producer.on_disconnected(
            lambda: logger.info("πŸ‘‹ Disconnected from video server")
        )
        producer.on_error(lambda error: logger.error(f"❌ Error: {error}"))

        producer.on_status_update(
            lambda status, data: logger.info(f"πŸ“Š Status: {status}")
        )
        producer.on_stream_stats(
            lambda stats: logger.debug(f"πŸ“ˆ Stats: {stats.average_fps:.1f}fps")
        )

        # Create a room and connect
        room_id = await producer.create_room()
        logger.info(f"🏠 Created room: {room_id}")

        connected = await producer.connect(room_id)
        if not connected:
            logger.error("❌ Failed to connect to room")
            return

        logger.info(f"βœ… Connected as producer to room: {room_id}")

        # Start custom video stream with animated content
        logger.info("🎬 Starting animated video stream...")
        await producer.start_custom_stream(animated_frame_source)

        logger.info("πŸ“Ί Video streaming started!")
        logger.info(f"πŸ”— Consumers can connect to room: {room_id}")
        logger.info(
            f"πŸ“± Use JS consumer: http://localhost:5173/consumer?room={room_id}"
        )

        # Stream for demo duration
        duration = 30  # Stream for 30 seconds
        logger.info(f"⏱️  Streaming for {duration} seconds...")

        for i in range(duration):
            await asyncio.sleep(1)
            if i % 5 == 0:
                logger.info(f"πŸ“‘ Streaming... {duration - i} seconds remaining")

        logger.info("πŸ›‘ Stopping video stream...")
        await producer.stop_streaming()

    except Exception as e:
        logger.error(f"❌ Unexpected error: {e}")
        import traceback

        traceback.print_exc()
    finally:
        # Clean up
        logger.info("🧹 Cleaning up...")
        if "producer" in locals():
            await producer.disconnect()
        logger.info("βœ… Video producer example completed")


async def camera_example():
    """Example using actual camera (if available)"""
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    logger.info("πŸ“· Starting Camera Video Producer Example")

    try:
        # Create producer using factory function
        producer = await create_producer_client(base_url="http://localhost:8000")

        room_id = producer.current_room_id
        logger.info(f"🏠 Connected to room: {room_id}")

        # Get available cameras
        cameras = await producer.get_camera_devices()
        if cameras:
            logger.info("πŸ“Ή Available cameras:")
            for camera in cameras:
                logger.info(
                    f"  Device {camera['device_id']}: {camera['name']} "
                    f"({camera['resolution']['width']}x{camera['resolution']['height']})"
                )

            # Start camera stream
            logger.info("πŸ“· Starting camera stream...")
            await producer.start_camera(device_id=0)

            logger.info("πŸ“Ί Camera streaming started!")
            logger.info(f"πŸ”— Consumers can connect to room: {room_id}")

            # Stream for demo duration
            await asyncio.sleep(30)

        else:
            logger.warning("⚠️  No cameras found")

    except Exception as e:
        logger.error(f"❌ Camera error: {e}")
        logger.info("πŸ’‘ Make sure your camera is available and not used by other apps")
    finally:
        if "producer" in locals():
            await producer.disconnect()


async def screen_share_example():
    """Example using screen sharing (animated pattern for demo)"""
    logging.basicConfig(level=logging.INFO)
    logger = logging.getLogger(__name__)

    logger.info("πŸ–₯️  Starting Screen Share Example")

    try:
        producer = await create_producer_client()
        room_id = producer.current_room_id

        logger.info("πŸ–₯️  Starting screen share...")
        await producer.start_screen_share()

        logger.info(f"πŸ“Ί Screen sharing started! Room: {room_id}")

        # Share for demo duration
        await asyncio.sleep(20)

    except Exception as e:
        logger.error(f"❌ Screen share error: {e}")
    finally:
        if "producer" in locals():
            await producer.disconnect()


if __name__ == "__main__":
    import sys

    if len(sys.argv) > 1:
        mode = sys.argv[1]

        if mode == "camera":
            asyncio.run(camera_example())
        elif mode == "screen":
            asyncio.run(screen_share_example())
        elif mode == "animated":
            asyncio.run(main())
        else:
            print("Usage:")
            print("  python video_producer_example.py animated   # Animated content")
            print("  python video_producer_example.py camera     # Camera stream")
            print("  python video_producer_example.py screen     # Screen share")
    else:
        # Default: run animated example
        asyncio.run(main())