|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from numbers import Integral |
|
import warnings |
|
|
|
from pyarrow.lib import Table |
|
import pyarrow._orc as _orc |
|
from pyarrow.fs import _resolve_filesystem_and_path |
|
|
|
|
|
class ORCFile: |
|
""" |
|
Reader interface for a single ORC file |
|
|
|
Parameters |
|
---------- |
|
source : str or pyarrow.NativeFile |
|
Readable source. For passing Python file objects or byte buffers, |
|
see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader. |
|
""" |
|
|
|
def __init__(self, source): |
|
self.reader = _orc.ORCReader() |
|
self.reader.open(source) |
|
|
|
@property |
|
def metadata(self): |
|
"""The file metadata, as an arrow KeyValueMetadata""" |
|
return self.reader.metadata() |
|
|
|
@property |
|
def schema(self): |
|
"""The file schema, as an arrow schema""" |
|
return self.reader.schema() |
|
|
|
@property |
|
def nrows(self): |
|
"""The number of rows in the file""" |
|
return self.reader.nrows() |
|
|
|
@property |
|
def nstripes(self): |
|
"""The number of stripes in the file""" |
|
return self.reader.nstripes() |
|
|
|
@property |
|
def file_version(self): |
|
"""Format version of the ORC file, must be 0.11 or 0.12""" |
|
return self.reader.file_version() |
|
|
|
@property |
|
def software_version(self): |
|
"""Software instance and version that wrote this file""" |
|
return self.reader.software_version() |
|
|
|
@property |
|
def compression(self): |
|
"""Compression codec of the file""" |
|
return self.reader.compression() |
|
|
|
@property |
|
def compression_size(self): |
|
"""Number of bytes to buffer for the compression codec in the file""" |
|
return self.reader.compression_size() |
|
|
|
@property |
|
def writer(self): |
|
"""Name of the writer that wrote this file. |
|
If the writer is unknown then its Writer ID |
|
(a number) is returned""" |
|
return self.reader.writer() |
|
|
|
@property |
|
def writer_version(self): |
|
"""Version of the writer""" |
|
return self.reader.writer_version() |
|
|
|
@property |
|
def row_index_stride(self): |
|
"""Number of rows per an entry in the row index or 0 |
|
if there is no row index""" |
|
return self.reader.row_index_stride() |
|
|
|
@property |
|
def nstripe_statistics(self): |
|
"""Number of stripe statistics""" |
|
return self.reader.nstripe_statistics() |
|
|
|
@property |
|
def content_length(self): |
|
"""Length of the data stripes in the file in bytes""" |
|
return self.reader.content_length() |
|
|
|
@property |
|
def stripe_statistics_length(self): |
|
"""The number of compressed bytes in the file stripe statistics""" |
|
return self.reader.stripe_statistics_length() |
|
|
|
@property |
|
def file_footer_length(self): |
|
"""The number of compressed bytes in the file footer""" |
|
return self.reader.file_footer_length() |
|
|
|
@property |
|
def file_postscript_length(self): |
|
"""The number of bytes in the file postscript""" |
|
return self.reader.file_postscript_length() |
|
|
|
@property |
|
def file_length(self): |
|
"""The number of bytes in the file""" |
|
return self.reader.file_length() |
|
|
|
def _select_names(self, columns=None): |
|
if columns is None: |
|
return None |
|
|
|
schema = self.schema |
|
names = [] |
|
for col in columns: |
|
if isinstance(col, Integral): |
|
col = int(col) |
|
if 0 <= col < len(schema): |
|
col = schema[col].name |
|
names.append(col) |
|
else: |
|
raise ValueError("Column indices must be in 0 <= ind < %d," |
|
" got %d" % (len(schema), col)) |
|
else: |
|
return columns |
|
|
|
return names |
|
|
|
def read_stripe(self, n, columns=None): |
|
"""Read a single stripe from the file. |
|
|
|
Parameters |
|
---------- |
|
n : int |
|
The stripe index |
|
columns : list |
|
If not None, only these columns will be read from the stripe. A |
|
column name may be a prefix of a nested field, e.g. 'a' will select |
|
'a.b', 'a.c', and 'a.d.e' |
|
|
|
Returns |
|
------- |
|
pyarrow.RecordBatch |
|
Content of the stripe as a RecordBatch. |
|
""" |
|
columns = self._select_names(columns) |
|
return self.reader.read_stripe(n, columns=columns) |
|
|
|
def read(self, columns=None): |
|
"""Read the whole file. |
|
|
|
Parameters |
|
---------- |
|
columns : list |
|
If not None, only these columns will be read from the file. A |
|
column name may be a prefix of a nested field, e.g. 'a' will select |
|
'a.b', 'a.c', and 'a.d.e'. Output always follows the |
|
ordering of the file and not the `columns` list. |
|
|
|
Returns |
|
------- |
|
pyarrow.Table |
|
Content of the file as a Table. |
|
""" |
|
columns = self._select_names(columns) |
|
return self.reader.read(columns=columns) |
|
|
|
|
|
_orc_writer_args_docs = """file_version : {"0.11", "0.12"}, default "0.12" |
|
Determine which ORC file version to use. |
|
`Hive 0.11 / ORC v0 <https://orc.apache.org/specification/ORCv0/>`_ |
|
is the older version |
|
while `Hive 0.12 / ORC v1 <https://orc.apache.org/specification/ORCv1/>`_ |
|
is the newer one. |
|
batch_size : int, default 1024 |
|
Number of rows the ORC writer writes at a time. |
|
stripe_size : int, default 64 * 1024 * 1024 |
|
Size of each ORC stripe in bytes. |
|
compression : string, default 'uncompressed' |
|
The compression codec. |
|
Valid values: {'UNCOMPRESSED', 'SNAPPY', 'ZLIB', 'LZ4', 'ZSTD'} |
|
Note that LZ0 is currently not supported. |
|
compression_block_size : int, default 64 * 1024 |
|
Size of each compression block in bytes. |
|
compression_strategy : string, default 'speed' |
|
The compression strategy i.e. speed vs size reduction. |
|
Valid values: {'SPEED', 'COMPRESSION'} |
|
row_index_stride : int, default 10000 |
|
The row index stride i.e. the number of rows per |
|
an entry in the row index. |
|
padding_tolerance : double, default 0.0 |
|
The padding tolerance. |
|
dictionary_key_size_threshold : double, default 0.0 |
|
The dictionary key size threshold. 0 to disable dictionary encoding. |
|
1 to always enable dictionary encoding. |
|
bloom_filter_columns : None, set-like or list-like, default None |
|
Columns that use the bloom filter. |
|
bloom_filter_fpp : double, default 0.05 |
|
Upper limit of the false-positive rate of the bloom filter. |
|
""" |
|
|
|
|
|
class ORCWriter: |
|
__doc__ = """ |
|
Writer interface for a single ORC file |
|
|
|
Parameters |
|
---------- |
|
where : str or pyarrow.io.NativeFile |
|
Writable target. For passing Python file objects or byte buffers, |
|
see pyarrow.io.PythonFileInterface, pyarrow.io.BufferOutputStream |
|
or pyarrow.io.FixedSizeBufferWriter. |
|
{} |
|
""".format(_orc_writer_args_docs) |
|
|
|
is_open = False |
|
|
|
def __init__(self, where, *, |
|
file_version='0.12', |
|
batch_size=1024, |
|
stripe_size=64 * 1024 * 1024, |
|
compression='uncompressed', |
|
compression_block_size=65536, |
|
compression_strategy='speed', |
|
row_index_stride=10000, |
|
padding_tolerance=0.0, |
|
dictionary_key_size_threshold=0.0, |
|
bloom_filter_columns=None, |
|
bloom_filter_fpp=0.05, |
|
): |
|
self.writer = _orc.ORCWriter() |
|
self.writer.open( |
|
where, |
|
file_version=file_version, |
|
batch_size=batch_size, |
|
stripe_size=stripe_size, |
|
compression=compression, |
|
compression_block_size=compression_block_size, |
|
compression_strategy=compression_strategy, |
|
row_index_stride=row_index_stride, |
|
padding_tolerance=padding_tolerance, |
|
dictionary_key_size_threshold=dictionary_key_size_threshold, |
|
bloom_filter_columns=bloom_filter_columns, |
|
bloom_filter_fpp=bloom_filter_fpp |
|
) |
|
self.is_open = True |
|
|
|
def __del__(self): |
|
self.close() |
|
|
|
def __enter__(self): |
|
return self |
|
|
|
def __exit__(self, *args, **kwargs): |
|
self.close() |
|
|
|
def write(self, table): |
|
""" |
|
Write the table into an ORC file. The schema of the table must |
|
be equal to the schema used when opening the ORC file. |
|
|
|
Parameters |
|
---------- |
|
table : pyarrow.Table |
|
The table to be written into the ORC file |
|
""" |
|
assert self.is_open |
|
self.writer.write(table) |
|
|
|
def close(self): |
|
""" |
|
Close the ORC file |
|
""" |
|
if self.is_open: |
|
self.writer.close() |
|
self.is_open = False |
|
|
|
|
|
def read_table(source, columns=None, filesystem=None): |
|
filesystem, path = _resolve_filesystem_and_path(source, filesystem) |
|
if filesystem is not None: |
|
source = filesystem.open_input_file(path) |
|
|
|
if columns is not None and len(columns) == 0: |
|
result = ORCFile(source).read().select(columns) |
|
else: |
|
result = ORCFile(source).read(columns=columns) |
|
|
|
return result |
|
|
|
|
|
read_table.__doc__ = """ |
|
Read a Table from an ORC file. |
|
|
|
Parameters |
|
---------- |
|
source : str, pyarrow.NativeFile, or file-like object |
|
If a string passed, can be a single file name. For file-like objects, |
|
only read a single file. Use pyarrow.BufferReader to read a file |
|
contained in a bytes or buffer-like object. |
|
columns : list |
|
If not None, only these columns will be read from the file. A column |
|
name may be a prefix of a nested field, e.g. 'a' will select 'a.b', |
|
'a.c', and 'a.d.e'. Output always follows the ordering of the file and |
|
not the `columns` list. If empty, no columns will be read. Note |
|
that the table will still have the correct num_rows set despite having |
|
no columns. |
|
filesystem : FileSystem, default None |
|
If nothing passed, will be inferred based on path. |
|
Path will try to be found in the local on-disk filesystem otherwise |
|
it will be parsed as an URI to determine the filesystem. |
|
""" |
|
|
|
|
|
def write_table(table, where, *, |
|
file_version='0.12', |
|
batch_size=1024, |
|
stripe_size=64 * 1024 * 1024, |
|
compression='uncompressed', |
|
compression_block_size=65536, |
|
compression_strategy='speed', |
|
row_index_stride=10000, |
|
padding_tolerance=0.0, |
|
dictionary_key_size_threshold=0.0, |
|
bloom_filter_columns=None, |
|
bloom_filter_fpp=0.05): |
|
if isinstance(where, Table): |
|
warnings.warn( |
|
"The order of the arguments has changed. Pass as " |
|
"'write_table(table, where)' instead. The old order will raise " |
|
"an error in the future.", FutureWarning, stacklevel=2 |
|
) |
|
table, where = where, table |
|
with ORCWriter( |
|
where, |
|
file_version=file_version, |
|
batch_size=batch_size, |
|
stripe_size=stripe_size, |
|
compression=compression, |
|
compression_block_size=compression_block_size, |
|
compression_strategy=compression_strategy, |
|
row_index_stride=row_index_stride, |
|
padding_tolerance=padding_tolerance, |
|
dictionary_key_size_threshold=dictionary_key_size_threshold, |
|
bloom_filter_columns=bloom_filter_columns, |
|
bloom_filter_fpp=bloom_filter_fpp |
|
) as writer: |
|
writer.write(table) |
|
|
|
|
|
write_table.__doc__ = """ |
|
Write a table into an ORC file. |
|
|
|
Parameters |
|
---------- |
|
table : pyarrow.lib.Table |
|
The table to be written into the ORC file |
|
where : str or pyarrow.io.NativeFile |
|
Writable target. For passing Python file objects or byte buffers, |
|
see pyarrow.io.PythonFileInterface, pyarrow.io.BufferOutputStream |
|
or pyarrow.io.FixedSizeBufferWriter. |
|
{} |
|
""".format(_orc_writer_args_docs) |
|
|