Spaces:
Sleeping
Sleeping
File size: 2,655 Bytes
8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 ec2d8f0 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 51f51c3 8aedc84 3367d1b 8aedc84 51f51c3 8aedc84 51f51c3 3367d1b 51f51c3 8aedc84 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 |
#!/usr/bin/env python3
"""
Basic Producer Example - RobotHub TransportServer
This example demonstrates:
- Creating a room with workspace
- Connecting as a producer
- Sending joint updates
- Basic error handling
"""
import asyncio
import logging
from transport_server_client import RoboticsProducer
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
async def main():
"""Basic producer example."""
# Create producer client
producer = RoboticsProducer("http://localhost:8000")
# Set up error callback
def on_error(error_msg):
logger.error(f"Producer error: {error_msg}")
def on_connected():
logger.info("Producer connected!")
def on_disconnected():
logger.info("Producer disconnected!")
producer.on_error(on_error)
producer.on_connected(on_connected)
producer.on_disconnected(on_disconnected)
try:
# Create a room and connect
workspace_id, room_id = await producer.create_room()
logger.info(f"Created room: {room_id}")
logger.info(f"Workspace ID: {workspace_id}")
# Connect as producer
success = await producer.connect(workspace_id, room_id)
if not success:
logger.error("Failed to connect!")
return
logger.info("Connected as producer!")
# Send some joint updates
joints = [
{"name": "shoulder", "value": 45.0},
{"name": "elbow", "value": -20.0},
{"name": "wrist", "value": 10.0},
]
logger.info("Sending joint updates...")
await producer.send_joint_update(joints)
# Send state sync (converted to joint updates)
state = {"shoulder": 90.0, "elbow": -45.0, "wrist": 0.0}
logger.info("Sending state sync...")
await producer.send_state_sync(state)
# Send a heartbeat
await producer.send_heartbeat()
# Keep alive for a bit
await asyncio.sleep(2)
logger.info("Example completed successfully!")
except Exception:
logger.exception("Error")
finally:
# Always disconnect and cleanup
if producer.is_connected():
await producer.disconnect()
logger.info("Disconnected")
# Clean up the room we created
if "workspace_id" in locals() and "room_id" in locals():
try:
await producer.delete_room(workspace_id, room_id)
logger.info("Room cleaned up")
except Exception:
logger.exception("Failed to clean up room")
if __name__ == "__main__":
asyncio.run(main())
|