|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
from collections import OrderedDict |
|
import io |
|
import warnings |
|
from shutil import copytree |
|
from decimal import Decimal |
|
|
|
import pytest |
|
|
|
import pyarrow as pa |
|
from pyarrow import fs |
|
from pyarrow.tests import util |
|
from pyarrow.tests.parquet.common import (_check_roundtrip, _roundtrip_table, |
|
_test_dataframe) |
|
|
|
try: |
|
import pyarrow.parquet as pq |
|
from pyarrow.tests.parquet.common import _read_table, _write_table |
|
except ImportError: |
|
pq = None |
|
|
|
|
|
try: |
|
import pandas as pd |
|
import pandas.testing as tm |
|
|
|
from pyarrow.tests.pandas_examples import dataframe_with_lists |
|
from pyarrow.tests.parquet.common import alltypes_sample |
|
except ImportError: |
|
pd = tm = None |
|
|
|
try: |
|
import numpy as np |
|
except ImportError: |
|
np = None |
|
|
|
|
|
|
|
pytestmark = pytest.mark.parquet |
|
|
|
|
|
def test_parquet_invalid_version(tempdir): |
|
table = pa.table({'a': [1, 2, 3]}) |
|
with pytest.raises(ValueError, match="Unsupported Parquet format version"): |
|
_write_table(table, tempdir / 'test_version.parquet', version="2.2") |
|
with pytest.raises(ValueError, match="Unsupported Parquet data page " + |
|
"version"): |
|
_write_table(table, tempdir / 'test_version.parquet', |
|
data_page_version="2.2") |
|
|
|
|
|
def test_set_data_page_size(): |
|
arr = pa.array([1, 2, 3] * 100000) |
|
t = pa.Table.from_arrays([arr], names=['f0']) |
|
|
|
|
|
page_sizes = [2 << 16, 2 << 18] |
|
for target_page_size in page_sizes: |
|
_check_roundtrip(t, data_page_size=target_page_size) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_set_write_batch_size(): |
|
df = _test_dataframe(100) |
|
table = pa.Table.from_pandas(df, preserve_index=False) |
|
|
|
_check_roundtrip( |
|
table, data_page_size=10, write_batch_size=1, version='2.4' |
|
) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_set_dictionary_pagesize_limit(): |
|
df = _test_dataframe(100) |
|
table = pa.Table.from_pandas(df, preserve_index=False) |
|
|
|
_check_roundtrip(table, dictionary_pagesize_limit=1, |
|
data_page_size=10, version='2.4') |
|
|
|
with pytest.raises(TypeError): |
|
_check_roundtrip(table, dictionary_pagesize_limit="a", |
|
data_page_size=10, version='2.4') |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_chunked_table_write(): |
|
|
|
tables = [] |
|
batch = pa.RecordBatch.from_pandas(alltypes_sample(size=10)) |
|
tables.append(pa.Table.from_batches([batch] * 3)) |
|
df, _ = dataframe_with_lists() |
|
batch = pa.RecordBatch.from_pandas(df) |
|
tables.append(pa.Table.from_batches([batch] * 3)) |
|
|
|
for data_page_version in ['1.0', '2.0']: |
|
for use_dictionary in [True, False]: |
|
for table in tables: |
|
_check_roundtrip( |
|
table, version='2.6', |
|
data_page_version=data_page_version, |
|
use_dictionary=use_dictionary) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_memory_map(tempdir): |
|
df = alltypes_sample(size=10) |
|
|
|
table = pa.Table.from_pandas(df) |
|
_check_roundtrip(table, read_table_kwargs={'memory_map': True}, |
|
version='2.6') |
|
|
|
filename = str(tempdir / 'tmp_file') |
|
with open(filename, 'wb') as f: |
|
_write_table(table, f, version='2.6') |
|
table_read = pq.read_pandas(filename, memory_map=True) |
|
assert table_read.equals(table) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_enable_buffered_stream(tempdir): |
|
df = alltypes_sample(size=10) |
|
|
|
table = pa.Table.from_pandas(df) |
|
_check_roundtrip(table, read_table_kwargs={'buffer_size': 1025}, |
|
version='2.6') |
|
|
|
filename = str(tempdir / 'tmp_file') |
|
with open(filename, 'wb') as f: |
|
_write_table(table, f, version='2.6') |
|
table_read = pq.read_pandas(filename, buffer_size=4096) |
|
assert table_read.equals(table) |
|
|
|
|
|
def test_special_chars_filename(tempdir): |
|
table = pa.Table.from_arrays([pa.array([42])], ["ints"]) |
|
filename = "foo # bar" |
|
path = tempdir / filename |
|
assert not path.exists() |
|
_write_table(table, str(path)) |
|
assert path.exists() |
|
table_read = _read_table(str(path)) |
|
assert table_read.equals(table) |
|
|
|
|
|
def test_invalid_source(): |
|
|
|
|
|
with pytest.raises(TypeError, match="None"): |
|
pq.read_table(None) |
|
|
|
with pytest.raises(TypeError, match="None"): |
|
pq.ParquetFile(None) |
|
|
|
|
|
@pytest.mark.slow |
|
def test_file_with_over_int16_max_row_groups(): |
|
|
|
|
|
|
|
|
|
|
|
t = pa.table([list(range(40000))], names=['f0']) |
|
_check_roundtrip(t, row_group_size=1) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_empty_table_roundtrip(): |
|
df = alltypes_sample(size=10) |
|
|
|
|
|
table = pa.Table.from_pandas(df) |
|
table = pa.Table.from_arrays( |
|
[col.chunk(0)[:0] for col in table.itercolumns()], |
|
names=table.schema.names) |
|
|
|
assert table.schema.field('null').type == pa.null() |
|
assert table.schema.field('null_list').type == pa.list_(pa.null()) |
|
_check_roundtrip( |
|
table, version='2.6') |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_empty_table_no_columns(): |
|
df = pd.DataFrame() |
|
empty = pa.Table.from_pandas(df, preserve_index=False) |
|
_check_roundtrip(empty) |
|
|
|
|
|
def test_write_nested_zero_length_array_chunk_failure(): |
|
|
|
cols = OrderedDict( |
|
int32=pa.int32(), |
|
list_string=pa.list_(pa.string()) |
|
) |
|
data = [[], [OrderedDict(int32=1, list_string=('G',)), ]] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
my_arrays = [pa.array(batch, type=pa.struct(cols)).flatten() |
|
for batch in data] |
|
my_batches = [pa.RecordBatch.from_arrays(batch, schema=pa.schema(cols)) |
|
for batch in my_arrays] |
|
tbl = pa.Table.from_batches(my_batches, pa.schema(cols)) |
|
_check_roundtrip(tbl) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_multiple_path_types(tempdir): |
|
|
|
path = tempdir / 'zzz.parquet' |
|
df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)}) |
|
_write_table(df, path) |
|
table_read = _read_table(path) |
|
df_read = table_read.to_pandas() |
|
tm.assert_frame_equal(df, df_read) |
|
|
|
|
|
path = str(tempdir) + 'zzz.parquet' |
|
df = pd.DataFrame({'x': np.arange(10, dtype=np.int64)}) |
|
_write_table(df, path) |
|
table_read = _read_table(path) |
|
df_read = table_read.to_pandas() |
|
tm.assert_frame_equal(df, df_read) |
|
|
|
|
|
def test_fspath(tempdir): |
|
|
|
path = tempdir / "test.parquet" |
|
table = pa.table({"a": [1, 2, 3]}) |
|
_write_table(table, path) |
|
|
|
fs_protocol_obj = util.FSProtocolClass(path) |
|
|
|
result = _read_table(fs_protocol_obj) |
|
assert result.equals(table) |
|
|
|
|
|
with pytest.raises(TypeError): |
|
_read_table(fs_protocol_obj, filesystem=fs.FileSystem()) |
|
|
|
|
|
@pytest.mark.parametrize("filesystem", [ |
|
None, fs.LocalFileSystem() |
|
]) |
|
@pytest.mark.parametrize("name", ("data.parquet", "δΎ.parquet")) |
|
def test_relative_paths(tempdir, filesystem, name): |
|
|
|
table = pa.table({"a": [1, 2, 3]}) |
|
path = tempdir / name |
|
|
|
|
|
pq.write_table(table, str(path)) |
|
with util.change_cwd(tempdir): |
|
result = pq.read_table(name, filesystem=filesystem) |
|
assert result.equals(table) |
|
|
|
path.unlink() |
|
assert not path.exists() |
|
|
|
|
|
with util.change_cwd(tempdir): |
|
pq.write_table(table, name, filesystem=filesystem) |
|
result = pq.read_table(path) |
|
assert result.equals(table) |
|
|
|
|
|
def test_read_non_existing_file(): |
|
|
|
with pytest.raises(FileNotFoundError): |
|
pq.read_table('i-am-not-existing.parquet') |
|
|
|
|
|
def test_file_error_python_exception(): |
|
class BogusFile(io.BytesIO): |
|
def read(self, *args): |
|
raise ZeroDivisionError("zorglub") |
|
|
|
def seek(self, *args): |
|
raise ZeroDivisionError("zorglub") |
|
|
|
|
|
with pytest.raises(ZeroDivisionError, match="zorglub"): |
|
pq.read_table(BogusFile(b"")) |
|
|
|
|
|
def test_parquet_read_from_buffer(tempdir): |
|
|
|
table = pa.table({"a": [1, 2, 3]}) |
|
pq.write_table(table, str(tempdir / "data.parquet")) |
|
|
|
with open(str(tempdir / "data.parquet"), "rb") as f: |
|
result = pq.read_table(f) |
|
assert result.equals(table) |
|
|
|
with open(str(tempdir / "data.parquet"), "rb") as f: |
|
result = pq.read_table(pa.PythonFile(f)) |
|
assert result.equals(table) |
|
|
|
|
|
def test_byte_stream_split(): |
|
|
|
arr_float = pa.array(list(map(float, range(100)))) |
|
arr_int = pa.array(list(map(int, range(100)))) |
|
arr_bool = pa.array([True, False] * 50) |
|
data_float = [arr_float, arr_float] |
|
table = pa.Table.from_arrays(data_float, names=['a', 'b']) |
|
|
|
|
|
_check_roundtrip(table, expected=table, compression="gzip", |
|
use_dictionary=False, use_byte_stream_split=True) |
|
|
|
|
|
|
|
_check_roundtrip(table, expected=table, compression="gzip", |
|
use_dictionary=['a'], |
|
use_byte_stream_split=['b']) |
|
|
|
|
|
_check_roundtrip(table, expected=table, compression="gzip", |
|
use_dictionary=['a', 'b'], |
|
use_byte_stream_split=['a', 'b']) |
|
|
|
|
|
mixed_table = pa.Table.from_arrays([arr_float, arr_float, arr_int, arr_int], |
|
names=['a', 'b', 'c', 'd']) |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=['b', 'd'], |
|
use_byte_stream_split=['a', 'c']) |
|
|
|
|
|
|
|
table = pa.Table.from_arrays([arr_bool], names=['tmp']) |
|
with pytest.raises(IOError, match='BYTE_STREAM_SPLIT only supports'): |
|
_check_roundtrip(table, expected=table, use_byte_stream_split=True, |
|
use_dictionary=False) |
|
|
|
|
|
def test_store_decimal_as_integer(tempdir): |
|
arr_decimal_1_9 = pa.array(list(map(Decimal, range(100))), |
|
type=pa.decimal128(5, 2)) |
|
arr_decimal_10_18 = pa.array(list(map(Decimal, range(100))), |
|
type=pa.decimal128(16, 9)) |
|
arr_decimal_gt18 = pa.array(list(map(Decimal, range(100))), |
|
type=pa.decimal128(22, 2)) |
|
arr_bool = pa.array([True, False] * 50) |
|
data_decimal = [arr_decimal_1_9, arr_decimal_10_18, arr_decimal_gt18] |
|
table = pa.Table.from_arrays(data_decimal, names=['a', 'b', 'c']) |
|
|
|
|
|
_check_roundtrip(table, |
|
expected=table, |
|
compression="gzip", |
|
use_dictionary=False, |
|
store_decimal_as_integer=True) |
|
|
|
|
|
pqtestfile_path = os.path.join(tempdir, 'test.parquet') |
|
pq.write_table(table, pqtestfile_path, |
|
compression="gzip", |
|
use_dictionary=False, |
|
store_decimal_as_integer=True) |
|
|
|
pqtestfile = pq.ParquetFile(pqtestfile_path) |
|
pqcol_decimal_1_9 = pqtestfile.schema.column(0) |
|
pqcol_decimal_10_18 = pqtestfile.schema.column(1) |
|
|
|
assert pqcol_decimal_1_9.physical_type == 'INT32' |
|
assert pqcol_decimal_10_18.physical_type == 'INT64' |
|
|
|
|
|
|
|
_check_roundtrip(table, |
|
expected=table, |
|
compression="gzip", |
|
use_dictionary=False, |
|
store_decimal_as_integer=True, |
|
column_encoding={ |
|
'a': 'DELTA_BINARY_PACKED', |
|
'b': 'DELTA_BINARY_PACKED' |
|
}) |
|
|
|
|
|
mixed_table = pa.Table.from_arrays( |
|
[arr_decimal_1_9, arr_decimal_10_18, arr_decimal_gt18, arr_bool], |
|
names=['a', 'b', 'c', 'd']) |
|
_check_roundtrip(mixed_table, |
|
expected=mixed_table, |
|
use_dictionary=False, |
|
store_decimal_as_integer=True) |
|
|
|
|
|
def test_column_encoding(): |
|
arr_float = pa.array(list(map(float, range(100)))) |
|
arr_int = pa.array(list(map(int, range(100)))) |
|
arr_bin = pa.array([str(x) for x in range(100)], type=pa.binary()) |
|
arr_flba = pa.array( |
|
[str(x).zfill(10) for x in range(100)], type=pa.binary(10)) |
|
arr_bool = pa.array([False, True, False, False] * 25) |
|
mixed_table = pa.Table.from_arrays( |
|
[arr_float, arr_int, arr_bin, arr_flba, arr_bool], |
|
names=['a', 'b', 'c', 'd', 'e']) |
|
|
|
|
|
|
|
_check_roundtrip(mixed_table, expected=mixed_table, use_dictionary=False, |
|
column_encoding={'a': "BYTE_STREAM_SPLIT", |
|
'b': "BYTE_STREAM_SPLIT", |
|
'c': "PLAIN", |
|
'd': "BYTE_STREAM_SPLIT"}) |
|
|
|
|
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding="PLAIN") |
|
|
|
|
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding={'a': "PLAIN", |
|
'b': "DELTA_BINARY_PACKED", |
|
'c': "PLAIN"}) |
|
|
|
|
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding={'a': "PLAIN", |
|
'b': "DELTA_BINARY_PACKED", |
|
'c': "DELTA_LENGTH_BYTE_ARRAY"}) |
|
|
|
|
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding={'a': "PLAIN", |
|
'b': "DELTA_BINARY_PACKED", |
|
'c': "DELTA_BYTE_ARRAY", |
|
'd': "DELTA_BYTE_ARRAY"}) |
|
|
|
|
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding={'e': "RLE"}) |
|
|
|
|
|
|
|
with pytest.raises(IOError, |
|
match="BYTE_STREAM_SPLIT only supports"): |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding={'a': "PLAIN", |
|
'c': "PLAIN", |
|
'e': "BYTE_STREAM_SPLIT"}) |
|
|
|
|
|
|
|
with pytest.raises(OSError, |
|
match="DELTA_BINARY_PACKED encoder only supports"): |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding={'a': "DELTA_BINARY_PACKED", |
|
'b': "PLAIN", |
|
'c': "PLAIN"}) |
|
|
|
|
|
|
|
|
|
with pytest.raises(ValueError, |
|
match="'RLE_DICTIONARY' is already used by default"): |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding="RLE_DICTIONARY") |
|
|
|
|
|
with pytest.raises(ValueError, |
|
match="Unsupported column encoding: 'MADE_UP_ENCODING'"): |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding={'a': "MADE_UP_ENCODING"}) |
|
|
|
|
|
|
|
with pytest.raises(ValueError): |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=['b'], |
|
column_encoding={'b': "PLAIN"}) |
|
|
|
|
|
|
|
with pytest.raises(ValueError): |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
column_encoding={'b': "PLAIN"}) |
|
|
|
|
|
|
|
with pytest.raises(ValueError): |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
use_byte_stream_split=['a'], |
|
column_encoding={'a': "RLE", |
|
'b': "BYTE_STREAM_SPLIT", |
|
'c': "PLAIN"}) |
|
|
|
|
|
|
|
with pytest.raises(ValueError): |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
use_byte_stream_split=True, |
|
column_encoding={'a': "RLE", |
|
'b': "BYTE_STREAM_SPLIT", |
|
'c': "PLAIN"}) |
|
|
|
|
|
|
|
with pytest.raises(TypeError): |
|
_check_roundtrip(mixed_table, expected=mixed_table, |
|
use_dictionary=False, |
|
column_encoding=True) |
|
|
|
|
|
def test_compression_level(): |
|
arr = pa.array(list(map(int, range(1000)))) |
|
data = [arr, arr] |
|
table = pa.Table.from_arrays(data, names=['a', 'b']) |
|
|
|
|
|
_check_roundtrip(table, expected=table, compression="gzip", |
|
compression_level=1) |
|
|
|
|
|
|
|
_check_roundtrip(table, expected=table, compression="gzip", |
|
compression_level=5) |
|
|
|
|
|
_check_roundtrip(table, expected=table, |
|
compression={'a': "gzip", 'b': "snappy"}) |
|
|
|
|
|
_check_roundtrip(table, expected=table, compression="gzip", |
|
compression_level={'a': 2, 'b': 3}) |
|
|
|
|
|
|
|
_check_roundtrip(table, expected=table, compression="lz4", |
|
compression_level=1) |
|
|
|
_check_roundtrip(table, expected=table, compression="lz4", |
|
compression_level=9) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
invalid_combinations = [("snappy", 4), ("gzip", -1337), |
|
("None", 444), ("lzo", 14)] |
|
buf = io.BytesIO() |
|
for (codec, level) in invalid_combinations: |
|
with pytest.raises((ValueError, OSError)): |
|
_write_table(table, buf, compression=codec, |
|
compression_level=level) |
|
|
|
|
|
def test_sanitized_spark_field_names(): |
|
a0 = pa.array([0, 1, 2, 3, 4]) |
|
name = 'prohib; ,\t{}' |
|
table = pa.Table.from_arrays([a0], [name]) |
|
|
|
result = _roundtrip_table(table, write_table_kwargs={'flavor': 'spark'}) |
|
|
|
expected_name = 'prohib______' |
|
assert result.schema[0].name == expected_name |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_multithreaded_read(): |
|
df = alltypes_sample(size=10000) |
|
|
|
table = pa.Table.from_pandas(df) |
|
|
|
buf = io.BytesIO() |
|
_write_table(table, buf, compression='SNAPPY', version='2.6') |
|
|
|
buf.seek(0) |
|
table1 = _read_table(buf, use_threads=True) |
|
|
|
buf.seek(0) |
|
table2 = _read_table(buf, use_threads=False) |
|
|
|
assert table1.equals(table2) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_min_chunksize(): |
|
data = pd.DataFrame([np.arange(4)], columns=['A', 'B', 'C', 'D']) |
|
table = pa.Table.from_pandas(data.reset_index()) |
|
|
|
buf = io.BytesIO() |
|
_write_table(table, buf, chunk_size=-1) |
|
|
|
buf.seek(0) |
|
result = _read_table(buf) |
|
|
|
assert result.equals(table) |
|
|
|
with pytest.raises(ValueError): |
|
_write_table(table, buf, chunk_size=0) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_write_error_deletes_incomplete_file(tempdir): |
|
|
|
df = pd.DataFrame({'a': list('abc'), |
|
'b': list(range(1, 4)), |
|
'c': np.arange(3, 6).astype('u1'), |
|
'd': np.arange(4.0, 7.0, dtype='float64'), |
|
'e': [True, False, True], |
|
'f': pd.Categorical(list('abc')), |
|
'g': pd.date_range('20130101', periods=3), |
|
'h': pd.date_range('20130101', periods=3, |
|
tz='US/Eastern'), |
|
'i': pd.date_range('20130101', periods=3, freq='ns')}) |
|
|
|
pdf = pa.Table.from_pandas(df) |
|
|
|
filename = tempdir / 'tmp_file' |
|
try: |
|
|
|
|
|
_write_table(pdf, filename, version="2.4") |
|
except pa.ArrowException: |
|
pass |
|
|
|
assert not filename.exists() |
|
|
|
|
|
def test_read_non_existent_file(tempdir): |
|
path = 'nonexistent-file.parquet' |
|
try: |
|
pq.read_table(path) |
|
except Exception as e: |
|
assert path in e.args[0] |
|
|
|
|
|
def test_read_table_doesnt_warn(datadir): |
|
with warnings.catch_warnings(): |
|
warnings.simplefilter(action="error") |
|
pq.read_table(datadir / 'v0.7.1.parquet') |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_zlib_compression_bug(): |
|
|
|
table = pa.Table.from_arrays([pa.array(['abc', 'def'])], ['some_col']) |
|
f = io.BytesIO() |
|
pq.write_table(table, f, compression='gzip') |
|
|
|
f.seek(0) |
|
roundtrip = pq.read_table(f) |
|
tm.assert_frame_equal(roundtrip.to_pandas(), table.to_pandas()) |
|
|
|
|
|
def test_parquet_file_too_small(tempdir): |
|
path = str(tempdir / "test.parquet") |
|
|
|
with pytest.raises((pa.ArrowInvalid, OSError), |
|
match='size is 0 bytes'): |
|
with open(path, 'wb') as f: |
|
pass |
|
pq.read_table(path) |
|
|
|
with pytest.raises((pa.ArrowInvalid, OSError), |
|
match='size is 4 bytes'): |
|
with open(path, 'wb') as f: |
|
f.write(b'ffff') |
|
pq.read_table(path) |
|
|
|
|
|
@pytest.mark.pandas |
|
@pytest.mark.fastparquet |
|
@pytest.mark.filterwarnings("ignore:RangeIndex:FutureWarning") |
|
@pytest.mark.filterwarnings("ignore:tostring:DeprecationWarning:fastparquet") |
|
def test_fastparquet_cross_compatibility(tempdir): |
|
fp = pytest.importorskip('fastparquet') |
|
|
|
df = pd.DataFrame( |
|
{ |
|
"a": list("abc"), |
|
"b": list(range(1, 4)), |
|
"c": np.arange(4.0, 7.0, dtype="float64"), |
|
"d": [True, False, True], |
|
"e": pd.date_range("20130101", periods=3), |
|
"f": pd.Categorical(["a", "b", "a"]), |
|
|
|
|
|
} |
|
) |
|
table = pa.table(df) |
|
|
|
|
|
file_arrow = str(tempdir / "cross_compat_arrow.parquet") |
|
pq.write_table(table, file_arrow, compression=None) |
|
|
|
fp_file = fp.ParquetFile(file_arrow) |
|
df_fp = fp_file.to_pandas() |
|
tm.assert_frame_equal(df, df_fp) |
|
|
|
|
|
file_fastparquet = str(tempdir / "cross_compat_fastparquet.parquet") |
|
fp.write(file_fastparquet, df) |
|
|
|
table_fp = pq.read_pandas(file_fastparquet) |
|
|
|
|
|
df['f'] = df['f'].astype(object) |
|
tm.assert_frame_equal(table_fp.to_pandas(), df) |
|
|
|
|
|
@pytest.mark.parametrize('array_factory', [ |
|
lambda: pa.array([0, None] * 10), |
|
lambda: pa.array([0, None] * 10).dictionary_encode(), |
|
lambda: pa.array(["", None] * 10), |
|
lambda: pa.array(["", None] * 10).dictionary_encode(), |
|
]) |
|
@pytest.mark.parametrize('read_dictionary', [False, True]) |
|
def test_buffer_contents( |
|
array_factory, read_dictionary |
|
): |
|
|
|
|
|
|
|
orig_table = pa.Table.from_pydict({"col": array_factory()}) |
|
bio = io.BytesIO() |
|
pq.write_table(orig_table, bio, use_dictionary=True) |
|
bio.seek(0) |
|
read_dictionary = ['col'] if read_dictionary else None |
|
table = pq.read_table(bio, use_threads=False, |
|
read_dictionary=read_dictionary) |
|
|
|
for col in table.columns: |
|
[chunk] = col.chunks |
|
buf = chunk.buffers()[1] |
|
assert buf.to_pybytes() == buf.size * b"\0" |
|
|
|
|
|
def test_parquet_compression_roundtrip(tempdir): |
|
|
|
|
|
|
|
|
|
table = pa.table([pa.array(range(4))], names=["ints"]) |
|
path = tempdir / "arrow-10480.pyarrow.gz" |
|
pq.write_table(table, path, compression="GZIP") |
|
result = pq.read_table(path) |
|
assert result.equals(table) |
|
|
|
|
|
def test_empty_row_groups(tempdir): |
|
|
|
table = pa.Table.from_arrays([pa.array([], type='int32')], ['f0']) |
|
|
|
path = tempdir / 'empty_row_groups.parquet' |
|
|
|
num_groups = 3 |
|
with pq.ParquetWriter(path, table.schema) as writer: |
|
for i in range(num_groups): |
|
writer.write_table(table) |
|
|
|
reader = pq.ParquetFile(path) |
|
assert reader.metadata.num_row_groups == num_groups |
|
|
|
for i in range(num_groups): |
|
assert reader.read_row_group(i).equals(table) |
|
|
|
|
|
def test_reads_over_batch(tempdir): |
|
data = [None] * (1 << 20) |
|
data.append([1]) |
|
|
|
|
|
|
|
table = pa.Table.from_arrays([data], ['column']) |
|
|
|
path = tempdir / 'arrow-11607.parquet' |
|
pq.write_table(table, path) |
|
table2 = pq.read_table(path) |
|
assert table == table2 |
|
|
|
|
|
def test_permutation_of_column_order(tempdir): |
|
|
|
case = tempdir / "dataset_column_order_permutation" |
|
case.mkdir(exist_ok=True) |
|
|
|
data1 = pa.table([[1, 2, 3], [.1, .2, .3]], names=['a', 'b']) |
|
pq.write_table(data1, case / "data1.parquet") |
|
|
|
data2 = pa.table([[.4, .5, .6], [4, 5, 6]], names=['b', 'a']) |
|
pq.write_table(data2, case / "data2.parquet") |
|
|
|
table = pq.read_table(str(case)) |
|
table2 = pa.table([[1, 2, 3, 4, 5, 6], |
|
[0.1, 0.2, 0.3, 0.4, 0.5, 0.6]], |
|
names=['a', 'b']) |
|
|
|
assert table == table2 |
|
|
|
|
|
def test_thrift_size_limits(tempdir): |
|
path = tempdir / 'largethrift.parquet' |
|
|
|
array = pa.array(list(range(10))) |
|
num_cols = 1000 |
|
table = pa.table( |
|
[array] * num_cols, |
|
names=[f'some_long_column_name_{i}' for i in range(num_cols)]) |
|
pq.write_table(table, path) |
|
|
|
with pytest.raises( |
|
OSError, |
|
match="Couldn't deserialize thrift:.*Exceeded size limit"): |
|
pq.read_table(path, thrift_string_size_limit=50 * num_cols) |
|
with pytest.raises( |
|
OSError, |
|
match="Couldn't deserialize thrift:.*Exceeded size limit"): |
|
pq.read_table(path, thrift_container_size_limit=num_cols) |
|
|
|
got = pq.read_table(path, thrift_string_size_limit=100 * num_cols) |
|
assert got == table |
|
got = pq.read_table(path, thrift_container_size_limit=2 * num_cols) |
|
assert got == table |
|
got = pq.read_table(path) |
|
assert got == table |
|
|
|
|
|
def test_page_checksum_verification_write_table(tempdir): |
|
"""Check that checksum verification works for datasets created with |
|
pq.write_table()""" |
|
|
|
|
|
original_path = tempdir / 'correct.parquet' |
|
table_orig = pa.table({'a': [1, 2, 3, 4]}) |
|
pq.write_table(table_orig, original_path, write_page_checksum=True) |
|
|
|
|
|
table_check = pq.read_table(original_path, page_checksum_verification=True) |
|
assert table_orig == table_check |
|
|
|
|
|
|
|
|
|
bin_data = bytearray(original_path.read_bytes()) |
|
|
|
|
|
|
|
assert bin_data[31] != bin_data[36] |
|
bin_data[31], bin_data[36] = bin_data[36], bin_data[31] |
|
|
|
|
|
corrupted_path = tempdir / 'corrupted.parquet' |
|
corrupted_path.write_bytes(bin_data) |
|
|
|
|
|
|
|
table_corrupt = pq.read_table(corrupted_path, |
|
page_checksum_verification=False) |
|
|
|
|
|
assert table_corrupt != table_orig |
|
assert table_corrupt == pa.table({'a': [1, 3, 2, 4]}) |
|
|
|
|
|
|
|
with pytest.raises(OSError, match="CRC checksum verification"): |
|
_ = pq.read_table(corrupted_path, page_checksum_verification=True) |
|
|
|
|
|
|
|
corrupted_pq_file = pq.ParquetFile(corrupted_path, |
|
page_checksum_verification=False) |
|
table_corrupt2 = corrupted_pq_file.read() |
|
assert table_corrupt2 != table_orig |
|
assert table_corrupt2 == pa.table({'a': [1, 3, 2, 4]}) |
|
|
|
|
|
|
|
corrupted_pq_file = pq.ParquetFile(corrupted_path, |
|
page_checksum_verification=True) |
|
|
|
with pytest.raises(OSError, match="CRC checksum verification"): |
|
_ = corrupted_pq_file.read() |
|
|
|
|
|
@pytest.mark.dataset |
|
def test_checksum_write_to_dataset(tempdir): |
|
"""Check that checksum verification works for datasets created with |
|
pq.write_to_dataset""" |
|
|
|
table_orig = pa.table({'a': [1, 2, 3, 4]}) |
|
|
|
|
|
original_dir_path = tempdir / 'correct_dir' |
|
pq.write_to_dataset(table_orig, |
|
original_dir_path, |
|
write_page_checksum=True) |
|
|
|
|
|
original_file_path_list = list(original_dir_path.iterdir()) |
|
assert len(original_file_path_list) == 1 |
|
original_path = original_file_path_list[0] |
|
table_check = pq.read_table(original_path, page_checksum_verification=True) |
|
assert table_orig == table_check |
|
|
|
|
|
|
|
|
|
bin_data = bytearray(original_path.read_bytes()) |
|
|
|
|
|
|
|
assert bin_data[31] != bin_data[36] |
|
bin_data[31], bin_data[36] = bin_data[36], bin_data[31] |
|
|
|
|
|
|
|
corrupted_dir_path = tempdir / 'corrupted_dir' |
|
copytree(original_dir_path, corrupted_dir_path) |
|
|
|
corrupted_file_path = corrupted_dir_path / original_path.name |
|
corrupted_file_path.write_bytes(bin_data) |
|
|
|
|
|
|
|
table_corrupt = pq.read_table(corrupted_file_path, |
|
page_checksum_verification=False) |
|
|
|
|
|
assert table_corrupt != table_orig |
|
assert table_corrupt == pa.table({'a': [1, 3, 2, 4]}) |
|
|
|
|
|
|
|
with pytest.raises(OSError, match="CRC checksum verification"): |
|
_ = pq.read_table(corrupted_file_path, page_checksum_verification=True) |
|
|