|
from typing import Callable, List |
|
from omegaconf import DictConfig, OmegaConf |
|
from enum import Enum |
|
import torch |
|
import numpy as np |
|
import multiprocess as mp |
|
import time |
|
import threading |
|
import cv2 |
|
import pygame |
|
import os |
|
import pickle |
|
import subprocess |
|
from pathlib import Path |
|
from copy import deepcopy |
|
import open3d as o3d |
|
|
|
from pgnd.utils import get_root, mkdir |
|
root: Path = get_root(__file__) |
|
|
|
from camera.multi_realsense import MultiRealsense |
|
from camera.single_realsense import SingleRealsense |
|
|
|
from modules_planning.segment_perception import SegmentPerception |
|
from modules_planning.planning_module import PlanningModule |
|
from modules_planning.xarm_controller import XarmController |
|
from modules_planning.udp_util import udpReceiver, udpSender |
|
from modules_planning.common.communication import XARM_STATE_PORT, XARM_CONTROL_PORT, XARM_CONTROL_PORT_L, XARM_CONTROL_PORT_R |
|
from modules_planning.common.xarm import GRIPPER_OPEN_MIN, GRIPPER_OPEN_MAX, POSITION_UPDATE_INTERVAL, COMMAND_CHECK_INTERVAL |
|
|
|
|
|
class EnvEnum(Enum): |
|
NONE = 0 |
|
INFO = 1 |
|
DEBUG = 2 |
|
VERBOSE = 3 |
|
|
|
|
|
class RobotPlanningEnv(mp.Process): |
|
|
|
def __init__( |
|
self, |
|
cfg: DictConfig, |
|
exp_root: str, |
|
ckpt_path: Path | str, |
|
|
|
realsense: MultiRealsense | SingleRealsense | None = None, |
|
shm_manager: mp.managers.SharedMemoryManager | None = None, |
|
serial_numbers: list[str] | None= None, |
|
resolution: tuple[int, int] = (1280, 720), |
|
capture_fps: int = 30, |
|
enable_depth: bool = True, |
|
enable_color: bool = True, |
|
|
|
record_fps: int | None = 0, |
|
record_time: float | None = 60 * 10, |
|
text_prompts: str = "white cotton rope.", |
|
show_annotation: bool = True, |
|
|
|
use_robot: bool = False, |
|
robot_ip: str = '192.168.1.196', |
|
gripper_enable: bool = False, |
|
calibrate_result_dir: Path = root / "log" / "latest_calibration", |
|
debug: bool | int | None = False, |
|
|
|
bimanual: bool = False, |
|
bimanual_robot_ip: List[str] = ['192.168.1.196', '192.168.1.224'], |
|
|
|
construct_target: bool = False, |
|
|
|
) -> None: |
|
|
|
|
|
self.debug = 0 if debug is None else (2 if debug is True else debug) |
|
|
|
self.cfg = cfg |
|
self.exp_root = Path(exp_root) |
|
|
|
if os.path.exists(self.exp_root.parent / "target" / "target.pcd"): |
|
mkdir(self.exp_root / "target", overwrite=False, resume=True) |
|
os.system(f"cp {self.exp_root.parent / 'target' / 'target.pcd'} {self.exp_root / 'target'}") |
|
else: |
|
assert construct_target, "target.pcd not found" |
|
self.construct_target = construct_target |
|
if construct_target: |
|
mkdir(self.exp_root.parent / "target", overwrite=False, resume=False) |
|
|
|
self.torch_device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") |
|
|
|
self.bimanual = bimanual |
|
if self.bimanual: |
|
self.bbox = np.array([[0.0, 0.7], [-0.35, 0.45 + 0.75], [-0.8, 0.0]]) |
|
else: |
|
self.bbox = np.array([[0.0, 0.7], [-0.35, 0.45], [-0.8, 0.0]]) |
|
|
|
self.text_prompts = text_prompts |
|
self.show_annotation = show_annotation |
|
|
|
self.capture_fps = capture_fps |
|
self.record_fps = record_fps |
|
|
|
self.state = mp.Manager().dict() |
|
|
|
|
|
|
|
if realsense is not None: |
|
assert isinstance(realsense, MultiRealsense) or isinstance(realsense, SingleRealsense) |
|
self.realsense = realsense |
|
self.serial_numbers = list(self.realsense.cameras.keys()) |
|
else: |
|
self.realsense = MultiRealsense( |
|
shm_manager=shm_manager, |
|
serial_numbers=serial_numbers, |
|
resolution=resolution, |
|
capture_fps=capture_fps, |
|
enable_depth=enable_depth, |
|
enable_color=enable_color, |
|
verbose=self.debug > EnvEnum.VERBOSE.value |
|
) |
|
self.serial_numbers = list(self.realsense.cameras.keys()) |
|
|
|
|
|
self.realsense.set_exposure(exposure=60, gain=60) |
|
self.realsense.set_white_balance(3800) |
|
|
|
|
|
self.calibrate_result_dir = calibrate_result_dir |
|
with open(f'{self.calibrate_result_dir}/base.pkl', 'rb') as f: |
|
base = pickle.load(f) |
|
if self.bimanual: |
|
R_leftbase2board = base['R_leftbase2world'] |
|
t_leftbase2board = base['t_leftbase2world'] |
|
R_rightbase2board = base['R_rightbase2world'] |
|
t_rightbase2board = base['t_rightbase2world'] |
|
leftbase2world_mat = np.eye(4) |
|
leftbase2world_mat[:3, :3] = R_leftbase2board |
|
leftbase2world_mat[:3, 3] = t_leftbase2board |
|
self.state["b2w_l"] = leftbase2world_mat |
|
rightbase2world_mat = np.eye(4) |
|
rightbase2world_mat[:3, :3] = R_rightbase2board |
|
rightbase2world_mat[:3, 3] = t_rightbase2board |
|
self.state["b2w_r"] = rightbase2world_mat |
|
else: |
|
R_base2board = base['R_base2world'] |
|
t_base2board = base['t_base2world'] |
|
base2world_mat = np.eye(4) |
|
base2world_mat[:3, :3] = R_base2board |
|
base2world_mat[:3, 3] = t_base2board |
|
self.state["b2w"] = base2world_mat |
|
|
|
|
|
extr_list = [] |
|
with open(f'{self.calibrate_result_dir}/rvecs.pkl', 'rb') as f: |
|
rvecs = pickle.load(f) |
|
with open(f'{self.calibrate_result_dir}/tvecs.pkl', 'rb') as f: |
|
tvecs = pickle.load(f) |
|
for i in range(len(self.serial_numbers)): |
|
device = self.serial_numbers[i] |
|
R_world2cam = cv2.Rodrigues(rvecs[device])[0] |
|
t_world2cam = tvecs[device][:, 0] |
|
extr_mat = np.eye(4) |
|
extr_mat[:3, :3] = R_world2cam |
|
extr_mat[:3, 3] = t_world2cam |
|
extr_list.append(extr_mat) |
|
self.state["extr"] = np.stack(extr_list) |
|
|
|
|
|
mkdir(self.exp_root / "calibration", overwrite=False, resume=False) |
|
subprocess.run(f'cp -r {self.calibrate_result_dir}/* {str(self.exp_root)}/calibration', shell=True) |
|
|
|
|
|
self.perception = SegmentPerception( |
|
realsense=self.realsense, |
|
capture_fps=self.realsense.capture_fps, |
|
record_fps=record_fps, |
|
record_time=record_time, |
|
text_prompts=self.text_prompts, |
|
show_annotation=self.show_annotation, |
|
bbox=self.bbox, |
|
device=self.torch_device, |
|
verbose=self.debug > EnvEnum.VERBOSE.value) |
|
|
|
|
|
self.use_robot = use_robot |
|
if use_robot: |
|
if bimanual: |
|
self.left_xarm_controller = XarmController( |
|
start_time=time.time(), |
|
ip=bimanual_robot_ip[0], |
|
gripper_enable=gripper_enable, |
|
mode='3D', |
|
command_mode='cartesian', |
|
robot_id=0, |
|
verbose=True, |
|
) |
|
self.right_xarm_controller = XarmController( |
|
start_time=time.time(), |
|
ip=bimanual_robot_ip[1], |
|
gripper_enable=gripper_enable, |
|
mode='3D', |
|
command_mode='cartesian', |
|
robot_id=1, |
|
verbose=True, |
|
) |
|
self.xarm_controller = None |
|
else: |
|
self.xarm_controller = XarmController( |
|
start_time=time.time(), |
|
ip=robot_ip, |
|
gripper_enable=gripper_enable, |
|
mode='3D', |
|
command_mode='cartesian', |
|
verbose=True, |
|
) |
|
self.left_xarm_controller = None |
|
self.right_xarm_controller = None |
|
else: |
|
self.left_xarm_controller = None |
|
self.right_xarm_controller = None |
|
self.xarm_controller = None |
|
|
|
|
|
self._real_alive = mp.Value('b', False) |
|
|
|
self.start_time = 0 |
|
mp.Process.__init__(self) |
|
self._alive = mp.Value('b', False) |
|
|
|
|
|
|
|
img_w, img_h = 848, 480 |
|
col_num = 2 |
|
self.screen_width, self.screen_height = img_w * col_num, img_h * len(self.realsense.serial_numbers) |
|
self.image_window = None |
|
|
|
|
|
self.image_data = mp.Array('B', np.zeros((self.screen_height, self.screen_width, 3), dtype=np.uint8).flatten()) |
|
|
|
|
|
self.eef_point = np.array([[0.0, 0.0, 0.175]]) |
|
|
|
|
|
self.planning_module = PlanningModule( |
|
cfg=cfg, |
|
exp_root=self.exp_root, |
|
ckpt_path=ckpt_path, |
|
device=self.torch_device, |
|
bimanual=bimanual, |
|
bbox=self.bbox, |
|
eef_point=self.eef_point, |
|
debug=self.debug |
|
) |
|
self.planning_module.set_target(str(self.exp_root / "target" / "target.pcd")) |
|
self.command = [] |
|
self.command_sender = None |
|
self.command_sender_left = None |
|
self.command_sender_right = None |
|
|
|
def real_start(self, start_time) -> None: |
|
self._real_alive.value = True |
|
print("starting real env") |
|
|
|
|
|
self.realsense.start() |
|
self.realsense.restart_put(start_time + 1) |
|
time.sleep(2) |
|
|
|
|
|
if self.perception is not None: |
|
self.perception.start() |
|
self.perception.update_extrinsics(self.state["extr"]) |
|
|
|
|
|
if self.use_robot: |
|
if self.bimanual: |
|
assert self.left_xarm_controller is not None |
|
assert self.right_xarm_controller is not None |
|
self.left_xarm_controller.start() |
|
self.right_xarm_controller.start() |
|
else: |
|
assert self.xarm_controller is not None |
|
self.xarm_controller.start() |
|
|
|
while not self.real_alive: |
|
self._real_alive.value = True |
|
print(".", end="") |
|
time.sleep(0.5) |
|
|
|
|
|
intrs = self.get_intrinsics() |
|
intrs = np.array(intrs) |
|
np.save(self.exp_root / "calibration" / "intrinsics.npy", intrs) |
|
|
|
print("real env started") |
|
|
|
self.update_real_state_t = threading.Thread(name="update_real_state", target=self.update_real_state) |
|
self.update_real_state_t.start() |
|
|
|
def real_stop(self, wait=False) -> None: |
|
self._real_alive.value = False |
|
if self.use_robot: |
|
if self.bimanual and self.left_xarm_controller and self.left_xarm_controller.is_controller_alive: |
|
self.left_xarm_controller.stop() |
|
if self.bimanual and self.right_xarm_controller and self.right_xarm_controller.is_controller_alive: |
|
self.right_xarm_controller.stop() |
|
if not self.bimanual and self.xarm_controller and self.xarm_controller.is_controller_alive: |
|
self.xarm_controller.stop() |
|
if self.perception is not None and self.perception.alive.value: |
|
self.perception.stop() |
|
self.realsense.stop(wait=False) |
|
|
|
self.image_display_thread.join() |
|
self.update_real_state_t.join() |
|
print("real env stopped") |
|
|
|
@property |
|
def real_alive(self) -> bool: |
|
alive = self._real_alive.value |
|
if self.perception is not None: |
|
alive = alive and self.perception.alive.value |
|
if self.use_robot: |
|
controller_alive = \ |
|
(self.bimanual and self.left_xarm_controller and self.left_xarm_controller.is_controller_alive \ |
|
and self.right_xarm_controller and self.right_xarm_controller.is_controller_alive) \ |
|
or (not self.bimanual and self.xarm_controller and self.xarm_controller.is_controller_alive) |
|
alive = alive and controller_alive |
|
self._real_alive.value = alive |
|
return self._real_alive.value |
|
|
|
def _update_perception(self) -> None: |
|
if self.perception.alive.value and self.perception.do_process.value: |
|
if not self.perception.perception_q.empty(): |
|
self.state["perception_out"] = { |
|
"time": time.time(), |
|
"value": self.perception.perception_q.get() |
|
} |
|
return |
|
|
|
def _update_robot(self) -> None: |
|
if self.bimanual: |
|
assert self.left_xarm_controller is not None |
|
assert self.right_xarm_controller is not None |
|
if self.left_xarm_controller.is_controller_alive and self.right_xarm_controller.is_controller_alive: |
|
if not self.left_xarm_controller.cur_trans_q.empty() and not self.right_xarm_controller.cur_trans_q.empty(): |
|
self.state["robot_out"] = { |
|
"time": time.time(), |
|
"left_value": self.left_xarm_controller.cur_trans_q.get(), |
|
"right_value": self.right_xarm_controller.cur_trans_q.get() |
|
} |
|
if not self.left_xarm_controller.cur_gripper_q.empty() and not self.right_xarm_controller.cur_trans_q.empty(): |
|
self.state["gripper_out"] = { |
|
"time": time.time(), |
|
"left_value": self.left_xarm_controller.cur_gripper_q.get(), |
|
"right_value": self.right_xarm_controller.cur_gripper_q.get() |
|
} |
|
else: |
|
assert self.xarm_controller is not None |
|
if self.xarm_controller.is_controller_alive: |
|
if not self.xarm_controller.cur_trans_q.empty(): |
|
self.state["robot_out"] = { |
|
"time": time.time(), |
|
"value": self.xarm_controller.cur_trans_q.get() |
|
} |
|
if not self.xarm_controller.cur_gripper_q.empty(): |
|
self.state["gripper_out"] = { |
|
"time": time.time(), |
|
"value": self.xarm_controller.cur_gripper_q.get() |
|
} |
|
return |
|
|
|
def update_real_state(self) -> None: |
|
while self.real_alive: |
|
try: |
|
if self.use_robot: |
|
self._update_robot() |
|
if self.perception is not None: |
|
self._update_perception() |
|
except BaseException as e: |
|
print(f"Error in update_real_state: {e.with_traceback()}") |
|
break |
|
print("update_real_state stopped") |
|
|
|
def display_image(self): |
|
pygame.init() |
|
self.image_window = pygame.display.set_mode((self.screen_width, self.screen_height)) |
|
pygame.display.set_caption('Image Display Window') |
|
while self._alive.value: |
|
|
|
image = np.frombuffer(self.image_data.get_obj(), dtype=np.uint8).reshape((self.screen_height, self.screen_width, 3)) |
|
pygame_image = pygame.surfarray.make_surface(image.swapaxes(0, 1)) |
|
|
|
|
|
self.image_window.blit(pygame_image, (0, 0)) |
|
pygame.display.update() |
|
|
|
|
|
for event in pygame.event.get(): |
|
if event.type == pygame.QUIT: |
|
self.stop() |
|
pygame.quit() |
|
return |
|
|
|
time.sleep(1 / self.realsense.capture_fps) |
|
print("Image display stopped") |
|
|
|
def start_image_display(self): |
|
|
|
self.image_display_thread = threading.Thread(name="display_image", target=self.display_image) |
|
self.image_display_thread.start() |
|
|
|
def run(self) -> None: |
|
|
|
robot_record_dir = None |
|
if self.bimanual: |
|
self.command_sender_left = udpSender(port=XARM_CONTROL_PORT_L) |
|
self.command_sender_right = udpSender(port=XARM_CONTROL_PORT_R) |
|
else: |
|
self.command_sender = udpSender(port=XARM_CONTROL_PORT) |
|
|
|
|
|
rgbs = [] |
|
depths = [] |
|
resolution = self.realsense.resolution |
|
for i in range(len(self.realsense.serial_numbers)): |
|
rgbs.append(np.zeros((resolution[1], resolution[0], 3), np.uint8)) |
|
depths.append(np.zeros((resolution[1], resolution[0]), np.uint16)) |
|
|
|
fps = self.record_fps if self.record_fps and self.record_fps > 0 else self.realsense.capture_fps |
|
idx = 0 |
|
|
|
while self.alive: |
|
try: |
|
tic = time.time() |
|
state = deepcopy(self.state) |
|
if self.bimanual: |
|
b2w_l = state["b2w_l"] |
|
b2w_r = state["b2w_r"] |
|
b2w = None |
|
else: |
|
b2w = state["b2w"] |
|
b2w_l = None |
|
b2w_r = None |
|
|
|
|
|
perception_out = state.get("perception_out", None) |
|
robot_out = state.get("robot_out", None) |
|
gripper_out = state.get("gripper_out", None) |
|
|
|
|
|
if perception_out is not None: |
|
self.state["perception_out"] = None |
|
|
|
intrinsics = self.get_intrinsics() |
|
state["intr"] = intrinsics |
|
self.state["intr"] = intrinsics |
|
self.perception.update_intrinsics(intrinsics) |
|
|
|
update_perception = True |
|
if update_perception and perception_out is not None: |
|
print("update image display") |
|
for k in range(len(perception_out['value']['color'])): |
|
rgbs[k] = perception_out['value']['color'][k].copy() |
|
depths[k] = perception_out['value']['depth'][k].copy() |
|
intr = intrinsics[k] |
|
|
|
l = 0.1 |
|
origin = state["extr"][k] @ np.array([0, 0, 0, 1]) |
|
x_axis = state["extr"][k] @ np.array([l, 0, 0, 1]) |
|
y_axis = state["extr"][k] @ np.array([0, l, 0, 1]) |
|
z_axis = state["extr"][k] @ np.array([0, 0, l, 1]) |
|
origin = origin[:3] / origin[2] |
|
x_axis = x_axis[:3] / x_axis[2] |
|
y_axis = y_axis[:3] / y_axis[2] |
|
z_axis = z_axis[:3] / z_axis[2] |
|
origin = intr @ origin |
|
x_axis = intr @ x_axis |
|
y_axis = intr @ y_axis |
|
z_axis = intr @ z_axis |
|
cv2.line(rgbs[k], (int(origin[0]), int(origin[1])), (int(x_axis[0]), int(x_axis[1])), (255, 0, 0), 2) |
|
cv2.line(rgbs[k], (int(origin[0]), int(origin[1])), (int(y_axis[0]), int(y_axis[1])), (0, 255, 0), 2) |
|
cv2.line(rgbs[k], (int(origin[0]), int(origin[1])), (int(z_axis[0]), int(z_axis[1])), (0, 0, 255), 2) |
|
if self.use_robot: |
|
eef_points = np.concatenate([self.eef_point, np.ones((self.eef_point.shape[0], 1))], axis=1) |
|
eef_colors = [(0, 255, 255)] |
|
|
|
eef_axis = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0]]) |
|
eef_axis_colors = [(0, 0, 255), (0, 255, 0), (255, 0, 0)] |
|
|
|
if robot_out is not None: |
|
assert gripper_out is not None |
|
eef_points_world_vis = [] |
|
eef_points_vis = [] |
|
if self.bimanual: |
|
left_eef_world_list = [] |
|
right_eef_world_list = [] |
|
assert b2w_l is not None |
|
assert b2w_r is not None |
|
for val, b2w, eef_world_list in zip(["left_value", "right_value"], [b2w_l, b2w_r], [left_eef_world_list, right_eef_world_list]): |
|
e2b = robot_out[val] |
|
eef_points_world = (b2w @ e2b @ eef_points.T).T[:, :3] |
|
eef_points_vis.append(eef_points) |
|
eef_points_world_vis.append(eef_points_world) |
|
eef_orientation_world = (b2w[:3, :3] @ e2b[:3, :3] @ eef_axis[:, :3].T).T |
|
eef_world = np.concatenate([eef_points_world, eef_orientation_world], axis=0) |
|
eef_world_list.append(eef_world) |
|
left_eef_world = np.concatenate(left_eef_world_list, axis=0) |
|
right_eef_world = np.concatenate(right_eef_world_list, axis=0) |
|
eef_world = np.concatenate([left_eef_world, right_eef_world], axis=0) |
|
else: |
|
assert b2w is not None |
|
e2b = robot_out["value"] |
|
eef_points_world = (b2w @ e2b @ eef_points.T).T[:, :3] |
|
eef_points_vis.append(eef_points) |
|
eef_points_world_vis.append(eef_points_world) |
|
eef_orientation_world = (b2w[:3, :3] @ e2b[:3, :3] @ eef_axis[:, :3].T).T |
|
eef_world = np.concatenate([eef_points_world, eef_orientation_world], axis=0) |
|
|
|
|
|
if self.bimanual: |
|
left_gripper = gripper_out["left_value"] |
|
right_gripper = gripper_out["right_value"] |
|
gripper_world = np.array([left_gripper, right_gripper, 0.0])[None, :] |
|
else: |
|
gripper = gripper_out["value"] |
|
gripper_world = np.array([gripper, 0.0, 0.0])[None, :] |
|
|
|
eef_world = np.concatenate([eef_world, gripper_world], axis=0) |
|
if robot_record_dir is not None: |
|
np.savetxt(robot_record_dir / f"{robot_out['time']:.3f}.txt", eef_world, fmt="%.6f") |
|
|
|
eef_points_vis = np.concatenate(eef_points_vis, axis=0) |
|
eef_points_world_vis = np.concatenate(eef_points_world_vis, axis=0) |
|
eef_points_world_vis = np.concatenate([eef_points_world_vis, np.ones((eef_points_world_vis.shape[0], 1))], axis=1) |
|
eef_colors = eef_colors * eef_points_world_vis.shape[0] |
|
|
|
if self.bimanual: |
|
for point_orig, point, color, val, b2w in zip(eef_points_vis, eef_points_world_vis, eef_colors, ["left_value", "right_value"], [b2w_l, b2w_r]): |
|
e2b = robot_out[val] |
|
point = state["extr"][k] @ point |
|
point = point[:3] / point[2] |
|
point = intr @ point |
|
cv2.circle(rgbs[k], (int(point[0]), int(point[1])), 2, color, -1) |
|
|
|
|
|
for axis, color in zip(eef_axis, eef_axis_colors): |
|
eef_point_axis = point_orig + 0.1 * axis |
|
eef_point_axis_world = (b2w @ e2b @ eef_point_axis).T |
|
eef_point_axis_world = state["extr"][k] @ eef_point_axis_world |
|
eef_point_axis_world = eef_point_axis_world[:3] / eef_point_axis_world[2] |
|
eef_point_axis_world = intr @ eef_point_axis_world |
|
cv2.line(rgbs[k], |
|
(int(point[0]), int(point[1])), |
|
(int(eef_point_axis_world[0]), int(eef_point_axis_world[1])), |
|
color, 2) |
|
else: |
|
point_orig, point, color, val, b2w = eef_points_vis[0], eef_points_world_vis[0], eef_colors[0], "value", b2w |
|
e2b = robot_out[val] |
|
point = state["extr"][k] @ point |
|
point = point[:3] / point[2] |
|
point = intr @ point |
|
cv2.circle(rgbs[k], (int(point[0]), int(point[1])), 2, color, -1) |
|
|
|
|
|
for axis, color in zip(eef_axis, eef_axis_colors): |
|
eef_point_axis = point_orig + 0.1 * axis |
|
eef_point_axis_world = (b2w @ e2b @ eef_point_axis).T |
|
eef_point_axis_world = state["extr"][k] @ eef_point_axis_world |
|
eef_point_axis_world = eef_point_axis_world[:3] / eef_point_axis_world[2] |
|
eef_point_axis_world = intr @ eef_point_axis_world |
|
cv2.line(rgbs[k], |
|
(int(point[0]), int(point[1])), |
|
(int(eef_point_axis_world[0]), int(eef_point_axis_world[1])), |
|
color, 2) |
|
|
|
row_imgs = [] |
|
for row in range(len(self.realsense.serial_numbers)): |
|
row_imgs.append( |
|
np.hstack( |
|
(cv2.cvtColor(rgbs[row], cv2.COLOR_BGR2RGB), |
|
cv2.applyColorMap(cv2.convertScaleAbs(depths[row], alpha=0.03), cv2.COLORMAP_JET)) |
|
) |
|
) |
|
combined_img = np.vstack(row_imgs) |
|
combined_img = cv2.resize(combined_img, (self.screen_width,self.screen_height)) |
|
np.copyto( |
|
np.frombuffer(self.image_data.get_obj(), dtype=np.uint8).reshape((self.screen_height, self.screen_width, 3)), |
|
combined_img |
|
) |
|
|
|
|
|
if self.construct_target and perception_out is not None and robot_out is not None: |
|
assert os.path.exists(self.exp_root.parent / "target") |
|
save_dir = str(self.exp_root.parent / "target" / "target.pcd") |
|
pts = perception_out['value']['pts'] |
|
print(f"target pts: {pts}") |
|
pts = np.concatenate(pts, axis=0) |
|
pcd = o3d.geometry.PointCloud() |
|
pcd.points = o3d.utility.Vector3dVector(pts) |
|
o3d.io.write_point_cloud(save_dir, pcd) |
|
|
|
mkdir(self.exp_root.parent / "target" / "vis", overwrite=False, resume=True) |
|
eef_points_world_vis = [] |
|
eef_points = np.concatenate([self.eef_point, np.ones((self.eef_point.shape[0], 1))], axis=1) |
|
eef_axis = np.array([[1, 0, 0, 0], [0, 1, 0, 0], [0, 0, 1, 0]]) |
|
if self.bimanual: |
|
left_eef_world_list = [] |
|
right_eef_world_list = [] |
|
assert b2w_l is not None |
|
assert b2w_r is not None |
|
for val, b2w, eef_world_list in zip(["left_value", "right_value"], [b2w_l, b2w_r], [left_eef_world_list, right_eef_world_list]): |
|
e2b = robot_out[val] |
|
eef_points_world = (b2w @ e2b @ eef_points.T).T[:, :3] |
|
eef_points_world_vis.append(eef_points_world) |
|
eef_orientation_world = (b2w[:3, :3] @ e2b[:3, :3] @ eef_axis[:, :3].T).T |
|
eef_world = np.concatenate([eef_points_world, eef_orientation_world], axis=0) |
|
eef_world_list.append(eef_world) |
|
left_eef_world = np.concatenate(left_eef_world_list, axis=0) |
|
right_eef_world = np.concatenate(right_eef_world_list, axis=0) |
|
eef_world = np.concatenate([left_eef_world, right_eef_world], axis=0) |
|
else: |
|
assert b2w is not None |
|
e2b = robot_out["value"] |
|
eef_points_world = (b2w @ e2b @ eef_points.T).T[:, :3] |
|
eef_points_world_vis.append(eef_points_world) |
|
eef_orientation_world = (b2w[:3, :3] @ e2b[:3, :3] @ eef_axis[:, :3].T).T |
|
eef_world = np.concatenate([eef_points_world, eef_orientation_world], axis=0) |
|
np.savetxt(str(self.exp_root.parent / "target" / "vis" / f"robot.txt"), eef_world, fmt="%.6f") |
|
|
|
for k in range(len(perception_out['value']['color'])): |
|
rgbs[k] = perception_out['value']['color'][k] |
|
depths[k] = perception_out['value']['depth'][k] |
|
rgb_save_dir = str(self.exp_root.parent / "target" / "vis" / f"rgb_{k}.png") |
|
depth_save_dir = str(self.exp_root.parent / "target" / "vis" / f"depth_{k}.png") |
|
cv2.imwrite(rgb_save_dir, rgbs[k]) |
|
cv2.imwrite(depth_save_dir, depths[k]) |
|
|
|
intr_list = [] |
|
extr_list = [] |
|
for k in range(len(perception_out['value']['color'])): |
|
intr = intrinsics[k] |
|
extr = state["extr"][k] |
|
intr_list.append(intr) |
|
extr_list.append(extr) |
|
np.save(str(self.exp_root.parent / "target" / "vis" / "intrinsics.npy"), np.stack(intr_list)) |
|
np.save(str(self.exp_root.parent / "target" / "vis" / "extrinsics.npy"), np.stack(extr_list)) |
|
|
|
print(f"target saved to {save_dir}") |
|
time.sleep(5) |
|
continue |
|
|
|
|
|
if perception_out is not None and robot_out is not None and gripper_out is not None: |
|
if len(perception_out['value']['pts']) > 0: |
|
self.perception.do_process.value = False |
|
command = self.planning_module.get_command(state, save_dir=f'{self.exp_root}/interaction_{idx:02d}', is_first_iter=(idx == 0)) |
|
if self.bimanual: |
|
self.command_sender_left.send(command[0]) |
|
self.command_sender_right.send(command[1]) |
|
else: |
|
self.command_sender.send(command[0]) |
|
idx += 1 |
|
time.sleep(10) |
|
self.perception.do_process.value = True |
|
else: |
|
print(f'no points detected in perception_out: {perception_out["value"]["pts"]}') |
|
else: |
|
print(f'perception_out is None: {perception_out is None}', end=' ') |
|
print(f'robot_out is None: {robot_out is None}', end=' ') |
|
print(f'gripper_out is None: {gripper_out is None}') |
|
|
|
time.sleep(max(0, 1 / fps - (time.time() - tic))) |
|
|
|
except BaseException as e: |
|
print(f"Error in robot planning env: {e.with_traceback()}") |
|
break |
|
|
|
if self.bimanual: |
|
assert self.command_sender_left is not None |
|
assert self.command_sender_right is not None |
|
self.command_sender_left.close() |
|
self.command_sender_right.close() |
|
else: |
|
assert self.command_sender is not None |
|
self.command_sender.close() |
|
self.stop() |
|
print("RealEnv process stopped") |
|
|
|
def get_intrinsics(self): |
|
return self.realsense.get_intrinsics() |
|
|
|
def get_extrinsics(self): |
|
return self.state["extr"] |
|
|
|
@property |
|
def alive(self) -> bool: |
|
alive = self._alive.value and self.real_alive |
|
self._alive.value = alive |
|
return alive |
|
|
|
def start(self) -> None: |
|
self.start_time = time.time() |
|
self._alive.value = True |
|
self.real_start(time.time()) |
|
self.start_image_display() |
|
super().start() |
|
|
|
def stop(self) -> None: |
|
self._alive.value = False |
|
self.real_stop() |
|
|