from typing import Dict, Optional, List from datetime import datetime, timezone import logging from .models import Robot, RobotStatus, DriverJointState logger = logging.getLogger(__name__) class RobotManager: """Manages robot lifecycle and state""" def __init__(self): self.robots: Dict[str, Robot] = {} async def create_robot(self, robot_id: str, robot_type: str, name: str) -> Robot: """Create a new robot""" if robot_id in self.robots: raise ValueError(f"Robot {robot_id} already exists") # Create demo joint configuration for so-arm100 joints = self._create_demo_joints(robot_type) robot = Robot( id=robot_id, name=name, robot_type=robot_type, created_at=datetime.now(timezone.utc), joints=joints ) self.robots[robot_id] = robot logger.info(f"Created robot {robot_id} ({robot_type}) with {len(joints)} joints") return robot def get_robot(self, robot_id: str) -> Optional[Robot]: """Get robot by ID""" return self.robots.get(robot_id) async def delete_robot(self, robot_id: str): """Delete a robot""" if robot_id in self.robots: del self.robots[robot_id] logger.info(f"Deleted robot {robot_id}") async def set_master_connected(self, robot_id: str, connection_id: str): """Mark robot as having a master connection""" robot = self.robots.get(robot_id) if robot: robot.master_connected = True robot.master_connection_id = connection_id robot.last_command_source = "master" robot.last_command_time = datetime.now(timezone.utc) async def set_master_disconnected(self, robot_id: str): """Mark robot as having no master connection""" robot = self.robots.get(robot_id) if robot: robot.master_connected = False robot.master_connection_id = None robot.last_command_source = "none" async def add_slave_connection(self, robot_id: str, connection_id: str): """Add a slave connection to robot""" robot = self.robots.get(robot_id) if robot: if connection_id not in robot.slave_connections: robot.slave_connections.append(connection_id) logger.info(f"Added slave {connection_id} to robot {robot_id} ({len(robot.slave_connections)} total)") async def remove_slave_connection(self, robot_id: str, connection_id: str): """Remove a slave connection from robot""" robot = self.robots.get(robot_id) if robot: try: robot.slave_connections.remove(connection_id) logger.info(f"Removed slave {connection_id} from robot {robot_id} ({len(robot.slave_connections)} remaining)") except ValueError: logger.warning(f"Slave {connection_id} not found in robot {robot_id} connections") def get_robot_status(self, robot_id: str) -> Optional[RobotStatus]: """Get robot connection status""" robot = self.robots.get(robot_id) if not robot: return None return RobotStatus( robot_id=robot_id, master_connected=robot.master_connected, slave_count=len(robot.slave_connections), last_command_source=robot.last_command_source, last_command_time=robot.last_command_time, last_seen=datetime.now(timezone.utc) ) def list_robots_with_masters(self) -> List[Robot]: """Get all robots that have master connections""" return [robot for robot in self.robots.values() if robot.master_connected] def list_robots_with_slaves(self) -> List[Robot]: """Get all robots that have slave connections""" return [robot for robot in self.robots.values() if robot.slave_connections] def _create_demo_joints(self, robot_type: str) -> List[DriverJointState]: """Create demo joint configuration based on robot type""" if robot_type == "so-arm100": return [ DriverJointState( name="Rotation", servo_id=1, type="revolute", virtual_value=0.0, real_value=0.0 ), DriverJointState( name="Pitch", servo_id=2, type="revolute", virtual_value=0.0, real_value=0.0 ), DriverJointState( name="Elbow", servo_id=3, type="revolute", virtual_value=0.0, real_value=0.0 ), DriverJointState( name="Wrist_Pitch", servo_id=4, type="revolute", virtual_value=0.0, real_value=0.0 ), DriverJointState( name="Wrist_Roll", servo_id=5, type="revolute", virtual_value=0.0, real_value=0.0 ), DriverJointState( name="Jaw", servo_id=6, type="revolute", virtual_value=0.0, real_value=0.0 ) ] else: # Default generic robot return [ DriverJointState( name="joint_1", servo_id=1, type="revolute", virtual_value=0.0, real_value=0.0 ), DriverJointState( name="joint_2", servo_id=2, type="revolute", virtual_value=0.0, real_value=0.0 ) ]