|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import datetime |
|
import decimal |
|
from collections import OrderedDict |
|
import io |
|
|
|
try: |
|
import numpy as np |
|
except ImportError: |
|
np = None |
|
import pytest |
|
|
|
import pyarrow as pa |
|
from pyarrow.tests.parquet.common import _check_roundtrip, make_sample_file |
|
from pyarrow.fs import LocalFileSystem |
|
from pyarrow.tests import util |
|
|
|
try: |
|
import pyarrow.parquet as pq |
|
from pyarrow.tests.parquet.common import _write_table |
|
except ImportError: |
|
pq = None |
|
|
|
|
|
try: |
|
import pandas as pd |
|
import pandas.testing as tm |
|
|
|
from pyarrow.tests.parquet.common import alltypes_sample |
|
except ImportError: |
|
pd = tm = None |
|
|
|
|
|
|
|
|
|
pytestmark = pytest.mark.parquet |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_parquet_metadata_api(): |
|
df = alltypes_sample(size=10000) |
|
df = df.reindex(columns=sorted(df.columns)) |
|
df.index = np.random.randint(0, 1000000, size=len(df)) |
|
|
|
fileh = make_sample_file(df) |
|
ncols = len(df.columns) |
|
|
|
|
|
meta = fileh.metadata |
|
repr(meta) |
|
assert meta.num_rows == len(df) |
|
assert meta.num_columns == ncols + 1 |
|
assert meta.num_row_groups == 1 |
|
assert meta.format_version == '2.6' |
|
assert 'parquet-cpp' in meta.created_by |
|
assert isinstance(meta.serialized_size, int) |
|
assert isinstance(meta.metadata, dict) |
|
|
|
|
|
schema = fileh.schema |
|
assert meta.schema is schema |
|
assert len(schema) == ncols + 1 |
|
repr(schema) |
|
|
|
col = schema[0] |
|
repr(col) |
|
assert col.name == df.columns[0] |
|
assert col.max_definition_level == 1 |
|
assert col.max_repetition_level == 0 |
|
assert col.max_repetition_level == 0 |
|
assert col.physical_type == 'BOOLEAN' |
|
assert col.converted_type == 'NONE' |
|
|
|
col_float16 = schema[5] |
|
assert col_float16.logical_type.type == 'FLOAT16' |
|
|
|
with pytest.raises(IndexError): |
|
schema[ncols + 1] |
|
|
|
with pytest.raises(IndexError): |
|
schema[-1] |
|
|
|
|
|
for rg in range(meta.num_row_groups): |
|
rg_meta = meta.row_group(rg) |
|
assert isinstance(rg_meta, pq.RowGroupMetaData) |
|
repr(rg_meta) |
|
|
|
for col in range(rg_meta.num_columns): |
|
col_meta = rg_meta.column(col) |
|
assert isinstance(col_meta, pq.ColumnChunkMetaData) |
|
repr(col_meta) |
|
|
|
with pytest.raises(IndexError): |
|
meta.row_group(-1) |
|
|
|
with pytest.raises(IndexError): |
|
meta.row_group(meta.num_row_groups + 1) |
|
|
|
rg_meta = meta.row_group(0) |
|
assert rg_meta.num_rows == len(df) |
|
assert rg_meta.num_columns == ncols + 1 |
|
assert rg_meta.total_byte_size > 0 |
|
|
|
with pytest.raises(IndexError): |
|
col_meta = rg_meta.column(-1) |
|
|
|
with pytest.raises(IndexError): |
|
col_meta = rg_meta.column(ncols + 2) |
|
|
|
col_meta = rg_meta.column(0) |
|
assert col_meta.file_offset == 0 |
|
assert col_meta.file_path == '' |
|
assert col_meta.physical_type == 'BOOLEAN' |
|
assert col_meta.num_values == 10000 |
|
assert col_meta.path_in_schema == 'bool' |
|
assert col_meta.is_stats_set is True |
|
assert isinstance(col_meta.statistics, pq.Statistics) |
|
assert col_meta.compression == 'SNAPPY' |
|
assert set(col_meta.encodings) == {'PLAIN', 'RLE'} |
|
assert col_meta.has_dictionary_page is False |
|
assert col_meta.dictionary_page_offset is None |
|
assert col_meta.data_page_offset > 0 |
|
assert col_meta.total_compressed_size > 0 |
|
assert col_meta.total_uncompressed_size > 0 |
|
with pytest.raises(NotImplementedError): |
|
col_meta.has_index_page |
|
with pytest.raises(NotImplementedError): |
|
col_meta.index_page_offset |
|
|
|
|
|
def test_parquet_metadata_lifetime(tempdir): |
|
|
|
table = pa.table({'a': [1, 2, 3]}) |
|
pq.write_table(table, tempdir / 'test_metadata_segfault.parquet') |
|
parquet_file = pq.ParquetFile(tempdir / 'test_metadata_segfault.parquet') |
|
parquet_file.metadata.row_group(0).column(0).statistics |
|
|
|
|
|
@pytest.mark.pandas |
|
@pytest.mark.parametrize( |
|
( |
|
'data', |
|
'type', |
|
'physical_type', |
|
'min_value', |
|
'max_value', |
|
'null_count', |
|
'num_values', |
|
'distinct_count' |
|
), |
|
[ |
|
([1, 2, 2, None, 4], pa.uint8(), 'INT32', 1, 4, 1, 4, None), |
|
([1, 2, 2, None, 4], pa.uint16(), 'INT32', 1, 4, 1, 4, None), |
|
([1, 2, 2, None, 4], pa.uint32(), 'INT32', 1, 4, 1, 4, None), |
|
([1, 2, 2, None, 4], pa.uint64(), 'INT64', 1, 4, 1, 4, None), |
|
([-1, 2, 2, None, 4], pa.int8(), 'INT32', -1, 4, 1, 4, None), |
|
([-1, 2, 2, None, 4], pa.int16(), 'INT32', -1, 4, 1, 4, None), |
|
([-1, 2, 2, None, 4], pa.int32(), 'INT32', -1, 4, 1, 4, None), |
|
([-1, 2, 2, None, 4], pa.int64(), 'INT64', -1, 4, 1, 4, None), |
|
( |
|
[-1.1, 2.2, 2.3, None, 4.4], pa.float32(), |
|
'FLOAT', -1.1, 4.4, 1, 4, None |
|
), |
|
( |
|
[-1.1, 2.2, 2.3, None, 4.4], pa.float64(), |
|
'DOUBLE', -1.1, 4.4, 1, 4, None |
|
), |
|
( |
|
['', 'b', chr(1000), None, 'aaa'], pa.binary(), |
|
'BYTE_ARRAY', b'', chr(1000).encode('utf-8'), 1, 4, None |
|
), |
|
( |
|
[True, False, False, True, True], pa.bool_(), |
|
'BOOLEAN', False, True, 0, 5, None |
|
), |
|
( |
|
[b'\x00', b'b', b'12', None, b'aaa'], pa.binary(), |
|
'BYTE_ARRAY', b'\x00', b'b', 1, 4, None |
|
), |
|
] |
|
) |
|
def test_parquet_column_statistics_api(data, type, physical_type, min_value, |
|
max_value, null_count, num_values, |
|
distinct_count): |
|
df = pd.DataFrame({'data': data}) |
|
schema = pa.schema([pa.field('data', type)]) |
|
table = pa.Table.from_pandas(df, schema=schema, safe=False) |
|
fileh = make_sample_file(table) |
|
|
|
meta = fileh.metadata |
|
|
|
rg_meta = meta.row_group(0) |
|
col_meta = rg_meta.column(0) |
|
|
|
stat = col_meta.statistics |
|
assert stat.has_min_max |
|
assert _close(type, stat.min, min_value) |
|
assert _close(type, stat.max, max_value) |
|
assert stat.null_count == null_count |
|
assert stat.num_values == num_values |
|
|
|
|
|
assert stat.distinct_count == distinct_count |
|
assert stat.physical_type == physical_type |
|
|
|
|
|
def _close(type, left, right): |
|
if type == pa.float32(): |
|
return abs(left - right) < 1E-7 |
|
elif type == pa.float64(): |
|
return abs(left - right) < 1E-13 |
|
else: |
|
return left == right |
|
|
|
|
|
|
|
@pytest.mark.pandas |
|
def test_parquet_raise_on_unset_statistics(): |
|
df = pd.DataFrame({"t": pd.Series([pd.NaT], dtype="datetime64[ns]")}) |
|
meta = make_sample_file(pa.Table.from_pandas(df)).metadata |
|
|
|
assert not meta.row_group(0).column(0).statistics.has_min_max |
|
assert meta.row_group(0).column(0).statistics.max is None |
|
|
|
|
|
def test_statistics_convert_logical_types(tempdir): |
|
|
|
|
|
|
|
cases = [(10, 11164359321221007157, pa.uint64()), |
|
(10, 4294967295, pa.uint32()), |
|
("Γ€hnlich", "ΓΆffentlich", pa.utf8()), |
|
(datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000), |
|
pa.time32('ms')), |
|
(datetime.time(10, 30, 0, 1000), datetime.time(15, 30, 0, 1000), |
|
pa.time64('us')), |
|
(datetime.datetime(2019, 6, 24, 0, 0, 0, 1000), |
|
datetime.datetime(2019, 6, 25, 0, 0, 0, 1000), |
|
pa.timestamp('ms')), |
|
(datetime.datetime(2019, 6, 24, 0, 0, 0, 1000), |
|
datetime.datetime(2019, 6, 25, 0, 0, 0, 1000), |
|
pa.timestamp('us')), |
|
(datetime.date(2019, 6, 24), |
|
datetime.date(2019, 6, 25), |
|
pa.date32()), |
|
(decimal.Decimal("20.123"), |
|
decimal.Decimal("20.124"), |
|
pa.decimal128(12, 5))] |
|
|
|
for i, (min_val, max_val, typ) in enumerate(cases): |
|
t = pa.Table.from_arrays([pa.array([min_val, max_val], type=typ)], |
|
['col']) |
|
path = str(tempdir / ('example{}.parquet'.format(i))) |
|
pq.write_table(t, path, version='2.6') |
|
pf = pq.ParquetFile(path) |
|
stats = pf.metadata.row_group(0).column(0).statistics |
|
assert stats.min == min_val |
|
assert stats.max == max_val |
|
|
|
|
|
def test_parquet_write_disable_statistics(tempdir): |
|
table = pa.Table.from_pydict( |
|
OrderedDict([ |
|
('a', pa.array([1, 2, 3])), |
|
('b', pa.array(['a', 'b', 'c'])) |
|
]) |
|
) |
|
_write_table(table, tempdir / 'data.parquet') |
|
meta = pq.read_metadata(tempdir / 'data.parquet') |
|
for col in [0, 1]: |
|
cc = meta.row_group(0).column(col) |
|
assert cc.is_stats_set is True |
|
assert cc.statistics is not None |
|
|
|
_write_table(table, tempdir / 'data2.parquet', write_statistics=False) |
|
meta = pq.read_metadata(tempdir / 'data2.parquet') |
|
for col in [0, 1]: |
|
cc = meta.row_group(0).column(col) |
|
assert cc.is_stats_set is False |
|
assert cc.statistics is None |
|
|
|
_write_table(table, tempdir / 'data3.parquet', write_statistics=['a']) |
|
meta = pq.read_metadata(tempdir / 'data3.parquet') |
|
cc_a = meta.row_group(0).column(0) |
|
cc_b = meta.row_group(0).column(1) |
|
assert cc_a.is_stats_set is True |
|
assert cc_b.is_stats_set is False |
|
assert cc_a.statistics is not None |
|
assert cc_b.statistics is None |
|
|
|
|
|
def test_parquet_sorting_column(): |
|
sorting_col = pq.SortingColumn(10) |
|
assert sorting_col.to_dict() == { |
|
'column_index': 10, |
|
'descending': False, |
|
'nulls_first': False |
|
} |
|
|
|
sorting_col = pq.SortingColumn(0, descending=True, nulls_first=True) |
|
assert sorting_col.to_dict() == { |
|
'column_index': 0, |
|
'descending': True, |
|
'nulls_first': True |
|
} |
|
|
|
schema = pa.schema([('a', pa.int64()), ('b', pa.int64())]) |
|
sorting_cols = ( |
|
pq.SortingColumn(1, descending=True), |
|
pq.SortingColumn(0, descending=False), |
|
) |
|
sort_order, null_placement = pq.SortingColumn.to_ordering(schema, sorting_cols) |
|
assert sort_order == (('b', "descending"), ('a', "ascending")) |
|
assert null_placement == "at_end" |
|
|
|
sorting_cols_roundtripped = pq.SortingColumn.from_ordering( |
|
schema, sort_order, null_placement) |
|
assert sorting_cols_roundtripped == sorting_cols |
|
|
|
sorting_cols = pq.SortingColumn.from_ordering( |
|
schema, ('a', ('b', "descending")), null_placement="at_start") |
|
expected = ( |
|
pq.SortingColumn(0, descending=False, nulls_first=True), |
|
pq.SortingColumn(1, descending=True, nulls_first=True), |
|
) |
|
assert sorting_cols == expected |
|
|
|
|
|
empty_sorting_cols = pq.SortingColumn.from_ordering(schema, ()) |
|
assert empty_sorting_cols == () |
|
|
|
assert pq.SortingColumn.to_ordering(schema, ()) == ((), "at_end") |
|
|
|
with pytest.raises(ValueError): |
|
pq.SortingColumn.from_ordering(schema, (("a", "not a valid sort order"))) |
|
|
|
with pytest.raises(ValueError, match="inconsistent null placement"): |
|
sorting_cols = ( |
|
pq.SortingColumn(1, nulls_first=True), |
|
pq.SortingColumn(0, nulls_first=False), |
|
) |
|
pq.SortingColumn.to_ordering(schema, sorting_cols) |
|
|
|
|
|
def test_parquet_sorting_column_nested(): |
|
schema = pa.schema({ |
|
'a': pa.struct([('x', pa.int64()), ('y', pa.int64())]), |
|
'b': pa.int64() |
|
}) |
|
|
|
sorting_columns = [ |
|
pq.SortingColumn(0, descending=True), |
|
pq.SortingColumn(2, descending=False) |
|
] |
|
|
|
sort_order, null_placement = pq.SortingColumn.to_ordering(schema, sorting_columns) |
|
assert null_placement == "at_end" |
|
assert len(sort_order) == 2 |
|
assert sort_order[0] == ("a.x", "descending") |
|
assert sort_order[1] == ("b", "ascending") |
|
|
|
|
|
def test_parquet_file_sorting_columns(): |
|
table = pa.table({'a': [1, 2, 3], 'b': ['a', 'b', 'c']}) |
|
|
|
sorting_columns = ( |
|
pq.SortingColumn(column_index=0, descending=True, nulls_first=True), |
|
pq.SortingColumn(column_index=1, descending=False), |
|
) |
|
writer = pa.BufferOutputStream() |
|
_write_table(table, writer, sorting_columns=sorting_columns) |
|
reader = pa.BufferReader(writer.getvalue()) |
|
|
|
|
|
metadata = pq.read_metadata(reader) |
|
assert sorting_columns == metadata.row_group(0).sorting_columns |
|
|
|
metadata_dict = metadata.to_dict() |
|
assert metadata_dict.get('num_columns') == 2 |
|
assert metadata_dict.get('num_rows') == 3 |
|
assert metadata_dict.get('num_row_groups') == 1 |
|
|
|
|
|
def test_field_id_metadata(): |
|
|
|
field_id = b'PARQUET:field_id' |
|
inner = pa.field('inner', pa.int32(), metadata={field_id: b'100'}) |
|
middle = pa.field('middle', pa.struct( |
|
[inner]), metadata={field_id: b'101'}) |
|
fields = [ |
|
pa.field('basic', pa.int32(), metadata={ |
|
b'other': b'abc', field_id: b'1'}), |
|
pa.field( |
|
'list', |
|
pa.list_(pa.field('list-inner', pa.int32(), |
|
metadata={field_id: b'10'})), |
|
metadata={field_id: b'11'}), |
|
pa.field('struct', pa.struct([middle]), metadata={field_id: b'102'}), |
|
pa.field('no-metadata', pa.int32()), |
|
pa.field('non-integral-field-id', pa.int32(), |
|
metadata={field_id: b'xyz'}), |
|
pa.field('negative-field-id', pa.int32(), |
|
metadata={field_id: b'-1000'}) |
|
] |
|
arrs = [[] for _ in fields] |
|
table = pa.table(arrs, schema=pa.schema(fields)) |
|
|
|
bio = pa.BufferOutputStream() |
|
pq.write_table(table, bio) |
|
contents = bio.getvalue() |
|
|
|
pf = pq.ParquetFile(pa.BufferReader(contents)) |
|
schema = pf.schema_arrow |
|
|
|
assert schema[0].metadata[field_id] == b'1' |
|
assert schema[0].metadata[b'other'] == b'abc' |
|
|
|
list_field = schema[1] |
|
assert list_field.metadata[field_id] == b'11' |
|
|
|
list_item_field = list_field.type.value_field |
|
assert list_item_field.metadata[field_id] == b'10' |
|
|
|
struct_field = schema[2] |
|
assert struct_field.metadata[field_id] == b'102' |
|
|
|
struct_middle_field = struct_field.type[0] |
|
assert struct_middle_field.metadata[field_id] == b'101' |
|
|
|
struct_inner_field = struct_middle_field.type[0] |
|
assert struct_inner_field.metadata[field_id] == b'100' |
|
|
|
assert schema[3].metadata is None |
|
|
|
|
|
assert schema[4].metadata[field_id] == b'xyz' |
|
assert schema[5].metadata[field_id] == b'-1000' |
|
|
|
|
|
def test_parquet_file_page_index(): |
|
for write_page_index in (False, True): |
|
table = pa.table({'a': [1, 2, 3]}) |
|
|
|
writer = pa.BufferOutputStream() |
|
_write_table(table, writer, write_page_index=write_page_index) |
|
reader = pa.BufferReader(writer.getvalue()) |
|
|
|
|
|
metadata = pq.read_metadata(reader) |
|
cc = metadata.row_group(0).column(0) |
|
assert cc.has_offset_index is write_page_index |
|
assert cc.has_column_index is write_page_index |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_multi_dataset_metadata(tempdir): |
|
filenames = ["ARROW-1983-dataset.0", "ARROW-1983-dataset.1"] |
|
metapath = str(tempdir / "_metadata") |
|
|
|
|
|
df = pd.DataFrame({ |
|
'one': [1, 2, 3], |
|
'two': [-1, -2, -3], |
|
'three': [[1, 2], [2, 3], [3, 4]], |
|
}) |
|
table = pa.Table.from_pandas(df) |
|
|
|
|
|
_meta = None |
|
for filename in filenames: |
|
meta = [] |
|
pq.write_table(table, str(tempdir / filename), |
|
metadata_collector=meta) |
|
meta[0].set_file_path(filename) |
|
if _meta is None: |
|
_meta = meta[0] |
|
else: |
|
_meta.append_row_groups(meta[0]) |
|
|
|
|
|
with open(metapath, "wb") as f: |
|
_meta.write_metadata_file(f) |
|
|
|
|
|
meta = pq.read_metadata(metapath) |
|
md = meta.to_dict() |
|
_md = _meta.to_dict() |
|
for key in _md: |
|
if key != 'serialized_size': |
|
assert _md[key] == md[key] |
|
assert _md['num_columns'] == 3 |
|
assert _md['num_rows'] == 6 |
|
assert _md['num_row_groups'] == 2 |
|
assert _md['serialized_size'] == 0 |
|
assert md['serialized_size'] > 0 |
|
|
|
|
|
def test_metadata_hashing(tempdir): |
|
path1 = str(tempdir / "metadata1") |
|
schema1 = pa.schema([("a", "int64"), ("b", "float64")]) |
|
pq.write_metadata(schema1, path1) |
|
parquet_meta1 = pq.read_metadata(path1) |
|
|
|
|
|
path2 = str(tempdir / "metadata2") |
|
schema2 = pa.schema([("a", "int64"), ("b", "float64")]) |
|
pq.write_metadata(schema2, path2) |
|
parquet_meta2 = pq.read_metadata(path2) |
|
|
|
|
|
path3 = str(tempdir / "metadata3") |
|
schema3 = pa.schema([("a", "int64"), ("b", "float32")]) |
|
pq.write_metadata(schema3, path3) |
|
parquet_meta3 = pq.read_metadata(path3) |
|
|
|
|
|
assert hash(parquet_meta1) == hash(parquet_meta1) |
|
assert hash(parquet_meta1) == hash(parquet_meta2) |
|
|
|
|
|
assert hash(parquet_meta1) != hash(parquet_meta3) |
|
|
|
|
|
@pytest.mark.filterwarnings("ignore:Parquet format:FutureWarning") |
|
def test_write_metadata(tempdir): |
|
path = str(tempdir / "metadata") |
|
schema = pa.schema([("a", "int64"), ("b", "float64")]) |
|
|
|
|
|
pq.write_metadata(schema, path) |
|
parquet_meta = pq.read_metadata(path) |
|
schema_as_arrow = parquet_meta.schema.to_arrow_schema() |
|
assert schema_as_arrow.equals(schema) |
|
|
|
|
|
if schema_as_arrow.metadata: |
|
assert b'ARROW:schema' not in schema_as_arrow.metadata |
|
|
|
|
|
for version in ["1.0", "2.4", "2.6"]: |
|
pq.write_metadata(schema, path, version=version) |
|
parquet_meta = pq.read_metadata(path) |
|
|
|
|
|
expected_version = "1.0" if version == "1.0" else "2.6" |
|
assert parquet_meta.format_version == expected_version |
|
|
|
|
|
table = pa.table({'a': [1, 2], 'b': [.1, .2]}, schema=schema) |
|
pq.write_table(table, tempdir / "data.parquet") |
|
parquet_meta = pq.read_metadata(str(tempdir / "data.parquet")) |
|
pq.write_metadata( |
|
schema, path, metadata_collector=[parquet_meta, parquet_meta] |
|
) |
|
parquet_meta_mult = pq.read_metadata(path) |
|
assert parquet_meta_mult.num_row_groups == 2 |
|
|
|
|
|
msg = ("AppendRowGroups requires equal schemas.\n" |
|
"The two columns with index 0 differ.") |
|
with pytest.raises(RuntimeError, match=msg): |
|
pq.write_metadata( |
|
pa.schema([("a", "int32"), ("b", "null")]), |
|
path, metadata_collector=[parquet_meta, parquet_meta] |
|
) |
|
|
|
|
|
def test_table_large_metadata(): |
|
|
|
my_schema = pa.schema([pa.field('f0', 'double')], |
|
metadata={'large': 'x' * 10000000}) |
|
|
|
table = pa.table([range(10)], schema=my_schema) |
|
_check_roundtrip(table) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_compare_schemas(): |
|
df = alltypes_sample(size=10000) |
|
|
|
fileh = make_sample_file(df) |
|
fileh2 = make_sample_file(df) |
|
fileh3 = make_sample_file(df[df.columns[::2]]) |
|
|
|
|
|
assert isinstance(fileh.schema, pq.ParquetSchema) |
|
assert fileh.schema.equals(fileh.schema) |
|
assert fileh.schema == fileh.schema |
|
assert fileh.schema.equals(fileh2.schema) |
|
assert fileh.schema == fileh2.schema |
|
assert fileh.schema != 'arbitrary object' |
|
assert not fileh.schema.equals(fileh3.schema) |
|
assert fileh.schema != fileh3.schema |
|
|
|
|
|
assert isinstance(fileh.schema[0], pq.ColumnSchema) |
|
assert fileh.schema[0].equals(fileh.schema[0]) |
|
assert fileh.schema[0] == fileh.schema[0] |
|
assert not fileh.schema[0].equals(fileh.schema[1]) |
|
assert fileh.schema[0] != fileh.schema[1] |
|
assert fileh.schema[0] != 'arbitrary object' |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_read_schema(tempdir): |
|
N = 100 |
|
df = pd.DataFrame({ |
|
'index': np.arange(N), |
|
'values': np.random.randn(N) |
|
}, columns=['index', 'values']) |
|
|
|
data_path = tempdir / 'test.parquet' |
|
|
|
table = pa.Table.from_pandas(df) |
|
_write_table(table, data_path) |
|
|
|
read1 = pq.read_schema(data_path) |
|
read2 = pq.read_schema(data_path, memory_map=True) |
|
assert table.schema.equals(read1) |
|
assert table.schema.equals(read2) |
|
|
|
assert table.schema.metadata[b'pandas'] == read1.metadata[b'pandas'] |
|
|
|
|
|
def test_parquet_metadata_empty_to_dict(tempdir): |
|
|
|
table = pa.table({"a": pa.array([], type="int64")}) |
|
pq.write_table(table, tempdir / "data.parquet") |
|
metadata = pq.read_metadata(tempdir / "data.parquet") |
|
|
|
metadata_dict = metadata.to_dict() |
|
assert len(metadata_dict["row_groups"]) == 1 |
|
assert len(metadata_dict["row_groups"][0]["columns"]) == 1 |
|
assert metadata_dict["row_groups"][0]["columns"][0]["statistics"] is None |
|
|
|
|
|
@pytest.mark.slow |
|
@pytest.mark.large_memory |
|
def test_metadata_exceeds_message_size(): |
|
|
|
|
|
NCOLS = 1000 |
|
NREPEATS = 4000 |
|
|
|
table = pa.table({str(i): np.random.randn(10) for i in range(NCOLS)}) |
|
|
|
with pa.BufferOutputStream() as out: |
|
pq.write_table(table, out) |
|
buf = out.getvalue() |
|
|
|
original_metadata = pq.read_metadata(pa.BufferReader(buf)) |
|
metadata = pq.read_metadata(pa.BufferReader(buf)) |
|
for i in range(NREPEATS): |
|
metadata.append_row_groups(original_metadata) |
|
|
|
with pa.BufferOutputStream() as out: |
|
metadata.write_metadata_file(out) |
|
buf = out.getvalue() |
|
|
|
metadata = pq.read_metadata(pa.BufferReader(buf)) |
|
|
|
|
|
def test_metadata_schema_filesystem(tempdir): |
|
table = pa.table({"a": [1, 2, 3]}) |
|
|
|
|
|
fname = "data.parquet" |
|
file_path = str(tempdir / fname) |
|
file_uri = 'file:///' + file_path |
|
|
|
pq.write_table(table, file_path) |
|
|
|
|
|
metadata = pq.read_metadata(tempdir / fname) |
|
schema = table.schema |
|
|
|
assert pq.read_metadata(file_uri).equals(metadata) |
|
assert pq.read_metadata( |
|
file_path, filesystem=LocalFileSystem()).equals(metadata) |
|
assert pq.read_metadata( |
|
fname, filesystem=f'file:///{tempdir}').equals(metadata) |
|
|
|
assert pq.read_schema(file_uri).equals(schema) |
|
assert pq.read_schema( |
|
file_path, filesystem=LocalFileSystem()).equals(schema) |
|
assert pq.read_schema( |
|
fname, filesystem=f'file:///{tempdir}').equals(schema) |
|
|
|
with util.change_cwd(tempdir): |
|
|
|
assert pq.read_metadata( |
|
fname, filesystem=LocalFileSystem()).equals(metadata) |
|
|
|
assert pq.read_schema( |
|
fname, filesystem=LocalFileSystem()).equals(schema) |
|
|
|
|
|
def test_metadata_equals(): |
|
table = pa.table({"a": [1, 2, 3]}) |
|
with pa.BufferOutputStream() as out: |
|
pq.write_table(table, out) |
|
buf = out.getvalue() |
|
|
|
original_metadata = pq.read_metadata(pa.BufferReader(buf)) |
|
match = "Argument 'other' has incorrect type" |
|
with pytest.raises(TypeError, match=match): |
|
original_metadata.equals(None) |
|
|
|
|
|
@pytest.mark.parametrize("t1,t2,expected_error", ( |
|
({'col1': range(10)}, {'col1': range(10)}, None), |
|
({'col1': range(10)}, {'col2': range(10)}, |
|
"The two columns with index 0 differ."), |
|
({'col1': range(10), 'col2': range(10)}, {'col3': range(10)}, |
|
"This schema has 2 columns, other has 1") |
|
)) |
|
def test_metadata_append_row_groups_diff(t1, t2, expected_error): |
|
table1 = pa.table(t1) |
|
table2 = pa.table(t2) |
|
|
|
buf1 = io.BytesIO() |
|
buf2 = io.BytesIO() |
|
pq.write_table(table1, buf1) |
|
pq.write_table(table2, buf2) |
|
buf1.seek(0) |
|
buf2.seek(0) |
|
|
|
meta1 = pq.ParquetFile(buf1).metadata |
|
meta2 = pq.ParquetFile(buf2).metadata |
|
|
|
if expected_error: |
|
|
|
prefix = "AppendRowGroups requires equal schemas.\n" |
|
with pytest.raises(RuntimeError, match=prefix + expected_error): |
|
meta1.append_row_groups(meta2) |
|
else: |
|
meta1.append_row_groups(meta2) |
|
|
|
|
|
@pytest.mark.s3 |
|
def test_write_metadata_fs_file_combinations(tempdir, s3_example_s3fs): |
|
s3_fs, s3_path = s3_example_s3fs |
|
|
|
meta1 = tempdir / "meta1" |
|
meta2 = tempdir / "meta2" |
|
meta3 = tempdir / "meta3" |
|
meta4 = tempdir / "meta4" |
|
meta5 = f"{s3_path}/meta5" |
|
|
|
table = pa.table({"col": range(5)}) |
|
|
|
|
|
pq.write_metadata(table.schema, meta1, []) |
|
|
|
|
|
pq.write_metadata(table.schema, meta2, [], filesystem=LocalFileSystem()) |
|
|
|
|
|
pq.write_metadata(table.schema, meta3.as_uri(), []) |
|
|
|
|
|
with meta4.open('wb+') as meta4_stream: |
|
pq.write_metadata(table.schema, meta4_stream, []) |
|
|
|
|
|
pq.write_metadata(table.schema, meta5, [], filesystem=s3_fs) |
|
|
|
assert meta1.read_bytes() == meta2.read_bytes() \ |
|
== meta3.read_bytes() == meta4.read_bytes() \ |
|
== s3_fs.open(meta5).read() |
|
|
|
|
|
def test_column_chunk_key_value_metadata(parquet_test_datadir): |
|
metadata = pq.read_metadata(parquet_test_datadir / |
|
'column_chunk_key_value_metadata.parquet') |
|
key_value_metadata1 = metadata.row_group(0).column(0).metadata |
|
assert key_value_metadata1 == {b'foo': b'bar', b'thisiskeywithoutvalue': b''} |
|
key_value_metadata2 = metadata.row_group(0).column(1).metadata |
|
assert key_value_metadata2 is None |
|
|
|
|
|
def test_internal_class_instantiation(): |
|
def msg(c): |
|
return f"Do not call {c}'s constructor directly" |
|
|
|
with pytest.raises(TypeError, match=msg("Statistics")): |
|
pq.Statistics() |
|
|
|
with pytest.raises(TypeError, match=msg("ParquetLogicalType")): |
|
pq.ParquetLogicalType() |
|
|
|
with pytest.raises(TypeError, match=msg("ColumnChunkMetaData")): |
|
pq.ColumnChunkMetaData() |
|
|
|
with pytest.raises(TypeError, match=msg("RowGroupMetaData")): |
|
pq.RowGroupMetaData() |
|
|
|
with pytest.raises(TypeError, match=msg("FileMetaData")): |
|
pq.FileMetaData() |
|
|