File size: 12,618 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 |
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from numbers import Integral
import warnings
from pyarrow.lib import Table
import pyarrow._orc as _orc
from pyarrow.fs import _resolve_filesystem_and_path
class ORCFile:
"""
Reader interface for a single ORC file
Parameters
----------
source : str or pyarrow.NativeFile
Readable source. For passing Python file objects or byte buffers,
see pyarrow.io.PythonFileInterface or pyarrow.io.BufferReader.
"""
def __init__(self, source):
self.reader = _orc.ORCReader()
self.reader.open(source)
@property
def metadata(self):
"""The file metadata, as an arrow KeyValueMetadata"""
return self.reader.metadata()
@property
def schema(self):
"""The file schema, as an arrow schema"""
return self.reader.schema()
@property
def nrows(self):
"""The number of rows in the file"""
return self.reader.nrows()
@property
def nstripes(self):
"""The number of stripes in the file"""
return self.reader.nstripes()
@property
def file_version(self):
"""Format version of the ORC file, must be 0.11 or 0.12"""
return self.reader.file_version()
@property
def software_version(self):
"""Software instance and version that wrote this file"""
return self.reader.software_version()
@property
def compression(self):
"""Compression codec of the file"""
return self.reader.compression()
@property
def compression_size(self):
"""Number of bytes to buffer for the compression codec in the file"""
return self.reader.compression_size()
@property
def writer(self):
"""Name of the writer that wrote this file.
If the writer is unknown then its Writer ID
(a number) is returned"""
return self.reader.writer()
@property
def writer_version(self):
"""Version of the writer"""
return self.reader.writer_version()
@property
def row_index_stride(self):
"""Number of rows per an entry in the row index or 0
if there is no row index"""
return self.reader.row_index_stride()
@property
def nstripe_statistics(self):
"""Number of stripe statistics"""
return self.reader.nstripe_statistics()
@property
def content_length(self):
"""Length of the data stripes in the file in bytes"""
return self.reader.content_length()
@property
def stripe_statistics_length(self):
"""The number of compressed bytes in the file stripe statistics"""
return self.reader.stripe_statistics_length()
@property
def file_footer_length(self):
"""The number of compressed bytes in the file footer"""
return self.reader.file_footer_length()
@property
def file_postscript_length(self):
"""The number of bytes in the file postscript"""
return self.reader.file_postscript_length()
@property
def file_length(self):
"""The number of bytes in the file"""
return self.reader.file_length()
def _select_names(self, columns=None):
if columns is None:
return None
schema = self.schema
names = []
for col in columns:
if isinstance(col, Integral):
col = int(col)
if 0 <= col < len(schema):
col = schema[col].name
names.append(col)
else:
raise ValueError("Column indices must be in 0 <= ind < %d,"
" got %d" % (len(schema), col))
else:
return columns
return names
def read_stripe(self, n, columns=None):
"""Read a single stripe from the file.
Parameters
----------
n : int
The stripe index
columns : list
If not None, only these columns will be read from the stripe. A
column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'
Returns
-------
pyarrow.RecordBatch
Content of the stripe as a RecordBatch.
"""
columns = self._select_names(columns)
return self.reader.read_stripe(n, columns=columns)
def read(self, columns=None):
"""Read the whole file.
Parameters
----------
columns : list
If not None, only these columns will be read from the file. A
column name may be a prefix of a nested field, e.g. 'a' will select
'a.b', 'a.c', and 'a.d.e'. Output always follows the
ordering of the file and not the `columns` list.
Returns
-------
pyarrow.Table
Content of the file as a Table.
"""
columns = self._select_names(columns)
return self.reader.read(columns=columns)
_orc_writer_args_docs = """file_version : {"0.11", "0.12"}, default "0.12"
Determine which ORC file version to use.
`Hive 0.11 / ORC v0 <https://orc.apache.org/specification/ORCv0/>`_
is the older version
while `Hive 0.12 / ORC v1 <https://orc.apache.org/specification/ORCv1/>`_
is the newer one.
batch_size : int, default 1024
Number of rows the ORC writer writes at a time.
stripe_size : int, default 64 * 1024 * 1024
Size of each ORC stripe in bytes.
compression : string, default 'uncompressed'
The compression codec.
Valid values: {'UNCOMPRESSED', 'SNAPPY', 'ZLIB', 'LZ4', 'ZSTD'}
Note that LZ0 is currently not supported.
compression_block_size : int, default 64 * 1024
Size of each compression block in bytes.
compression_strategy : string, default 'speed'
The compression strategy i.e. speed vs size reduction.
Valid values: {'SPEED', 'COMPRESSION'}
row_index_stride : int, default 10000
The row index stride i.e. the number of rows per
an entry in the row index.
padding_tolerance : double, default 0.0
The padding tolerance.
dictionary_key_size_threshold : double, default 0.0
The dictionary key size threshold. 0 to disable dictionary encoding.
1 to always enable dictionary encoding.
bloom_filter_columns : None, set-like or list-like, default None
Columns that use the bloom filter.
bloom_filter_fpp : double, default 0.05
Upper limit of the false-positive rate of the bloom filter.
"""
class ORCWriter:
__doc__ = """
Writer interface for a single ORC file
Parameters
----------
where : str or pyarrow.io.NativeFile
Writable target. For passing Python file objects or byte buffers,
see pyarrow.io.PythonFileInterface, pyarrow.io.BufferOutputStream
or pyarrow.io.FixedSizeBufferWriter.
{}
""".format(_orc_writer_args_docs)
is_open = False
def __init__(self, where, *,
file_version='0.12',
batch_size=1024,
stripe_size=64 * 1024 * 1024,
compression='uncompressed',
compression_block_size=65536,
compression_strategy='speed',
row_index_stride=10000,
padding_tolerance=0.0,
dictionary_key_size_threshold=0.0,
bloom_filter_columns=None,
bloom_filter_fpp=0.05,
):
self.writer = _orc.ORCWriter()
self.writer.open(
where,
file_version=file_version,
batch_size=batch_size,
stripe_size=stripe_size,
compression=compression,
compression_block_size=compression_block_size,
compression_strategy=compression_strategy,
row_index_stride=row_index_stride,
padding_tolerance=padding_tolerance,
dictionary_key_size_threshold=dictionary_key_size_threshold,
bloom_filter_columns=bloom_filter_columns,
bloom_filter_fpp=bloom_filter_fpp
)
self.is_open = True
def __del__(self):
self.close()
def __enter__(self):
return self
def __exit__(self, *args, **kwargs):
self.close()
def write(self, table):
"""
Write the table into an ORC file. The schema of the table must
be equal to the schema used when opening the ORC file.
Parameters
----------
table : pyarrow.Table
The table to be written into the ORC file
"""
assert self.is_open
self.writer.write(table)
def close(self):
"""
Close the ORC file
"""
if self.is_open:
self.writer.close()
self.is_open = False
def read_table(source, columns=None, filesystem=None):
filesystem, path = _resolve_filesystem_and_path(source, filesystem)
if filesystem is not None:
source = filesystem.open_input_file(path)
if columns is not None and len(columns) == 0:
result = ORCFile(source).read().select(columns)
else:
result = ORCFile(source).read(columns=columns)
return result
read_table.__doc__ = """
Read a Table from an ORC file.
Parameters
----------
source : str, pyarrow.NativeFile, or file-like object
If a string passed, can be a single file name. For file-like objects,
only read a single file. Use pyarrow.BufferReader to read a file
contained in a bytes or buffer-like object.
columns : list
If not None, only these columns will be read from the file. A column
name may be a prefix of a nested field, e.g. 'a' will select 'a.b',
'a.c', and 'a.d.e'. Output always follows the ordering of the file and
not the `columns` list. If empty, no columns will be read. Note
that the table will still have the correct num_rows set despite having
no columns.
filesystem : FileSystem, default None
If nothing passed, will be inferred based on path.
Path will try to be found in the local on-disk filesystem otherwise
it will be parsed as an URI to determine the filesystem.
"""
def write_table(table, where, *,
file_version='0.12',
batch_size=1024,
stripe_size=64 * 1024 * 1024,
compression='uncompressed',
compression_block_size=65536,
compression_strategy='speed',
row_index_stride=10000,
padding_tolerance=0.0,
dictionary_key_size_threshold=0.0,
bloom_filter_columns=None,
bloom_filter_fpp=0.05):
if isinstance(where, Table):
warnings.warn(
"The order of the arguments has changed. Pass as "
"'write_table(table, where)' instead. The old order will raise "
"an error in the future.", FutureWarning, stacklevel=2
)
table, where = where, table
with ORCWriter(
where,
file_version=file_version,
batch_size=batch_size,
stripe_size=stripe_size,
compression=compression,
compression_block_size=compression_block_size,
compression_strategy=compression_strategy,
row_index_stride=row_index_stride,
padding_tolerance=padding_tolerance,
dictionary_key_size_threshold=dictionary_key_size_threshold,
bloom_filter_columns=bloom_filter_columns,
bloom_filter_fpp=bloom_filter_fpp
) as writer:
writer.write(table)
write_table.__doc__ = """
Write a table into an ORC file.
Parameters
----------
table : pyarrow.lib.Table
The table to be written into the ORC file
where : str or pyarrow.io.NativeFile
Writable target. For passing Python file objects or byte buffers,
see pyarrow.io.PythonFileInterface, pyarrow.io.BufferOutputStream
or pyarrow.io.FixedSizeBufferWriter.
{}
""".format(_orc_writer_args_docs)
|