xinjie.wang
update
be0ecc3
raw
history blame
21.6 kB
# Project EmbodiedGen
#
# Copyright (c) 2025 Horizon Robotics. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied. See the License for the specific language governing
# permissions and limitations under the License.
import json
import logging
import os
import xml.etree.ElementTree as ET
from collections import defaultdict
from typing import Literal
import mplib
import numpy as np
import sapien.core as sapien
import sapien.physx as physx
import torch
from mani_skill.agents.base_agent import BaseAgent
from mani_skill.envs.scene import ManiSkillScene
from mani_skill.examples.motionplanning.panda.utils import (
compute_grasp_info_by_obb,
)
from mani_skill.utils.geometry.trimesh_utils import get_component_mesh
from PIL import Image, ImageColor
from scipy.spatial.transform import Rotation as R
from embodied_gen.data.utils import DiffrastRender
from embodied_gen.utils.enum import LayoutInfo, Scene3DItemEnum
from embodied_gen.utils.geometry import quaternion_multiply
from embodied_gen.utils.log import logger
COLORMAP = list(set(ImageColor.colormap.values()))
COLOR_PALETTE = np.array(
[ImageColor.getrgb(c) for c in COLORMAP], dtype=np.uint8
)
SIM_COORD_ALIGN = np.array(
[
[1.0, 0.0, 0.0, 0.0],
[0.0, -1.0, 0.0, 0.0],
[0.0, 0.0, -1.0, 0.0],
[0.0, 0.0, 0.0, 1.0],
]
) # Used to align SAPIEN, MuJoCo coordinate system with the world coordinate system
__all__ = [
"SIM_COORD_ALIGN",
"FrankaPandaGrasper",
"load_assets_from_layout_file",
"load_mani_skill_robot",
"render_images",
]
def load_actor_from_urdf(
scene: ManiSkillScene | sapien.Scene,
file_path: str,
pose: sapien.Pose,
env_idx: int = None,
use_static: bool = False,
update_mass: bool = False,
) -> sapien.pysapien.Entity:
tree = ET.parse(file_path)
root = tree.getroot()
node_name = root.get("name")
file_dir = os.path.dirname(file_path)
visual_file = root.find('.//visual/geometry/mesh').get("filename")
collision_file = root.find('.//collision/geometry/mesh').get("filename")
visual_file = os.path.join(file_dir, visual_file)
collision_file = os.path.join(file_dir, collision_file)
static_fric = root.find('.//collision/gazebo/mu1').text
dynamic_fric = root.find('.//collision/gazebo/mu2').text
material = physx.PhysxMaterial(
static_friction=np.clip(float(static_fric), 0.1, 0.7),
dynamic_friction=np.clip(float(dynamic_fric), 0.1, 0.6),
restitution=0.05,
)
builder = scene.create_actor_builder()
body_type = "static" if use_static else "dynamic"
builder.set_physx_body_type(body_type)
builder.add_multiple_convex_collisions_from_file(
collision_file if body_type == "dynamic" else visual_file,
material=material,
# decomposition="coacd",
# decomposition_params=dict(
# threshold=0.05, max_convex_hull=64, verbose=False
# ),
)
builder.add_visual_from_file(visual_file)
builder.set_initial_pose(pose)
if isinstance(scene, ManiSkillScene) and env_idx is not None:
builder.set_scene_idxs([env_idx])
actor = builder.build(name=f"{node_name}-{env_idx}")
if update_mass and hasattr(actor.components[1], "mass"):
node_mass = float(root.find('.//inertial/mass').get("value"))
actor.components[1].set_mass(node_mass)
return actor
def load_assets_from_layout_file(
scene: ManiSkillScene | sapien.Scene,
layout: LayoutInfo | str,
z_offset: float = 0.0,
init_quat: list[float] = [0, 0, 0, 1],
env_idx: int = None,
) -> dict[str, sapien.pysapien.Entity]:
"""Load assets from `EmbodiedGen` layout-gen output and create actors in the scene.
Args:
scene (sapien.Scene | ManiSkillScene): The SAPIEN or ManiSkill scene to load assets into.
layout (LayoutInfo): The layout information data.
z_offset (float): Offset to apply to the Z-coordinate of non-context objects.
init_quat (List[float]): Initial quaternion (x, y, z, w) for orientation adjustment.
env_idx (int): Environment index for multi-environment setup.
"""
if isinstance(layout, str) and layout.endswith(".json"):
layout = LayoutInfo.from_dict(json.load(open(layout, "r")))
actors = dict()
for node in layout.assets:
file_dir = layout.assets[node]
file_name = f"{node.replace(' ', '_')}.urdf"
urdf_file = os.path.join(file_dir, file_name)
if layout.objs_mapping[node] == Scene3DItemEnum.BACKGROUND.value:
continue
position = layout.position[node].copy()
if layout.objs_mapping[node] != Scene3DItemEnum.CONTEXT.value:
position[2] += z_offset
use_static = (
layout.relation.get(Scene3DItemEnum.CONTEXT.value, None) == node
)
# Combine initial quaternion with object quaternion
x, y, z, qx, qy, qz, qw = position
qx, qy, qz, qw = quaternion_multiply([qx, qy, qz, qw], init_quat)
actor = load_actor_from_urdf(
scene,
urdf_file,
sapien.Pose(p=[x, y, z], q=[qw, qx, qy, qz]),
env_idx,
use_static=use_static,
update_mass=False,
)
actors[node] = actor
return actors
def load_mani_skill_robot(
scene: sapien.Scene | ManiSkillScene,
layout: LayoutInfo | str,
control_freq: int = 20,
robot_init_qpos_noise: float = 0.0,
control_mode: str = "pd_joint_pos",
backend_str: tuple[str, str] = ("cpu", "gpu"),
) -> BaseAgent:
from mani_skill.agents import REGISTERED_AGENTS
from mani_skill.envs.scene import ManiSkillScene
from mani_skill.envs.utils.system.backend import (
parse_sim_and_render_backend,
)
if isinstance(layout, str) and layout.endswith(".json"):
layout = LayoutInfo.from_dict(json.load(open(layout, "r")))
robot_name = layout.relation[Scene3DItemEnum.ROBOT.value]
x, y, z, qx, qy, qz, qw = layout.position[robot_name]
delta_z = 0.002 # Add small offset to avoid collision.
pose = sapien.Pose([x, y, z + delta_z], [qw, qx, qy, qz])
if robot_name not in REGISTERED_AGENTS:
logger.warning(
f"Robot `{robot_name}` not registered, chosen from {REGISTERED_AGENTS.keys()}, use `panda` instead."
)
robot_name = "panda"
ROBOT_CLS = REGISTERED_AGENTS[robot_name].agent_cls
backend = parse_sim_and_render_backend(*backend_str)
if isinstance(scene, sapien.Scene):
scene = ManiSkillScene([scene], device=backend_str[0], backend=backend)
robot = ROBOT_CLS(
scene=scene,
control_freq=control_freq,
control_mode=control_mode,
initial_pose=pose,
)
# Set robot init joint rad agree(joint0 to joint6 w 2 finger).
qpos = np.array(
[
0.0,
np.pi / 8,
0,
-np.pi * 3 / 8,
0,
np.pi * 3 / 4,
np.pi / 4,
0.04,
0.04,
]
)
qpos = (
np.random.normal(
0, robot_init_qpos_noise, (len(scene.sub_scenes), len(qpos))
)
+ qpos
)
qpos[:, -2:] = 0.04
robot.reset(qpos)
robot.init_qpos = robot.robot.qpos
robot.controller.controllers["gripper"].reset()
return robot
def render_images(
camera: sapien.render.RenderCameraComponent,
render_keys: list[
Literal[
"Color",
"Segmentation",
"Normal",
"Mask",
"Depth",
"Foreground",
]
] = None,
) -> dict[str, Image.Image]:
"""Render images from a given sapien camera.
Args:
camera (sapien.render.RenderCameraComponent): The camera to render from.
render_keys (List[str]): Types of images to render (e.g., Color, Segmentation).
Returns:
Dict[str, Image.Image]: Dictionary of rendered images.
"""
if render_keys is None:
render_keys = [
"Color",
"Segmentation",
"Normal",
"Mask",
"Depth",
"Foreground",
]
results: dict[str, Image.Image] = {}
if "Color" in render_keys:
color = camera.get_picture("Color")
color_rgb = (np.clip(color[..., :3], 0, 1) * 255).astype(np.uint8)
results["Color"] = Image.fromarray(color_rgb)
if "Mask" in render_keys:
alpha = (np.clip(color[..., 3], 0, 1) * 255).astype(np.uint8)
results["Mask"] = Image.fromarray(alpha)
if "Segmentation" in render_keys:
seg_labels = camera.get_picture("Segmentation")
label0 = seg_labels[..., 0].astype(np.uint8)
seg_color = COLOR_PALETTE[label0]
results["Segmentation"] = Image.fromarray(seg_color)
if "Foreground" in render_keys:
seg_labels = camera.get_picture("Segmentation")
label0 = seg_labels[..., 0]
mask = np.where((label0 > 1), 255, 0).astype(np.uint8)
color = camera.get_picture("Color")
color_rgb = (np.clip(color[..., :3], 0, 1) * 255).astype(np.uint8)
foreground = np.concatenate([color_rgb, mask[..., None]], axis=-1)
results["Foreground"] = Image.fromarray(foreground)
if "Normal" in render_keys:
normal = camera.get_picture("Normal")[..., :3]
normal_img = (((normal + 1) / 2) * 255).astype(np.uint8)
results["Normal"] = Image.fromarray(normal_img)
if "Depth" in render_keys:
position_map = camera.get_picture("Position")
depth = -position_map[..., 2]
alpha = torch.tensor(color[..., 3], dtype=torch.float32)
norm_depth = DiffrastRender.normalize_map_by_mask(
torch.tensor(depth), alpha
)
depth_img = (norm_depth * 255).to(torch.uint8).numpy()
results["Depth"] = Image.fromarray(depth_img)
return results
class SapienSceneManager:
"""A class to manage SAPIEN simulator."""
def __init__(
self, sim_freq: int, ray_tracing: bool, device: str = "cuda"
) -> None:
self.sim_freq = sim_freq
self.ray_tracing = ray_tracing
self.device = device
self.renderer = sapien.SapienRenderer()
self.scene = self._setup_scene()
self.cameras: list[sapien.render.RenderCameraComponent] = []
self.actors: dict[str, sapien.pysapien.Entity] = {}
def _setup_scene(self) -> sapien.Scene:
"""Set up the SAPIEN scene with lighting and ground."""
# Ray tracing settings
if self.ray_tracing:
sapien.render.set_camera_shader_dir("rt")
sapien.render.set_ray_tracing_samples_per_pixel(64)
sapien.render.set_ray_tracing_path_depth(10)
sapien.render.set_ray_tracing_denoiser("oidn")
scene = sapien.Scene()
scene.set_timestep(1 / self.sim_freq)
# Add lighting
scene.set_ambient_light([0.2, 0.2, 0.2])
scene.add_directional_light(
direction=[0, 1, -1],
color=[1.5, 1.45, 1.4],
shadow=True,
shadow_map_size=2048,
)
scene.add_directional_light(
direction=[0, -0.5, 1], color=[0.8, 0.8, 0.85], shadow=False
)
scene.add_directional_light(
direction=[0, -1, 1], color=[1.0, 1.0, 1.0], shadow=False
)
ground_material = self.renderer.create_material()
ground_material.base_color = [0.5, 0.5, 0.5, 1] # rgba, gray
ground_material.roughness = 0.7
ground_material.metallic = 0.0
scene.add_ground(0, render_material=ground_material)
return scene
def step_action(
self,
agent: BaseAgent,
action: torch.Tensor,
cameras: list[sapien.render.RenderCameraComponent],
render_keys: list[str],
sim_steps_per_control: int = 1,
) -> dict:
agent.set_action(action)
frames = defaultdict(list)
for _ in range(sim_steps_per_control):
self.scene.step()
self.scene.update_render()
for camera in cameras:
camera.take_picture()
images = render_images(camera, render_keys=render_keys)
frames[camera.name].append(images)
return frames
def create_camera(
self,
cam_name: str,
pose: sapien.Pose,
image_hw: tuple[int, int],
fovy_deg: float,
) -> sapien.render.RenderCameraComponent:
"""Create a single camera in the scene.
Args:
cam_name (str): Name of the camera.
pose (sapien.Pose): Camera pose p=(x, y, z), q=(w, x, y, z)
image_hw (Tuple[int, int]): Image resolution (height, width) for cameras.
fovy_deg (float): Field of view in degrees for cameras.
Returns:
sapien.render.RenderCameraComponent: The created camera.
"""
cam_actor = self.scene.create_actor_builder().build_kinematic()
cam_actor.set_pose(pose)
camera = self.scene.add_mounted_camera(
name=cam_name,
mount=cam_actor,
pose=sapien.Pose(p=[0, 0, 0], q=[1, 0, 0, 0]),
width=image_hw[1],
height=image_hw[0],
fovy=np.deg2rad(fovy_deg),
near=0.01,
far=100,
)
self.cameras.append(camera)
return camera
def initialize_circular_cameras(
self,
num_cameras: int,
radius: float,
height: float,
target_pt: list[float],
image_hw: tuple[int, int],
fovy_deg: float,
) -> list[sapien.render.RenderCameraComponent]:
"""Initialize multiple cameras arranged in a circle.
Args:
num_cameras (int): Number of cameras to create.
radius (float): Radius of the camera circle.
height (float): Fixed Z-coordinate of the cameras.
target_pt (list[float]): 3D point (x, y, z) that cameras look at.
image_hw (Tuple[int, int]): Image resolution (height, width) for cameras.
fovy_deg (float): Field of view in degrees for cameras.
Returns:
List[sapien.render.RenderCameraComponent]: List of created cameras.
"""
angle_step = 2 * np.pi / num_cameras
world_up_vec = np.array([0.0, 0.0, 1.0])
target_pt = np.array(target_pt)
for i in range(num_cameras):
angle = i * angle_step
cam_x = radius * np.cos(angle)
cam_y = radius * np.sin(angle)
cam_z = height
eye_pos = [cam_x, cam_y, cam_z]
forward_vec = target_pt - eye_pos
forward_vec = forward_vec / np.linalg.norm(forward_vec)
temp_right_vec = np.cross(forward_vec, world_up_vec)
if np.linalg.norm(temp_right_vec) < 1e-6:
temp_right_vec = np.array([1.0, 0.0, 0.0])
if np.abs(np.dot(temp_right_vec, forward_vec)) > 0.99:
temp_right_vec = np.array([0.0, 1.0, 0.0])
right_vec = temp_right_vec / np.linalg.norm(temp_right_vec)
up_vec = np.cross(right_vec, forward_vec)
rotation_matrix = np.array([forward_vec, -right_vec, up_vec]).T
rot = R.from_matrix(rotation_matrix)
scipy_quat = rot.as_quat() # (x, y, z, w)
quat = [
scipy_quat[3],
scipy_quat[0],
scipy_quat[1],
scipy_quat[2],
] # (w, x, y, z)
self.create_camera(
f"camera_{i}",
sapien.Pose(p=eye_pos, q=quat),
image_hw,
fovy_deg,
)
return self.cameras
class FrankaPandaGrasper(object):
def __init__(
self,
agent: BaseAgent,
control_freq: float,
joint_vel_limits: float = 2.0,
joint_acc_limits: float = 1.0,
finger_length: float = 0.025,
) -> None:
self.agent = agent
self.robot = agent.robot
self.control_freq = control_freq
self.control_timestep = 1 / control_freq
self.joint_vel_limits = joint_vel_limits
self.joint_acc_limits = joint_acc_limits
self.finger_length = finger_length
self.planners = self._setup_planner()
def _setup_planner(self) -> mplib.Planner:
planners = []
for pose in self.robot.pose:
link_names = [link.get_name() for link in self.robot.get_links()]
joint_names = [
joint.get_name() for joint in self.robot.get_active_joints()
]
planner = mplib.Planner(
urdf=self.agent.urdf_path,
srdf=self.agent.urdf_path.replace(".urdf", ".srdf"),
user_link_names=link_names,
user_joint_names=joint_names,
move_group="panda_hand_tcp",
joint_vel_limits=np.ones(7) * self.joint_vel_limits,
joint_acc_limits=np.ones(7) * self.joint_acc_limits,
)
planner.set_base_pose(pose.raw_pose[0].tolist())
planners.append(planner)
return planners
def control_gripper(
self,
gripper_state: Literal[-1, 1],
n_step: int = 10,
) -> np.ndarray:
qpos = self.robot.get_qpos()[0, :-2].cpu().numpy()
actions = []
for _ in range(n_step):
action = np.hstack([qpos, gripper_state])[None, ...]
actions.append(action)
return np.concatenate(actions, axis=0)
def move_to_pose(
self,
pose: sapien.Pose,
control_timestep: float,
gripper_state: Literal[-1, 1],
use_point_cloud: bool = False,
n_max_step: int = 100,
action_key: str = "position",
env_idx: int = 0,
) -> np.ndarray:
result = self.planners[env_idx].plan_qpos_to_pose(
np.concatenate([pose.p, pose.q]),
self.robot.get_qpos().cpu().numpy()[0],
time_step=control_timestep,
use_point_cloud=use_point_cloud,
)
if result["status"] != "Success":
result = self.planners[env_idx].plan_screw(
np.concatenate([pose.p, pose.q]),
self.robot.get_qpos().cpu().numpy()[0],
time_step=control_timestep,
use_point_cloud=use_point_cloud,
)
if result["status"] != "Success":
return
sample_ratio = (len(result[action_key]) // n_max_step) + 1
result[action_key] = result[action_key][::sample_ratio]
n_step = len(result[action_key])
actions = []
for i in range(n_step):
qpos = result[action_key][i]
action = np.hstack([qpos, gripper_state])[None, ...]
actions.append(action)
return np.concatenate(actions, axis=0)
def compute_grasp_action(
self,
actor: sapien.pysapien.Entity,
reach_target_only: bool = True,
offset: tuple[float, float, float] = [0, 0, -0.05],
env_idx: int = 0,
) -> np.ndarray:
physx_rigid = actor.components[1]
mesh = get_component_mesh(physx_rigid, to_world_frame=True)
obb = mesh.bounding_box_oriented
approaching = np.array([0, 0, -1])
tcp_pose = self.agent.tcp.pose[env_idx]
target_closing = (
tcp_pose.to_transformation_matrix()[0, :3, 1].cpu().numpy()
)
grasp_info = compute_grasp_info_by_obb(
obb,
approaching=approaching,
target_closing=target_closing,
depth=self.finger_length,
)
closing, center = grasp_info["closing"], grasp_info["center"]
raw_tcp_pose = tcp_pose.sp
grasp_pose = self.agent.build_grasp_pose(approaching, closing, center)
reach_pose = grasp_pose * sapien.Pose(p=offset)
grasp_pose = grasp_pose * sapien.Pose(p=[0, 0, 0.01])
actions = []
reach_actions = self.move_to_pose(
reach_pose,
self.control_timestep,
gripper_state=1,
env_idx=env_idx,
)
actions.append(reach_actions)
if reach_actions is None:
logger.warning(
f"Failed to reach the grasp pose for node `{actor.name}`, skipping grasping."
)
return None
if not reach_target_only:
grasp_actions = self.move_to_pose(
grasp_pose,
self.control_timestep,
gripper_state=1,
env_idx=env_idx,
)
actions.append(grasp_actions)
close_actions = self.control_gripper(
gripper_state=-1,
env_idx=env_idx,
)
actions.append(close_actions)
back_actions = self.move_to_pose(
raw_tcp_pose,
self.control_timestep,
gripper_state=-1,
env_idx=env_idx,
)
actions.append(back_actions)
return np.concatenate(actions, axis=0)