from typing import Optional, Callable, Dict import os import enum import time import json import numpy as np import pyrealsense2 as rs import multiprocess as mp import cv2 from threadpoolctl import threadpool_limits from multiprocess.managers import SharedMemoryManager from .utils import get_accumulate_timestamp_idxs from .shared_memory.shared_ndarray import SharedNDArray from .shared_memory.shared_memory_ring_buffer import SharedMemoryRingBuffer from .shared_memory.shared_memory_queue import SharedMemoryQueue, Full, Empty class Command(enum.Enum): SET_COLOR_OPTION = 0 SET_DEPTH_OPTION = 1 START_RECORDING = 2 STOP_RECORDING = 3 RESTART_PUT = 4 class SingleRealsense(mp.Process): MAX_PATH_LENGTH = 4096 # linux path has a limit of 4096 bytes def __init__( self, shm_manager: SharedMemoryManager, serial_number, resolution=(1280,720), capture_fps=30, put_fps=None, put_downsample=True, enable_color=True, enable_depth=False, process_depth=False, enable_infrared=False, get_max_k=30, advanced_mode_config=None, transform: Optional[Callable[[Dict], Dict]] = None, vis_transform: Optional[Callable[[Dict], Dict]] = None, is_master=False, verbose=False ): super().__init__() if put_fps is None: put_fps = capture_fps # create ring buffer resolution = tuple(resolution) shape = resolution[::-1] examples = dict() if enable_color: examples['color'] = np.empty( shape=shape+(3,), dtype=np.uint8) if enable_depth: examples['depth'] = np.empty( shape=shape, dtype=np.uint16) if enable_infrared: examples['infrared'] = np.empty( shape=shape, dtype=np.uint8) examples['camera_capture_timestamp'] = 0.0 examples['camera_receive_timestamp'] = 0.0 examples['timestamp'] = 0.0 examples['step_idx'] = 0 ring_buffer = SharedMemoryRingBuffer.create_from_examples( shm_manager=shm_manager, examples=examples if transform is None else transform(dict(examples)), get_max_k=get_max_k, get_time_budget=0.2, put_desired_frequency=put_fps ) # create command queue examples = { 'cmd': Command.SET_COLOR_OPTION.value, 'option_enum': rs.option.exposure.value, 'option_value': 0.0, 'put_start_time': 0.0 } command_queue = SharedMemoryQueue.create_from_examples( shm_manager=shm_manager, examples=examples, buffer_size=128 ) # create shared array for intrinsics intrinsics_array = SharedNDArray.create_from_shape( mem_mgr=shm_manager, shape=(7,), dtype=np.float64) intrinsics_array.get()[:] = 0 # copied variables self.serial_number = serial_number self.resolution = resolution self.capture_fps = capture_fps self.put_fps = put_fps self.put_downsample = put_downsample self.enable_color = enable_color self.enable_depth = enable_depth self.enable_infrared = enable_infrared self.advanced_mode_config = advanced_mode_config self.transform = transform self.vis_transform = vis_transform self.process_depth = process_depth self.is_master = is_master self.verbose = verbose self.put_start_time = None # shared variables self.stop_event = mp.Event() self.ready_event = mp.Event() self.ring_buffer = ring_buffer self.command_queue = command_queue self.intrinsics_array = intrinsics_array @staticmethod def get_connected_devices_serial(): serials = list() for d in rs.context().devices: if d.get_info(rs.camera_info.name).lower() != 'platform camera': serial = d.get_info(rs.camera_info.serial_number) product_line = d.get_info(rs.camera_info.product_line) if product_line == 'D400': # only works with D400 series serials.append(serial) serials = sorted(serials) return serials # ========= context manager =========== def __enter__(self): self.start() return self def __exit__(self, exc_type, exc_val, exc_tb): self.stop() # ========= user API =========== def start(self, wait=True, put_start_time=None): self.put_start_time = put_start_time super().start() if wait: self.start_wait() def stop(self, wait=True): self.stop_event.set() if wait: self.end_wait() def start_wait(self): self.ready_event.wait() def end_wait(self): self.join() @property def is_ready(self): return self.ready_event.is_set() def get(self, k=None, out=None): if k is None: return self.ring_buffer.get(out=out) else: return self.ring_buffer.get_last_k(k, out=out) # ========= user API =========== def set_color_option(self, option: rs.option, value: float): self.command_queue.put({ 'cmd': Command.SET_COLOR_OPTION.value, 'option_enum': option.value, 'option_value': value }) def set_exposure(self, exposure=None, gain=None): """ exposure: (1, 10000) 100us unit. (0.1 ms, 1/10000s) gain: (0, 128) """ if exposure is None and gain is None: # auto exposure self.set_color_option(rs.option.enable_auto_exposure, 1.0) else: # manual exposure self.set_color_option(rs.option.enable_auto_exposure, 0.0) if exposure is not None: self.set_color_option(rs.option.exposure, exposure) if gain is not None: self.set_color_option(rs.option.gain, gain) def set_white_balance(self, white_balance=None): if white_balance is None: self.set_color_option(rs.option.enable_auto_white_balance, 1.0) else: self.set_color_option(rs.option.enable_auto_white_balance, 0.0) self.set_color_option(rs.option.white_balance, white_balance) def get_intrinsics(self): assert self.ready_event.is_set() fx, fy, ppx, ppy = self.intrinsics_array.get()[:4] mat = np.eye(3) mat[0,0] = fx mat[1,1] = fy mat[0,2] = ppx mat[1,2] = ppy return mat def get_depth_scale(self): assert self.ready_event.is_set() scale = self.intrinsics_array.get()[-1] return scale def depth_process(self, depth_frame): depth_to_disparity = rs.disparity_transform(True) disparity_to_depth = rs.disparity_transform(False) spatial = rs.spatial_filter() spatial.set_option(rs.option.filter_magnitude, 5) spatial.set_option(rs.option.filter_smooth_alpha, 0.75) spatial.set_option(rs.option.filter_smooth_delta, 1) spatial.set_option(rs.option.holes_fill, 1) temporal = rs.temporal_filter() temporal.set_option(rs.option.filter_smooth_alpha, 0.75) temporal.set_option(rs.option.filter_smooth_delta, 1) filtered_depth = depth_to_disparity.process(depth_frame) filtered_depth = spatial.process(filtered_depth) filtered_depth = temporal.process(filtered_depth) filtered_depth = disparity_to_depth.process(filtered_depth) return filtered_depth def restart_put(self, start_time): self.command_queue.put({ 'cmd': Command.RESTART_PUT.value, 'put_start_time': start_time }) # ========= interval API =========== def run(self): # limit threads threadpool_limits(1) cv2.setNumThreads(1) w, h = self.resolution fps = self.capture_fps align = rs.align(rs.stream.color) # Enable the streams from all the intel realsense devices rs_config = rs.config() if self.enable_color: rs_config.enable_stream(rs.stream.color, w, h, rs.format.bgr8, fps) if self.enable_depth: rs_config.enable_stream(rs.stream.depth, w, h, rs.format.z16, fps) if self.enable_infrared: rs_config.enable_stream(rs.stream.infrared, w, h, rs.format.y8, fps) def init_device(): rs_config.enable_device(self.serial_number) # start pipeline pipeline = rs.pipeline() pipeline_profile = pipeline.start(rs_config) self.pipeline = pipeline self.pipeline_profile = pipeline_profile # report global time # https://github.com/IntelRealSense/librealsense/pull/3909 d = self.pipeline_profile.get_device().first_color_sensor() d.set_option(rs.option.global_time_enabled, 1) # setup advanced mode if self.advanced_mode_config is not None: json_text = json.dumps(self.advanced_mode_config) device = self.pipeline_profile.get_device() advanced_mode = rs.rs400_advanced_mode(device) advanced_mode.load_json(json_text) # get color_stream = self.pipeline_profile.get_stream(rs.stream.color) intr = color_stream.as_video_stream_profile().get_intrinsics() order = ['fx', 'fy', 'ppx', 'ppy', 'height', 'width'] for i, name in enumerate(order): self.intrinsics_array.get()[i] = getattr(intr, name) if self.enable_depth: depth_sensor = self.pipeline_profile.get_device().first_depth_sensor() depth_scale = depth_sensor.get_depth_scale() self.intrinsics_array.get()[-1] = depth_scale # one-time setup (intrinsics etc, ignore for now) if self.verbose: print(f'[SingleRealsense {self.serial_number}] Main loop started.') try: init_device() # put frequency regulation put_idx = None put_start_time = self.put_start_time if put_start_time is None: put_start_time = time.time() iter_idx = 0 t_start = time.time() while not self.stop_event.is_set(): # wait for frames to come in frameset = None while frameset is None: try: frameset = self.pipeline.wait_for_frames() except RuntimeError as e: print(f'[SingleRealsense {self.serial_number}] Error: {e}. Ready state: {self.ready_event.is_set()}, Restarting device.') device = self.pipeline.get_active_profile().get_device() device.hardware_reset() self.pipeline.stop() init_device() continue receive_time = time.time() # align frames to color frameset = align.process(frameset) self.ring_buffer.ready_for_get = (receive_time - put_start_time >= 0) # grab data if self.verbose: grad_start_time = time.time() data = dict() data['camera_receive_timestamp'] = receive_time # realsense report in ms data['camera_capture_timestamp'] = frameset.get_timestamp() / 1000 if self.enable_color: # print(time.time()) color_frame = frameset.get_color_frame() data['color'] = np.asarray(color_frame.get_data()) t = color_frame.get_timestamp() / 1000 data['camera_capture_timestamp'] = t # print('device', time.time() - t) # print(color_frame.get_frame_timestamp_domain()) if self.enable_depth: depth_frame = frameset.get_depth_frame() if self.process_depth: data['depth'] = self.depth_process(depth_frame) data['depth'] = np.asarray(depth_frame.get_data()) # data['depth'] = np.asarray( # frameset.get_depth_frame().get_data()) if self.enable_infrared: data['infrared'] = np.asarray( frameset.get_infrared_frame().get_data()) if self.verbose: print(f'[SingleRealsense {self.serial_number}] Grab data time {time.time() - grad_start_time}') # apply transform if self.verbose: transform_start_time = time.time() put_data = data if self.transform is not None: put_data = self.transform(dict(data)) if self.verbose: print(f'[SingleRealsense {self.serial_number}] Transform time {time.time() - transform_start_time}') if self.verbose: put_data_start_time = time.time() if self.put_downsample: # put frequency regulation # print(self.serial_number, put_start_time, put_idx, len(global_idxs)) local_idxs, global_idxs, put_idx \ = get_accumulate_timestamp_idxs( timestamps=[receive_time], start_time=put_start_time, dt=1/self.put_fps, # this is non in first iteration # and then replaced with a concrete number next_global_idx=put_idx, # continue to pump frames even if not started. # start_time is simply used to align timestamps. allow_negative=True ) for step_idx in global_idxs: put_data['step_idx'] = step_idx # put_data['timestamp'] = put_start_time + step_idx / self.put_fps put_data['timestamp'] = receive_time # print(step_idx, data['timestamp']) self.ring_buffer.put(put_data, wait=False, serial_number=self.serial_number) else: step_idx = int((receive_time - put_start_time) * self.put_fps) print(step_idx, receive_time) put_data['step_idx'] = step_idx put_data['timestamp'] = receive_time self.ring_buffer.put(put_data, wait=False, serial_number=self.serial_number) if self.verbose: print(f'[SingleRealsense {self.serial_number}] Put data time {time.time() - put_data_start_time}', end=' ') print(f'with downsample for {len(global_idxs)}x' if self.put_downsample and len(global_idxs) > 1 else '') # signal ready if iter_idx == 0: self.ready_event.set() # perf t_end = time.time() duration = t_end - t_start frequency = np.round(1 / duration, 1) t_start = t_end if self.verbose: print(f'[SingleRealsense {self.serial_number}] FPS {frequency}') # fetch command from queue try: commands = self.command_queue.get_all() n_cmd = len(commands['cmd']) except Empty: n_cmd = 0 # execute commands for i in range(n_cmd): command = dict() for key, value in commands.items(): command[key] = value[i] cmd = command['cmd'] if cmd == Command.SET_COLOR_OPTION.value: sensor = self.pipeline_profile.get_device().first_color_sensor() option = rs.option(command['option_enum']) value = float(command['option_value']) sensor.set_option(option, value) # print('auto', sensor.get_option(rs.option.enable_auto_exposure)) # print('exposure', sensor.get_option(rs.option.exposure)) # print('gain', sensor.get_option(rs.option.gain)) elif cmd == Command.SET_DEPTH_OPTION.value: sensor = self.pipeline_profile.get_device().first_depth_sensor().set_option(rs.option.inter_cam_sync_mode, 1 if self.is_master else 2) option = rs.option(command['option_enum']) value = float(command['option_value']) sensor.set_option(option, value) elif cmd == Command.RESTART_PUT.value: put_idx = None put_start_time = command['put_start_time'] iter_idx += 1 finally: rs_config.disable_all_streams() self.ready_event.set() if self.verbose: print(f'[SingleRealsense {self.serial_number}] Exiting worker process.')