# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. # cython: language_level = 3 """Dataset support for Parquet file format.""" from cython.operator cimport dereference as deref import os import warnings import pyarrow as pa from pyarrow.lib cimport * from pyarrow.lib import frombytes, tobytes, is_threading_enabled from pyarrow.includes.libarrow cimport * from pyarrow.includes.libarrow_dataset cimport * from pyarrow.includes.libarrow_dataset_parquet cimport * from pyarrow._fs cimport FileSystem from pyarrow._compute cimport Expression, _bind from pyarrow._dataset cimport ( _make_file_source, DatasetFactory, FileFormat, FileFragment, FileWriteOptions, Fragment, FragmentScanOptions, CacheOptions, Partitioning, PartitioningFactory, WrittenFile ) from pyarrow._parquet cimport ( _create_writer_properties, _create_arrow_writer_properties, FileMetaData, ) try: from pyarrow._dataset_parquet_encryption import ( set_encryption_config, set_decryption_config, set_decryption_properties ) parquet_encryption_enabled = True except ImportError: parquet_encryption_enabled = False cdef Expression _true = Expression._scalar(True) ctypedef CParquetFileWriter* _CParquetFileWriterPtr cdef class ParquetFileFormat(FileFormat): """ FileFormat for Parquet Parameters ---------- read_options : ParquetReadOptions Read options for the file. default_fragment_scan_options : ParquetFragmentScanOptions Scan Options for the file. **kwargs : dict Additional options for read option or scan option """ cdef: CParquetFileFormat* parquet_format def __init__(self, read_options=None, default_fragment_scan_options=None, **kwargs): cdef: shared_ptr[CParquetFileFormat] wrapped CParquetFileFormatReaderOptions* options # Read/scan options read_options_args = {option: kwargs[option] for option in kwargs if option in _PARQUET_READ_OPTIONS} scan_args = {option: kwargs[option] for option in kwargs if option not in _PARQUET_READ_OPTIONS} if read_options and read_options_args: duplicates = ', '.join(sorted(read_options_args)) raise ValueError(f'If `read_options` is given, ' f'cannot specify {duplicates}') if default_fragment_scan_options and scan_args: duplicates = ', '.join(sorted(scan_args)) raise ValueError(f'If `default_fragment_scan_options` is given, ' f'cannot specify {duplicates}') if read_options is None: read_options = ParquetReadOptions(**read_options_args) elif isinstance(read_options, dict): # For backwards compatibility duplicates = [] for option, value in read_options.items(): if option in _PARQUET_READ_OPTIONS: read_options_args[option] = value else: duplicates.append(option) scan_args[option] = value if duplicates: duplicates = ", ".join(duplicates) warnings.warn(f'The scan options {duplicates} should be ' 'specified directly as keyword arguments') read_options = ParquetReadOptions(**read_options_args) elif not isinstance(read_options, ParquetReadOptions): raise TypeError('`read_options` must be either a dictionary or an ' 'instance of ParquetReadOptions') if default_fragment_scan_options is None: default_fragment_scan_options = ParquetFragmentScanOptions(**scan_args) elif isinstance(default_fragment_scan_options, dict): default_fragment_scan_options = ParquetFragmentScanOptions( **default_fragment_scan_options) elif not isinstance(default_fragment_scan_options, ParquetFragmentScanOptions): raise TypeError('`default_fragment_scan_options` must be either a ' 'dictionary or an instance of ' 'ParquetFragmentScanOptions') wrapped = make_shared[CParquetFileFormat]() options = &(wrapped.get().reader_options) if read_options.dictionary_columns is not None: for column in read_options.dictionary_columns: options.dict_columns.insert(tobytes(column)) options.coerce_int96_timestamp_unit = \ read_options._coerce_int96_timestamp_unit self.init( wrapped) self.default_fragment_scan_options = default_fragment_scan_options cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) self.parquet_format = sp.get() cdef WrittenFile _finish_write(self, path, base_dir, CFileWriter* file_writer): cdef: FileMetaData parquet_metadata CParquetFileWriter* parquet_file_writer parquet_metadata = None parquet_file_writer = dynamic_cast[_CParquetFileWriterPtr](file_writer) with nogil: metadata = deref( deref(parquet_file_writer).parquet_writer()).metadata() if metadata: parquet_metadata = FileMetaData.__new__(FileMetaData) parquet_metadata.init(metadata) parquet_metadata.set_file_path(os.path.relpath(path, base_dir)) size = GetResultValue(file_writer.GetBytesWritten()) return WrittenFile(path, parquet_metadata, size) @property def read_options(self): cdef CParquetFileFormatReaderOptions* options options = &self.parquet_format.reader_options parquet_read_options = ParquetReadOptions( dictionary_columns={frombytes(col) for col in options.dict_columns}, ) # Read options getter/setter works with strings so setting # the private property which uses the C Type parquet_read_options._coerce_int96_timestamp_unit = \ options.coerce_int96_timestamp_unit return parquet_read_options def make_write_options(self, **kwargs): """ Parameters ---------- **kwargs : dict Returns ------- pyarrow.dataset.FileWriteOptions """ # Safeguard from calling make_write_options as a static class method if not isinstance(self, ParquetFileFormat): raise TypeError("make_write_options() should be called on " "an instance of ParquetFileFormat") opts = FileFormat.make_write_options(self) ( opts).update(**kwargs) return opts cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): if options.type_name == 'parquet': self.parquet_format.default_fragment_scan_options = options.wrapped else: super()._set_default_fragment_scan_options(options) def equals(self, ParquetFileFormat other): """ Parameters ---------- other : pyarrow.dataset.ParquetFileFormat Returns ------- bool """ return ( self.read_options.equals(other.read_options) and self.default_fragment_scan_options == other.default_fragment_scan_options ) @property def default_extname(self): return "parquet" def __reduce__(self): return ParquetFileFormat, (self.read_options, self.default_fragment_scan_options) def __repr__(self): return f"" def make_fragment(self, file, filesystem=None, Expression partition_expression=None, row_groups=None, *, file_size=None): """ Make a FileFragment from a given file. Parameters ---------- file : file-like object, path-like or str The file or file path to make a fragment from. filesystem : Filesystem, optional If `filesystem` is given, `file` must be a string and specifies the path of the file to read from the filesystem. partition_expression : Expression, optional An expression that is guaranteed true for all rows in the fragment. Allows fragment to be potentially skipped while scanning with a filter. row_groups : Iterable, optional The indices of the row groups to include file_size : int, optional The size of the file in bytes. Can improve performance with high-latency filesystems when file size needs to be known before reading. Returns ------- fragment : Fragment The file fragment """ cdef: vector[int] c_row_groups if partition_expression is None: partition_expression = _true if row_groups is None: return super().make_fragment(file, filesystem, partition_expression, file_size=file_size) c_source = _make_file_source(file, filesystem, file_size) c_row_groups = [ row_group for row_group in set(row_groups)] c_fragment = GetResultValue( self.parquet_format.MakeFragment(move(c_source), partition_expression.unwrap(), nullptr, move(c_row_groups))) return Fragment.wrap(move(c_fragment)) class RowGroupInfo: """ A wrapper class for RowGroup information Parameters ---------- id : integer The group ID. metadata : FileMetaData The rowgroup metadata. schema : Schema Schema of the rows. """ def __init__(self, id, metadata, schema): self.id = id self.metadata = metadata self.schema = schema @property def num_rows(self): return self.metadata.num_rows @property def total_byte_size(self): return self.metadata.total_byte_size @property def statistics(self): def name_stats(i): col = self.metadata.column(i) stats = col.statistics if stats is None or not stats.has_min_max: return None, None name = col.path_in_schema field_index = self.schema.get_field_index(name) if field_index < 0: return None, None typ = self.schema.field(field_index).type return col.path_in_schema, { 'min': pa.scalar(stats.min, type=typ).as_py(), 'max': pa.scalar(stats.max, type=typ).as_py() } return { name: stats for name, stats in map(name_stats, range(self.metadata.num_columns)) if stats is not None } def __repr__(self): return "RowGroupInfo({})".format(self.id) def __eq__(self, other): if isinstance(other, int): return self.id == other if not isinstance(other, RowGroupInfo): return False return self.id == other.id cdef class ParquetFileFragment(FileFragment): """A Fragment representing a parquet file.""" cdef: CParquetFileFragment* parquet_file_fragment cdef void init(self, const shared_ptr[CFragment]& sp): FileFragment.init(self, sp) self.parquet_file_fragment = sp.get() def __reduce__(self): buffer = self.buffer # parquet_file_fragment.row_groups() is empty if the metadata # information of the file is not yet populated if not bool(self.parquet_file_fragment.row_groups()): row_groups = None else: row_groups = [row_group.id for row_group in self.row_groups] return self.format.make_fragment, ( self.path if buffer is None else buffer, self.filesystem, self.partition_expression, row_groups ) def ensure_complete_metadata(self): """ Ensure that all metadata (statistics, physical schema, ...) have been read and cached in this fragment. """ with nogil: check_status(self.parquet_file_fragment.EnsureCompleteMetadata()) @property def row_groups(self): metadata = self.metadata cdef vector[int] row_groups = self.parquet_file_fragment.row_groups() return [RowGroupInfo(i, metadata.row_group(i), self.physical_schema) for i in row_groups] @property def metadata(self): self.ensure_complete_metadata() cdef FileMetaData metadata = FileMetaData.__new__(FileMetaData) metadata.init(self.parquet_file_fragment.metadata()) return metadata @property def num_row_groups(self): """ Return the number of row groups viewed by this fragment (not the number of row groups in the origin file). """ self.ensure_complete_metadata() return self.parquet_file_fragment.row_groups().size() def split_by_row_group(self, Expression filter=None, Schema schema=None): """ Split the fragment into multiple fragments. Yield a Fragment wrapping each row group in this ParquetFileFragment. Row groups will be excluded whose metadata contradicts the optional filter. Parameters ---------- filter : Expression, default None Only include the row groups which satisfy this predicate (using the Parquet RowGroup statistics). schema : Schema, default None Schema to use when filtering row groups. Defaults to the Fragment's physical schema Returns ------- A list of Fragments """ cdef: vector[shared_ptr[CFragment]] c_fragments CExpression c_filter shared_ptr[CFragment] c_fragment schema = schema or self.physical_schema c_filter = _bind(filter, schema) with nogil: c_fragments = move(GetResultValue( self.parquet_file_fragment.SplitByRowGroup(move(c_filter)))) return [Fragment.wrap(c_fragment) for c_fragment in c_fragments] def subset(self, Expression filter=None, Schema schema=None, object row_group_ids=None): """ Create a subset of the fragment (viewing a subset of the row groups). Subset can be specified by either a filter predicate (with optional schema) or by a list of row group IDs. Note that when using a filter, the resulting fragment can be empty (viewing no row groups). Parameters ---------- filter : Expression, default None Only include the row groups which satisfy this predicate (using the Parquet RowGroup statistics). schema : Schema, default None Schema to use when filtering row groups. Defaults to the Fragment's physical schema row_group_ids : list of ints The row group IDs to include in the subset. Can only be specified if `filter` is None. Returns ------- ParquetFileFragment """ cdef: CExpression c_filter vector[int] c_row_group_ids shared_ptr[CFragment] c_fragment if filter is not None and row_group_ids is not None: raise ValueError( "Cannot specify both 'filter' and 'row_group_ids'." ) if filter is not None: schema = schema or self.physical_schema c_filter = _bind(filter, schema) with nogil: c_fragment = move(GetResultValue( self.parquet_file_fragment.SubsetWithFilter( move(c_filter)))) elif row_group_ids is not None: c_row_group_ids = [ row_group for row_group in sorted(set(row_group_ids)) ] with nogil: c_fragment = move(GetResultValue( self.parquet_file_fragment.SubsetWithIds( move(c_row_group_ids)))) else: raise ValueError( "Need to specify one of 'filter' or 'row_group_ids'" ) return Fragment.wrap(c_fragment) cdef class ParquetReadOptions(_Weakrefable): """ Parquet format specific options for reading. Parameters ---------- dictionary_columns : list of string, default None Names of columns which should be dictionary encoded as they are read coerce_int96_timestamp_unit : str, default None Cast timestamps that are stored in INT96 format to a particular resolution (e.g. 'ms'). Setting to None is equivalent to 'ns' and therefore INT96 timestamps will be inferred as timestamps in nanoseconds """ cdef public: set dictionary_columns TimeUnit _coerce_int96_timestamp_unit # Also see _PARQUET_READ_OPTIONS def __init__(self, dictionary_columns=None, coerce_int96_timestamp_unit=None): self.dictionary_columns = set(dictionary_columns or set()) self.coerce_int96_timestamp_unit = coerce_int96_timestamp_unit @property def coerce_int96_timestamp_unit(self): return timeunit_to_string(self._coerce_int96_timestamp_unit) @coerce_int96_timestamp_unit.setter def coerce_int96_timestamp_unit(self, unit): if unit is not None: self._coerce_int96_timestamp_unit = string_to_timeunit(unit) else: self._coerce_int96_timestamp_unit = TimeUnit_NANO def equals(self, ParquetReadOptions other): """ Parameters ---------- other : pyarrow.dataset.ParquetReadOptions Returns ------- bool """ return (self.dictionary_columns == other.dictionary_columns and self.coerce_int96_timestamp_unit == other.coerce_int96_timestamp_unit) def __eq__(self, other): try: return self.equals(other) except TypeError: return False def __repr__(self): return ( f"" ) cdef class ParquetFileWriteOptions(FileWriteOptions): def update(self, **kwargs): """ Parameters ---------- **kwargs : dict """ arrow_fields = { "use_deprecated_int96_timestamps", "coerce_timestamps", "allow_truncated_timestamps", "use_compliant_nested_type", } setters = set() for name, value in kwargs.items(): if name not in self._properties: raise TypeError("unexpected parquet write option: " + name) self._properties[name] = value if name in arrow_fields: setters.add(self._set_arrow_properties) elif name == "encryption_config" and value is not None: setters.add(self._set_encryption_config) else: setters.add(self._set_properties) for setter in setters: setter() def _set_properties(self): cdef CParquetFileWriteOptions* opts = self.parquet_options opts.writer_properties = _create_writer_properties( use_dictionary=self._properties["use_dictionary"], compression=self._properties["compression"], version=self._properties["version"], write_statistics=self._properties["write_statistics"], data_page_size=self._properties["data_page_size"], compression_level=self._properties["compression_level"], use_byte_stream_split=( self._properties["use_byte_stream_split"] ), column_encoding=self._properties["column_encoding"], data_page_version=self._properties["data_page_version"], encryption_properties=self._properties["encryption_properties"], write_batch_size=self._properties["write_batch_size"], dictionary_pagesize_limit=self._properties["dictionary_pagesize_limit"], write_page_index=self._properties["write_page_index"], write_page_checksum=self._properties["write_page_checksum"], sorting_columns=self._properties["sorting_columns"], store_decimal_as_integer=self._properties["store_decimal_as_integer"], ) def _set_arrow_properties(self): cdef CParquetFileWriteOptions* opts = self.parquet_options opts.arrow_writer_properties = _create_arrow_writer_properties( use_deprecated_int96_timestamps=( self._properties["use_deprecated_int96_timestamps"] ), coerce_timestamps=self._properties["coerce_timestamps"], allow_truncated_timestamps=( self._properties["allow_truncated_timestamps"] ), writer_engine_version="V2", use_compliant_nested_type=( self._properties["use_compliant_nested_type"] ) ) def _set_encryption_config(self): if not parquet_encryption_enabled: raise NotImplementedError( "Encryption is not enabled in your installation of pyarrow, but an " "encryption_config was provided." ) set_encryption_config(self, self._properties["encryption_config"]) cdef void init(self, const shared_ptr[CFileWriteOptions]& sp): FileWriteOptions.init(self, sp) self.parquet_options = sp.get() self._properties = dict( use_dictionary=True, compression="snappy", version="2.6", write_statistics=None, data_page_size=None, compression_level=None, use_byte_stream_split=False, column_encoding=None, data_page_version="1.0", use_deprecated_int96_timestamps=False, coerce_timestamps=None, allow_truncated_timestamps=False, use_compliant_nested_type=True, encryption_properties=None, write_batch_size=None, dictionary_pagesize_limit=None, write_page_index=False, encryption_config=None, write_page_checksum=False, sorting_columns=None, store_decimal_as_integer=False, ) self._set_properties() self._set_arrow_properties() def __repr__(self): return "".format( " ".join([f"{key}={value}" for key, value in self._properties.items()]) ) cdef set _PARQUET_READ_OPTIONS = { 'dictionary_columns', 'coerce_int96_timestamp_unit' } cdef class ParquetFragmentScanOptions(FragmentScanOptions): """ Scan-specific options for Parquet fragments. Parameters ---------- use_buffered_stream : bool, default False Read files through buffered input streams rather than loading entire row groups at once. This may be enabled to reduce memory overhead. Disabled by default. buffer_size : int, default 8192 Size of buffered stream, if enabled. Default is 8KB. pre_buffer : bool, default True If enabled, pre-buffer the raw Parquet data instead of issuing one read per column chunk. This can improve performance on high-latency filesystems (e.g. S3, GCS) by coalescing and issuing file reads in parallel using a background I/O thread pool. Set to False if you want to prioritize minimal memory usage over maximum speed. cache_options : pyarrow.CacheOptions, default None Cache options used when pre_buffer is enabled. The default values should be good for most use cases. You may want to adjust these for example if you have exceptionally high latency to the file system. thrift_string_size_limit : int, default None If not None, override the maximum total string size allocated when decoding Thrift structures. The default limit should be sufficient for most Parquet files. thrift_container_size_limit : int, default None If not None, override the maximum total size of containers allocated when decoding Thrift structures. The default limit should be sufficient for most Parquet files. decryption_config : pyarrow.dataset.ParquetDecryptionConfig, default None If not None, use the provided ParquetDecryptionConfig to decrypt the Parquet file. decryption_properties : pyarrow.parquet.FileDecryptionProperties, default None If not None, use the provided FileDecryptionProperties to decrypt encrypted Parquet file. page_checksum_verification : bool, default False If True, verify the page checksum for each page read from the file. """ # Avoid mistakingly creating attributes __slots__ = () def __init__(self, *, bint use_buffered_stream=False, buffer_size=8192, bint pre_buffer=True, cache_options=None, thrift_string_size_limit=None, thrift_container_size_limit=None, decryption_config=None, decryption_properties=None, bint page_checksum_verification=False): self.init(shared_ptr[CFragmentScanOptions]( new CParquetFragmentScanOptions())) self.use_buffered_stream = use_buffered_stream self.buffer_size = buffer_size if pre_buffer and not is_threading_enabled(): pre_buffer = False self.pre_buffer = pre_buffer if cache_options is not None: self.cache_options = cache_options if thrift_string_size_limit is not None: self.thrift_string_size_limit = thrift_string_size_limit if thrift_container_size_limit is not None: self.thrift_container_size_limit = thrift_container_size_limit if decryption_config is not None: self.parquet_decryption_config = decryption_config if decryption_properties is not None: self.decryption_properties = decryption_properties self.page_checksum_verification = page_checksum_verification cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) self.parquet_options = sp.get() cdef CReaderProperties* reader_properties(self): return self.parquet_options.reader_properties.get() cdef ArrowReaderProperties* arrow_reader_properties(self): return self.parquet_options.arrow_reader_properties.get() @property def use_buffered_stream(self): return self.reader_properties().is_buffered_stream_enabled() @use_buffered_stream.setter def use_buffered_stream(self, bint use_buffered_stream): if use_buffered_stream: self.reader_properties().enable_buffered_stream() else: self.reader_properties().disable_buffered_stream() @property def buffer_size(self): return self.reader_properties().buffer_size() @buffer_size.setter def buffer_size(self, buffer_size): if buffer_size <= 0: raise ValueError("Buffer size must be larger than zero") self.reader_properties().set_buffer_size(buffer_size) @property def pre_buffer(self): return self.arrow_reader_properties().pre_buffer() @pre_buffer.setter def pre_buffer(self, bint pre_buffer): if pre_buffer and not is_threading_enabled(): return self.arrow_reader_properties().set_pre_buffer(pre_buffer) @property def cache_options(self): return CacheOptions.wrap(self.arrow_reader_properties().cache_options()) @cache_options.setter def cache_options(self, CacheOptions options): self.arrow_reader_properties().set_cache_options(options.unwrap()) @property def thrift_string_size_limit(self): return self.reader_properties().thrift_string_size_limit() @thrift_string_size_limit.setter def thrift_string_size_limit(self, size): if size <= 0: raise ValueError("size must be larger than zero") self.reader_properties().set_thrift_string_size_limit(size) @property def thrift_container_size_limit(self): return self.reader_properties().thrift_container_size_limit() @thrift_container_size_limit.setter def thrift_container_size_limit(self, size): if size <= 0: raise ValueError("size must be larger than zero") self.reader_properties().set_thrift_container_size_limit(size) @property def decryption_properties(self): if not parquet_encryption_enabled: raise NotImplementedError( "Unable to access encryption features. " "Encryption is not enabled in your installation of pyarrow." ) return self._decryption_properties @decryption_properties.setter def decryption_properties(self, config): if not parquet_encryption_enabled: raise NotImplementedError( "Encryption is not enabled in your installation of pyarrow, but " "decryption_properties were provided." ) set_decryption_properties(self, config) self._decryption_properties = config @property def parquet_decryption_config(self): if not parquet_encryption_enabled: raise NotImplementedError( "Unable to access encryption features. " "Encryption is not enabled in your installation of pyarrow." ) return self._parquet_decryption_config @parquet_decryption_config.setter def parquet_decryption_config(self, config): if not parquet_encryption_enabled: raise NotImplementedError( "Encryption is not enabled in your installation of pyarrow, but a " "decryption_config was provided." ) set_decryption_config(self, config) self._parquet_decryption_config = config @property def page_checksum_verification(self): return self.reader_properties().page_checksum_verification() @page_checksum_verification.setter def page_checksum_verification(self, bint page_checksum_verification): self.reader_properties().set_page_checksum_verification(page_checksum_verification) def equals(self, ParquetFragmentScanOptions other): """ Parameters ---------- other : pyarrow.dataset.ParquetFragmentScanOptions Returns ------- bool """ attrs = ( self.use_buffered_stream, self.buffer_size, self.pre_buffer, self.cache_options, self.thrift_string_size_limit, self.thrift_container_size_limit, self.page_checksum_verification) other_attrs = ( other.use_buffered_stream, other.buffer_size, other.pre_buffer, other.cache_options, other.thrift_string_size_limit, other.thrift_container_size_limit, other.page_checksum_verification) return attrs == other_attrs @staticmethod def _reconstruct(kwargs): # __reduce__ doesn't allow passing named arguments directly to the # reconstructor, hence this wrapper. return ParquetFragmentScanOptions(**kwargs) def __reduce__(self): kwargs = dict( use_buffered_stream=self.use_buffered_stream, buffer_size=self.buffer_size, pre_buffer=self.pre_buffer, cache_options=self.cache_options, thrift_string_size_limit=self.thrift_string_size_limit, thrift_container_size_limit=self.thrift_container_size_limit, page_checksum_verification=self.page_checksum_verification ) return ParquetFragmentScanOptions._reconstruct, (kwargs,) cdef class ParquetFactoryOptions(_Weakrefable): """ Influences the discovery of parquet dataset. Parameters ---------- partition_base_dir : str, optional For the purposes of applying the partitioning, paths will be stripped of the partition_base_dir. Files not matching the partition_base_dir prefix will be skipped for partitioning discovery. The ignored files will still be part of the Dataset, but will not have partition information. partitioning : Partitioning, PartitioningFactory, optional The partitioning scheme applied to fragments, see ``Partitioning``. validate_column_chunk_paths : bool, default False Assert that all ColumnChunk paths are consistent. The parquet spec allows for ColumnChunk data to be stored in multiple files, but ParquetDatasetFactory supports only a single file with all ColumnChunk data. If this flag is set construction of a ParquetDatasetFactory will raise an error if ColumnChunk data is not resident in a single file. """ cdef: CParquetFactoryOptions options __slots__ = () # avoid mistakingly creating attributes def __init__(self, partition_base_dir=None, partitioning=None, validate_column_chunk_paths=False): if isinstance(partitioning, PartitioningFactory): self.partitioning_factory = partitioning elif isinstance(partitioning, Partitioning): self.partitioning = partitioning if partition_base_dir is not None: self.partition_base_dir = partition_base_dir self.options.validate_column_chunk_paths = validate_column_chunk_paths cdef inline CParquetFactoryOptions unwrap(self): return self.options @property def partitioning(self): """Partitioning to apply to discovered files. NOTE: setting this property will overwrite partitioning_factory. """ c_partitioning = self.options.partitioning.partitioning() if c_partitioning.get() == nullptr: return None return Partitioning.wrap(c_partitioning) @partitioning.setter def partitioning(self, Partitioning value): self.options.partitioning = ( value).unwrap() @property def partitioning_factory(self): """PartitioningFactory to apply to discovered files and discover a Partitioning. NOTE: setting this property will overwrite partitioning. """ c_factory = self.options.partitioning.factory() if c_factory.get() == nullptr: return None return PartitioningFactory.wrap(c_factory, None, None) @partitioning_factory.setter def partitioning_factory(self, PartitioningFactory value): self.options.partitioning = ( value).unwrap() @property def partition_base_dir(self): """ Base directory to strip paths before applying the partitioning. """ return frombytes(self.options.partition_base_dir) @partition_base_dir.setter def partition_base_dir(self, value): self.options.partition_base_dir = tobytes(value) @property def validate_column_chunk_paths(self): """ Base directory to strip paths before applying the partitioning. """ return self.options.validate_column_chunk_paths @validate_column_chunk_paths.setter def validate_column_chunk_paths(self, value): self.options.validate_column_chunk_paths = value cdef class ParquetDatasetFactory(DatasetFactory): """ Create a ParquetDatasetFactory from a Parquet `_metadata` file. Parameters ---------- metadata_path : str Path to the `_metadata` parquet metadata-only file generated with `pyarrow.parquet.write_metadata`. filesystem : pyarrow.fs.FileSystem Filesystem to read the metadata_path from, and subsequent parquet files. format : ParquetFileFormat Parquet format options. options : ParquetFactoryOptions, optional Various flags influencing the discovery of filesystem paths. """ cdef: CParquetDatasetFactory* parquet_factory def __init__(self, metadata_path, FileSystem filesystem not None, FileFormat format not None, ParquetFactoryOptions options=None): cdef: c_string c_path shared_ptr[CFileSystem] c_filesystem shared_ptr[CParquetFileFormat] c_format CResult[shared_ptr[CDatasetFactory]] result CParquetFactoryOptions c_options c_path = tobytes(metadata_path) c_filesystem = filesystem.unwrap() c_format = static_pointer_cast[CParquetFileFormat, CFileFormat]( format.unwrap()) options = options or ParquetFactoryOptions() c_options = options.unwrap() with nogil: result = CParquetDatasetFactory.MakeFromMetaDataPath( c_path, c_filesystem, c_format, c_options) self.init(GetResultValue(result)) cdef init(self, shared_ptr[CDatasetFactory]& sp): DatasetFactory.init(self, sp) self.parquet_factory = sp.get()