Spaces:
Running
on
Zero
Running
on
Zero
# Project EmbodiedGen | |
# | |
# Copyright (c) 2025 Horizon Robotics. All Rights Reserved. | |
# | |
# Licensed under the Apache License, Version 2.0 (the "License"); | |
# you may not use this file except in compliance with the License. | |
# You may obtain a copy of the License at | |
# | |
# http://www.apache.org/licenses/LICENSE-2.0 | |
# | |
# Unless required by applicable law or agreed to in writing, software | |
# distributed under the License is distributed on an "AS IS" BASIS, | |
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or | |
# implied. See the License for the specific language governing | |
# permissions and limitations under the License. | |
import json | |
import logging | |
import os | |
import xml.etree.ElementTree as ET | |
from collections import defaultdict | |
from typing import Literal | |
import mplib | |
import numpy as np | |
import sapien.core as sapien | |
import sapien.physx as physx | |
import torch | |
from mani_skill.agents.base_agent import BaseAgent | |
from mani_skill.envs.scene import ManiSkillScene | |
from mani_skill.examples.motionplanning.panda.utils import ( | |
compute_grasp_info_by_obb, | |
) | |
from mani_skill.utils.geometry.trimesh_utils import get_component_mesh | |
from PIL import Image, ImageColor | |
from scipy.spatial.transform import Rotation as R | |
from embodied_gen.data.utils import DiffrastRender | |
from embodied_gen.utils.enum import LayoutInfo, Scene3DItemEnum | |
from embodied_gen.utils.geometry import quaternion_multiply | |
from embodied_gen.utils.log import logger | |
COLORMAP = list(set(ImageColor.colormap.values())) | |
COLOR_PALETTE = np.array( | |
[ImageColor.getrgb(c) for c in COLORMAP], dtype=np.uint8 | |
) | |
SIM_COORD_ALIGN = np.array( | |
[ | |
[1.0, 0.0, 0.0, 0.0], | |
[0.0, -1.0, 0.0, 0.0], | |
[0.0, 0.0, -1.0, 0.0], | |
[0.0, 0.0, 0.0, 1.0], | |
] | |
) # Used to align SAPIEN, MuJoCo coordinate system with the world coordinate system | |
__all__ = [ | |
"SIM_COORD_ALIGN", | |
"FrankaPandaGrasper", | |
"load_assets_from_layout_file", | |
"load_mani_skill_robot", | |
"render_images", | |
] | |
def load_actor_from_urdf( | |
scene: ManiSkillScene | sapien.Scene, | |
file_path: str, | |
pose: sapien.Pose, | |
env_idx: int = None, | |
use_static: bool = False, | |
update_mass: bool = False, | |
) -> sapien.pysapien.Entity: | |
tree = ET.parse(file_path) | |
root = tree.getroot() | |
node_name = root.get("name") | |
file_dir = os.path.dirname(file_path) | |
visual_file = root.find('.//visual/geometry/mesh').get("filename") | |
collision_file = root.find('.//collision/geometry/mesh').get("filename") | |
visual_file = os.path.join(file_dir, visual_file) | |
collision_file = os.path.join(file_dir, collision_file) | |
static_fric = root.find('.//collision/gazebo/mu1').text | |
dynamic_fric = root.find('.//collision/gazebo/mu2').text | |
material = physx.PhysxMaterial( | |
static_friction=np.clip(float(static_fric), 0.1, 0.7), | |
dynamic_friction=np.clip(float(dynamic_fric), 0.1, 0.6), | |
restitution=0.05, | |
) | |
builder = scene.create_actor_builder() | |
body_type = "static" if use_static else "dynamic" | |
builder.set_physx_body_type(body_type) | |
builder.add_multiple_convex_collisions_from_file( | |
collision_file if body_type == "dynamic" else visual_file, | |
material=material, | |
# decomposition="coacd", | |
# decomposition_params=dict( | |
# threshold=0.05, max_convex_hull=64, verbose=False | |
# ), | |
) | |
builder.add_visual_from_file(visual_file) | |
builder.set_initial_pose(pose) | |
if isinstance(scene, ManiSkillScene) and env_idx is not None: | |
builder.set_scene_idxs([env_idx]) | |
actor = builder.build(name=f"{node_name}-{env_idx}") | |
if update_mass and hasattr(actor.components[1], "mass"): | |
node_mass = float(root.find('.//inertial/mass').get("value")) | |
actor.components[1].set_mass(node_mass) | |
return actor | |
def load_assets_from_layout_file( | |
scene: ManiSkillScene | sapien.Scene, | |
layout: LayoutInfo | str, | |
z_offset: float = 0.0, | |
init_quat: list[float] = [0, 0, 0, 1], | |
env_idx: int = None, | |
) -> dict[str, sapien.pysapien.Entity]: | |
"""Load assets from `EmbodiedGen` layout-gen output and create actors in the scene. | |
Args: | |
scene (sapien.Scene | ManiSkillScene): The SAPIEN or ManiSkill scene to load assets into. | |
layout (LayoutInfo): The layout information data. | |
z_offset (float): Offset to apply to the Z-coordinate of non-context objects. | |
init_quat (List[float]): Initial quaternion (x, y, z, w) for orientation adjustment. | |
env_idx (int): Environment index for multi-environment setup. | |
""" | |
if isinstance(layout, str) and layout.endswith(".json"): | |
layout = LayoutInfo.from_dict(json.load(open(layout, "r"))) | |
actors = dict() | |
for node in layout.assets: | |
file_dir = layout.assets[node] | |
file_name = f"{node.replace(' ', '_')}.urdf" | |
urdf_file = os.path.join(file_dir, file_name) | |
if layout.objs_mapping[node] == Scene3DItemEnum.BACKGROUND.value: | |
continue | |
position = layout.position[node].copy() | |
if layout.objs_mapping[node] != Scene3DItemEnum.CONTEXT.value: | |
position[2] += z_offset | |
use_static = ( | |
layout.relation.get(Scene3DItemEnum.CONTEXT.value, None) == node | |
) | |
# Combine initial quaternion with object quaternion | |
x, y, z, qx, qy, qz, qw = position | |
qx, qy, qz, qw = quaternion_multiply([qx, qy, qz, qw], init_quat) | |
actor = load_actor_from_urdf( | |
scene, | |
urdf_file, | |
sapien.Pose(p=[x, y, z], q=[qw, qx, qy, qz]), | |
env_idx, | |
use_static=use_static, | |
update_mass=False, | |
) | |
actors[node] = actor | |
return actors | |
def load_mani_skill_robot( | |
scene: sapien.Scene | ManiSkillScene, | |
layout: LayoutInfo | str, | |
control_freq: int = 20, | |
robot_init_qpos_noise: float = 0.0, | |
control_mode: str = "pd_joint_pos", | |
backend_str: tuple[str, str] = ("cpu", "gpu"), | |
) -> BaseAgent: | |
from mani_skill.agents import REGISTERED_AGENTS | |
from mani_skill.envs.scene import ManiSkillScene | |
from mani_skill.envs.utils.system.backend import ( | |
parse_sim_and_render_backend, | |
) | |
if isinstance(layout, str) and layout.endswith(".json"): | |
layout = LayoutInfo.from_dict(json.load(open(layout, "r"))) | |
robot_name = layout.relation[Scene3DItemEnum.ROBOT.value] | |
x, y, z, qx, qy, qz, qw = layout.position[robot_name] | |
delta_z = 0.002 # Add small offset to avoid collision. | |
pose = sapien.Pose([x, y, z + delta_z], [qw, qx, qy, qz]) | |
if robot_name not in REGISTERED_AGENTS: | |
logger.warning( | |
f"Robot `{robot_name}` not registered, chosen from {REGISTERED_AGENTS.keys()}, use `panda` instead." | |
) | |
robot_name = "panda" | |
ROBOT_CLS = REGISTERED_AGENTS[robot_name].agent_cls | |
backend = parse_sim_and_render_backend(*backend_str) | |
if isinstance(scene, sapien.Scene): | |
scene = ManiSkillScene([scene], device=backend_str[0], backend=backend) | |
robot = ROBOT_CLS( | |
scene=scene, | |
control_freq=control_freq, | |
control_mode=control_mode, | |
initial_pose=pose, | |
) | |
# Set robot init joint rad agree(joint0 to joint6 w 2 finger). | |
qpos = np.array( | |
[ | |
0.0, | |
np.pi / 8, | |
0, | |
-np.pi * 3 / 8, | |
0, | |
np.pi * 3 / 4, | |
np.pi / 4, | |
0.04, | |
0.04, | |
] | |
) | |
qpos = ( | |
np.random.normal( | |
0, robot_init_qpos_noise, (len(scene.sub_scenes), len(qpos)) | |
) | |
+ qpos | |
) | |
qpos[:, -2:] = 0.04 | |
robot.reset(qpos) | |
robot.init_qpos = robot.robot.qpos | |
robot.controller.controllers["gripper"].reset() | |
return robot | |
def render_images( | |
camera: sapien.render.RenderCameraComponent, | |
render_keys: list[ | |
Literal[ | |
"Color", | |
"Segmentation", | |
"Normal", | |
"Mask", | |
"Depth", | |
"Foreground", | |
] | |
] = None, | |
) -> dict[str, Image.Image]: | |
"""Render images from a given sapien camera. | |
Args: | |
camera (sapien.render.RenderCameraComponent): The camera to render from. | |
render_keys (List[str]): Types of images to render (e.g., Color, Segmentation). | |
Returns: | |
Dict[str, Image.Image]: Dictionary of rendered images. | |
""" | |
if render_keys is None: | |
render_keys = [ | |
"Color", | |
"Segmentation", | |
"Normal", | |
"Mask", | |
"Depth", | |
"Foreground", | |
] | |
results: dict[str, Image.Image] = {} | |
if "Color" in render_keys: | |
color = camera.get_picture("Color") | |
color_rgb = (np.clip(color[..., :3], 0, 1) * 255).astype(np.uint8) | |
results["Color"] = Image.fromarray(color_rgb) | |
if "Mask" in render_keys: | |
alpha = (np.clip(color[..., 3], 0, 1) * 255).astype(np.uint8) | |
results["Mask"] = Image.fromarray(alpha) | |
if "Segmentation" in render_keys: | |
seg_labels = camera.get_picture("Segmentation") | |
label0 = seg_labels[..., 0].astype(np.uint8) | |
seg_color = COLOR_PALETTE[label0] | |
results["Segmentation"] = Image.fromarray(seg_color) | |
if "Foreground" in render_keys: | |
seg_labels = camera.get_picture("Segmentation") | |
label0 = seg_labels[..., 0] | |
mask = np.where((label0 > 1), 255, 0).astype(np.uint8) | |
color = camera.get_picture("Color") | |
color_rgb = (np.clip(color[..., :3], 0, 1) * 255).astype(np.uint8) | |
foreground = np.concatenate([color_rgb, mask[..., None]], axis=-1) | |
results["Foreground"] = Image.fromarray(foreground) | |
if "Normal" in render_keys: | |
normal = camera.get_picture("Normal")[..., :3] | |
normal_img = (((normal + 1) / 2) * 255).astype(np.uint8) | |
results["Normal"] = Image.fromarray(normal_img) | |
if "Depth" in render_keys: | |
position_map = camera.get_picture("Position") | |
depth = -position_map[..., 2] | |
alpha = torch.tensor(color[..., 3], dtype=torch.float32) | |
norm_depth = DiffrastRender.normalize_map_by_mask( | |
torch.tensor(depth), alpha | |
) | |
depth_img = (norm_depth * 255).to(torch.uint8).numpy() | |
results["Depth"] = Image.fromarray(depth_img) | |
return results | |
class SapienSceneManager: | |
"""A class to manage SAPIEN simulator.""" | |
def __init__( | |
self, sim_freq: int, ray_tracing: bool, device: str = "cuda" | |
) -> None: | |
self.sim_freq = sim_freq | |
self.ray_tracing = ray_tracing | |
self.device = device | |
self.renderer = sapien.SapienRenderer() | |
self.scene = self._setup_scene() | |
self.cameras: list[sapien.render.RenderCameraComponent] = [] | |
self.actors: dict[str, sapien.pysapien.Entity] = {} | |
def _setup_scene(self) -> sapien.Scene: | |
"""Set up the SAPIEN scene with lighting and ground.""" | |
# Ray tracing settings | |
if self.ray_tracing: | |
sapien.render.set_camera_shader_dir("rt") | |
sapien.render.set_ray_tracing_samples_per_pixel(64) | |
sapien.render.set_ray_tracing_path_depth(10) | |
sapien.render.set_ray_tracing_denoiser("oidn") | |
scene = sapien.Scene() | |
scene.set_timestep(1 / self.sim_freq) | |
# Add lighting | |
scene.set_ambient_light([0.2, 0.2, 0.2]) | |
scene.add_directional_light( | |
direction=[0, 1, -1], | |
color=[1.5, 1.45, 1.4], | |
shadow=True, | |
shadow_map_size=2048, | |
) | |
scene.add_directional_light( | |
direction=[0, -0.5, 1], color=[0.8, 0.8, 0.85], shadow=False | |
) | |
scene.add_directional_light( | |
direction=[0, -1, 1], color=[1.0, 1.0, 1.0], shadow=False | |
) | |
ground_material = self.renderer.create_material() | |
ground_material.base_color = [0.5, 0.5, 0.5, 1] # rgba, gray | |
ground_material.roughness = 0.7 | |
ground_material.metallic = 0.0 | |
scene.add_ground(0, render_material=ground_material) | |
return scene | |
def step_action( | |
self, | |
agent: BaseAgent, | |
action: torch.Tensor, | |
cameras: list[sapien.render.RenderCameraComponent], | |
render_keys: list[str], | |
sim_steps_per_control: int = 1, | |
) -> dict: | |
agent.set_action(action) | |
frames = defaultdict(list) | |
for _ in range(sim_steps_per_control): | |
self.scene.step() | |
self.scene.update_render() | |
for camera in cameras: | |
camera.take_picture() | |
images = render_images(camera, render_keys=render_keys) | |
frames[camera.name].append(images) | |
return frames | |
def create_camera( | |
self, | |
cam_name: str, | |
pose: sapien.Pose, | |
image_hw: tuple[int, int], | |
fovy_deg: float, | |
) -> sapien.render.RenderCameraComponent: | |
"""Create a single camera in the scene. | |
Args: | |
cam_name (str): Name of the camera. | |
pose (sapien.Pose): Camera pose p=(x, y, z), q=(w, x, y, z) | |
image_hw (Tuple[int, int]): Image resolution (height, width) for cameras. | |
fovy_deg (float): Field of view in degrees for cameras. | |
Returns: | |
sapien.render.RenderCameraComponent: The created camera. | |
""" | |
cam_actor = self.scene.create_actor_builder().build_kinematic() | |
cam_actor.set_pose(pose) | |
camera = self.scene.add_mounted_camera( | |
name=cam_name, | |
mount=cam_actor, | |
pose=sapien.Pose(p=[0, 0, 0], q=[1, 0, 0, 0]), | |
width=image_hw[1], | |
height=image_hw[0], | |
fovy=np.deg2rad(fovy_deg), | |
near=0.01, | |
far=100, | |
) | |
self.cameras.append(camera) | |
return camera | |
def initialize_circular_cameras( | |
self, | |
num_cameras: int, | |
radius: float, | |
height: float, | |
target_pt: list[float], | |
image_hw: tuple[int, int], | |
fovy_deg: float, | |
) -> list[sapien.render.RenderCameraComponent]: | |
"""Initialize multiple cameras arranged in a circle. | |
Args: | |
num_cameras (int): Number of cameras to create. | |
radius (float): Radius of the camera circle. | |
height (float): Fixed Z-coordinate of the cameras. | |
target_pt (list[float]): 3D point (x, y, z) that cameras look at. | |
image_hw (Tuple[int, int]): Image resolution (height, width) for cameras. | |
fovy_deg (float): Field of view in degrees for cameras. | |
Returns: | |
List[sapien.render.RenderCameraComponent]: List of created cameras. | |
""" | |
angle_step = 2 * np.pi / num_cameras | |
world_up_vec = np.array([0.0, 0.0, 1.0]) | |
target_pt = np.array(target_pt) | |
for i in range(num_cameras): | |
angle = i * angle_step | |
cam_x = radius * np.cos(angle) | |
cam_y = radius * np.sin(angle) | |
cam_z = height | |
eye_pos = [cam_x, cam_y, cam_z] | |
forward_vec = target_pt - eye_pos | |
forward_vec = forward_vec / np.linalg.norm(forward_vec) | |
temp_right_vec = np.cross(forward_vec, world_up_vec) | |
if np.linalg.norm(temp_right_vec) < 1e-6: | |
temp_right_vec = np.array([1.0, 0.0, 0.0]) | |
if np.abs(np.dot(temp_right_vec, forward_vec)) > 0.99: | |
temp_right_vec = np.array([0.0, 1.0, 0.0]) | |
right_vec = temp_right_vec / np.linalg.norm(temp_right_vec) | |
up_vec = np.cross(right_vec, forward_vec) | |
rotation_matrix = np.array([forward_vec, -right_vec, up_vec]).T | |
rot = R.from_matrix(rotation_matrix) | |
scipy_quat = rot.as_quat() # (x, y, z, w) | |
quat = [ | |
scipy_quat[3], | |
scipy_quat[0], | |
scipy_quat[1], | |
scipy_quat[2], | |
] # (w, x, y, z) | |
self.create_camera( | |
f"camera_{i}", | |
sapien.Pose(p=eye_pos, q=quat), | |
image_hw, | |
fovy_deg, | |
) | |
return self.cameras | |
class FrankaPandaGrasper(object): | |
def __init__( | |
self, | |
agent: BaseAgent, | |
control_freq: float, | |
joint_vel_limits: float = 2.0, | |
joint_acc_limits: float = 1.0, | |
finger_length: float = 0.025, | |
) -> None: | |
self.agent = agent | |
self.robot = agent.robot | |
self.control_freq = control_freq | |
self.control_timestep = 1 / control_freq | |
self.joint_vel_limits = joint_vel_limits | |
self.joint_acc_limits = joint_acc_limits | |
self.finger_length = finger_length | |
self.planners = self._setup_planner() | |
def _setup_planner(self) -> mplib.Planner: | |
planners = [] | |
for pose in self.robot.pose: | |
link_names = [link.get_name() for link in self.robot.get_links()] | |
joint_names = [ | |
joint.get_name() for joint in self.robot.get_active_joints() | |
] | |
planner = mplib.Planner( | |
urdf=self.agent.urdf_path, | |
srdf=self.agent.urdf_path.replace(".urdf", ".srdf"), | |
user_link_names=link_names, | |
user_joint_names=joint_names, | |
move_group="panda_hand_tcp", | |
joint_vel_limits=np.ones(7) * self.joint_vel_limits, | |
joint_acc_limits=np.ones(7) * self.joint_acc_limits, | |
) | |
planner.set_base_pose(pose.raw_pose[0].tolist()) | |
planners.append(planner) | |
return planners | |
def control_gripper( | |
self, | |
gripper_state: Literal[-1, 1], | |
n_step: int = 10, | |
) -> np.ndarray: | |
qpos = self.robot.get_qpos()[0, :-2].cpu().numpy() | |
actions = [] | |
for _ in range(n_step): | |
action = np.hstack([qpos, gripper_state])[None, ...] | |
actions.append(action) | |
return np.concatenate(actions, axis=0) | |
def move_to_pose( | |
self, | |
pose: sapien.Pose, | |
control_timestep: float, | |
gripper_state: Literal[-1, 1], | |
use_point_cloud: bool = False, | |
n_max_step: int = 100, | |
action_key: str = "position", | |
env_idx: int = 0, | |
) -> np.ndarray: | |
result = self.planners[env_idx].plan_qpos_to_pose( | |
np.concatenate([pose.p, pose.q]), | |
self.robot.get_qpos().cpu().numpy()[0], | |
time_step=control_timestep, | |
use_point_cloud=use_point_cloud, | |
) | |
if result["status"] != "Success": | |
result = self.planners[env_idx].plan_screw( | |
np.concatenate([pose.p, pose.q]), | |
self.robot.get_qpos().cpu().numpy()[0], | |
time_step=control_timestep, | |
use_point_cloud=use_point_cloud, | |
) | |
if result["status"] != "Success": | |
return | |
sample_ratio = (len(result[action_key]) // n_max_step) + 1 | |
result[action_key] = result[action_key][::sample_ratio] | |
n_step = len(result[action_key]) | |
actions = [] | |
for i in range(n_step): | |
qpos = result[action_key][i] | |
action = np.hstack([qpos, gripper_state])[None, ...] | |
actions.append(action) | |
return np.concatenate(actions, axis=0) | |
def compute_grasp_action( | |
self, | |
actor: sapien.pysapien.Entity, | |
reach_target_only: bool = True, | |
offset: tuple[float, float, float] = [0, 0, -0.05], | |
env_idx: int = 0, | |
) -> np.ndarray: | |
physx_rigid = actor.components[1] | |
mesh = get_component_mesh(physx_rigid, to_world_frame=True) | |
obb = mesh.bounding_box_oriented | |
approaching = np.array([0, 0, -1]) | |
tcp_pose = self.agent.tcp.pose[env_idx] | |
target_closing = ( | |
tcp_pose.to_transformation_matrix()[0, :3, 1].cpu().numpy() | |
) | |
grasp_info = compute_grasp_info_by_obb( | |
obb, | |
approaching=approaching, | |
target_closing=target_closing, | |
depth=self.finger_length, | |
) | |
closing, center = grasp_info["closing"], grasp_info["center"] | |
raw_tcp_pose = tcp_pose.sp | |
grasp_pose = self.agent.build_grasp_pose(approaching, closing, center) | |
reach_pose = grasp_pose * sapien.Pose(p=offset) | |
grasp_pose = grasp_pose * sapien.Pose(p=[0, 0, 0.01]) | |
actions = [] | |
reach_actions = self.move_to_pose( | |
reach_pose, | |
self.control_timestep, | |
gripper_state=1, | |
env_idx=env_idx, | |
) | |
actions.append(reach_actions) | |
if reach_actions is None: | |
logger.warning( | |
f"Failed to reach the grasp pose for node `{actor.name}`, skipping grasping." | |
) | |
return None | |
if not reach_target_only: | |
grasp_actions = self.move_to_pose( | |
grasp_pose, | |
self.control_timestep, | |
gripper_state=1, | |
env_idx=env_idx, | |
) | |
actions.append(grasp_actions) | |
close_actions = self.control_gripper( | |
gripper_state=-1, | |
env_idx=env_idx, | |
) | |
actions.append(close_actions) | |
back_actions = self.move_to_pose( | |
raw_tcp_pose, | |
self.control_timestep, | |
gripper_state=-1, | |
env_idx=env_idx, | |
) | |
actions.append(back_actions) | |
return np.concatenate(actions, axis=0) | |