File size: 5,702 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 |
import warnings
from typing import List, Union
from ..utils import (
add_end_docstrings,
is_torch_available,
is_vision_available,
logging,
requires_backends,
)
from .base import Pipeline, build_pipeline_init_args
if is_vision_available():
from PIL import Image
from ..image_utils import load_image
if is_torch_available():
from ..models.auto.modeling_auto import MODEL_FOR_DEPTH_ESTIMATION_MAPPING_NAMES
logger = logging.get_logger(__name__)
@add_end_docstrings(build_pipeline_init_args(has_image_processor=True))
class DepthEstimationPipeline(Pipeline):
"""
Depth estimation pipeline using any `AutoModelForDepthEstimation`. This pipeline predicts the depth of an image.
Example:
```python
>>> from transformers import pipeline
>>> depth_estimator = pipeline(task="depth-estimation", model="LiheYoung/depth-anything-base-hf")
>>> output = depth_estimator("http://images.cocodataset.org/val2017/000000039769.jpg")
>>> # This is a tensor with the values being the depth expressed in meters for each pixel
>>> output["predicted_depth"].shape
torch.Size([1, 384, 384])
```
Learn more about the basics of using a pipeline in the [pipeline tutorial](../pipeline_tutorial)
This depth estimation pipeline can currently be loaded from [`pipeline`] using the following task identifier:
`"depth-estimation"`.
See the list of available models on [huggingface.co/models](https://huggingface.co/models?filter=depth-estimation).
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
requires_backends(self, "vision")
self.check_model_type(MODEL_FOR_DEPTH_ESTIMATION_MAPPING_NAMES)
def __call__(self, inputs: Union[str, List[str], "Image.Image", List["Image.Image"]] = None, **kwargs):
"""
Predict the depth(s) of the image(s) passed as inputs.
Args:
inputs (`str`, `List[str]`, `PIL.Image` or `List[PIL.Image]`):
The pipeline handles three types of images:
- A string containing a http link pointing to an image
- A string containing a local path to an image
- An image loaded in PIL directly
The pipeline accepts either a single image or a batch of images, which must then be passed as a string.
Images in a batch must all be in the same format: all as http links, all as local paths, or all as PIL
images.
parameters (`Dict`, *optional*):
A dictionary of argument names to parameter values, to control pipeline behaviour.
The only parameter available right now is `timeout`, which is the length of time, in seconds,
that the pipeline should wait before giving up on trying to download an image.
Return:
A dictionary or a list of dictionaries containing result. If the input is a single image, will return a
dictionary, if the input is a list of several images, will return a list of dictionaries corresponding to
the images.
The dictionaries contain the following keys:
- **predicted_depth** (`torch.Tensor`) -- The predicted depth by the model as a `torch.Tensor`.
- **depth** (`PIL.Image`) -- The predicted depth by the model as a `PIL.Image`.
"""
# After deprecation of this is completed, remove the default `None` value for `images`
if "images" in kwargs:
inputs = kwargs.pop("images")
if inputs is None:
raise ValueError("Cannot call the depth-estimation pipeline without an inputs argument!")
return super().__call__(inputs, **kwargs)
def _sanitize_parameters(self, timeout=None, parameters=None, **kwargs):
preprocess_params = {}
if timeout is not None:
warnings.warn(
"The `timeout` argument is deprecated and will be removed in version 5 of Transformers", FutureWarning
)
preprocess_params["timeout"] = timeout
if isinstance(parameters, dict) and "timeout" in parameters:
preprocess_params["timeout"] = parameters["timeout"]
return preprocess_params, {}, {}
def preprocess(self, image, timeout=None):
image = load_image(image, timeout)
model_inputs = self.image_processor(images=image, return_tensors=self.framework)
if self.framework == "pt":
model_inputs = model_inputs.to(self.torch_dtype)
model_inputs["target_size"] = image.size[::-1]
return model_inputs
def _forward(self, model_inputs):
target_size = model_inputs.pop("target_size")
model_outputs = self.model(**model_inputs)
model_outputs["target_size"] = target_size
return model_outputs
def postprocess(self, model_outputs):
outputs = self.image_processor.post_process_depth_estimation(
model_outputs,
# this acts as `source_sizes` for ZoeDepth and as `target_sizes` for the rest of the models so do *not*
# replace with `target_sizes = [model_outputs["target_size"]]`
[model_outputs["target_size"]],
)
formatted_outputs = []
for output in outputs:
depth = output["predicted_depth"].detach().cpu().numpy()
depth = (depth - depth.min()) / (depth.max() - depth.min())
depth = Image.fromarray((depth * 255).astype("uint8"))
formatted_outputs.append({"predicted_depth": output["predicted_depth"], "depth": depth})
return formatted_outputs[0] if len(outputs) == 1 else formatted_outputs
|