File size: 6,146 Bytes
3aea7c6
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from typing import Dict, Optional, List
from datetime import datetime, timezone
import logging

from .models import Robot, RobotStatus, DriverJointState

logger = logging.getLogger(__name__)

class RobotManager:
    """Manages robot lifecycle and state"""
    
    def __init__(self):
        self.robots: Dict[str, Robot] = {}

    async def create_robot(self, robot_id: str, robot_type: str, name: str) -> Robot:
        """Create a new robot"""
        if robot_id in self.robots:
            raise ValueError(f"Robot {robot_id} already exists")
        
        # Create demo joint configuration for so-arm100
        joints = self._create_demo_joints(robot_type)
        
        robot = Robot(
            id=robot_id,
            name=name,
            robot_type=robot_type,
            created_at=datetime.now(timezone.utc),
            joints=joints
        )
        
        self.robots[robot_id] = robot
        logger.info(f"Created robot {robot_id} ({robot_type}) with {len(joints)} joints")
        
        return robot

    def get_robot(self, robot_id: str) -> Optional[Robot]:
        """Get robot by ID"""
        return self.robots.get(robot_id)

    async def delete_robot(self, robot_id: str):
        """Delete a robot"""
        if robot_id in self.robots:
            del self.robots[robot_id]
            logger.info(f"Deleted robot {robot_id}")

    async def set_master_connected(self, robot_id: str, connection_id: str):
        """Mark robot as having a master connection"""
        robot = self.robots.get(robot_id)
        if robot:
            robot.master_connected = True
            robot.master_connection_id = connection_id
            robot.last_command_source = "master"
            robot.last_command_time = datetime.now(timezone.utc)

    async def set_master_disconnected(self, robot_id: str):
        """Mark robot as having no master connection"""
        robot = self.robots.get(robot_id)
        if robot:
            robot.master_connected = False
            robot.master_connection_id = None
            robot.last_command_source = "none"

    async def add_slave_connection(self, robot_id: str, connection_id: str):
        """Add a slave connection to robot"""
        robot = self.robots.get(robot_id)
        if robot:
            if connection_id not in robot.slave_connections:
                robot.slave_connections.append(connection_id)
                logger.info(f"Added slave {connection_id} to robot {robot_id} ({len(robot.slave_connections)} total)")

    async def remove_slave_connection(self, robot_id: str, connection_id: str):
        """Remove a slave connection from robot"""
        robot = self.robots.get(robot_id)
        if robot:
            try:
                robot.slave_connections.remove(connection_id)
                logger.info(f"Removed slave {connection_id} from robot {robot_id} ({len(robot.slave_connections)} remaining)")
            except ValueError:
                logger.warning(f"Slave {connection_id} not found in robot {robot_id} connections")

    def get_robot_status(self, robot_id: str) -> Optional[RobotStatus]:
        """Get robot connection status"""
        robot = self.robots.get(robot_id)
        if not robot:
            return None
        
        return RobotStatus(
            robot_id=robot_id,
            master_connected=robot.master_connected,
            slave_count=len(robot.slave_connections),
            last_command_source=robot.last_command_source,
            last_command_time=robot.last_command_time,
            last_seen=datetime.now(timezone.utc)
        )

    def list_robots_with_masters(self) -> List[Robot]:
        """Get all robots that have master connections"""
        return [robot for robot in self.robots.values() if robot.master_connected]

    def list_robots_with_slaves(self) -> List[Robot]:
        """Get all robots that have slave connections"""
        return [robot for robot in self.robots.values() if robot.slave_connections]

    def _create_demo_joints(self, robot_type: str) -> List[DriverJointState]:
        """Create demo joint configuration based on robot type"""
        if robot_type == "so-arm100":
            return [
                DriverJointState(
                    name="Rotation",
                    servo_id=1,
                    type="revolute",
                    virtual_value=0.0,
                    real_value=0.0
                ),
                DriverJointState(
                    name="Pitch",
                    servo_id=2,
                    type="revolute",
                    virtual_value=0.0,
                    real_value=0.0
                ),
                DriverJointState(
                    name="Elbow",
                    servo_id=3,
                    type="revolute",
                    virtual_value=0.0,
                    real_value=0.0
                ),
                DriverJointState(
                    name="Wrist_Pitch",
                    servo_id=4,
                    type="revolute",
                    virtual_value=0.0,
                    real_value=0.0
                ),
                DriverJointState(
                    name="Wrist_Roll",
                    servo_id=5,
                    type="revolute",
                    virtual_value=0.0,
                    real_value=0.0
                ),
                DriverJointState(
                    name="Jaw",
                    servo_id=6,
                    type="revolute",
                    virtual_value=0.0,
                    real_value=0.0
                )
            ]
        else:
            # Default generic robot
            return [
                DriverJointState(
                    name="joint_1",
                    servo_id=1,
                    type="revolute",
                    virtual_value=0.0,
                    real_value=0.0
                ),
                DriverJointState(
                    name="joint_2",
                    servo_id=2,
                    type="revolute",
                    virtual_value=0.0,
                    real_value=0.0
                )
            ]