File size: 25,131 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479 480 481 482 483 484 485 486 487 488 489 490 491 492 493 494 495 496 497 498 499 500 501 502 503 504 505 506 507 508 509 510 511 512 513 514 515 516 517 518 519 520 521 522 523 524 525 526 527 528 529 530 531 532 533 534 535 536 537 538 539 540 541 542 543 544 545 546 547 548 549 550 551 552 553 554 555 556 557 558 559 560 561 562 563 564 565 566 567 568 569 570 571 572 573 574 575 576 577 578 579 580 581 582 583 584 585 586 587 588 589 590 591 592 593 594 595 596 597 598 599 600 601 602 603 604 605 606 607 608 609 610 611 612 613 614 615 616 617 618 619 620 621 |
# Copyright 2020 The HuggingFace Datasets Authors and the TensorFlow Datasets Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Lint as: python3
"""Arrow ArrowReader."""
import copy
import math
import os
import re
from dataclasses import dataclass
from functools import partial
from typing import TYPE_CHECKING, Optional, Union
import pyarrow as pa
import pyarrow.parquet as pq
from tqdm.contrib.concurrent import thread_map
from .download.download_config import DownloadConfig # noqa: F401
from .naming import _split_re, filenames_for_dataset_split
from .table import InMemoryTable, MemoryMappedTable, Table, concat_tables
from .utils import logging
from .utils import tqdm as hf_tqdm
if TYPE_CHECKING:
from .info import DatasetInfo # noqa: F401
from .splits import Split, SplitInfo # noqa: F401
logger = logging.get_logger(__name__)
HF_GCP_BASE_URL = "https://storage.googleapis.com/huggingface-nlp/cache/datasets"
_SUB_SPEC_RE = re.compile(
rf"""
^
(?P<split>{_split_re[1:-1]})
(\[
((?P<from>-?[\d_]+)
(?P<from_pct>%)?)?
:
((?P<to>-?[\d_]+)
(?P<to_pct>%)?)?
\])?(\((?P<rounding>[^\)]*)\))?
$
""", # remove ^ and $
re.X,
)
_ADDITION_SEP_RE = re.compile(r"\s*\+\s*")
class DatasetNotOnHfGcsError(ConnectionError):
"""When you can't get the dataset from the Hf google cloud storage"""
pass
class MissingFilesOnHfGcsError(ConnectionError):
"""When some files are missing on the Hf oogle cloud storage"""
pass
@dataclass(frozen=True)
class FileInstructions:
"""The file instructions associated with a split ReadInstruction.
Attributes:
num_examples: `int`, The total number of examples
file_instructions: List[dict(filename, skip, take)], the files information.
The filenames contains the relative path, not absolute.
skip/take indicates which example read in the file: `ds.slice(skip, take)`
"""
num_examples: int
file_instructions: list[dict]
def make_file_instructions(
name: str,
split_infos: list["SplitInfo"],
instruction: Union[str, "ReadInstruction"],
filetype_suffix: Optional[str] = None,
prefix_path: Optional[str] = None,
) -> FileInstructions:
"""Returns instructions of the split dict.
Args:
name (`str`): Name of the dataset.
split_infos (`list` of `[SplitInfo]`): Dataset splits information.
instruction ([`ReadInstruction`] or `str`): Reading instruction for a dataset.
filetype_suffix (`str`, *optional*): Suffix of dataset files, e.g. 'arrow' or 'parquet'.
prefix_path (`str`, *optional*): Prefix of dataset files, e.g. directory name.
Returns:
[`FileInstructions`]
"""
if not isinstance(name, str):
raise TypeError(f"Expected str 'name', but got: {type(name).__name__}")
elif not name:
raise ValueError("Expected non-empty str 'name'")
name2len = {info.name: info.num_examples for info in split_infos}
name2shard_lengths = {info.name: info.shard_lengths for info in split_infos}
name2filenames = {
info.name: filenames_for_dataset_split(
path=prefix_path,
dataset_name=name,
split=info.name,
filetype_suffix=filetype_suffix,
shard_lengths=name2shard_lengths[info.name],
)
for info in split_infos
}
if not isinstance(instruction, ReadInstruction):
instruction = ReadInstruction.from_spec(instruction)
# Create the absolute instruction (per split)
absolute_instructions = instruction.to_absolute(name2len)
# For each split, return the files instruction (skip/take)
file_instructions = []
num_examples = 0
for abs_instr in absolute_instructions:
split_length = name2len[abs_instr.splitname]
filenames = name2filenames[abs_instr.splitname]
shard_lengths = name2shard_lengths[abs_instr.splitname]
from_ = 0 if abs_instr.from_ is None else abs_instr.from_
to = split_length if abs_instr.to is None else abs_instr.to
if shard_lengths is None: # not sharded
for filename in filenames:
take = to - from_
if take == 0:
continue
num_examples += take
file_instructions.append({"filename": filename, "skip": from_, "take": take})
else: # sharded
index_start = 0 # Beginning (included) of moving window.
index_end = 0 # End (excluded) of moving window.
for filename, shard_length in zip(filenames, shard_lengths):
index_end += shard_length
if from_ < index_end and to > index_start: # There is something to take.
skip = from_ - index_start if from_ > index_start else 0
take = to - index_start - skip if to < index_end else -1
if take == 0:
continue
file_instructions.append({"filename": filename, "skip": skip, "take": take})
num_examples += shard_length - skip if take == -1 else take
index_start += shard_length
return FileInstructions(
num_examples=num_examples,
file_instructions=file_instructions,
)
class BaseReader:
"""
Build a Dataset object out of Instruction instance(s).
"""
def __init__(self, path: str, info: Optional["DatasetInfo"]):
"""Initializes ArrowReader.
Args:
path (str): path where tfrecords are stored.
info (DatasetInfo): info about the dataset.
"""
self._path: str = path
self._info: Optional["DatasetInfo"] = info
self._filetype_suffix: Optional[str] = None
def _get_table_from_filename(self, filename_skip_take, in_memory=False) -> Table:
"""Returns a Dataset instance from given (filename, skip, take)."""
raise NotImplementedError
def _read_files(self, files, in_memory=False) -> Table:
"""Returns Dataset for given file instructions.
Args:
files: List[dict(filename, skip, take)], the files information.
The filenames contain the absolute path, not relative.
skip/take indicates which example read in the file: `ds.slice(skip, take)`
in_memory (bool, default False): Whether to copy the data in-memory.
"""
if len(files) == 0 or not all(isinstance(f, dict) for f in files):
raise ValueError("please provide valid file informations")
files = copy.deepcopy(files)
for f in files:
f["filename"] = os.path.join(self._path, f["filename"])
pa_tables = thread_map(
partial(self._get_table_from_filename, in_memory=in_memory),
files,
tqdm_class=hf_tqdm,
desc="Loading dataset shards",
# set `disable=None` rather than `disable=False` by default to disable progress bar when no TTY attached
disable=len(files) <= 16 or None,
)
pa_tables = [t for t in pa_tables if len(t) > 0]
if not pa_tables and (self._info is None or self._info.features is None):
raise ValueError(
"Tried to read an empty table. Please specify at least info.features to create an empty table with the right type."
)
pa_tables = pa_tables or [InMemoryTable.from_batches([], schema=pa.schema(self._info.features.type))]
pa_table = concat_tables(pa_tables) if len(pa_tables) != 1 else pa_tables[0]
return pa_table
def get_file_instructions(self, name, instruction, split_infos):
"""Return list of dict {'filename': str, 'skip': int, 'take': int}"""
file_instructions = make_file_instructions(
name, split_infos, instruction, filetype_suffix=self._filetype_suffix, prefix_path=self._path
)
files = file_instructions.file_instructions
return files
def read(
self,
name,
instructions,
split_infos,
in_memory=False,
):
"""Returns Dataset instance(s).
Args:
name (str): name of the dataset.
instructions (ReadInstruction): instructions to read.
Instruction can be string and will then be passed to the Instruction
constructor as it.
split_infos (list of SplitInfo proto): the available splits for dataset.
in_memory (bool, default False): Whether to copy the data in-memory.
Returns:
kwargs to build a single Dataset instance.
"""
files = self.get_file_instructions(name, instructions, split_infos)
if not files:
msg = f'Instruction "{instructions}" corresponds to no data!'
raise ValueError(msg)
return self.read_files(files=files, original_instructions=instructions, in_memory=in_memory)
def read_files(
self,
files: list[dict],
original_instructions: Union[None, "ReadInstruction", "Split"] = None,
in_memory=False,
):
"""Returns single Dataset instance for the set of file instructions.
Args:
files: List[dict(filename, skip, take)], the files information.
The filenames contains the relative path, not absolute.
skip/take indicates which example read in the file: `ds.skip().take()`
original_instructions: store the original instructions used to build the dataset split in the dataset.
in_memory (bool, default False): Whether to copy the data in-memory.
Returns:
kwargs to build a Dataset instance.
"""
# Prepend path to filename
pa_table = self._read_files(files, in_memory=in_memory)
# If original_instructions is not None, convert it to a human-readable NamedSplit
if original_instructions is not None:
from .splits import Split # noqa
split = Split(str(original_instructions))
else:
split = None
dataset_kwargs = {"arrow_table": pa_table, "info": self._info, "split": split}
return dataset_kwargs
class ArrowReader(BaseReader):
"""
Build a Dataset object out of Instruction instance(s).
This Reader uses either memory mapping or file descriptors (in-memory) on arrow files.
"""
def __init__(self, path: str, info: Optional["DatasetInfo"]):
"""Initializes ArrowReader.
Args:
path (str): path where Arrow files are stored.
info (DatasetInfo): info about the dataset.
"""
super().__init__(path, info)
self._filetype_suffix = "arrow"
def _get_table_from_filename(self, filename_skip_take, in_memory=False) -> Table:
"""Returns a Dataset instance from given (filename, skip, take)."""
filename, skip, take = (
filename_skip_take["filename"],
filename_skip_take["skip"] if "skip" in filename_skip_take else None,
filename_skip_take["take"] if "take" in filename_skip_take else None,
)
table = ArrowReader.read_table(filename, in_memory=in_memory)
if take == -1:
take = len(table) - skip
# here we don't want to slice an empty table, or it may segfault
if skip is not None and take is not None and not (skip == 0 and take == len(table)):
table = table.slice(skip, take)
return table
@staticmethod
def read_table(filename, in_memory=False) -> Table:
"""
Read table from file.
Args:
filename (str): File name of the table.
in_memory (bool, default=False): Whether to copy the data in-memory.
Returns:
pyarrow.Table
"""
table_cls = InMemoryTable if in_memory else MemoryMappedTable
return table_cls.from_file(filename)
class ParquetReader(BaseReader):
"""
Build a Dataset object out of Instruction instance(s).
This Reader uses memory mapping on parquet files.
"""
def __init__(self, path: str, info: Optional["DatasetInfo"]):
"""Initializes ParquetReader.
Args:
path (str): path where tfrecords are stored.
info (DatasetInfo): info about the dataset.
"""
super().__init__(path, info)
self._filetype_suffix = "parquet"
def _get_table_from_filename(self, filename_skip_take, **kwargs):
"""Returns a Dataset instance from given (filename, skip, take)."""
filename, skip, take = (
filename_skip_take["filename"],
filename_skip_take["skip"] if "skip" in filename_skip_take else None,
filename_skip_take["take"] if "take" in filename_skip_take else None,
)
# Parquet read_table always loads data in memory, independently of memory_map
pa_table = pq.read_table(filename, memory_map=True)
# here we don't want to slice an empty table, or it may segfault
if skip is not None and take is not None and not (skip == 0 and take == len(pa_table)):
pa_table = pa_table.slice(skip, take)
return pa_table
@dataclass(frozen=True)
class _AbsoluteInstruction:
"""A machine friendly slice: defined absolute positive boundaries."""
splitname: str
from_: int # uint (starting index).
to: int # uint (ending index).
@dataclass(frozen=True)
class _RelativeInstruction:
"""Represents a single parsed slicing instruction, can use % and negatives."""
splitname: str
from_: Optional[int] = None # int (starting index) or None if no lower boundary.
to: Optional[int] = None # int (ending index) or None if no upper boundary.
unit: Optional[str] = None
rounding: Optional[str] = None
def __post_init__(self):
if self.unit is not None and self.unit not in ["%", "abs"]:
raise ValueError("unit must be either % or abs")
if self.rounding is not None and self.rounding not in ["closest", "pct1_dropremainder"]:
raise ValueError("rounding must be either closest or pct1_dropremainder")
if self.unit != "%" and self.rounding is not None:
raise ValueError("It is forbidden to specify rounding if not using percent slicing.")
if self.unit == "%" and self.from_ is not None and abs(self.from_) > 100:
raise ValueError("Percent slice boundaries must be > -100 and < 100.")
if self.unit == "%" and self.to is not None and abs(self.to) > 100:
raise ValueError("Percent slice boundaries must be > -100 and < 100.")
# Update via __dict__ due to instance being "frozen"
self.__dict__["rounding"] = "closest" if self.rounding is None and self.unit == "%" else self.rounding
def _str_to_read_instruction(spec):
"""Returns ReadInstruction for given string."""
res = _SUB_SPEC_RE.match(spec)
if not res:
raise ValueError(f"Unrecognized instruction format: {spec}")
unit = "%" if res.group("from_pct") or res.group("to_pct") else "abs"
return ReadInstruction(
split_name=res.group("split"),
rounding=res.group("rounding"),
from_=int(res.group("from")) if res.group("from") else None,
to=int(res.group("to")) if res.group("to") else None,
unit=unit,
)
def _pct_to_abs_pct1(boundary, num_examples):
# Using math.trunc here, since -99.5% should give -99%, not -100%.
if num_examples < 100:
msg = (
'Using "pct1_dropremainder" rounding on a split with less than 100 '
"elements is forbidden: it always results in an empty dataset."
)
raise ValueError(msg)
return boundary * math.trunc(num_examples / 100.0)
def _pct_to_abs_closest(boundary, num_examples):
return int(round(boundary * num_examples / 100.0))
def _rel_to_abs_instr(rel_instr, name2len):
"""Returns _AbsoluteInstruction instance for given RelativeInstruction.
Args:
rel_instr: RelativeInstruction instance.
name2len: dict {split_name: num_examples}.
"""
pct_to_abs = _pct_to_abs_closest if rel_instr.rounding == "closest" else _pct_to_abs_pct1
split = rel_instr.splitname
if split not in name2len:
raise ValueError(f'Unknown split "{split}". Should be one of {list(name2len)}.')
num_examples = name2len[split]
from_ = rel_instr.from_
to = rel_instr.to
if rel_instr.unit == "%":
from_ = 0 if from_ is None else pct_to_abs(from_, num_examples)
to = num_examples if to is None else pct_to_abs(to, num_examples)
else:
from_ = 0 if from_ is None else from_
to = num_examples if to is None else to
if from_ < 0:
from_ = max(num_examples + from_, 0)
if to < 0:
to = max(num_examples + to, 0)
from_ = min(from_, num_examples)
to = min(to, num_examples)
return _AbsoluteInstruction(split, from_, to)
class ReadInstruction:
"""Reading instruction for a dataset.
Examples::
# The following lines are equivalent:
ds = datasets.load_dataset('mnist', split='test[:33%]')
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction.from_spec('test[:33%]'))
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction('test', to=33, unit='%'))
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction(
'test', from_=0, to=33, unit='%'))
# The following lines are equivalent:
ds = datasets.load_dataset('mnist', split='test[:33%]+train[1:-1]')
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction.from_spec(
'test[:33%]+train[1:-1]'))
ds = datasets.load_dataset('mnist', split=(
datasets.ReadInstruction('test', to=33, unit='%') +
datasets.ReadInstruction('train', from_=1, to=-1, unit='abs')))
# The following lines are equivalent:
ds = datasets.load_dataset('mnist', split='test[:33%](pct1_dropremainder)')
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction.from_spec(
'test[:33%](pct1_dropremainder)'))
ds = datasets.load_dataset('mnist', split=datasets.ReadInstruction(
'test', from_=0, to=33, unit='%', rounding="pct1_dropremainder"))
# 10-fold validation:
tests = datasets.load_dataset(
'mnist',
[datasets.ReadInstruction('train', from_=k, to=k+10, unit='%')
for k in range(0, 100, 10)])
trains = datasets.load_dataset(
'mnist',
[datasets.ReadInstruction('train', to=k, unit='%') + datasets.ReadInstruction('train', from_=k+10, unit='%')
for k in range(0, 100, 10)])
"""
def _init(self, relative_instructions):
# Private initializer.
self._relative_instructions = relative_instructions
@classmethod
def _read_instruction_from_relative_instructions(cls, relative_instructions):
"""Returns ReadInstruction obj initialized with relative_instructions."""
# Use __new__ to bypass __init__ used by public API and not conveniant here.
result = cls.__new__(cls)
result._init(relative_instructions) # pylint: disable=protected-access
return result
def __init__(self, split_name, rounding=None, from_=None, to=None, unit=None):
"""Initialize ReadInstruction.
Args:
split_name (str): name of the split to read. Eg: 'train'.
rounding (str, optional): The rounding behaviour to use when percent slicing is
used. Ignored when slicing with absolute indices.
Possible values:
- 'closest' (default): The specified percentages are rounded to the
closest value. Use this if you want specified percents to be as
much exact as possible.
- 'pct1_dropremainder': the specified percentages are treated as
multiple of 1%. Use this option if you want consistency. Eg:
len(5%) == 5 * len(1%).
Using this option, one might not be able to use the full set of
examples, if the number of those is not a multiple of 100.
from_ (int):
to (int): alternative way of specifying slicing boundaries. If any of
{from_, to, unit} argument is used, slicing cannot be specified as
string.
unit (str): optional, one of:
'%': to set the slicing unit as percents of the split size.
'abs': to set the slicing unit as absolute numbers.
"""
# This constructor is not always called. See factory method
# `_read_instruction_from_relative_instructions`. Common init instructions
# MUST be placed in the _init method.
self._init([_RelativeInstruction(split_name, from_, to, unit, rounding)])
@classmethod
def from_spec(cls, spec):
"""Creates a `ReadInstruction` instance out of a string spec.
Args:
spec (`str`):
Split(s) + optional slice(s) to read + optional rounding
if percents are used as the slicing unit. A slice can be specified,
using absolute numbers (`int`) or percentages (`int`).
Examples:
```
test: test split.
test + validation: test split + validation split.
test[10:]: test split, minus its first 10 records.
test[:10%]: first 10% records of test split.
test[:20%](pct1_dropremainder): first 10% records, rounded with the pct1_dropremainder rounding.
test[:-5%]+train[40%:60%]: first 95% of test + middle 20% of train.
```
Returns:
ReadInstruction instance.
"""
spec = str(spec) # Need to convert to str in case of NamedSplit instance.
subs = _ADDITION_SEP_RE.split(spec)
if not subs:
raise ValueError(f"No instructions could be built out of {spec}")
instruction = _str_to_read_instruction(subs[0])
return sum((_str_to_read_instruction(sub) for sub in subs[1:]), instruction)
def to_spec(self):
rel_instr_specs = []
for rel_instr in self._relative_instructions:
rel_instr_spec = rel_instr.splitname
if rel_instr.from_ is not None or rel_instr.to is not None:
from_ = rel_instr.from_
to = rel_instr.to
unit = rel_instr.unit
rounding = rel_instr.rounding
unit = unit if unit == "%" else ""
from_ = str(from_) + unit if from_ is not None else ""
to = str(to) + unit if to is not None else ""
slice_str = f"[{from_}:{to}]"
rounding_str = (
f"({rounding})" if unit == "%" and rounding is not None and rounding != "closest" else ""
)
rel_instr_spec += slice_str + rounding_str
rel_instr_specs.append(rel_instr_spec)
return "+".join(rel_instr_specs)
def __add__(self, other):
"""Returns a new ReadInstruction obj, result of appending other to self."""
if not isinstance(other, ReadInstruction):
msg = "ReadInstruction can only be added to another ReadInstruction obj."
raise TypeError(msg)
self_ris = self._relative_instructions
other_ris = other._relative_instructions # pylint: disable=protected-access
if (
self_ris[0].unit != "abs"
and other_ris[0].unit != "abs"
and self._relative_instructions[0].rounding != other_ris[0].rounding
):
raise ValueError("It is forbidden to sum ReadInstruction instances with different rounding values.")
return self._read_instruction_from_relative_instructions(self_ris + other_ris)
def __str__(self):
return self.to_spec()
def __repr__(self):
return f"ReadInstruction({self._relative_instructions})"
def to_absolute(self, name2len):
"""Translate instruction into a list of absolute instructions.
Those absolute instructions are then to be added together.
Args:
name2len (`dict`):
Associating split names to number of examples.
Returns:
list of _AbsoluteInstruction instances (corresponds to the + in spec).
"""
return [_rel_to_abs_instr(rel_instr, name2len) for rel_instr in self._relative_instructions]
|