jamtur01's picture
Upload folder using huggingface_hub
9c6594c verified
import warnings
from typing import List, Union
from ..utils import (
add_end_docstrings,
is_torch_available,
is_vision_available,
logging,
requires_backends,
)
from .base import Pipeline, build_pipeline_init_args
if is_vision_available():
from PIL import Image
from ..image_utils import load_image
if is_torch_available():
from ..models.auto.modeling_auto import MODEL_FOR_DEPTH_ESTIMATION_MAPPING_NAMES
logger = logging.get_logger(__name__)
@add_end_docstrings(build_pipeline_init_args(has_image_processor=True))
class DepthEstimationPipeline(Pipeline):
"""
Depth estimation pipeline using any `AutoModelForDepthEstimation`. This pipeline predicts the depth of an image.
Example:
```python
>>> from transformers import pipeline
>>> depth_estimator = pipeline(task="depth-estimation", model="LiheYoung/depth-anything-base-hf")
>>> output = depth_estimator("http://images.cocodataset.org/val2017/000000039769.jpg")
>>> # This is a tensor with the values being the depth expressed in meters for each pixel
>>> output["predicted_depth"].shape
torch.Size([1, 384, 384])
```
Learn more about the basics of using a pipeline in the [pipeline tutorial](../pipeline_tutorial)
This depth estimation pipeline can currently be loaded from [`pipeline`] using the following task identifier:
`"depth-estimation"`.
See the list of available models on [huggingface.co/models](https://huggingface.co/models?filter=depth-estimation).
"""
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
requires_backends(self, "vision")
self.check_model_type(MODEL_FOR_DEPTH_ESTIMATION_MAPPING_NAMES)
def __call__(self, inputs: Union[str, List[str], "Image.Image", List["Image.Image"]] = None, **kwargs):
"""
Predict the depth(s) of the image(s) passed as inputs.
Args:
inputs (`str`, `List[str]`, `PIL.Image` or `List[PIL.Image]`):
The pipeline handles three types of images:
- A string containing a http link pointing to an image
- A string containing a local path to an image
- An image loaded in PIL directly
The pipeline accepts either a single image or a batch of images, which must then be passed as a string.
Images in a batch must all be in the same format: all as http links, all as local paths, or all as PIL
images.
parameters (`Dict`, *optional*):
A dictionary of argument names to parameter values, to control pipeline behaviour.
The only parameter available right now is `timeout`, which is the length of time, in seconds,
that the pipeline should wait before giving up on trying to download an image.
Return:
A dictionary or a list of dictionaries containing result. If the input is a single image, will return a
dictionary, if the input is a list of several images, will return a list of dictionaries corresponding to
the images.
The dictionaries contain the following keys:
- **predicted_depth** (`torch.Tensor`) -- The predicted depth by the model as a `torch.Tensor`.
- **depth** (`PIL.Image`) -- The predicted depth by the model as a `PIL.Image`.
"""
# After deprecation of this is completed, remove the default `None` value for `images`
if "images" in kwargs:
inputs = kwargs.pop("images")
if inputs is None:
raise ValueError("Cannot call the depth-estimation pipeline without an inputs argument!")
return super().__call__(inputs, **kwargs)
def _sanitize_parameters(self, timeout=None, parameters=None, **kwargs):
preprocess_params = {}
if timeout is not None:
warnings.warn(
"The `timeout` argument is deprecated and will be removed in version 5 of Transformers", FutureWarning
)
preprocess_params["timeout"] = timeout
if isinstance(parameters, dict) and "timeout" in parameters:
preprocess_params["timeout"] = parameters["timeout"]
return preprocess_params, {}, {}
def preprocess(self, image, timeout=None):
image = load_image(image, timeout)
model_inputs = self.image_processor(images=image, return_tensors=self.framework)
if self.framework == "pt":
model_inputs = model_inputs.to(self.torch_dtype)
model_inputs["target_size"] = image.size[::-1]
return model_inputs
def _forward(self, model_inputs):
target_size = model_inputs.pop("target_size")
model_outputs = self.model(**model_inputs)
model_outputs["target_size"] = target_size
return model_outputs
def postprocess(self, model_outputs):
outputs = self.image_processor.post_process_depth_estimation(
model_outputs,
# this acts as `source_sizes` for ZoeDepth and as `target_sizes` for the rest of the models so do *not*
# replace with `target_sizes = [model_outputs["target_size"]]`
[model_outputs["target_size"]],
)
formatted_outputs = []
for output in outputs:
depth = output["predicted_depth"].detach().cpu().numpy()
depth = (depth - depth.min()) / (depth.max() - depth.min())
depth = Image.fromarray((depth * 255).astype("uint8"))
formatted_outputs.append({"predicted_depth": output["predicted_depth"], "depth": depth})
return formatted_outputs[0] if len(outputs) == 1 else formatted_outputs