|
from typing import Optional, Callable, Dict |
|
import os |
|
import enum |
|
import time |
|
import json |
|
import numpy as np |
|
import pyrealsense2 as rs |
|
import multiprocess as mp |
|
import cv2 |
|
from threadpoolctl import threadpool_limits |
|
from multiprocess.managers import SharedMemoryManager |
|
|
|
from .utils import get_accumulate_timestamp_idxs |
|
from .shared_memory.shared_ndarray import SharedNDArray |
|
from .shared_memory.shared_memory_ring_buffer import SharedMemoryRingBuffer |
|
from .shared_memory.shared_memory_queue import SharedMemoryQueue, Full, Empty |
|
|
|
class Command(enum.Enum): |
|
SET_COLOR_OPTION = 0 |
|
SET_DEPTH_OPTION = 1 |
|
START_RECORDING = 2 |
|
STOP_RECORDING = 3 |
|
RESTART_PUT = 4 |
|
|
|
class SingleRealsense(mp.Process): |
|
MAX_PATH_LENGTH = 4096 |
|
|
|
def __init__( |
|
self, |
|
shm_manager: SharedMemoryManager, |
|
serial_number, |
|
resolution=(1280,720), |
|
capture_fps=30, |
|
put_fps=None, |
|
put_downsample=True, |
|
enable_color=True, |
|
enable_depth=False, |
|
process_depth=False, |
|
enable_infrared=False, |
|
get_max_k=30, |
|
advanced_mode_config=None, |
|
transform: Optional[Callable[[Dict], Dict]] = None, |
|
vis_transform: Optional[Callable[[Dict], Dict]] = None, |
|
is_master=False, |
|
verbose=False |
|
): |
|
super().__init__() |
|
|
|
if put_fps is None: |
|
put_fps = capture_fps |
|
|
|
|
|
resolution = tuple(resolution) |
|
shape = resolution[::-1] |
|
examples = dict() |
|
if enable_color: |
|
examples['color'] = np.empty( |
|
shape=shape+(3,), dtype=np.uint8) |
|
if enable_depth: |
|
examples['depth'] = np.empty( |
|
shape=shape, dtype=np.uint16) |
|
if enable_infrared: |
|
examples['infrared'] = np.empty( |
|
shape=shape, dtype=np.uint8) |
|
examples['camera_capture_timestamp'] = 0.0 |
|
examples['camera_receive_timestamp'] = 0.0 |
|
examples['timestamp'] = 0.0 |
|
examples['step_idx'] = 0 |
|
|
|
ring_buffer = SharedMemoryRingBuffer.create_from_examples( |
|
shm_manager=shm_manager, |
|
examples=examples if transform is None |
|
else transform(dict(examples)), |
|
get_max_k=get_max_k, |
|
get_time_budget=0.2, |
|
put_desired_frequency=put_fps |
|
) |
|
|
|
|
|
examples = { |
|
'cmd': Command.SET_COLOR_OPTION.value, |
|
'option_enum': rs.option.exposure.value, |
|
'option_value': 0.0, |
|
'put_start_time': 0.0 |
|
} |
|
|
|
command_queue = SharedMemoryQueue.create_from_examples( |
|
shm_manager=shm_manager, |
|
examples=examples, |
|
buffer_size=128 |
|
) |
|
|
|
|
|
intrinsics_array = SharedNDArray.create_from_shape( |
|
mem_mgr=shm_manager, |
|
shape=(7,), |
|
dtype=np.float64) |
|
intrinsics_array.get()[:] = 0 |
|
|
|
|
|
self.serial_number = serial_number |
|
self.resolution = resolution |
|
self.capture_fps = capture_fps |
|
self.put_fps = put_fps |
|
self.put_downsample = put_downsample |
|
self.enable_color = enable_color |
|
self.enable_depth = enable_depth |
|
self.enable_infrared = enable_infrared |
|
self.advanced_mode_config = advanced_mode_config |
|
self.transform = transform |
|
self.vis_transform = vis_transform |
|
self.process_depth = process_depth |
|
self.is_master = is_master |
|
self.verbose = verbose |
|
self.put_start_time = None |
|
|
|
|
|
self.stop_event = mp.Event() |
|
self.ready_event = mp.Event() |
|
self.ring_buffer = ring_buffer |
|
self.command_queue = command_queue |
|
self.intrinsics_array = intrinsics_array |
|
|
|
@staticmethod |
|
def get_connected_devices_serial(): |
|
serials = list() |
|
for d in rs.context().devices: |
|
if d.get_info(rs.camera_info.name).lower() != 'platform camera': |
|
serial = d.get_info(rs.camera_info.serial_number) |
|
product_line = d.get_info(rs.camera_info.product_line) |
|
if product_line == 'D400': |
|
|
|
serials.append(serial) |
|
serials = sorted(serials) |
|
return serials |
|
|
|
|
|
def __enter__(self): |
|
self.start() |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
self.stop() |
|
|
|
|
|
def start(self, wait=True, put_start_time=None): |
|
self.put_start_time = put_start_time |
|
super().start() |
|
if wait: |
|
self.start_wait() |
|
|
|
def stop(self, wait=True): |
|
self.stop_event.set() |
|
if wait: |
|
self.end_wait() |
|
|
|
def start_wait(self): |
|
self.ready_event.wait() |
|
|
|
def end_wait(self): |
|
self.join() |
|
|
|
@property |
|
def is_ready(self): |
|
return self.ready_event.is_set() |
|
|
|
def get(self, k=None, out=None): |
|
if k is None: |
|
return self.ring_buffer.get(out=out) |
|
else: |
|
return self.ring_buffer.get_last_k(k, out=out) |
|
|
|
|
|
def set_color_option(self, option: rs.option, value: float): |
|
self.command_queue.put({ |
|
'cmd': Command.SET_COLOR_OPTION.value, |
|
'option_enum': option.value, |
|
'option_value': value |
|
}) |
|
|
|
def set_exposure(self, exposure=None, gain=None): |
|
""" |
|
exposure: (1, 10000) 100us unit. (0.1 ms, 1/10000s) |
|
gain: (0, 128) |
|
""" |
|
|
|
if exposure is None and gain is None: |
|
|
|
self.set_color_option(rs.option.enable_auto_exposure, 1.0) |
|
else: |
|
|
|
self.set_color_option(rs.option.enable_auto_exposure, 0.0) |
|
if exposure is not None: |
|
self.set_color_option(rs.option.exposure, exposure) |
|
if gain is not None: |
|
self.set_color_option(rs.option.gain, gain) |
|
|
|
def set_white_balance(self, white_balance=None): |
|
if white_balance is None: |
|
self.set_color_option(rs.option.enable_auto_white_balance, 1.0) |
|
else: |
|
self.set_color_option(rs.option.enable_auto_white_balance, 0.0) |
|
self.set_color_option(rs.option.white_balance, white_balance) |
|
|
|
def get_intrinsics(self): |
|
assert self.ready_event.is_set() |
|
fx, fy, ppx, ppy = self.intrinsics_array.get()[:4] |
|
mat = np.eye(3) |
|
mat[0,0] = fx |
|
mat[1,1] = fy |
|
mat[0,2] = ppx |
|
mat[1,2] = ppy |
|
return mat |
|
|
|
def get_depth_scale(self): |
|
assert self.ready_event.is_set() |
|
scale = self.intrinsics_array.get()[-1] |
|
return scale |
|
|
|
def depth_process(self, depth_frame): |
|
depth_to_disparity = rs.disparity_transform(True) |
|
disparity_to_depth = rs.disparity_transform(False) |
|
|
|
spatial = rs.spatial_filter() |
|
spatial.set_option(rs.option.filter_magnitude, 5) |
|
spatial.set_option(rs.option.filter_smooth_alpha, 0.75) |
|
spatial.set_option(rs.option.filter_smooth_delta, 1) |
|
spatial.set_option(rs.option.holes_fill, 1) |
|
|
|
temporal = rs.temporal_filter() |
|
temporal.set_option(rs.option.filter_smooth_alpha, 0.75) |
|
temporal.set_option(rs.option.filter_smooth_delta, 1) |
|
|
|
filtered_depth = depth_to_disparity.process(depth_frame) |
|
filtered_depth = spatial.process(filtered_depth) |
|
filtered_depth = temporal.process(filtered_depth) |
|
filtered_depth = disparity_to_depth.process(filtered_depth) |
|
return filtered_depth |
|
|
|
def restart_put(self, start_time): |
|
self.command_queue.put({ |
|
'cmd': Command.RESTART_PUT.value, |
|
'put_start_time': start_time |
|
}) |
|
|
|
|
|
def run(self): |
|
|
|
threadpool_limits(1) |
|
cv2.setNumThreads(1) |
|
w, h = self.resolution |
|
fps = self.capture_fps |
|
align = rs.align(rs.stream.color) |
|
|
|
rs_config = rs.config() |
|
if self.enable_color: |
|
rs_config.enable_stream(rs.stream.color, |
|
w, h, rs.format.bgr8, fps) |
|
if self.enable_depth: |
|
rs_config.enable_stream(rs.stream.depth, |
|
w, h, rs.format.z16, fps) |
|
if self.enable_infrared: |
|
rs_config.enable_stream(rs.stream.infrared, |
|
w, h, rs.format.y8, fps) |
|
|
|
def init_device(): |
|
rs_config.enable_device(self.serial_number) |
|
|
|
|
|
pipeline = rs.pipeline() |
|
pipeline_profile = pipeline.start(rs_config) |
|
self.pipeline = pipeline |
|
self.pipeline_profile = pipeline_profile |
|
|
|
|
|
|
|
d = self.pipeline_profile.get_device().first_color_sensor() |
|
d.set_option(rs.option.global_time_enabled, 1) |
|
|
|
|
|
if self.advanced_mode_config is not None: |
|
json_text = json.dumps(self.advanced_mode_config) |
|
device = self.pipeline_profile.get_device() |
|
advanced_mode = rs.rs400_advanced_mode(device) |
|
advanced_mode.load_json(json_text) |
|
|
|
|
|
color_stream = self.pipeline_profile.get_stream(rs.stream.color) |
|
intr = color_stream.as_video_stream_profile().get_intrinsics() |
|
order = ['fx', 'fy', 'ppx', 'ppy', 'height', 'width'] |
|
for i, name in enumerate(order): |
|
self.intrinsics_array.get()[i] = getattr(intr, name) |
|
|
|
if self.enable_depth: |
|
depth_sensor = self.pipeline_profile.get_device().first_depth_sensor() |
|
depth_scale = depth_sensor.get_depth_scale() |
|
self.intrinsics_array.get()[-1] = depth_scale |
|
|
|
|
|
if self.verbose: |
|
print(f'[SingleRealsense {self.serial_number}] Main loop started.') |
|
|
|
try: |
|
init_device() |
|
|
|
put_idx = None |
|
put_start_time = self.put_start_time |
|
if put_start_time is None: |
|
put_start_time = time.time() |
|
|
|
iter_idx = 0 |
|
t_start = time.time() |
|
while not self.stop_event.is_set(): |
|
|
|
frameset = None |
|
while frameset is None: |
|
try: |
|
frameset = self.pipeline.wait_for_frames() |
|
except RuntimeError as e: |
|
print(f'[SingleRealsense {self.serial_number}] Error: {e}. Ready state: {self.ready_event.is_set()}, Restarting device.') |
|
device = self.pipeline.get_active_profile().get_device() |
|
device.hardware_reset() |
|
self.pipeline.stop() |
|
init_device() |
|
continue |
|
receive_time = time.time() |
|
|
|
frameset = align.process(frameset) |
|
|
|
self.ring_buffer.ready_for_get = (receive_time - put_start_time >= 0) |
|
|
|
|
|
if self.verbose: |
|
grad_start_time = time.time() |
|
data = dict() |
|
data['camera_receive_timestamp'] = receive_time |
|
|
|
data['camera_capture_timestamp'] = frameset.get_timestamp() / 1000 |
|
if self.enable_color: |
|
|
|
color_frame = frameset.get_color_frame() |
|
data['color'] = np.asarray(color_frame.get_data()) |
|
t = color_frame.get_timestamp() / 1000 |
|
data['camera_capture_timestamp'] = t |
|
|
|
|
|
if self.enable_depth: |
|
depth_frame = frameset.get_depth_frame() |
|
if self.process_depth: |
|
data['depth'] = self.depth_process(depth_frame) |
|
data['depth'] = np.asarray(depth_frame.get_data()) |
|
|
|
|
|
if self.enable_infrared: |
|
data['infrared'] = np.asarray( |
|
frameset.get_infrared_frame().get_data()) |
|
if self.verbose: |
|
print(f'[SingleRealsense {self.serial_number}] Grab data time {time.time() - grad_start_time}') |
|
|
|
|
|
if self.verbose: |
|
transform_start_time = time.time() |
|
put_data = data |
|
if self.transform is not None: |
|
put_data = self.transform(dict(data)) |
|
if self.verbose: |
|
print(f'[SingleRealsense {self.serial_number}] Transform time {time.time() - transform_start_time}') |
|
|
|
if self.verbose: |
|
put_data_start_time = time.time() |
|
if self.put_downsample: |
|
|
|
|
|
local_idxs, global_idxs, put_idx \ |
|
= get_accumulate_timestamp_idxs( |
|
timestamps=[receive_time], |
|
start_time=put_start_time, |
|
dt=1/self.put_fps, |
|
|
|
|
|
next_global_idx=put_idx, |
|
|
|
|
|
allow_negative=True |
|
) |
|
for step_idx in global_idxs: |
|
put_data['step_idx'] = step_idx |
|
|
|
put_data['timestamp'] = receive_time |
|
|
|
self.ring_buffer.put(put_data, wait=False, serial_number=self.serial_number) |
|
else: |
|
step_idx = int((receive_time - put_start_time) * self.put_fps) |
|
print(step_idx, receive_time) |
|
put_data['step_idx'] = step_idx |
|
put_data['timestamp'] = receive_time |
|
self.ring_buffer.put(put_data, wait=False, serial_number=self.serial_number) |
|
if self.verbose: |
|
print(f'[SingleRealsense {self.serial_number}] Put data time {time.time() - put_data_start_time}', end=' ') |
|
print(f'with downsample for {len(global_idxs)}x' if self.put_downsample and len(global_idxs) > 1 else '') |
|
|
|
|
|
if iter_idx == 0: |
|
self.ready_event.set() |
|
|
|
|
|
t_end = time.time() |
|
duration = t_end - t_start |
|
frequency = np.round(1 / duration, 1) |
|
t_start = t_end |
|
if self.verbose: |
|
print(f'[SingleRealsense {self.serial_number}] FPS {frequency}') |
|
|
|
|
|
try: |
|
commands = self.command_queue.get_all() |
|
n_cmd = len(commands['cmd']) |
|
except Empty: |
|
n_cmd = 0 |
|
|
|
|
|
for i in range(n_cmd): |
|
command = dict() |
|
for key, value in commands.items(): |
|
command[key] = value[i] |
|
cmd = command['cmd'] |
|
if cmd == Command.SET_COLOR_OPTION.value: |
|
sensor = self.pipeline_profile.get_device().first_color_sensor() |
|
option = rs.option(command['option_enum']) |
|
value = float(command['option_value']) |
|
sensor.set_option(option, value) |
|
|
|
|
|
|
|
elif cmd == Command.SET_DEPTH_OPTION.value: |
|
sensor = self.pipeline_profile.get_device().first_depth_sensor().set_option(rs.option.inter_cam_sync_mode, 1 if self.is_master else 2) |
|
option = rs.option(command['option_enum']) |
|
value = float(command['option_value']) |
|
sensor.set_option(option, value) |
|
elif cmd == Command.RESTART_PUT.value: |
|
put_idx = None |
|
put_start_time = command['put_start_time'] |
|
|
|
iter_idx += 1 |
|
finally: |
|
rs_config.disable_all_streams() |
|
self.ready_event.set() |
|
|
|
if self.verbose: |
|
print(f'[SingleRealsense {self.serial_number}] Exiting worker process.') |
|
|