|
from pathlib import Path |
|
from tqdm import tqdm |
|
import time |
|
import math |
|
import psutil |
|
import random |
|
import fcntl |
|
import os |
|
import subprocess |
|
import tempfile |
|
from random import randint |
|
from errno import EACCES |
|
|
|
import hydra |
|
from omegaconf import DictConfig |
|
import numpy as np |
|
import torch |
|
import pyvista as pv |
|
|
|
from pgnd.utils import get_root, mkdir |
|
from pgnd.ffmpeg import make_video |
|
|
|
root: Path = get_root(__file__) |
|
|
|
|
|
class Xvfb(object): |
|
|
|
|
|
|
|
MAX_DISPLAY = 2147483647 |
|
SLEEP_TIME_BEFORE_START = 0.1 |
|
|
|
def __init__( |
|
self, width=800, height=680, colordepth=24, |
|
tempdir=None, display=None, **kwargs): |
|
self.width = width |
|
self.height = height |
|
self.colordepth = colordepth |
|
self._tempdir = tempdir or tempfile.gettempdir() |
|
self.new_display = display |
|
|
|
if not self.xvfb_exists(): |
|
msg = ( |
|
'Can not find Xvfb. Please install it with:\n' |
|
' sudo apt install libgl1-mesa-glx xvfb') |
|
raise EnvironmentError(msg) |
|
|
|
self.extra_xvfb_args = ['-screen', '0', '{}x{}x{}'.format( |
|
self.width, self.height, self.colordepth)] |
|
|
|
for key, value in kwargs.items(): |
|
self.extra_xvfb_args += ['-{}'.format(key), value] |
|
|
|
if 'DISPLAY' in os.environ: |
|
self.orig_display = os.environ['DISPLAY'].split(':')[1] |
|
else: |
|
self.orig_display = None |
|
|
|
self.proc = None |
|
|
|
def __enter__(self): |
|
self.start() |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb): |
|
self.stop() |
|
|
|
def start(self): |
|
if self.new_display is not None: |
|
if not self._get_lock_for_display(self.new_display): |
|
raise ValueError(f'Could not lock display :{self.new_display}') |
|
else: |
|
self.new_display = self._get_next_unused_display() |
|
display_var = ':{}'.format(self.new_display) |
|
self.xvfb_cmd = ['Xvfb', display_var] + self.extra_xvfb_args |
|
with open(os.devnull, 'w') as fnull: |
|
self.proc = subprocess.Popen( |
|
self.xvfb_cmd, stdout=fnull, stderr=fnull, close_fds=True) |
|
|
|
time.sleep(self.__class__.SLEEP_TIME_BEFORE_START) |
|
ret_code = self.proc.poll() |
|
if ret_code is None: |
|
self._set_display_var(self.new_display) |
|
else: |
|
self._cleanup_lock_file() |
|
raise RuntimeError( |
|
f'Xvfb did not start ({ret_code}): {self.xvfb_cmd}') |
|
|
|
def stop(self): |
|
try: |
|
if self.orig_display is None: |
|
del os.environ['DISPLAY'] |
|
else: |
|
self._set_display_var(self.orig_display) |
|
if self.proc is not None: |
|
try: |
|
self.proc.terminate() |
|
self.proc.wait() |
|
except OSError: |
|
pass |
|
self.proc = None |
|
finally: |
|
self._cleanup_lock_file() |
|
|
|
def xvfb_exists(self): |
|
"""Check that Xvfb is available on PATH and is executable.""" |
|
paths = os.environ['PATH'].split(os.pathsep) |
|
return any(os.access(os.path.join(path, 'Xvfb'), os.X_OK) |
|
for path in paths) |
|
|
|
def _cleanup_lock_file(self): |
|
''' |
|
This should always get called if the process exits safely |
|
with Xvfb.stop() (whether called explicitly, or by __exit__). |
|
If you are ending up with /tmp/X123-lock files when Xvfb is not |
|
running, then Xvfb is not exiting cleanly. Always either call |
|
Xvfb.stop() in a finally block, or use Xvfb as a context manager |
|
to ensure lock files are purged. |
|
''' |
|
self._lock_display_file.close() |
|
try: |
|
os.remove(self._lock_display_file.name) |
|
except OSError: |
|
pass |
|
|
|
def _get_lock_for_display(self, display): |
|
''' |
|
In order to ensure multi-process safety, this method attempts |
|
to acquire an exclusive lock on a temporary file whose name |
|
contains the display number for Xvfb. |
|
''' |
|
tempfile_path = os.path.join(self._tempdir, '.X{0}-lock'.format(display)) |
|
try: |
|
self._lock_display_file = open(tempfile_path, 'w') |
|
except PermissionError as e: |
|
return False |
|
else: |
|
try: |
|
fcntl.flock(self._lock_display_file, |
|
fcntl.LOCK_EX | fcntl.LOCK_NB) |
|
except BlockingIOError: |
|
return False |
|
else: |
|
return True |
|
|
|
def _get_next_unused_display(self): |
|
''' |
|
Randomly chooses a display number and tries to acquire a lock for this number. |
|
If the lock could be acquired, returns this number, otherwise choses a new one. |
|
:return: free display number |
|
''' |
|
while True: |
|
rand = randint(1, self.__class__.MAX_DISPLAY) |
|
if self._get_lock_for_display(rand): |
|
return rand |
|
else: |
|
continue |
|
|
|
def _set_display_var(self, display): |
|
os.environ['DISPLAY'] = ':{}'.format(display) |
|
|
|
|
|
|
|
def get_camera_custom(center=(0.5, 0.3, 0.5), distance=3.4, azimuth=-125, elevation=30): |
|
target = np.array(center) |
|
theta = 90 + azimuth |
|
y = distance * math.sin(math.radians(elevation)) |
|
x = math.cos(math.radians(theta)) * distance * math.cos(math.radians(elevation)) |
|
z = math.sin(math.radians(theta)) * distance * math.cos(math.radians(elevation)) |
|
origin = target + np.array([x, y, z]) |
|
|
|
return [ |
|
origin.tolist(), |
|
target.tolist(), |
|
(0.0, 1.0, 0.0), |
|
] |
|
|