Spaces:
Sleeping
Sleeping
""" | |
Joint configuration and mapping utilities for RobotHub Inference Server. | |
This module handles joint data parsing and normalization between different | |
robot configurations and the standardized training data format. | |
""" | |
from typing import ClassVar | |
import numpy as np | |
class JointConfig: | |
"""Joint configuration and mapping utilities.""" | |
# Joint name mapping from AI server names to standard names | |
AI_TO_STANDARD_NAMES: ClassVar = { | |
"Rotation": "shoulder_pan", | |
"Pitch": "shoulder_lift", | |
"Elbow": "elbow_flex", | |
"Wrist_Pitch": "wrist_flex", | |
"Wrist_Roll": "wrist_roll", | |
"Jaw": "gripper", | |
} | |
# Standard joint names in order | |
STANDARD_JOINT_NAMES: ClassVar = [ | |
"shoulder_pan", | |
"shoulder_lift", | |
"elbow_flex", | |
"wrist_flex", | |
"wrist_roll", | |
"gripper", | |
] | |
# AI server joint names in order | |
AI_JOINT_NAMES: ClassVar = [ | |
"Rotation", | |
"Pitch", | |
"Elbow", | |
"Wrist_Pitch", | |
"Wrist_Roll", | |
"Jaw", | |
] | |
# Normalization ranges for robot joints | |
# Most joints: [-100, 100], Gripper: [0, 100] | |
ROBOT_NORMALIZATION_RANGES: ClassVar = { | |
"shoulder_pan": (-100, 100), | |
"shoulder_lift": (-100, 100), | |
"elbow_flex": (-100, 100), | |
"wrist_flex": (-100, 100), | |
"wrist_roll": (-100, 100), | |
"gripper": (0, 100), | |
} | |
def parse_joint_data(cls, joints_data, policy_type: str = "act") -> list[float]: | |
""" | |
Parse joint data from Transport Server message into standard order. | |
Args: | |
joints_data: Joint data from Transport Server message | |
policy_type: Type of policy (for logging purposes) | |
Returns: | |
List of 6 normalized joint values in standard order | |
""" | |
# Handle different possible data formats | |
joint_dict = joints_data.data if hasattr(joints_data, "data") else joints_data | |
if not isinstance(joint_dict, dict): | |
return [0.0] * 6 | |
# Extract joint values in standard order | |
joint_values = [] | |
for standard_name in cls.STANDARD_JOINT_NAMES: | |
value = 0.0 # Default value | |
# Try standard name first | |
if standard_name in joint_dict: | |
value = float(joint_dict[standard_name]) | |
else: | |
# Try AI name | |
for ai_name, std_name in cls.AI_TO_STANDARD_NAMES.items(): | |
if std_name == standard_name and ai_name in joint_dict: | |
value = float(joint_dict[ai_name]) | |
break | |
joint_values.append(value) | |
return joint_values | |
def create_joint_commands(cls, action_values: np.ndarray | list) -> list[dict]: | |
""" | |
Create joint command messages from action values. | |
Args: | |
action_values: Array of 6 joint values in standard order | |
Returns: | |
List of joint command dictionaries with AI server names | |
""" | |
if len(action_values) != 6: | |
msg = f"Expected 6 joint values, got {len(action_values)}" | |
raise ValueError(msg) | |
commands = [] | |
for i, ai_name in enumerate(cls.AI_JOINT_NAMES): | |
commands.append({ | |
"name": ai_name, | |
"value": float(action_values[i]), | |
"index": i, | |
}) | |
return commands | |
def validate_joint_values(cls, joint_values: np.ndarray) -> np.ndarray: | |
""" | |
Validate and clamp joint values to their normalized limits. | |
Args: | |
joint_values: Array of joint values | |
Returns: | |
Clamped joint values | |
""" | |
if len(joint_values) != 6: | |
# Pad or truncate to 6 values | |
padded = np.zeros(6, dtype=np.float32) | |
n = min(len(joint_values), 6) | |
padded[:n] = joint_values[:n] | |
joint_values = padded | |
# Clamp to normalized limits | |
for i, standard_name in enumerate(cls.STANDARD_JOINT_NAMES): | |
min_val, max_val = cls.ROBOT_NORMALIZATION_RANGES[standard_name] | |
joint_values[i] = np.clip(joint_values[i], min_val, max_val) | |
return joint_values | |