MMaDA / venv /lib /python3.11 /site-packages /datasets /download /streaming_download_manager.py
jamtur01's picture
Upload folder using huggingface_hub
9c6594c verified
import io
import os
from collections.abc import Iterable
from typing import Optional, Union
from ..utils.file_utils import ( # noqa: F401 # backward compatibility
SINGLE_FILE_COMPRESSION_PROTOCOLS,
ArchiveIterable,
FilesIterable,
_get_extraction_protocol,
_get_path_extension,
_prepare_path_and_storage_options,
is_relative_path,
url_or_path_join,
xbasename,
xdirname,
xet_parse,
xexists,
xgetsize,
xglob,
xgzip_open,
xisdir,
xisfile,
xjoin,
xlistdir,
xnumpy_load,
xopen,
xpandas_read_csv,
xpandas_read_excel,
xPath,
xpyarrow_parquet_read_table,
xrelpath,
xsio_loadmat,
xsplit,
xsplitext,
xwalk,
xxml_dom_minidom_parse,
)
from ..utils.logging import get_logger
from ..utils.py_utils import map_nested
from .download_config import DownloadConfig
logger = get_logger(__name__)
class StreamingDownloadManager:
"""
Download manager that uses the "::" separator to navigate through (possibly remote) compressed archives.
Contrary to the regular `DownloadManager`, the `download` and `extract` methods don't actually download nor extract
data, but they rather return the path or url that could be opened using the `xopen` function which extends the
built-in `open` function to stream data from remote files.
"""
is_streaming = True
def __init__(
self,
dataset_name: Optional[str] = None,
data_dir: Optional[str] = None,
download_config: Optional[DownloadConfig] = None,
base_path: Optional[str] = None,
):
self._dataset_name = dataset_name
self._data_dir = data_dir
self._base_path = base_path or os.path.abspath(".")
self.download_config = download_config or DownloadConfig()
self.downloaded_size = None
self.record_checksums = False
@property
def manual_dir(self):
return self._data_dir
def download(self, url_or_urls):
"""Normalize URL(s) of files to stream data from.
This is the lazy version of `DownloadManager.download` for streaming.
Args:
url_or_urls (`str` or `list` or `dict`):
URL(s) of files to stream data from. Each url is a `str`.
Returns:
url(s): (`str` or `list` or `dict`), URL(s) to stream data from matching the given input url_or_urls.
Example:
```py
>>> downloaded_files = dl_manager.download('https://storage.googleapis.com/seldon-datasets/sentence_polarity_v1/rt-polaritydata.tar.gz')
```
"""
url_or_urls = map_nested(self._download_single, url_or_urls, map_tuple=True)
return url_or_urls
def _download_single(self, urlpath: str) -> str:
urlpath = str(urlpath)
if is_relative_path(urlpath):
# append the relative path to the base_path
urlpath = url_or_path_join(self._base_path, urlpath)
return urlpath
def extract(self, url_or_urls):
"""Add extraction protocol for given url(s) for streaming.
This is the lazy version of `DownloadManager.extract` for streaming.
Args:
url_or_urls (`str` or `list` or `dict`):
URL(s) of files to stream data from. Each url is a `str`.
Returns:
url(s): (`str` or `list` or `dict`), URL(s) to stream data from matching the given input `url_or_urls`.
Example:
```py
>>> downloaded_files = dl_manager.download('https://storage.googleapis.com/seldon-datasets/sentence_polarity_v1/rt-polaritydata.tar.gz')
>>> extracted_files = dl_manager.extract(downloaded_files)
```
"""
urlpaths = map_nested(self._extract, url_or_urls, map_tuple=True)
return urlpaths
def _extract(self, urlpath: str) -> str:
urlpath = str(urlpath)
protocol = _get_extraction_protocol(urlpath, download_config=self.download_config)
# get inner file: zip://train-00000.json.gz::https://foo.bar/data.zip -> zip://train-00000.json.gz
path = urlpath.split("::")[0]
extension = _get_path_extension(path)
if extension in ["tgz", "tar"] or path.endswith((".tar.gz", ".tar.bz2", ".tar.xz")):
raise NotImplementedError(
f"Extraction protocol for TAR archives like '{urlpath}' is not implemented in streaming mode. "
f"Please use `dl_manager.iter_archive` instead.\n\n"
f"Example usage:\n\n"
f"\turl = dl_manager.download(url)\n"
f"\ttar_archive_iterator = dl_manager.iter_archive(url)\n\n"
f"\tfor filename, file in tar_archive_iterator:\n"
f"\t\t..."
)
if protocol is None:
# no extraction
return urlpath
elif protocol in SINGLE_FILE_COMPRESSION_PROTOCOLS:
# there is one single file which is the uncompressed file
inner_file = os.path.basename(urlpath.split("::")[0])
inner_file = inner_file[: inner_file.rindex(".")] if "." in inner_file else inner_file
return f"{protocol}://{inner_file}::{urlpath}"
else:
return f"{protocol}://::{urlpath}"
def download_and_extract(self, url_or_urls):
"""Prepare given `url_or_urls` for streaming (add extraction protocol).
This is the lazy version of `DownloadManager.download_and_extract` for streaming.
Is equivalent to:
```
urls = dl_manager.extract(dl_manager.download(url_or_urls))
```
Args:
url_or_urls (`str` or `list` or `dict`):
URL(s) to stream from data from. Each url is a `str`.
Returns:
url(s): (`str` or `list` or `dict`), URL(s) to stream data from matching the given input `url_or_urls`.
"""
return self.extract(self.download(url_or_urls))
def iter_archive(self, urlpath_or_buf: Union[str, io.BufferedReader]) -> Iterable[tuple]:
"""Iterate over files within an archive.
Args:
urlpath_or_buf (`str` or `io.BufferedReader`):
Archive path or archive binary file object.
Yields:
`tuple[str, io.BufferedReader]`:
2-tuple (path_within_archive, file_object).
File object is opened in binary mode.
Example:
```py
>>> archive = dl_manager.download('https://storage.googleapis.com/seldon-datasets/sentence_polarity_v1/rt-polaritydata.tar.gz')
>>> files = dl_manager.iter_archive(archive)
```
"""
if hasattr(urlpath_or_buf, "read"):
return ArchiveIterable.from_buf(urlpath_or_buf)
else:
return ArchiveIterable.from_urlpath(urlpath_or_buf, download_config=self.download_config)
def iter_files(self, urlpaths: Union[str, list[str]]) -> Iterable[str]:
"""Iterate over files.
Args:
urlpaths (`str` or `list` of `str`):
Root paths.
Yields:
str: File URL path.
Example:
```py
>>> files = dl_manager.download_and_extract('https://huggingface.co/datasets/beans/resolve/main/data/train.zip')
>>> files = dl_manager.iter_files(files)
```
"""
return FilesIterable.from_urlpaths(urlpaths, download_config=self.download_config)
def manage_extracted_files(self):
pass
def get_recorded_sizes_checksums(self):
pass