pgnd / src /experiments /real_world /camera /multi_realsense.py
kywind
update
f96995c
from typing import List, Optional, Union, Dict, Callable
import numbers
import time
from multiprocess.managers import SharedMemoryManager
import numpy as np
import pyrealsense2 as rs
from .single_realsense import SingleRealsense
class MultiRealsense:
def __init__(self,
serial_numbers: Optional[List[str]]=None,
shm_manager: Optional[SharedMemoryManager]=None,
resolution=(1280,720),
capture_fps=30,
put_fps=None,
put_downsample=True,
enable_color=True,
enable_depth=False,
process_depth=False,
enable_infrared=False,
get_max_k=30,
advanced_mode_config: Optional[Union[dict, List[dict]]]=None,
transform: Optional[Union[Callable[[Dict], Dict], List[Callable]]]=None,
vis_transform: Optional[Union[Callable[[Dict], Dict], List[Callable]]]=None,
verbose=False
):
if shm_manager is None:
shm_manager = SharedMemoryManager()
shm_manager.start()
if serial_numbers is None:
serial_numbers = SingleRealsense.get_connected_devices_serial()
n_cameras = len(serial_numbers)
advanced_mode_config = repeat_to_list(
advanced_mode_config, n_cameras, dict)
transform = repeat_to_list(
transform, n_cameras, Callable)
vis_transform = repeat_to_list(
vis_transform, n_cameras, Callable)
cameras = dict()
for i, serial in enumerate(serial_numbers):
cameras[serial] = SingleRealsense(
shm_manager=shm_manager,
serial_number=serial,
resolution=resolution,
capture_fps=capture_fps,
put_fps=put_fps,
put_downsample=put_downsample,
enable_color=enable_color,
enable_depth=enable_depth,
process_depth=process_depth,
enable_infrared=enable_infrared,
get_max_k=get_max_k,
advanced_mode_config=advanced_mode_config[i],
transform=transform[i],
vis_transform=vis_transform[i],
is_master=(i == 0),
verbose=verbose
)
self.cameras = cameras
self.serial_numbers = serial_numbers
self.shm_manager = shm_manager
self.resolution = resolution
self.capture_fps = capture_fps
def __enter__(self):
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop()
@property
def n_cameras(self):
return len(self.cameras)
@property
def is_ready(self):
is_ready = True
for camera in self.cameras.values():
if not camera.is_ready:
is_ready = False
return is_ready
def start(self, wait=True, put_start_time=None):
if put_start_time is None:
put_start_time = time.time()
for camera in self.cameras.values():
camera.start(wait=False, put_start_time=put_start_time)
if wait:
self.start_wait()
def stop(self, wait=True):
for camera in self.cameras.values():
camera.stop(wait=False)
if wait:
self.stop_wait()
def start_wait(self):
for camera in self.cameras.values():
print('processing camera {}'.format(camera.serial_number))
camera.start_wait()
def stop_wait(self):
for camera in self.cameras.values():
camera.join()
def get(self, k=None, index=None, out=None) -> Dict[int, Dict[str, np.ndarray]]:
"""
Return order T,H,W,C
{
0: {
'rgb': (T,H,W,C),
'timestamp': (T,)
},
1: ...
}
"""
if index is not None:
this_out = None
this_out = self.cameras[self.serial_numbers[index]].get(k=k, out=this_out)
return this_out
if out is None:
out = dict()
for i, camera in enumerate(self.cameras.values()):
this_out = None
if i in out:
this_out = out[i]
this_out = camera.get(k=k, out=this_out)
out[i] = this_out
return out
def set_color_option(self, option, value):
n_camera = len(self.cameras)
value = repeat_to_list(value, n_camera, numbers.Number)
for i, camera in enumerate(self.cameras.values()):
camera.set_color_option(option, value[i])
def set_exposure(self, exposure=None, gain=None):
"""150nit. (0.1 ms, 1/10000s)
gain: (0, 128)
"""
if exposure is None and gain is None:
# auto exposure
self.set_color_option(rs.option.enable_auto_exposure, 1.0)
else:
# manual exposure
self.set_color_option(rs.option.enable_auto_exposure, 0.0)
if exposure is not None:
self.set_color_option(rs.option.exposure, exposure)
if gain is not None:
self.set_color_option(rs.option.gain, gain)
def set_white_balance(self, white_balance=None):
if white_balance is None:
self.set_color_option(rs.option.enable_auto_white_balance, 1.0)
else:
self.set_color_option(rs.option.enable_auto_white_balance, 0.0)
self.set_color_option(rs.option.white_balance, white_balance)
def get_intrinsics(self):
return np.array([c.get_intrinsics() for c in self.cameras.values()])
def get_depth_scale(self):
return np.array([c.get_depth_scale() for c in self.cameras.values()])
def restart_put(self, start_time):
for camera in self.cameras.values():
camera.restart_put(start_time)
def repeat_to_list(x, n: int, cls):
if x is None:
x = [None] * n
if isinstance(x, cls):
x = [x] * n
assert len(x) == n
return x