|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
"""Dataset support for Parquet file format.""" |
|
|
|
from cython.operator cimport dereference as deref |
|
|
|
import os |
|
import warnings |
|
|
|
import pyarrow as pa |
|
from pyarrow.lib cimport * |
|
from pyarrow.lib import frombytes, tobytes, is_threading_enabled |
|
from pyarrow.includes.libarrow cimport * |
|
from pyarrow.includes.libarrow_dataset cimport * |
|
from pyarrow.includes.libarrow_dataset_parquet cimport * |
|
from pyarrow._fs cimport FileSystem |
|
|
|
from pyarrow._compute cimport Expression, _bind |
|
from pyarrow._dataset cimport ( |
|
_make_file_source, |
|
DatasetFactory, |
|
FileFormat, |
|
FileFragment, |
|
FileWriteOptions, |
|
Fragment, |
|
FragmentScanOptions, |
|
CacheOptions, |
|
Partitioning, |
|
PartitioningFactory, |
|
WrittenFile |
|
) |
|
|
|
from pyarrow._parquet cimport ( |
|
_create_writer_properties, _create_arrow_writer_properties, |
|
FileMetaData, |
|
) |
|
|
|
|
|
try: |
|
from pyarrow._dataset_parquet_encryption import ( |
|
set_encryption_config, set_decryption_config, set_decryption_properties |
|
) |
|
parquet_encryption_enabled = True |
|
except ImportError: |
|
parquet_encryption_enabled = False |
|
|
|
|
|
cdef Expression _true = Expression._scalar(True) |
|
|
|
ctypedef CParquetFileWriter* _CParquetFileWriterPtr |
|
|
|
|
|
cdef class ParquetFileFormat(FileFormat): |
|
""" |
|
FileFormat for Parquet |
|
|
|
Parameters |
|
---------- |
|
read_options : ParquetReadOptions |
|
Read options for the file. |
|
default_fragment_scan_options : ParquetFragmentScanOptions |
|
Scan Options for the file. |
|
**kwargs : dict |
|
Additional options for read option or scan option |
|
""" |
|
|
|
cdef: |
|
CParquetFileFormat* parquet_format |
|
|
|
def __init__(self, read_options=None, |
|
default_fragment_scan_options=None, |
|
**kwargs): |
|
cdef: |
|
shared_ptr[CParquetFileFormat] wrapped |
|
CParquetFileFormatReaderOptions* options |
|
|
|
|
|
read_options_args = {option: kwargs[option] for option in kwargs |
|
if option in _PARQUET_READ_OPTIONS} |
|
scan_args = {option: kwargs[option] for option in kwargs |
|
if option not in _PARQUET_READ_OPTIONS} |
|
if read_options and read_options_args: |
|
duplicates = ', '.join(sorted(read_options_args)) |
|
raise ValueError(f'If `read_options` is given, ' |
|
f'cannot specify {duplicates}') |
|
if default_fragment_scan_options and scan_args: |
|
duplicates = ', '.join(sorted(scan_args)) |
|
raise ValueError(f'If `default_fragment_scan_options` is given, ' |
|
f'cannot specify {duplicates}') |
|
|
|
if read_options is None: |
|
read_options = ParquetReadOptions(**read_options_args) |
|
elif isinstance(read_options, dict): |
|
|
|
duplicates = [] |
|
for option, value in read_options.items(): |
|
if option in _PARQUET_READ_OPTIONS: |
|
read_options_args[option] = value |
|
else: |
|
duplicates.append(option) |
|
scan_args[option] = value |
|
if duplicates: |
|
duplicates = ", ".join(duplicates) |
|
warnings.warn(f'The scan options {duplicates} should be ' |
|
'specified directly as keyword arguments') |
|
read_options = ParquetReadOptions(**read_options_args) |
|
elif not isinstance(read_options, ParquetReadOptions): |
|
raise TypeError('`read_options` must be either a dictionary or an ' |
|
'instance of ParquetReadOptions') |
|
|
|
if default_fragment_scan_options is None: |
|
default_fragment_scan_options = ParquetFragmentScanOptions(**scan_args) |
|
elif isinstance(default_fragment_scan_options, dict): |
|
default_fragment_scan_options = ParquetFragmentScanOptions( |
|
**default_fragment_scan_options) |
|
elif not isinstance(default_fragment_scan_options, |
|
ParquetFragmentScanOptions): |
|
raise TypeError('`default_fragment_scan_options` must be either a ' |
|
'dictionary or an instance of ' |
|
'ParquetFragmentScanOptions') |
|
|
|
wrapped = make_shared[CParquetFileFormat]() |
|
|
|
options = &(wrapped.get().reader_options) |
|
if read_options.dictionary_columns is not None: |
|
for column in read_options.dictionary_columns: |
|
options.dict_columns.insert(tobytes(column)) |
|
options.coerce_int96_timestamp_unit = \ |
|
read_options._coerce_int96_timestamp_unit |
|
|
|
self.init(<shared_ptr[CFileFormat]> wrapped) |
|
self.default_fragment_scan_options = default_fragment_scan_options |
|
|
|
cdef void init(self, const shared_ptr[CFileFormat]& sp): |
|
FileFormat.init(self, sp) |
|
self.parquet_format = <CParquetFileFormat*> sp.get() |
|
|
|
cdef WrittenFile _finish_write(self, path, base_dir, |
|
CFileWriter* file_writer): |
|
cdef: |
|
FileMetaData parquet_metadata |
|
CParquetFileWriter* parquet_file_writer |
|
|
|
parquet_metadata = None |
|
parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer) |
|
with nogil: |
|
metadata = deref( |
|
deref(parquet_file_writer).parquet_writer()).metadata() |
|
if metadata: |
|
parquet_metadata = FileMetaData.__new__(FileMetaData) |
|
parquet_metadata.init(metadata) |
|
parquet_metadata.set_file_path(os.path.relpath(path, base_dir)) |
|
|
|
size = GetResultValue(file_writer.GetBytesWritten()) |
|
|
|
return WrittenFile(path, parquet_metadata, size) |
|
|
|
@property |
|
def read_options(self): |
|
cdef CParquetFileFormatReaderOptions* options |
|
options = &self.parquet_format.reader_options |
|
parquet_read_options = ParquetReadOptions( |
|
dictionary_columns={frombytes(col) |
|
for col in options.dict_columns}, |
|
) |
|
|
|
|
|
parquet_read_options._coerce_int96_timestamp_unit = \ |
|
options.coerce_int96_timestamp_unit |
|
return parquet_read_options |
|
|
|
def make_write_options(self, **kwargs): |
|
""" |
|
Parameters |
|
---------- |
|
**kwargs : dict |
|
|
|
Returns |
|
------- |
|
pyarrow.dataset.FileWriteOptions |
|
""" |
|
|
|
if not isinstance(self, ParquetFileFormat): |
|
raise TypeError("make_write_options() should be called on " |
|
"an instance of ParquetFileFormat") |
|
opts = FileFormat.make_write_options(self) |
|
(<ParquetFileWriteOptions> opts).update(**kwargs) |
|
return opts |
|
|
|
cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): |
|
if options.type_name == 'parquet': |
|
self.parquet_format.default_fragment_scan_options = options.wrapped |
|
else: |
|
super()._set_default_fragment_scan_options(options) |
|
|
|
def equals(self, ParquetFileFormat other): |
|
""" |
|
Parameters |
|
---------- |
|
other : pyarrow.dataset.ParquetFileFormat |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
return ( |
|
self.read_options.equals(other.read_options) and |
|
self.default_fragment_scan_options == |
|
other.default_fragment_scan_options |
|
) |
|
|
|
@property |
|
def default_extname(self): |
|
return "parquet" |
|
|
|
def __reduce__(self): |
|
return ParquetFileFormat, (self.read_options, |
|
self.default_fragment_scan_options) |
|
|
|
def __repr__(self): |
|
return f"<ParquetFileFormat read_options={self.read_options}>" |
|
|
|
def make_fragment(self, file, filesystem=None, |
|
Expression partition_expression=None, row_groups=None, *, file_size=None): |
|
""" |
|
Make a FileFragment from a given file. |
|
|
|
Parameters |
|
---------- |
|
file : file-like object, path-like or str |
|
The file or file path to make a fragment from. |
|
filesystem : Filesystem, optional |
|
If `filesystem` is given, `file` must be a string and specifies |
|
the path of the file to read from the filesystem. |
|
partition_expression : Expression, optional |
|
An expression that is guaranteed true for all rows in the fragment. Allows |
|
fragment to be potentially skipped while scanning with a filter. |
|
row_groups : Iterable, optional |
|
The indices of the row groups to include |
|
file_size : int, optional |
|
The size of the file in bytes. Can improve performance with high-latency filesystems |
|
when file size needs to be known before reading. |
|
|
|
Returns |
|
------- |
|
fragment : Fragment |
|
The file fragment |
|
""" |
|
cdef: |
|
vector[int] c_row_groups |
|
if partition_expression is None: |
|
partition_expression = _true |
|
if row_groups is None: |
|
return super().make_fragment(file, filesystem, |
|
partition_expression, file_size=file_size) |
|
|
|
c_source = _make_file_source(file, filesystem, file_size) |
|
c_row_groups = [<int> row_group for row_group in set(row_groups)] |
|
|
|
c_fragment = <shared_ptr[CFragment]> GetResultValue( |
|
self.parquet_format.MakeFragment(move(c_source), |
|
partition_expression.unwrap(), |
|
<shared_ptr[CSchema]>nullptr, |
|
move(c_row_groups))) |
|
return Fragment.wrap(move(c_fragment)) |
|
|
|
|
|
class RowGroupInfo: |
|
""" |
|
A wrapper class for RowGroup information |
|
|
|
Parameters |
|
---------- |
|
id : integer |
|
The group ID. |
|
metadata : FileMetaData |
|
The rowgroup metadata. |
|
schema : Schema |
|
Schema of the rows. |
|
""" |
|
|
|
def __init__(self, id, metadata, schema): |
|
self.id = id |
|
self.metadata = metadata |
|
self.schema = schema |
|
|
|
@property |
|
def num_rows(self): |
|
return self.metadata.num_rows |
|
|
|
@property |
|
def total_byte_size(self): |
|
return self.metadata.total_byte_size |
|
|
|
@property |
|
def statistics(self): |
|
def name_stats(i): |
|
col = self.metadata.column(i) |
|
|
|
stats = col.statistics |
|
if stats is None or not stats.has_min_max: |
|
return None, None |
|
|
|
name = col.path_in_schema |
|
field_index = self.schema.get_field_index(name) |
|
if field_index < 0: |
|
return None, None |
|
|
|
typ = self.schema.field(field_index).type |
|
return col.path_in_schema, { |
|
'min': pa.scalar(stats.min, type=typ).as_py(), |
|
'max': pa.scalar(stats.max, type=typ).as_py() |
|
} |
|
|
|
return { |
|
name: stats for name, stats |
|
in map(name_stats, range(self.metadata.num_columns)) |
|
if stats is not None |
|
} |
|
|
|
def __repr__(self): |
|
return "RowGroupInfo({})".format(self.id) |
|
|
|
def __eq__(self, other): |
|
if isinstance(other, int): |
|
return self.id == other |
|
if not isinstance(other, RowGroupInfo): |
|
return False |
|
return self.id == other.id |
|
|
|
|
|
cdef class ParquetFileFragment(FileFragment): |
|
"""A Fragment representing a parquet file.""" |
|
|
|
cdef: |
|
CParquetFileFragment* parquet_file_fragment |
|
|
|
cdef void init(self, const shared_ptr[CFragment]& sp): |
|
FileFragment.init(self, sp) |
|
self.parquet_file_fragment = <CParquetFileFragment*> sp.get() |
|
|
|
def __reduce__(self): |
|
buffer = self.buffer |
|
|
|
|
|
if not bool(self.parquet_file_fragment.row_groups()): |
|
row_groups = None |
|
else: |
|
row_groups = [row_group.id for row_group in self.row_groups] |
|
|
|
return self.format.make_fragment, ( |
|
self.path if buffer is None else buffer, |
|
self.filesystem, |
|
self.partition_expression, |
|
row_groups |
|
) |
|
|
|
def ensure_complete_metadata(self): |
|
""" |
|
Ensure that all metadata (statistics, physical schema, ...) have |
|
been read and cached in this fragment. |
|
""" |
|
with nogil: |
|
check_status(self.parquet_file_fragment.EnsureCompleteMetadata()) |
|
|
|
@property |
|
def row_groups(self): |
|
metadata = self.metadata |
|
cdef vector[int] row_groups = self.parquet_file_fragment.row_groups() |
|
return [RowGroupInfo(i, metadata.row_group(i), self.physical_schema) |
|
for i in row_groups] |
|
|
|
@property |
|
def metadata(self): |
|
self.ensure_complete_metadata() |
|
cdef FileMetaData metadata = FileMetaData.__new__(FileMetaData) |
|
metadata.init(self.parquet_file_fragment.metadata()) |
|
return metadata |
|
|
|
@property |
|
def num_row_groups(self): |
|
""" |
|
Return the number of row groups viewed by this fragment (not the |
|
number of row groups in the origin file). |
|
""" |
|
self.ensure_complete_metadata() |
|
return self.parquet_file_fragment.row_groups().size() |
|
|
|
def split_by_row_group(self, Expression filter=None, |
|
Schema schema=None): |
|
""" |
|
Split the fragment into multiple fragments. |
|
|
|
Yield a Fragment wrapping each row group in this ParquetFileFragment. |
|
Row groups will be excluded whose metadata contradicts the optional |
|
filter. |
|
|
|
Parameters |
|
---------- |
|
filter : Expression, default None |
|
Only include the row groups which satisfy this predicate (using |
|
the Parquet RowGroup statistics). |
|
schema : Schema, default None |
|
Schema to use when filtering row groups. Defaults to the |
|
Fragment's physical schema |
|
|
|
Returns |
|
------- |
|
A list of Fragments |
|
""" |
|
cdef: |
|
vector[shared_ptr[CFragment]] c_fragments |
|
CExpression c_filter |
|
shared_ptr[CFragment] c_fragment |
|
|
|
schema = schema or self.physical_schema |
|
c_filter = _bind(filter, schema) |
|
with nogil: |
|
c_fragments = move(GetResultValue( |
|
self.parquet_file_fragment.SplitByRowGroup(move(c_filter)))) |
|
|
|
return [Fragment.wrap(c_fragment) for c_fragment in c_fragments] |
|
|
|
def subset(self, Expression filter=None, Schema schema=None, |
|
object row_group_ids=None): |
|
""" |
|
Create a subset of the fragment (viewing a subset of the row groups). |
|
|
|
Subset can be specified by either a filter predicate (with optional |
|
schema) or by a list of row group IDs. Note that when using a filter, |
|
the resulting fragment can be empty (viewing no row groups). |
|
|
|
Parameters |
|
---------- |
|
filter : Expression, default None |
|
Only include the row groups which satisfy this predicate (using |
|
the Parquet RowGroup statistics). |
|
schema : Schema, default None |
|
Schema to use when filtering row groups. Defaults to the |
|
Fragment's physical schema |
|
row_group_ids : list of ints |
|
The row group IDs to include in the subset. Can only be specified |
|
if `filter` is None. |
|
|
|
Returns |
|
------- |
|
ParquetFileFragment |
|
""" |
|
cdef: |
|
CExpression c_filter |
|
vector[int] c_row_group_ids |
|
shared_ptr[CFragment] c_fragment |
|
|
|
if filter is not None and row_group_ids is not None: |
|
raise ValueError( |
|
"Cannot specify both 'filter' and 'row_group_ids'." |
|
) |
|
|
|
if filter is not None: |
|
schema = schema or self.physical_schema |
|
c_filter = _bind(filter, schema) |
|
with nogil: |
|
c_fragment = move(GetResultValue( |
|
self.parquet_file_fragment.SubsetWithFilter( |
|
move(c_filter)))) |
|
elif row_group_ids is not None: |
|
c_row_group_ids = [ |
|
<int> row_group for row_group in sorted(set(row_group_ids)) |
|
] |
|
with nogil: |
|
c_fragment = move(GetResultValue( |
|
self.parquet_file_fragment.SubsetWithIds( |
|
move(c_row_group_ids)))) |
|
else: |
|
raise ValueError( |
|
"Need to specify one of 'filter' or 'row_group_ids'" |
|
) |
|
|
|
return Fragment.wrap(c_fragment) |
|
|
|
|
|
cdef class ParquetReadOptions(_Weakrefable): |
|
""" |
|
Parquet format specific options for reading. |
|
|
|
Parameters |
|
---------- |
|
dictionary_columns : list of string, default None |
|
Names of columns which should be dictionary encoded as |
|
they are read |
|
coerce_int96_timestamp_unit : str, default None |
|
Cast timestamps that are stored in INT96 format to a particular |
|
resolution (e.g. 'ms'). Setting to None is equivalent to 'ns' |
|
and therefore INT96 timestamps will be inferred as timestamps |
|
in nanoseconds |
|
""" |
|
|
|
cdef public: |
|
set dictionary_columns |
|
TimeUnit _coerce_int96_timestamp_unit |
|
|
|
|
|
def __init__(self, dictionary_columns=None, |
|
coerce_int96_timestamp_unit=None): |
|
self.dictionary_columns = set(dictionary_columns or set()) |
|
self.coerce_int96_timestamp_unit = coerce_int96_timestamp_unit |
|
|
|
@property |
|
def coerce_int96_timestamp_unit(self): |
|
return timeunit_to_string(self._coerce_int96_timestamp_unit) |
|
|
|
@coerce_int96_timestamp_unit.setter |
|
def coerce_int96_timestamp_unit(self, unit): |
|
if unit is not None: |
|
self._coerce_int96_timestamp_unit = string_to_timeunit(unit) |
|
else: |
|
self._coerce_int96_timestamp_unit = TimeUnit_NANO |
|
|
|
def equals(self, ParquetReadOptions other): |
|
""" |
|
Parameters |
|
---------- |
|
other : pyarrow.dataset.ParquetReadOptions |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
return (self.dictionary_columns == other.dictionary_columns and |
|
self.coerce_int96_timestamp_unit == |
|
other.coerce_int96_timestamp_unit) |
|
|
|
def __eq__(self, other): |
|
try: |
|
return self.equals(other) |
|
except TypeError: |
|
return False |
|
|
|
def __repr__(self): |
|
return ( |
|
f"<ParquetReadOptions" |
|
f" dictionary_columns={self.dictionary_columns}" |
|
f" coerce_int96_timestamp_unit={self.coerce_int96_timestamp_unit}>" |
|
) |
|
|
|
|
|
cdef class ParquetFileWriteOptions(FileWriteOptions): |
|
|
|
def update(self, **kwargs): |
|
""" |
|
Parameters |
|
---------- |
|
**kwargs : dict |
|
""" |
|
arrow_fields = { |
|
"use_deprecated_int96_timestamps", |
|
"coerce_timestamps", |
|
"allow_truncated_timestamps", |
|
"use_compliant_nested_type", |
|
} |
|
|
|
setters = set() |
|
for name, value in kwargs.items(): |
|
if name not in self._properties: |
|
raise TypeError("unexpected parquet write option: " + name) |
|
self._properties[name] = value |
|
if name in arrow_fields: |
|
setters.add(self._set_arrow_properties) |
|
elif name == "encryption_config" and value is not None: |
|
setters.add(self._set_encryption_config) |
|
else: |
|
setters.add(self._set_properties) |
|
|
|
for setter in setters: |
|
setter() |
|
|
|
def _set_properties(self): |
|
cdef CParquetFileWriteOptions* opts = self.parquet_options |
|
|
|
opts.writer_properties = _create_writer_properties( |
|
use_dictionary=self._properties["use_dictionary"], |
|
compression=self._properties["compression"], |
|
version=self._properties["version"], |
|
write_statistics=self._properties["write_statistics"], |
|
data_page_size=self._properties["data_page_size"], |
|
compression_level=self._properties["compression_level"], |
|
use_byte_stream_split=( |
|
self._properties["use_byte_stream_split"] |
|
), |
|
column_encoding=self._properties["column_encoding"], |
|
data_page_version=self._properties["data_page_version"], |
|
encryption_properties=self._properties["encryption_properties"], |
|
write_batch_size=self._properties["write_batch_size"], |
|
dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"], |
|
write_page_index=self._properties["write_page_index"], |
|
write_page_checksum=self._properties["write_page_checksum"], |
|
sorting_columns=self._properties["sorting_columns"], |
|
store_decimal_as_integer=self._properties["store_decimal_as_integer"], |
|
) |
|
|
|
def _set_arrow_properties(self): |
|
cdef CParquetFileWriteOptions* opts = self.parquet_options |
|
|
|
opts.arrow_writer_properties = _create_arrow_writer_properties( |
|
use_deprecated_int96_timestamps=( |
|
self._properties["use_deprecated_int96_timestamps"] |
|
), |
|
coerce_timestamps=self._properties["coerce_timestamps"], |
|
allow_truncated_timestamps=( |
|
self._properties["allow_truncated_timestamps"] |
|
), |
|
writer_engine_version="V2", |
|
use_compliant_nested_type=( |
|
self._properties["use_compliant_nested_type"] |
|
) |
|
) |
|
|
|
def _set_encryption_config(self): |
|
if not parquet_encryption_enabled: |
|
raise NotImplementedError( |
|
"Encryption is not enabled in your installation of pyarrow, but an " |
|
"encryption_config was provided." |
|
) |
|
set_encryption_config(self, self._properties["encryption_config"]) |
|
|
|
cdef void init(self, const shared_ptr[CFileWriteOptions]& sp): |
|
FileWriteOptions.init(self, sp) |
|
self.parquet_options = <CParquetFileWriteOptions*> sp.get() |
|
self._properties = dict( |
|
use_dictionary=True, |
|
compression="snappy", |
|
version="2.6", |
|
write_statistics=None, |
|
data_page_size=None, |
|
compression_level=None, |
|
use_byte_stream_split=False, |
|
column_encoding=None, |
|
data_page_version="1.0", |
|
use_deprecated_int96_timestamps=False, |
|
coerce_timestamps=None, |
|
allow_truncated_timestamps=False, |
|
use_compliant_nested_type=True, |
|
encryption_properties=None, |
|
write_batch_size=None, |
|
dictionary_pagesize_limit=None, |
|
write_page_index=False, |
|
encryption_config=None, |
|
write_page_checksum=False, |
|
sorting_columns=None, |
|
store_decimal_as_integer=False, |
|
) |
|
|
|
self._set_properties() |
|
self._set_arrow_properties() |
|
|
|
def __repr__(self): |
|
return "<pyarrow.dataset.ParquetFileWriteOptions {0}>".format( |
|
" ".join([f"{key}={value}" for key, value in self._properties.items()]) |
|
) |
|
|
|
|
|
cdef set _PARQUET_READ_OPTIONS = { |
|
'dictionary_columns', 'coerce_int96_timestamp_unit' |
|
} |
|
|
|
|
|
cdef class ParquetFragmentScanOptions(FragmentScanOptions): |
|
""" |
|
Scan-specific options for Parquet fragments. |
|
|
|
Parameters |
|
---------- |
|
use_buffered_stream : bool, default False |
|
Read files through buffered input streams rather than loading entire |
|
row groups at once. This may be enabled to reduce memory overhead. |
|
Disabled by default. |
|
buffer_size : int, default 8192 |
|
Size of buffered stream, if enabled. Default is 8KB. |
|
pre_buffer : bool, default True |
|
If enabled, pre-buffer the raw Parquet data instead of issuing one |
|
read per column chunk. This can improve performance on high-latency |
|
filesystems (e.g. S3, GCS) by coalescing and issuing file reads in |
|
parallel using a background I/O thread pool. |
|
Set to False if you want to prioritize minimal memory usage |
|
over maximum speed. |
|
cache_options : pyarrow.CacheOptions, default None |
|
Cache options used when pre_buffer is enabled. The default values should |
|
be good for most use cases. You may want to adjust these for example if |
|
you have exceptionally high latency to the file system. |
|
thrift_string_size_limit : int, default None |
|
If not None, override the maximum total string size allocated |
|
when decoding Thrift structures. The default limit should be |
|
sufficient for most Parquet files. |
|
thrift_container_size_limit : int, default None |
|
If not None, override the maximum total size of containers allocated |
|
when decoding Thrift structures. The default limit should be |
|
sufficient for most Parquet files. |
|
decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None |
|
If not None, use the provided ParquetDecryptionConfig to decrypt the |
|
Parquet file. |
|
decryption_properties : pyarrow.parquet.FileDecryptionProperties, default None |
|
If not None, use the provided FileDecryptionProperties to decrypt encrypted |
|
Parquet file. |
|
page_checksum_verification : bool, default False |
|
If True, verify the page checksum for each page read from the file. |
|
""" |
|
|
|
|
|
__slots__ = () |
|
|
|
def __init__(self, *, bint use_buffered_stream=False, |
|
buffer_size=8192, |
|
bint pre_buffer=True, |
|
cache_options=None, |
|
thrift_string_size_limit=None, |
|
thrift_container_size_limit=None, |
|
decryption_config=None, |
|
decryption_properties=None, |
|
bint page_checksum_verification=False): |
|
self.init(shared_ptr[CFragmentScanOptions]( |
|
new CParquetFragmentScanOptions())) |
|
self.use_buffered_stream = use_buffered_stream |
|
self.buffer_size = buffer_size |
|
if pre_buffer and not is_threading_enabled(): |
|
pre_buffer = False |
|
self.pre_buffer = pre_buffer |
|
if cache_options is not None: |
|
self.cache_options = cache_options |
|
if thrift_string_size_limit is not None: |
|
self.thrift_string_size_limit = thrift_string_size_limit |
|
if thrift_container_size_limit is not None: |
|
self.thrift_container_size_limit = thrift_container_size_limit |
|
if decryption_config is not None: |
|
self.parquet_decryption_config = decryption_config |
|
if decryption_properties is not None: |
|
self.decryption_properties = decryption_properties |
|
self.page_checksum_verification = page_checksum_verification |
|
|
|
cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): |
|
FragmentScanOptions.init(self, sp) |
|
self.parquet_options = <CParquetFragmentScanOptions*> sp.get() |
|
|
|
cdef CReaderProperties* reader_properties(self): |
|
return self.parquet_options.reader_properties.get() |
|
|
|
cdef ArrowReaderProperties* arrow_reader_properties(self): |
|
return self.parquet_options.arrow_reader_properties.get() |
|
|
|
@property |
|
def use_buffered_stream(self): |
|
return self.reader_properties().is_buffered_stream_enabled() |
|
|
|
@use_buffered_stream.setter |
|
def use_buffered_stream(self, bint use_buffered_stream): |
|
if use_buffered_stream: |
|
self.reader_properties().enable_buffered_stream() |
|
else: |
|
self.reader_properties().disable_buffered_stream() |
|
|
|
@property |
|
def buffer_size(self): |
|
return self.reader_properties().buffer_size() |
|
|
|
@buffer_size.setter |
|
def buffer_size(self, buffer_size): |
|
if buffer_size <= 0: |
|
raise ValueError("Buffer size must be larger than zero") |
|
self.reader_properties().set_buffer_size(buffer_size) |
|
|
|
@property |
|
def pre_buffer(self): |
|
return self.arrow_reader_properties().pre_buffer() |
|
|
|
@pre_buffer.setter |
|
def pre_buffer(self, bint pre_buffer): |
|
if pre_buffer and not is_threading_enabled(): |
|
return |
|
self.arrow_reader_properties().set_pre_buffer(pre_buffer) |
|
|
|
@property |
|
def cache_options(self): |
|
return CacheOptions.wrap(self.arrow_reader_properties().cache_options()) |
|
|
|
@cache_options.setter |
|
def cache_options(self, CacheOptions options): |
|
self.arrow_reader_properties().set_cache_options(options.unwrap()) |
|
|
|
@property |
|
def thrift_string_size_limit(self): |
|
return self.reader_properties().thrift_string_size_limit() |
|
|
|
@thrift_string_size_limit.setter |
|
def thrift_string_size_limit(self, size): |
|
if size <= 0: |
|
raise ValueError("size must be larger than zero") |
|
self.reader_properties().set_thrift_string_size_limit(size) |
|
|
|
@property |
|
def thrift_container_size_limit(self): |
|
return self.reader_properties().thrift_container_size_limit() |
|
|
|
@thrift_container_size_limit.setter |
|
def thrift_container_size_limit(self, size): |
|
if size <= 0: |
|
raise ValueError("size must be larger than zero") |
|
self.reader_properties().set_thrift_container_size_limit(size) |
|
|
|
@property |
|
def decryption_properties(self): |
|
if not parquet_encryption_enabled: |
|
raise NotImplementedError( |
|
"Unable to access encryption features. " |
|
"Encryption is not enabled in your installation of pyarrow." |
|
) |
|
return self._decryption_properties |
|
|
|
@decryption_properties.setter |
|
def decryption_properties(self, config): |
|
if not parquet_encryption_enabled: |
|
raise NotImplementedError( |
|
"Encryption is not enabled in your installation of pyarrow, but " |
|
"decryption_properties were provided." |
|
) |
|
set_decryption_properties(self, config) |
|
self._decryption_properties = config |
|
|
|
@property |
|
def parquet_decryption_config(self): |
|
if not parquet_encryption_enabled: |
|
raise NotImplementedError( |
|
"Unable to access encryption features. " |
|
"Encryption is not enabled in your installation of pyarrow." |
|
) |
|
return self._parquet_decryption_config |
|
|
|
@parquet_decryption_config.setter |
|
def parquet_decryption_config(self, config): |
|
if not parquet_encryption_enabled: |
|
raise NotImplementedError( |
|
"Encryption is not enabled in your installation of pyarrow, but a " |
|
"decryption_config was provided." |
|
) |
|
set_decryption_config(self, config) |
|
self._parquet_decryption_config = config |
|
|
|
@property |
|
def page_checksum_verification(self): |
|
return self.reader_properties().page_checksum_verification() |
|
|
|
@page_checksum_verification.setter |
|
def page_checksum_verification(self, bint page_checksum_verification): |
|
self.reader_properties().set_page_checksum_verification(page_checksum_verification) |
|
|
|
def equals(self, ParquetFragmentScanOptions other): |
|
""" |
|
Parameters |
|
---------- |
|
other : pyarrow.dataset.ParquetFragmentScanOptions |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
attrs = ( |
|
self.use_buffered_stream, self.buffer_size, self.pre_buffer, self.cache_options, |
|
self.thrift_string_size_limit, self.thrift_container_size_limit, |
|
self.page_checksum_verification) |
|
other_attrs = ( |
|
other.use_buffered_stream, other.buffer_size, other.pre_buffer, other.cache_options, |
|
other.thrift_string_size_limit, |
|
other.thrift_container_size_limit, other.page_checksum_verification) |
|
return attrs == other_attrs |
|
|
|
@staticmethod |
|
def _reconstruct(kwargs): |
|
|
|
|
|
return ParquetFragmentScanOptions(**kwargs) |
|
|
|
def __reduce__(self): |
|
kwargs = dict( |
|
use_buffered_stream=self.use_buffered_stream, |
|
buffer_size=self.buffer_size, |
|
pre_buffer=self.pre_buffer, |
|
cache_options=self.cache_options, |
|
thrift_string_size_limit=self.thrift_string_size_limit, |
|
thrift_container_size_limit=self.thrift_container_size_limit, |
|
page_checksum_verification=self.page_checksum_verification |
|
) |
|
return ParquetFragmentScanOptions._reconstruct, (kwargs,) |
|
|
|
|
|
cdef class ParquetFactoryOptions(_Weakrefable): |
|
""" |
|
Influences the discovery of parquet dataset. |
|
|
|
Parameters |
|
---------- |
|
partition_base_dir : str, optional |
|
For the purposes of applying the partitioning, paths will be |
|
stripped of the partition_base_dir. Files not matching the |
|
partition_base_dir prefix will be skipped for partitioning discovery. |
|
The ignored files will still be part of the Dataset, but will not |
|
have partition information. |
|
partitioning : Partitioning, PartitioningFactory, optional |
|
The partitioning scheme applied to fragments, see ``Partitioning``. |
|
validate_column_chunk_paths : bool, default False |
|
Assert that all ColumnChunk paths are consistent. The parquet spec |
|
allows for ColumnChunk data to be stored in multiple files, but |
|
ParquetDatasetFactory supports only a single file with all ColumnChunk |
|
data. If this flag is set construction of a ParquetDatasetFactory will |
|
raise an error if ColumnChunk data is not resident in a single file. |
|
""" |
|
|
|
cdef: |
|
CParquetFactoryOptions options |
|
|
|
__slots__ = () |
|
|
|
def __init__(self, partition_base_dir=None, partitioning=None, |
|
validate_column_chunk_paths=False): |
|
if isinstance(partitioning, PartitioningFactory): |
|
self.partitioning_factory = partitioning |
|
elif isinstance(partitioning, Partitioning): |
|
self.partitioning = partitioning |
|
|
|
if partition_base_dir is not None: |
|
self.partition_base_dir = partition_base_dir |
|
|
|
self.options.validate_column_chunk_paths = validate_column_chunk_paths |
|
|
|
cdef inline CParquetFactoryOptions unwrap(self): |
|
return self.options |
|
|
|
@property |
|
def partitioning(self): |
|
"""Partitioning to apply to discovered files. |
|
|
|
NOTE: setting this property will overwrite partitioning_factory. |
|
""" |
|
c_partitioning = self.options.partitioning.partitioning() |
|
if c_partitioning.get() == nullptr: |
|
return None |
|
return Partitioning.wrap(c_partitioning) |
|
|
|
@partitioning.setter |
|
def partitioning(self, Partitioning value): |
|
self.options.partitioning = (<Partitioning> value).unwrap() |
|
|
|
@property |
|
def partitioning_factory(self): |
|
"""PartitioningFactory to apply to discovered files and |
|
discover a Partitioning. |
|
|
|
NOTE: setting this property will overwrite partitioning. |
|
""" |
|
c_factory = self.options.partitioning.factory() |
|
if c_factory.get() == nullptr: |
|
return None |
|
return PartitioningFactory.wrap(c_factory, None, None) |
|
|
|
@partitioning_factory.setter |
|
def partitioning_factory(self, PartitioningFactory value): |
|
self.options.partitioning = (<PartitioningFactory> value).unwrap() |
|
|
|
@property |
|
def partition_base_dir(self): |
|
""" |
|
Base directory to strip paths before applying the partitioning. |
|
""" |
|
return frombytes(self.options.partition_base_dir) |
|
|
|
@partition_base_dir.setter |
|
def partition_base_dir(self, value): |
|
self.options.partition_base_dir = tobytes(value) |
|
|
|
@property |
|
def validate_column_chunk_paths(self): |
|
""" |
|
Base directory to strip paths before applying the partitioning. |
|
""" |
|
return self.options.validate_column_chunk_paths |
|
|
|
@validate_column_chunk_paths.setter |
|
def validate_column_chunk_paths(self, value): |
|
self.options.validate_column_chunk_paths = value |
|
|
|
|
|
cdef class ParquetDatasetFactory(DatasetFactory): |
|
""" |
|
Create a ParquetDatasetFactory from a Parquet `_metadata` file. |
|
|
|
Parameters |
|
---------- |
|
metadata_path : str |
|
Path to the `_metadata` parquet metadata-only file generated with |
|
`pyarrow.parquet.write_metadata`. |
|
filesystem : pyarrow.fs.FileSystem |
|
Filesystem to read the metadata_path from, and subsequent parquet |
|
files. |
|
format : ParquetFileFormat |
|
Parquet format options. |
|
options : ParquetFactoryOptions, optional |
|
Various flags influencing the discovery of filesystem paths. |
|
""" |
|
|
|
cdef: |
|
CParquetDatasetFactory* parquet_factory |
|
|
|
def __init__(self, metadata_path, FileSystem filesystem not None, |
|
FileFormat format not None, |
|
ParquetFactoryOptions options=None): |
|
cdef: |
|
c_string c_path |
|
shared_ptr[CFileSystem] c_filesystem |
|
shared_ptr[CParquetFileFormat] c_format |
|
CResult[shared_ptr[CDatasetFactory]] result |
|
CParquetFactoryOptions c_options |
|
|
|
c_path = tobytes(metadata_path) |
|
c_filesystem = filesystem.unwrap() |
|
c_format = static_pointer_cast[CParquetFileFormat, CFileFormat]( |
|
format.unwrap()) |
|
options = options or ParquetFactoryOptions() |
|
c_options = options.unwrap() |
|
|
|
with nogil: |
|
result = CParquetDatasetFactory.MakeFromMetaDataPath( |
|
c_path, c_filesystem, c_format, c_options) |
|
self.init(GetResultValue(result)) |
|
|
|
cdef init(self, shared_ptr[CDatasetFactory]& sp): |
|
DatasetFactory.init(self, sp) |
|
self.parquet_factory = <CParquetDatasetFactory*> sp.get() |
|
|