|
import os |
|
from dataclasses import dataclass, field |
|
from typing import TYPE_CHECKING, Any, ClassVar, Optional, TypedDict, Union |
|
|
|
import numpy as np |
|
import pyarrow as pa |
|
|
|
from .. import config |
|
from ..download.download_config import DownloadConfig |
|
from ..table import array_cast |
|
from ..utils.file_utils import is_local_path, xopen |
|
from ..utils.py_utils import string_to_dict |
|
|
|
|
|
if TYPE_CHECKING: |
|
from torchvision.io import VideoReader |
|
|
|
from .features import FeatureType |
|
|
|
|
|
class Example(TypedDict): |
|
path: Optional[str] |
|
bytes: Optional[bytes] |
|
|
|
|
|
@dataclass |
|
class Video: |
|
""" |
|
**Experimental.** Video [`Feature`] to read video data from a video file. |
|
|
|
Input: The Video feature accepts as input: |
|
- A `str`: Absolute path to the video file (i.e. random access is allowed). |
|
- A `dict` with the keys: |
|
|
|
- `path`: String with relative path of the video file in a dataset repository. |
|
- `bytes`: Bytes of the video file. |
|
|
|
This is useful for archived files with sequential access. |
|
|
|
- A `torchvision.io.VideoReader`: torchvision video reader object. |
|
|
|
Args: |
|
mode (`str`, *optional*): |
|
The mode to convert the video to. If `None`, the native mode of the video is used. |
|
decode (`bool`, defaults to `True`): |
|
Whether to decode the video data. If `False`, |
|
returns the underlying dictionary in the format `{"path": video_path, "bytes": video_bytes}`. |
|
|
|
Examples: |
|
|
|
```py |
|
>>> from datasets import Dataset, Video |
|
>>> ds = Dataset.from_dict({"video":["path/to/Screen Recording.mov"]}).cast_column("video", Video()) |
|
>>> ds.features["video"] |
|
Video(decode=True, id=None) |
|
>>> ds[0]["video"] |
|
<torchvision.io.video_reader.VideoReader object at 0x325b1aae0> |
|
>>> ds = ds.cast_column('video', Video(decode=False)) |
|
{'bytes': None, |
|
'path': 'path/to/Screen Recording.mov'} |
|
``` |
|
""" |
|
|
|
decode: bool = True |
|
id: Optional[str] = None |
|
|
|
dtype: ClassVar[str] = "torchvision.io.VideoReader" |
|
pa_type: ClassVar[Any] = pa.struct({"bytes": pa.binary(), "path": pa.string()}) |
|
_type: str = field(default="Video", init=False, repr=False) |
|
|
|
def __call__(self): |
|
return self.pa_type |
|
|
|
def encode_example(self, value: Union[str, bytes, bytearray, Example, np.ndarray, "VideoReader"]) -> Example: |
|
"""Encode example into a format for Arrow. |
|
|
|
Args: |
|
value (`str`, `np.ndarray`, `VideoReader` or `dict`): |
|
Data passed as input to Video feature. |
|
|
|
Returns: |
|
`dict` with "path" and "bytes" fields |
|
""" |
|
if config.TORCHVISION_AVAILABLE: |
|
from torchvision.io import VideoReader |
|
|
|
else: |
|
VideoReader = None |
|
|
|
if isinstance(value, list): |
|
value = np.array(value) |
|
|
|
if isinstance(value, str): |
|
return {"path": value, "bytes": None} |
|
elif isinstance(value, (bytes, bytearray)): |
|
return {"path": None, "bytes": value} |
|
elif isinstance(value, np.ndarray): |
|
|
|
return encode_np_array(value) |
|
elif VideoReader is not None and isinstance(value, VideoReader): |
|
|
|
return encode_torchvision_video(value) |
|
elif isinstance(value, dict): |
|
path, bytes_ = value.get("path"), value.get("bytes") |
|
if path is not None and os.path.isfile(path): |
|
|
|
return {"bytes": None, "path": path} |
|
elif bytes_ is not None or path is not None: |
|
|
|
return {"bytes": bytes_, "path": path} |
|
else: |
|
raise ValueError( |
|
f"A video sample should have one of 'path' or 'bytes' but they are missing or None in {value}." |
|
) |
|
else: |
|
raise TypeError(f"Unsupported encode_example type: {type(value)}") |
|
|
|
def decode_example( |
|
self, |
|
value: Union[str, Example], |
|
token_per_repo_id: Optional[dict[str, Union[bool, str]]] = None, |
|
) -> "VideoReader": |
|
"""Decode example video file into video data. |
|
|
|
Args: |
|
value (`str` or `dict`): |
|
A string with the absolute video file path, a dictionary with |
|
keys: |
|
|
|
- `path`: String with absolute or relative video file path. |
|
- `bytes`: The bytes of the video file. |
|
token_per_repo_id (`dict`, *optional*): |
|
To access and decode |
|
video files from private repositories on the Hub, you can pass |
|
a dictionary repo_id (`str`) -> token (`bool` or `str`). |
|
|
|
Returns: |
|
`torchvision.io.VideoReader` |
|
""" |
|
if not self.decode: |
|
raise RuntimeError("Decoding is disabled for this feature. Please use Video(decode=True) instead.") |
|
|
|
if config.TORCHVISION_AVAILABLE: |
|
from torchvision.io import VideoReader |
|
|
|
else: |
|
raise ImportError("To support decoding videos, please install 'torchvision'.") |
|
|
|
if token_per_repo_id is None: |
|
token_per_repo_id = {} |
|
|
|
if isinstance(value, str): |
|
path, bytes_ = value, None |
|
else: |
|
path, bytes_ = value["path"], value["bytes"] |
|
|
|
if bytes_ is None: |
|
if path is None: |
|
raise ValueError(f"A video should have one of 'path' or 'bytes' but both are None in {value}.") |
|
elif is_local_path(path): |
|
video = VideoReader(path) |
|
else: |
|
video = hf_video_reader(path, token_per_repo_id=token_per_repo_id) |
|
else: |
|
video = VideoReader(bytes_) |
|
video._hf_encoded = {"path": path, "bytes": bytes_} |
|
return video |
|
|
|
def flatten(self) -> Union["FeatureType", dict[str, "FeatureType"]]: |
|
"""If in the decodable state, return the feature itself, otherwise flatten the feature into a dictionary.""" |
|
from .features import Value |
|
|
|
return ( |
|
self |
|
if self.decode |
|
else { |
|
"bytes": Value("binary"), |
|
"path": Value("string"), |
|
} |
|
) |
|
|
|
def cast_storage(self, storage: Union[pa.StringArray, pa.StructArray, pa.ListArray]) -> pa.StructArray: |
|
"""Cast an Arrow array to the Video arrow storage type. |
|
The Arrow types that can be converted to the Video pyarrow storage type are: |
|
|
|
- `pa.string()` - it must contain the "path" data |
|
- `pa.binary()` - it must contain the video bytes |
|
- `pa.struct({"bytes": pa.binary()})` |
|
- `pa.struct({"path": pa.string()})` |
|
- `pa.struct({"bytes": pa.binary(), "path": pa.string()})` - order doesn't matter |
|
- `pa.list(*)` - it must contain the video array data |
|
|
|
Args: |
|
storage (`Union[pa.StringArray, pa.StructArray, pa.ListArray]`): |
|
PyArrow array to cast. |
|
|
|
Returns: |
|
`pa.StructArray`: Array in the Video arrow storage type, that is |
|
`pa.struct({"bytes": pa.binary(), "path": pa.string()})`. |
|
""" |
|
if pa.types.is_string(storage.type): |
|
bytes_array = pa.array([None] * len(storage), type=pa.binary()) |
|
storage = pa.StructArray.from_arrays([bytes_array, storage], ["bytes", "path"], mask=storage.is_null()) |
|
elif pa.types.is_binary(storage.type): |
|
path_array = pa.array([None] * len(storage), type=pa.string()) |
|
storage = pa.StructArray.from_arrays([storage, path_array], ["bytes", "path"], mask=storage.is_null()) |
|
elif pa.types.is_struct(storage.type): |
|
if storage.type.get_field_index("bytes") >= 0: |
|
bytes_array = storage.field("bytes") |
|
else: |
|
bytes_array = pa.array([None] * len(storage), type=pa.binary()) |
|
if storage.type.get_field_index("path") >= 0: |
|
path_array = storage.field("path") |
|
else: |
|
path_array = pa.array([None] * len(storage), type=pa.string()) |
|
storage = pa.StructArray.from_arrays([bytes_array, path_array], ["bytes", "path"], mask=storage.is_null()) |
|
elif pa.types.is_list(storage.type): |
|
bytes_array = pa.array( |
|
[encode_np_array(np.array(arr))["bytes"] if arr is not None else None for arr in storage.to_pylist()], |
|
type=pa.binary(), |
|
) |
|
path_array = pa.array([None] * len(storage), type=pa.string()) |
|
storage = pa.StructArray.from_arrays( |
|
[bytes_array, path_array], ["bytes", "path"], mask=bytes_array.is_null() |
|
) |
|
return array_cast(storage, self.pa_type) |
|
|
|
|
|
def video_to_bytes(video: "VideoReader") -> bytes: |
|
"""Convert a torchvision Video object to bytes using native compression if possible""" |
|
raise NotImplementedError() |
|
|
|
|
|
def encode_torchvision_video(video: "VideoReader") -> Example: |
|
if hasattr(video, "_hf_encoded"): |
|
return video._hf_encoded |
|
else: |
|
raise NotImplementedError( |
|
"Encoding a VideoReader that doesn't come from datasets.Video.decode() is not implemented" |
|
) |
|
|
|
|
|
def encode_np_array(array: np.ndarray) -> Example: |
|
raise NotImplementedError() |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def hf_video_reader( |
|
path: str, token_per_repo_id: Optional[dict[str, Union[bool, str]]] = None, stream: str = "video" |
|
) -> "VideoReader": |
|
import av |
|
from torchvision import get_video_backend |
|
from torchvision.io import VideoReader |
|
|
|
|
|
if token_per_repo_id is None: |
|
token_per_repo_id = {} |
|
source_url = path.split("::")[-1] |
|
pattern = config.HUB_DATASETS_URL if source_url.startswith(config.HF_ENDPOINT) else config.HUB_DATASETS_HFFS_URL |
|
source_url_fields = string_to_dict(source_url, pattern) |
|
token = token_per_repo_id.get(source_url_fields["repo_id"]) if source_url_fields is not None else None |
|
download_config = DownloadConfig(token=token) |
|
f = xopen(path, "rb", download_config=download_config) |
|
|
|
|
|
vr = object.__new__(VideoReader) |
|
vr.backend = get_video_backend() |
|
if vr.backend != "pyav": |
|
raise RuntimeError(f"Unsupported video backend for VideoReader from HF files: {vr.backend}") |
|
vr.container = av.open(f, metadata_errors="ignore") |
|
stream_type = stream.split(":")[0] |
|
stream_id = 0 if len(stream.split(":")) == 1 else int(stream.split(":")[1]) |
|
vr.pyav_stream = {stream_type: stream_id} |
|
vr._c = vr.container.decode(**vr.pyav_stream) |
|
return vr |
|
|