|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import datetime |
|
import io |
|
import warnings |
|
|
|
try: |
|
import numpy as np |
|
except ImportError: |
|
np = None |
|
import pytest |
|
|
|
import pyarrow as pa |
|
from pyarrow.tests.parquet.common import _check_roundtrip |
|
|
|
try: |
|
import pyarrow.parquet as pq |
|
from pyarrow.tests.parquet.common import _read_table, _write_table |
|
except ImportError: |
|
pq = None |
|
|
|
|
|
try: |
|
import pandas as pd |
|
import pandas.testing as tm |
|
|
|
from pyarrow.tests.parquet.common import _roundtrip_pandas_dataframe |
|
except ImportError: |
|
pd = tm = None |
|
|
|
|
|
|
|
|
|
pytestmark = pytest.mark.parquet |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_pandas_parquet_datetime_tz(): |
|
|
|
|
|
|
|
s = pd.Series([datetime.datetime(2017, 9, 6)], dtype='datetime64[us]') |
|
s = s.dt.tz_localize('utc') |
|
s.index = s |
|
|
|
|
|
df = pd.DataFrame({'tz_aware': s, |
|
'tz_eastern': s.dt.tz_convert('US/Eastern')}, |
|
index=s) |
|
|
|
f = io.BytesIO() |
|
|
|
arrow_table = pa.Table.from_pandas(df) |
|
|
|
_write_table(arrow_table, f) |
|
f.seek(0) |
|
|
|
table_read = pq.read_pandas(f) |
|
|
|
df_read = table_read.to_pandas() |
|
tm.assert_frame_equal(df, df_read) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_datetime_timezone_tzinfo(): |
|
value = datetime.datetime(2018, 1, 1, 1, 23, 45, |
|
tzinfo=datetime.timezone.utc) |
|
df = pd.DataFrame({'foo': [value]}) |
|
|
|
_roundtrip_pandas_dataframe(df, write_kwargs={}) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_coerce_timestamps(tempdir): |
|
from collections import OrderedDict |
|
|
|
|
|
arrays = OrderedDict() |
|
fields = [pa.field('datetime64', |
|
pa.list_(pa.timestamp('ms')))] |
|
arrays['datetime64'] = [ |
|
np.array(['2007-07-13T01:23:34.123456789', |
|
None, |
|
'2010-08-13T05:46:57.437699912'], |
|
dtype='datetime64[ms]'), |
|
None, |
|
None, |
|
np.array(['2007-07-13T02', |
|
None, |
|
'2010-08-13T05:46:57.437699912'], |
|
dtype='datetime64[ms]'), |
|
] |
|
|
|
df = pd.DataFrame(arrays) |
|
schema = pa.schema(fields) |
|
|
|
filename = tempdir / 'pandas_roundtrip.parquet' |
|
arrow_table = pa.Table.from_pandas(df, schema=schema) |
|
|
|
_write_table(arrow_table, filename, version='2.6', coerce_timestamps='us') |
|
table_read = _read_table(filename) |
|
df_read = table_read.to_pandas() |
|
|
|
df_expected = df.copy() |
|
for i, x in enumerate(df_expected['datetime64']): |
|
if isinstance(x, np.ndarray): |
|
df_expected.loc[i, 'datetime64'] = x.astype('M8[us]') |
|
|
|
tm.assert_frame_equal(df_expected, df_read) |
|
|
|
with pytest.raises(ValueError): |
|
_write_table(arrow_table, filename, version='2.6', |
|
coerce_timestamps='unknown') |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_coerce_timestamps_truncated(tempdir): |
|
""" |
|
ARROW-2555: Test that we can truncate timestamps when coercing if |
|
explicitly allowed. |
|
""" |
|
dt_us = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1, |
|
second=1, microsecond=1) |
|
dt_ms = datetime.datetime(year=2017, month=1, day=1, hour=1, minute=1, |
|
second=1) |
|
|
|
fields_us = [pa.field('datetime64', pa.timestamp('us'))] |
|
arrays_us = {'datetime64': [dt_us, dt_ms]} |
|
|
|
df_us = pd.DataFrame(arrays_us) |
|
schema_us = pa.schema(fields_us) |
|
|
|
filename = tempdir / 'pandas_truncated.parquet' |
|
table_us = pa.Table.from_pandas(df_us, schema=schema_us) |
|
|
|
_write_table(table_us, filename, version='2.6', coerce_timestamps='ms', |
|
allow_truncated_timestamps=True) |
|
table_ms = _read_table(filename) |
|
df_ms = table_ms.to_pandas() |
|
|
|
arrays_expected = {'datetime64': [dt_ms, dt_ms]} |
|
df_expected = pd.DataFrame(arrays_expected, dtype='datetime64[ms]') |
|
tm.assert_frame_equal(df_expected, df_ms) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_date_time_types(tempdir): |
|
t1 = pa.date32() |
|
data1 = np.array([17259, 17260, 17261], dtype='int32') |
|
a1 = pa.array(data1, type=t1) |
|
|
|
t2 = pa.date64() |
|
data2 = data1.astype('int64') * 86400000 |
|
a2 = pa.array(data2, type=t2) |
|
|
|
t3 = pa.timestamp('us') |
|
start = pd.Timestamp('2001-01-01').value / 1000 |
|
data3 = np.array([start, start + 1, start + 2], dtype='int64') |
|
a3 = pa.array(data3, type=t3) |
|
|
|
t4 = pa.time32('ms') |
|
data4 = np.arange(3, dtype='i4') |
|
a4 = pa.array(data4, type=t4) |
|
|
|
t5 = pa.time64('us') |
|
a5 = pa.array(data4.astype('int64'), type=t5) |
|
|
|
t6 = pa.time32('s') |
|
a6 = pa.array(data4, type=t6) |
|
|
|
ex_t6 = pa.time32('ms') |
|
ex_a6 = pa.array(data4 * 1000, type=ex_t6) |
|
|
|
t7 = pa.timestamp('ns') |
|
start = pd.Timestamp('2001-01-01').value |
|
data7 = np.array([start, start + 1000, start + 2000], |
|
dtype='int64') |
|
a7 = pa.array(data7, type=t7) |
|
|
|
table = pa.Table.from_arrays([a1, a2, a3, a4, a5, a6, a7], |
|
['date32', 'date64', 'timestamp[us]', |
|
'time32[s]', 'time64[us]', |
|
'time32_from64[s]', |
|
'timestamp[ns]']) |
|
|
|
|
|
|
|
expected = pa.Table.from_arrays([a1, a1, a3, a4, a5, ex_a6, a7], |
|
['date32', 'date64', 'timestamp[us]', |
|
'time32[s]', 'time64[us]', |
|
'time32_from64[s]', |
|
'timestamp[ns]']) |
|
|
|
_check_roundtrip(table, expected=expected, version='2.6') |
|
|
|
t0 = pa.timestamp('ms') |
|
data0 = np.arange(4, dtype='int64') |
|
a0 = pa.array(data0, type=t0) |
|
|
|
t1 = pa.timestamp('us') |
|
data1 = np.arange(4, dtype='int64') |
|
a1 = pa.array(data1, type=t1) |
|
|
|
t2 = pa.timestamp('ns') |
|
data2 = np.arange(4, dtype='int64') |
|
a2 = pa.array(data2, type=t2) |
|
|
|
table = pa.Table.from_arrays([a0, a1, a2], |
|
['ts[ms]', 'ts[us]', 'ts[ns]']) |
|
expected = pa.Table.from_arrays([a0, a1, a2], |
|
['ts[ms]', 'ts[us]', 'ts[ns]']) |
|
|
|
|
|
filename = tempdir / 'int64_timestamps.parquet' |
|
_write_table(table, filename, version='2.6') |
|
parquet_schema = pq.ParquetFile(filename).schema |
|
for i in range(3): |
|
assert parquet_schema.column(i).physical_type == 'INT64' |
|
read_table = _read_table(filename) |
|
assert read_table.equals(expected) |
|
|
|
t0_ns = pa.timestamp('ns') |
|
data0_ns = np.array(data0 * 1000000, dtype='int64') |
|
a0_ns = pa.array(data0_ns, type=t0_ns) |
|
|
|
t1_ns = pa.timestamp('ns') |
|
data1_ns = np.array(data1 * 1000, dtype='int64') |
|
a1_ns = pa.array(data1_ns, type=t1_ns) |
|
|
|
expected = pa.Table.from_arrays([a0_ns, a1_ns, a2], |
|
['ts[ms]', 'ts[us]', 'ts[ns]']) |
|
|
|
|
|
filename = tempdir / 'explicit_int96_timestamps.parquet' |
|
_write_table(table, filename, version='2.6', |
|
use_deprecated_int96_timestamps=True) |
|
parquet_schema = pq.ParquetFile(filename).schema |
|
for i in range(3): |
|
assert parquet_schema.column(i).physical_type == 'INT96' |
|
read_table = _read_table(filename) |
|
assert read_table.equals(expected) |
|
|
|
|
|
filename = tempdir / 'spark_int96_timestamps.parquet' |
|
_write_table(table, filename, version='2.6', |
|
flavor='spark') |
|
parquet_schema = pq.ParquetFile(filename).schema |
|
for i in range(3): |
|
assert parquet_schema.column(i).physical_type == 'INT96' |
|
read_table = _read_table(filename) |
|
assert read_table.equals(expected) |
|
|
|
|
|
@pytest.mark.pandas |
|
@pytest.mark.parametrize('unit', ['s', 'ms', 'us', 'ns']) |
|
def test_coerce_int96_timestamp_unit(unit): |
|
i_s = pd.Timestamp('2010-01-01').value / 1000000000 |
|
|
|
d_s = np.arange(i_s, i_s + 10, 1, dtype='int64') |
|
d_ms = d_s * 1000 |
|
d_us = d_ms * 1000 |
|
d_ns = d_us * 1000 |
|
|
|
a_s = pa.array(d_s, type=pa.timestamp('s')) |
|
a_ms = pa.array(d_ms, type=pa.timestamp('ms')) |
|
a_us = pa.array(d_us, type=pa.timestamp('us')) |
|
a_ns = pa.array(d_ns, type=pa.timestamp('ns')) |
|
|
|
arrays = {"s": a_s, "ms": a_ms, "us": a_us, "ns": a_ns} |
|
names = ['ts_s', 'ts_ms', 'ts_us', 'ts_ns'] |
|
table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names) |
|
|
|
|
|
|
|
expected = pa.Table.from_arrays([arrays.get(unit)]*4, names) |
|
read_table_kwargs = {"coerce_int96_timestamp_unit": unit} |
|
_check_roundtrip(table, expected, |
|
read_table_kwargs=read_table_kwargs, |
|
use_deprecated_int96_timestamps=True) |
|
_check_roundtrip(table, expected, version='2.6', |
|
read_table_kwargs=read_table_kwargs, |
|
use_deprecated_int96_timestamps=True) |
|
|
|
|
|
@pytest.mark.pandas |
|
@pytest.mark.parametrize('pq_reader_method', ['ParquetFile', 'read_table']) |
|
def test_coerce_int96_timestamp_overflow(pq_reader_method, tempdir): |
|
|
|
def get_table(pq_reader_method, filename, **kwargs): |
|
if pq_reader_method == "ParquetFile": |
|
return pq.ParquetFile(filename, **kwargs).read() |
|
elif pq_reader_method == "read_table": |
|
return pq.read_table(filename, **kwargs) |
|
|
|
|
|
oob_dts = [ |
|
datetime.datetime(1000, 1, 1), |
|
datetime.datetime(2000, 1, 1), |
|
datetime.datetime(3000, 1, 1) |
|
] |
|
df = pd.DataFrame({"a": oob_dts}) |
|
table = pa.table(df) |
|
|
|
filename = tempdir / "test_round_trip_overflow.parquet" |
|
pq.write_table(table, filename, use_deprecated_int96_timestamps=True, |
|
version="1.0") |
|
|
|
|
|
|
|
tab_error = get_table(pq_reader_method, filename) |
|
with warnings.catch_warnings(): |
|
warnings.filterwarnings("ignore", |
|
"Discarding nonzero nanoseconds in conversion", |
|
UserWarning) |
|
assert tab_error["a"].to_pylist() != oob_dts |
|
|
|
|
|
tab_correct = get_table( |
|
pq_reader_method, filename, coerce_int96_timestamp_unit="s" |
|
) |
|
df_correct = tab_correct.to_pandas(timestamp_as_object=True) |
|
df["a"] = df["a"].astype(object) |
|
tm.assert_frame_equal(df, df_correct) |
|
|
|
|
|
@pytest.mark.parametrize('unit', ['ms', 'us', 'ns']) |
|
def test_timestamp_restore_timezone(unit): |
|
|
|
ty = pa.timestamp(unit, tz='America/New_York') |
|
arr = pa.array([1, 2, 3], type=ty) |
|
t = pa.table([arr], names=['f0']) |
|
_check_roundtrip(t) |
|
|
|
|
|
def test_timestamp_restore_timezone_nanosecond(): |
|
|
|
|
|
ty = pa.timestamp('ns', tz='America/New_York') |
|
arr = pa.array([1000, 2000, 3000], type=ty) |
|
table = pa.table([arr], names=['f0']) |
|
ty_us = pa.timestamp('us', tz='America/New_York') |
|
expected = pa.table([arr.cast(ty_us)], names=['f0']) |
|
_check_roundtrip(table, expected=expected, version='2.4') |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_list_of_datetime_time_roundtrip(): |
|
|
|
times = pd.to_datetime(['09:00', '09:30', '10:00', '10:30', '11:00', |
|
'11:30', '12:00'], format="%H:%M") |
|
df = pd.DataFrame({'time': [times.time]}) |
|
_roundtrip_pandas_dataframe(df, write_kwargs={}) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_parquet_version_timestamp_differences(): |
|
i_s = pd.Timestamp('2010-01-01').value / 1000000000 |
|
|
|
d_s = np.arange(i_s, i_s + 10, 1, dtype='int64') |
|
d_ms = d_s * 1000 |
|
d_us = d_ms * 1000 |
|
d_ns = d_us * 1000 |
|
|
|
a_s = pa.array(d_s, type=pa.timestamp('s')) |
|
a_ms = pa.array(d_ms, type=pa.timestamp('ms')) |
|
a_us = pa.array(d_us, type=pa.timestamp('us')) |
|
a_ns = pa.array(d_ns, type=pa.timestamp('ns')) |
|
|
|
all_versions = ['1.0', '2.4', '2.6'] |
|
|
|
names = ['ts:s', 'ts:ms', 'ts:us', 'ts:ns'] |
|
table = pa.Table.from_arrays([a_s, a_ms, a_us, a_ns], names) |
|
|
|
|
|
|
|
expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_us], names) |
|
_check_roundtrip(table, expected, version='1.0') |
|
_check_roundtrip(table, expected, version='2.4') |
|
|
|
|
|
|
|
expected = pa.Table.from_arrays([a_ms, a_ms, a_us, a_ns], names) |
|
_check_roundtrip(table, expected, version='2.6') |
|
|
|
|
|
|
|
expected = pa.Table.from_arrays([a_ms, a_ms, a_ms, a_ms], names) |
|
for ver in all_versions: |
|
_check_roundtrip(table, expected, coerce_timestamps='ms', version=ver) |
|
|
|
expected = pa.Table.from_arrays([a_us, a_us, a_us, a_us], names) |
|
for ver in all_versions: |
|
_check_roundtrip(table, expected, version=ver, coerce_timestamps='us') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
expected = pa.Table.from_arrays([a_ns, a_ns, a_ns, a_ns], names) |
|
for ver in all_versions: |
|
_check_roundtrip(table, expected, version=ver, |
|
use_deprecated_int96_timestamps=True) |
|
|
|
|
|
@pytest.mark.pandas |
|
def test_noncoerced_nanoseconds_written_without_exception(tempdir): |
|
|
|
|
|
n = 9 |
|
df = pd.DataFrame({'x': range(n)}, |
|
index=pd.date_range('2017-01-01', freq='ns', periods=n)) |
|
tb = pa.Table.from_pandas(df) |
|
|
|
filename = tempdir / 'written.parquet' |
|
try: |
|
pq.write_table(tb, filename, version='2.6') |
|
except Exception: |
|
pass |
|
assert filename.exists() |
|
|
|
recovered_table = pq.read_table(filename) |
|
assert tb.equals(recovered_table) |
|
|
|
|
|
filename = tempdir / 'not_written.parquet' |
|
with pytest.raises(ValueError): |
|
pq.write_table(tb, filename, coerce_timestamps='ms', version='2.6') |
|
|
|
|
|
def test_duration_type(): |
|
|
|
arrays = [pa.array([0, 1, 2, 3], type=pa.duration(unit)) |
|
for unit in ["s", "ms", "us", "ns"]] |
|
table = pa.Table.from_arrays(arrays, ["d[s]", "d[ms]", "d[us]", "d[ns]"]) |
|
|
|
_check_roundtrip(table) |
|
|