|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Arrow ArrowReader.""" |
|
|
|
import copy |
|
import math |
|
import os |
|
import re |
|
from dataclasses import dataclass |
|
from functools import partial |
|
from typing import TYPE_CHECKING, Optional, Union |
|
|
|
import pyarrow as pa |
|
import pyarrow.parquet as pq |
|
from tqdm.contrib.concurrent import thread_map |
|
|
|
from .download.download_config import DownloadConfig |
|
from .naming import _split_re, filenames_for_dataset_split |
|
from .table import InMemoryTable, MemoryMappedTable, Table, concat_tables |
|
from .utils import logging |
|
from .utils import tqdm as hf_tqdm |
|
|
|
|
|
if TYPE_CHECKING: |
|
from .info import DatasetInfo |
|
from .splits import Split, SplitInfo |
|
|
|
|
|
logger = logging.get_logger(__name__) |
|
|
|
HF_GCP_BASE_URL = "https://storage.googleapis.com/huggingface-nlp/cache/datasets" |
|
|
|
_SUB_SPEC_RE = re.compile( |
|
rf""" |
|
^ |
|
(?P<split>{_split_re[1:-1]}) |
|
(\[ |
|
((?P<from>-?[\d_]+) |
|
(?P<from_pct>%)?)? |
|
: |
|
((?P<to>-?[\d_]+) |
|
(?P<to_pct>%)?)? |
|
\])?(\((?P<rounding>[^\)]*)\))? |
|
$ |
|
""", |
|
re.X, |
|
) |
|
|
|
_ADDITION_SEP_RE = re.compile(r"\s*\+\s*") |
|
|
|
|
|
class DatasetNotOnHfGcsError(ConnectionError): |
|
"""When you can't get the dataset from the Hf google cloud storage""" |
|
|
|
pass |
|
|
|
|
|
class MissingFilesOnHfGcsError(ConnectionError): |
|
"""When some files are missing on the Hf oogle cloud storage""" |
|
|
|
pass |
|
|
|
|
|
@dataclass(frozen=True) |
|
class FileInstructions: |
|
"""The file instructions associated with a split ReadInstruction. |
|
|
|
Attributes: |
|
num_examples: `int`, The total number of examples |
|
file_instructions: List[dict(filename, skip, take)], the files information. |
|
The filenames contains the relative path, not absolute. |
|
skip/take indicates which example read in the file: `ds.slice(skip, take)` |
|
""" |
|
|
|
num_examples: int |
|
file_instructions: list[dict] |
|
|
|
|
|
def make_file_instructions( |
|
name: str, |
|
split_infos: list["SplitInfo"], |
|
instruction: Union[str, "ReadInstruction"], |
|
filetype_suffix: Optional[str] = None, |
|
prefix_path: Optional[str] = None, |
|
) -> FileInstructions: |
|
"""Returns instructions of the split dict. |
|
|
|
Args: |
|
name (`str`): Name of the dataset. |
|
split_infos (`list` of `[SplitInfo]`): Dataset splits information. |
|
instruction ([`ReadInstruction`] or `str`): Reading instruction for a dataset. |
|
filetype_suffix (`str`, *optional*): Suffix of dataset files, e.g. 'arrow' or 'parquet'. |
|
prefix_path (`str`, *optional*): Prefix of dataset files, e.g. directory name. |
|
|
|
Returns: |
|
[`FileInstructions`] |
|
""" |
|
if not isinstance(name, str): |
|
raise TypeError(f"Expected str 'name', but got: {type(name).__name__}") |
|
elif not name: |
|
raise ValueError("Expected non-empty str 'name'") |
|
name2len = {info.name: info.num_examples for info in split_infos} |
|
name2shard_lengths = {info.name: info.shard_lengths for info in split_infos} |
|
name2filenames = { |
|
info.name: filenames_for_dataset_split( |
|
path=prefix_path, |
|
dataset_name=name, |
|
split=info.name, |
|
filetype_suffix=filetype_suffix, |
|
shard_lengths=name2shard_lengths[info.name], |
|
) |
|
for info in split_infos |
|
} |
|
if not isinstance(instruction, ReadInstruction): |
|
instruction = ReadInstruction.from_spec(instruction) |
|
|
|
absolute_instructions = instruction.to_absolute(name2len) |
|
|
|
|
|
file_instructions = [] |
|
num_examples = 0 |
|
for abs_instr in absolute_instructions: |
|
split_length = name2len[abs_instr.splitname] |
|
filenames = name2filenames[abs_instr.splitname] |
|
shard_lengths = name2shard_lengths[abs_instr.splitname] |
|
from_ = 0 if abs_instr.from_ is None else abs_instr.from_ |
|
to = split_length if abs_instr.to is None else abs_instr.to |
|
if shard_lengths is None: |
|
for filename in filenames: |
|
take = to - from_ |
|
if take == 0: |
|
continue |
|
num_examples += take |
|
file_instructions.append({"filename": filename, "skip": from_, "take": take}) |
|
else: |
|
index_start = 0 |
|
index_end = 0 |
|
for filename, shard_length in zip(filenames, shard_lengths): |
|
index_end += shard_length |
|
if from_ < index_end and to > index_start: |
|
skip = from_ - index_start if from_ > index_start else 0 |
|
take = to - index_start - skip if to < index_end else -1 |
|
if take == 0: |
|
continue |
|
file_instructions.append({"filename": filename, "skip": skip, "take": take}) |
|
num_examples += shard_length - skip if take == -1 else take |
|
index_start += shard_length |
|
return FileInstructions( |
|
num_examples=num_examples, |
|
file_instructions=file_instructions, |
|
) |
|
|
|
|
|
class BaseReader: |
|
""" |
|
Build a Dataset object out of Instruction instance(s). |
|
""" |
|
|
|
def __init__(self, path: str, info: Optional["DatasetInfo"]): |
|
"""Initializes ArrowReader. |
|
|
|
Args: |
|
path (str): path where tfrecords are stored. |
|
info (DatasetInfo): info about the dataset. |
|
""" |
|
self._path: str = path |
|
self._info: Optional["DatasetInfo"] = info |
|
self._filetype_suffix: Optional[str] = None |
|
|
|
def _get_table_from_filename(self, filename_skip_take, in_memory=False) -> Table: |
|
"""Returns a Dataset instance from given (filename, skip, take).""" |
|
raise NotImplementedError |
|
|
|
def _read_files(self, files, in_memory=False) -> Table: |
|
"""Returns Dataset for given file instructions. |
|
|
|
Args: |
|
files: List[dict(filename, skip, take)], the files information. |
|
The filenames contain the absolute path, not relative. |
|
skip/take indicates which example read in the file: `ds.slice(skip, take)` |
|
in_memory (bool, default False): Whether to copy the data in-memory. |
|
""" |
|
if len(files) == 0 or not all(isinstance(f, dict) for f in files): |
|
raise ValueError("please provide valid file informations") |
|
files = copy.deepcopy(files) |
|
for f in files: |
|
f["filename"] = os.path.join(self._path, f["filename"]) |
|
|
|
pa_tables = thread_map( |
|
partial(self._get_table_from_filename, in_memory=in_memory), |
|
files, |
|
tqdm_class=hf_tqdm, |
|
desc="Loading dataset shards", |
|
|
|
disable=len(files) <= 16 or None, |
|
) |
|
pa_tables = [t for t in pa_tables if len(t) > 0] |
|
if not pa_tables and (self._info is None or self._info.features is None): |
|
raise ValueError( |
|
"Tried to read an empty table. Please specify at least info.features to create an empty table with the right type." |
|
) |
|
pa_tables = pa_tables or [InMemoryTable.from_batches([], schema=pa.schema(self._info.features.type))] |
|
pa_table = concat_tables(pa_tables) if len(pa_tables) != 1 else pa_tables[0] |
|
return pa_table |
|
|
|
def get_file_instructions(self, name, instruction, split_infos): |
|
"""Return list of dict {'filename': str, 'skip': int, 'take': int}""" |
|
file_instructions = make_file_instructions( |
|
name, split_infos, instruction, filetype_suffix=self._filetype_suffix, prefix_path=self._path |
|
) |
|
files = file_instructions.file_instructions |
|
return files |
|
|
|
def read( |
|
self, |
|
name, |
|
instructions, |
|
split_infos, |
|
in_memory=False, |
|
): |
|
"""Returns Dataset instance(s). |
|
|
|
Args: |
|
name (str): name of the dataset. |
|
instructions (ReadInstruction): instructions to read. |
|
Instruction can be string and will then be passed to the Instruction |
|
constructor as it. |
|
split_infos (list of SplitInfo proto): the available splits for dataset. |
|
in_memory (bool, default False): Whether to copy the data in-memory. |
|
|
|
Returns: |
|
kwargs to build a single Dataset instance. |
|
""" |
|
|
|
files = self.get_file_instructions(name, instructions, split_infos) |
|
if not files: |
|
msg = f'Instruction "{instructions}" corresponds to no data!' |
|
raise ValueError(msg) |
|
return self.read_files(files=files, original_instructions=instructions, in_memory=in_memory) |
|
|
|
def read_files( |
|
self, |
|
files: list[dict], |
|
original_instructions: Union[None, "ReadInstruction", "Split"] = None, |
|
in_memory=False, |
|
): |
|
"""Returns single Dataset instance for the set of file instructions. |
|
|
|
Args: |
|
files: List[dict(filename, skip, take)], the files information. |
|
The filenames contains the relative path, not absolute. |
|
skip/take indicates which example read in the file: `ds.skip().take()` |
|
original_instructions: store the original instructions used to build the dataset split in the dataset. |
|
in_memory (bool, default False): Whether to copy the data in-memory. |
|
|
|
Returns: |
|
kwargs to build a Dataset instance. |
|
""" |
|
|
|
pa_table = self._read_files(files, in_memory=in_memory) |
|
|
|
if original_instructions is not None: |
|
from .splits import Split |
|
|
|
split = Split(str(original_instructions)) |
|
else: |
|
split = None |
|
dataset_kwargs = {"arrow_table": pa_table, "info": self._info, "split": split} |
|
return dataset_kwargs |
|
|
|
|
|
class ArrowReader(BaseReader): |
|
""" |
|
Build a Dataset object out of Instruction instance(s). |
|
This Reader uses either memory mapping or file descriptors (in-memory) on arrow files. |
|
""" |
|
|
|
def __init__(self, path: str, info: Optional["DatasetInfo"]): |
|
"""Initializes ArrowReader. |
|
|
|
Args: |
|
path (str): path where Arrow files are stored. |
|
info (DatasetInfo): info about the dataset. |
|
""" |
|
super().__init__(path, info) |
|
self._filetype_suffix = "arrow" |
|
|
|
def _get_table_from_filename(self, filename_skip_take, in_memory=False) -> Table: |
|
"""Returns a Dataset instance from given (filename, skip, take).""" |
|
filename, skip, take = ( |
|
filename_skip_take["filename"], |
|
filename_skip_take["skip"] if "skip" in filename_skip_take else None, |
|
filename_skip_take["take"] if "take" in filename_skip_take else None, |
|
) |
|
table = ArrowReader.read_table(filename, in_memory=in_memory) |
|
if take == -1: |
|
take = len(table) - skip |
|
|
|
if skip is not None and take is not None and not (skip == 0 and take == len(table)): |
|
table = table.slice(skip, take) |
|
return table |
|
|
|
@staticmethod |
|
def read_table(filename, in_memory=False) -> Table: |
|
""" |
|
Read table from file. |
|
|
|
Args: |
|
filename (str): File name of the table. |
|
in_memory (bool, default=False): Whether to copy the data in-memory. |
|
|
|
Returns: |
|
pyarrow.Table |
|
""" |
|
table_cls = InMemoryTable if in_memory else MemoryMappedTable |
|
return table_cls.from_file(filename) |
|
|
|
|
|
class ParquetReader(BaseReader): |
|
""" |
|
Build a Dataset object out of Instruction instance(s). |
|
This Reader uses memory mapping on parquet files. |
|
""" |
|
|
|
def __init__(self, path: str, info: Optional["DatasetInfo"]): |
|
"""Initializes ParquetReader. |
|
|
|
Args: |
|
path (str): path where tfrecords are stored. |
|
info (DatasetInfo): info about the dataset. |
|
""" |
|
super().__init__(path, info) |
|
self._filetype_suffix = "parquet" |
|
|
|
def _get_table_from_filename(self, filename_skip_take, **kwargs): |
|
"""Returns a Dataset instance from given (filename, skip, take).""" |
|
filename, skip, take = ( |
|
filename_skip_take["filename"], |
|
filename_skip_take["skip"] if "skip" in filename_skip_take else None, |
|
filename_skip_take["take"] if "take" in filename_skip_take else None, |
|
) |
|
|
|
pa_table = pq.read_table(filename, memory_map=True) |
|
|
|
if skip is not None and take is not None and not (skip == 0 and take == len(pa_table)): |
|
pa_table = pa_table.slice(skip, take) |
|
return pa_table |
|
|
|
|
|
@dataclass(frozen=True) |
|
class _AbsoluteInstruction: |
|
"""A machine friendly slice: defined absolute positive boundaries.""" |
|
|
|
splitname: str |
|
from_: int |
|
to: int |
|
|
|
|
|
@dataclass(frozen=True) |
|
class _RelativeInstruction: |
|
"""Represents a single parsed slicing instruction, can use % and negatives.""" |
|
|
|
splitname: str |
|
from_: Optional[int] = None |
|
to: Optional[int] = None |
|
unit: Optional[str] = None |
|
rounding: Optional[str] = None |
|
|
|
def __post_init__(self): |
|
if self.unit is not None and self.unit not in ["%", "abs"]: |
|
raise ValueError("unit must be either % or abs") |
|
if self.rounding is not None and self.rounding not in ["closest", "pct1_dropremainder"]: |
|
raise ValueError("rounding must be either closest or pct1_dropremainder") |
|
if self.unit != "%" and self.rounding is not None: |
|
raise ValueError("It is forbidden to specify rounding if not using percent slicing.") |
|
if self.unit == "%" and self.from_ is not None and abs(self.from_) > 100: |
|
raise ValueError("Percent slice boundaries must be > -100 and < 100.") |
|
if self.unit == "%" and self.to is not None and abs(self.to) > 100: |
|
raise ValueError("Percent slice boundaries must be > -100 and < 100.") |
|
|
|
self.__dict__["rounding"] = "closest" if self.rounding is None and self.unit == "%" else self.rounding |
|
|
|
|
|
def _str_to_read_instruction(spec): |
|
"""Returns ReadInstruction for given string.""" |
|
res = _SUB_SPEC_RE.match(spec) |
|
if not res: |
|
raise ValueError(f"Unrecognized instruction format: {spec}") |
|
unit = "%" if res.group("from_pct") or res.group("to_pct") else "abs" |
|
return ReadInstruction( |
|
split_name=res.group("split"), |
|
rounding=res.group("rounding"), |
|
from_=int(res.group("from")) if res.group("from") else None, |
|
to=int(res.group("to")) if res.group("to") else None, |
|
unit=unit, |
|
) |
|
|
|
|
|
def _pct_to_abs_pct1(boundary, num_examples): |
|
|
|
if num_examples < 100: |
|
msg = ( |
|
'Using "pct1_dropremainder" rounding on a split with less than 100 ' |
|
"elements is forbidden: it always results in an empty dataset." |
|
) |
|
raise ValueError(msg) |
|
return boundary * math.trunc(num_examples / 100.0) |
|
|
|
|
|
def _pct_to_abs_closest(boundary, num_examples): |
|
return int(round(boundary * num_examples / 100.0)) |
|
|
|
|
|
def _rel_to_abs_instr(rel_instr, name2len): |
|
"""Returns _AbsoluteInstruction instance for given RelativeInstruction. |
|
|
|
Args: |
|
rel_instr: RelativeInstruction instance. |
|
name2len: dict {split_name: num_examples}. |
|
""" |
|
pct_to_abs = _pct_to_abs_closest if rel_instr.rounding == "closest" else _pct_to_abs_pct1 |
|
split = rel_instr.splitname |
|
if split not in name2len: |
|
raise ValueError(f'Unknown split "{split}". Should be one of {list(name2len)}.') |
|
num_examples = name2len[split] |
|
from_ = rel_instr.from_ |
|
to = rel_instr.to |
|
if rel_instr.unit == "%": |
|
from_ = 0 if from_ is None else pct_to_abs(from_, num_examples) |
|
to = num_examples if to is None else pct_to_abs(to, num_examples) |
|
else: |
|
from_ = 0 if from_ is None else from_ |
|
to = num_examples if to is None else to |
|
if from_ < 0: |
|
from_ = max(num_examples + from_, 0) |
|
if to < 0: |
|
to = max(num_examples + to, 0) |
|
from_ = min(from_, num_examples) |
|
to = min(to, num_examples) |
|
return _AbsoluteInstruction(split, from_, to) |
|
|
|
|
|
class ReadInstruction: |
|
"""Reading instruction for a dataset. |
|
|
|
Examples:: |
|
|
|
# The following lines are equivalent: |
|
ds = datasets.load_dataset('mnist', split='test[:33%]') |
|
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction.from_spec('test[:33%]')) |
|
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction('test', to=33, unit='%')) |
|
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction( |
|
'test', from_=0, to=33, unit='%')) |
|
|
|
# The following lines are equivalent: |
|
ds = datasets.load_dataset('mnist', split='test[:33%]+train[1:-1]') |
|
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction.from_spec( |
|
'test[:33%]+train[1:-1]')) |
|
ds = datasets.load_dataset('mnist', split=( |
|
datasets.ReadInstruction('test', to=33, unit='%') + |
|
datasets.ReadInstruction('train', from_=1, to=-1, unit='abs'))) |
|
|
|
# The following lines are equivalent: |
|
ds = datasets.load_dataset('mnist', split='test[:33%](pct1_dropremainder)') |
|
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction.from_spec( |
|
'test[:33%](pct1_dropremainder)')) |
|
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction( |
|
'test', from_=0, to=33, unit='%', rounding="pct1_dropremainder")) |
|
|
|
# 10-fold validation: |
|
tests = datasets.load_dataset( |
|
'mnist', |
|
[datasets.ReadInstruction('train', from_=k, to=k+10, unit='%') |
|
for k in range(0, 100, 10)]) |
|
trains = datasets.load_dataset( |
|
'mnist', |
|
[datasets.ReadInstruction('train', to=k, unit='%') + datasets.ReadInstruction('train', from_=k+10, unit='%') |
|
for k in range(0, 100, 10)]) |
|
|
|
""" |
|
|
|
def _init(self, relative_instructions): |
|
|
|
self._relative_instructions = relative_instructions |
|
|
|
@classmethod |
|
def _read_instruction_from_relative_instructions(cls, relative_instructions): |
|
"""Returns ReadInstruction obj initialized with relative_instructions.""" |
|
|
|
result = cls.__new__(cls) |
|
result._init(relative_instructions) |
|
return result |
|
|
|
def __init__(self, split_name, rounding=None, from_=None, to=None, unit=None): |
|
"""Initialize ReadInstruction. |
|
|
|
Args: |
|
split_name (str): name of the split to read. Eg: 'train'. |
|
rounding (str, optional): The rounding behaviour to use when percent slicing is |
|
used. Ignored when slicing with absolute indices. |
|
Possible values: |
|
- 'closest' (default): The specified percentages are rounded to the |
|
closest value. Use this if you want specified percents to be as |
|
much exact as possible. |
|
- 'pct1_dropremainder': the specified percentages are treated as |
|
multiple of 1%. Use this option if you want consistency. Eg: |
|
len(5%) == 5 * len(1%). |
|
Using this option, one might not be able to use the full set of |
|
examples, if the number of those is not a multiple of 100. |
|
from_ (int): |
|
to (int): alternative way of specifying slicing boundaries. If any of |
|
{from_, to, unit} argument is used, slicing cannot be specified as |
|
string. |
|
unit (str): optional, one of: |
|
'%': to set the slicing unit as percents of the split size. |
|
'abs': to set the slicing unit as absolute numbers. |
|
""" |
|
|
|
|
|
|
|
self._init([_RelativeInstruction(split_name, from_, to, unit, rounding)]) |
|
|
|
@classmethod |
|
def from_spec(cls, spec): |
|
"""Creates a `ReadInstruction` instance out of a string spec. |
|
|
|
Args: |
|
spec (`str`): |
|
Split(s) + optional slice(s) to read + optional rounding |
|
if percents are used as the slicing unit. A slice can be specified, |
|
using absolute numbers (`int`) or percentages (`int`). |
|
|
|
Examples: |
|
|
|
``` |
|
test: test split. |
|
test + validation: test split + validation split. |
|
test[10:]: test split, minus its first 10 records. |
|
test[:10%]: first 10% records of test split. |
|
test[:20%](pct1_dropremainder): first 10% records, rounded with the pct1_dropremainder rounding. |
|
test[:-5%]+train[40%:60%]: first 95% of test + middle 20% of train. |
|
``` |
|
|
|
Returns: |
|
ReadInstruction instance. |
|
""" |
|
spec = str(spec) |
|
subs = _ADDITION_SEP_RE.split(spec) |
|
if not subs: |
|
raise ValueError(f"No instructions could be built out of {spec}") |
|
instruction = _str_to_read_instruction(subs[0]) |
|
return sum((_str_to_read_instruction(sub) for sub in subs[1:]), instruction) |
|
|
|
def to_spec(self): |
|
rel_instr_specs = [] |
|
for rel_instr in self._relative_instructions: |
|
rel_instr_spec = rel_instr.splitname |
|
if rel_instr.from_ is not None or rel_instr.to is not None: |
|
from_ = rel_instr.from_ |
|
to = rel_instr.to |
|
unit = rel_instr.unit |
|
rounding = rel_instr.rounding |
|
unit = unit if unit == "%" else "" |
|
from_ = str(from_) + unit if from_ is not None else "" |
|
to = str(to) + unit if to is not None else "" |
|
slice_str = f"[{from_}:{to}]" |
|
rounding_str = ( |
|
f"({rounding})" if unit == "%" and rounding is not None and rounding != "closest" else "" |
|
) |
|
rel_instr_spec += slice_str + rounding_str |
|
rel_instr_specs.append(rel_instr_spec) |
|
return "+".join(rel_instr_specs) |
|
|
|
def __add__(self, other): |
|
"""Returns a new ReadInstruction obj, result of appending other to self.""" |
|
if not isinstance(other, ReadInstruction): |
|
msg = "ReadInstruction can only be added to another ReadInstruction obj." |
|
raise TypeError(msg) |
|
self_ris = self._relative_instructions |
|
other_ris = other._relative_instructions |
|
if ( |
|
self_ris[0].unit != "abs" |
|
and other_ris[0].unit != "abs" |
|
and self._relative_instructions[0].rounding != other_ris[0].rounding |
|
): |
|
raise ValueError("It is forbidden to sum ReadInstruction instances with different rounding values.") |
|
return self._read_instruction_from_relative_instructions(self_ris + other_ris) |
|
|
|
def __str__(self): |
|
return self.to_spec() |
|
|
|
def __repr__(self): |
|
return f"ReadInstruction({self._relative_instructions})" |
|
|
|
def to_absolute(self, name2len): |
|
"""Translate instruction into a list of absolute instructions. |
|
|
|
Those absolute instructions are then to be added together. |
|
|
|
Args: |
|
name2len (`dict`): |
|
Associating split names to number of examples. |
|
|
|
Returns: |
|
list of _AbsoluteInstruction instances (corresponds to the + in spec). |
|
""" |
|
return [_rel_to_abs_instr(rel_instr, name2len) for rel_instr in self._relative_instructions] |
|
|