|
import logging |
|
from pathlib import Path |
|
import numpy as np |
|
import torch |
|
import torchvision.transforms as tfm |
|
import os |
|
import contextlib |
|
from yacs.config import CfgNode as CN |
|
import sys |
|
from typing import Union, Optional, Dict, List, Tuple, Any |
|
|
|
logger = logging.getLogger() |
|
logger.setLevel(31) |
|
|
|
|
|
def get_image_pairs_paths(inputs: Union[List[Path], List[str]]) -> List[Tuple[Path, Path]]: |
|
"""Get pairs of image paths from various input formats. |
|
|
|
Args: |
|
inputs: List of input paths (1 or 2 items) or single directory/file |
|
|
|
Returns: |
|
List of tuples containing image path pairs |
|
""" |
|
if len(inputs) > 2: |
|
raise ValueError(f"--input should be one or two paths, not {len(inputs)} paths like {inputs}") |
|
|
|
if len(inputs) == 2: |
|
|
|
if not Path(inputs[0]).is_file() or not Path(inputs[1]).is_file(): |
|
raise ValueError(f"If --input is two paths, it should be two images, not {inputs}") |
|
return [(Path(inputs[0]), Path(inputs[1]))] |
|
|
|
assert len(inputs) == 1 |
|
inputs = Path(inputs[0]) |
|
|
|
if not inputs.exists(): |
|
raise ValueError(f"{inputs} does not exist") |
|
|
|
if inputs.is_file(): |
|
|
|
with open(inputs) as file: |
|
lines = file.read().splitlines() |
|
pairs_of_paths = [line.strip().split(" ") for line in lines] |
|
for pair in pairs_of_paths: |
|
if len(pair) != 2: |
|
raise ValueError(f"{pair} should be a pair of paths") |
|
return [(Path(path0.strip()), Path(path1.strip())) for path0, path1 in pairs_of_paths] |
|
else: |
|
inner_files = sorted(inputs.glob("*")) |
|
if len(inner_files) == 2 and inner_files[0].is_file() and inner_files[1].is_file(): |
|
|
|
return [(inner_files[0], inner_files[1])] |
|
else: |
|
|
|
pairs_of_paths = [sorted(pair_dir.glob("*")) for pair_dir in inner_files] |
|
for pair in pairs_of_paths: |
|
if len(pair) != 2: |
|
raise ValueError(f"{pair} should be a pair of paths") |
|
return [(pair[0], pair[1]) for pair in pairs_of_paths] |
|
|
|
|
|
def to_numpy(x: Union[torch.Tensor, np.ndarray, Dict, List]) -> Union[np.ndarray, Dict, List]: |
|
"""Convert item or container of items to numpy. |
|
|
|
Args: |
|
x: Input to convert (Tensor, ndarray, dict or list) |
|
|
|
Returns: |
|
Numpy array or container with numpy arrays |
|
""" |
|
if isinstance(x, list): |
|
return [to_numpy(i) for i in x] |
|
if isinstance(x, dict): |
|
return {k: to_numpy(v) for k, v in x.items()} |
|
if isinstance(x, torch.Tensor): |
|
return x.cpu().numpy() |
|
return x |
|
|
|
|
|
def to_tensor(x: Union[np.ndarray, torch.Tensor], device: Optional[str] = None) -> torch.Tensor: |
|
"""Convert to tensor and place on device. |
|
|
|
Args: |
|
x: Item to convert to tensor |
|
device: Device to place tensor on |
|
|
|
Returns: |
|
Tensor with data from x on specified device |
|
""" |
|
if not isinstance(x, torch.Tensor): |
|
x = torch.from_numpy(x) |
|
return x.to(device) if device is not None else x |
|
|
|
|
|
def to_normalized_coords(pts: Union[np.ndarray, torch.Tensor], height: int, width: int) -> np.ndarray: |
|
"""Normalize keypoint coordinates from pixel space to [0,1]. |
|
|
|
Args: |
|
pts: Array of keypoints in shape (N, 2) (x,y order) |
|
height: Image height |
|
width: Image width |
|
|
|
Returns: |
|
Keypoints in normalized [0,1] coordinates |
|
""" |
|
assert pts.shape[-1] == 2, f"Input should be shape (N, 2), got {pts.shape}" |
|
pts = to_numpy(pts).astype(float) |
|
pts[:, 0] /= width |
|
pts[:, 1] /= height |
|
return pts |
|
|
|
|
|
def to_px_coords(pts: Union[np.ndarray, torch.Tensor], height: int, width: int) -> np.ndarray: |
|
"""Unnormalize keypoint coordinates from [0,1] to pixel space. |
|
|
|
Args: |
|
pts: Array of keypoints in shape (N, 2) (x,y order) |
|
height: Image height |
|
width: Image width |
|
|
|
Returns: |
|
Keypoints in pixel coordinates |
|
""" |
|
assert pts.shape[-1] == 2, f"Input should be shape (N, 2), got {pts.shape}" |
|
pts = to_numpy(pts) |
|
pts[:, 0] *= width |
|
pts[:, 1] *= height |
|
return pts |
|
|
|
|
|
def resize_to_divisible(img: torch.Tensor, divisible_by: int = 14) -> torch.Tensor: |
|
"""Resize to be divisible by a factor (useful for ViT models). |
|
|
|
Args: |
|
img: Image tensor in (*, H, W) order |
|
divisible_by: Factor to ensure divisibility |
|
|
|
Returns: |
|
Image tensor with divisible shape |
|
""" |
|
h, w = img.shape[-2:] |
|
divisible_h = round(h / divisible_by) * divisible_by |
|
divisible_w = round(w / divisible_by) * divisible_by |
|
return tfm.functional.resize(img, [divisible_h, divisible_w], antialias=True) |
|
|
|
|
|
def supress_stdout(func): |
|
"""Decorator to suppress stdout from a function.""" |
|
def wrapper(*args, **kwargs): |
|
with open(os.devnull, "w") as devnull: |
|
with contextlib.redirect_stdout(devnull): |
|
return func(*args, **kwargs) |
|
return wrapper |
|
|
|
|
|
def lower_config(yacs_cfg: Union[CN, Dict]) -> Dict: |
|
"""Convert YACS config to lowercase dictionary.""" |
|
if not isinstance(yacs_cfg, CN): |
|
return yacs_cfg |
|
return {k.lower(): lower_config(v) for k, v in yacs_cfg.items()} |
|
|
|
|
|
def load_module(module_name: str, module_path: Union[Path, str]) -> None: |
|
"""Load module from path into interpreter with given namespace. |
|
|
|
Args: |
|
module_name: Module name for importing |
|
module_path: Path to module (usually __init__.py) |
|
""" |
|
import importlib.util |
|
module_path = str(module_path) |
|
spec = importlib.util.spec_from_file_location(module_name, module_path) |
|
module = importlib.util.module_from_spec(spec) |
|
sys.modules[module_name] = module |
|
spec.loader.exec_module(module) |
|
|
|
|
|
def add_to_path(path: Union[str, Path], insert: Optional[int] = None) -> None: |
|
"""Add path to sys.path at specified position.""" |
|
path = str(path) |
|
if path in sys.path: |
|
sys.path.remove(path) |
|
if insert is None: |
|
sys.path.append(path) |
|
else: |
|
sys.path.insert(insert, path) |
|
|
|
|
|
def get_default_device() -> str: |
|
"""Get default device (cuda/mps/cpu) based on availability.""" |
|
if sys.platform == "darwin" and torch.backends.mps.is_available(): |
|
return "mps" |
|
return "cuda" if torch.cuda.is_available() else "cpu" |