# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from collections import OrderedDict from collections.abc import Iterable import sys import weakref try: import numpy as np except ImportError: np = None import pytest import pyarrow as pa import pyarrow.compute as pc from pyarrow.interchange import from_dataframe from pyarrow.vendored.version import Version def test_chunked_array_basics(): data = pa.chunked_array([], type=pa.string()) assert data.type == pa.string() assert data.to_pylist() == [] data.validate() data2 = pa.chunked_array([], type='binary') assert data2.type == pa.binary() with pytest.raises(ValueError): pa.chunked_array([]) data = pa.chunked_array([ [1, 2, 3], [4, 5, 6], [7, 8, 9] ]) assert isinstance(data.chunks, list) assert all(isinstance(c, pa.lib.Int64Array) for c in data.chunks) assert all(isinstance(c, pa.lib.Int64Array) for c in data.iterchunks()) assert len(data.chunks) == 3 assert data.get_total_buffer_size() == sum(c.get_total_buffer_size() for c in data.iterchunks()) assert sys.getsizeof(data) >= object.__sizeof__( data) + data.get_total_buffer_size() assert data.nbytes == 3 * 3 * 8 # 3 items per 3 lists with int64 size(8) data.validate() wr = weakref.ref(data) assert wr() is not None del data assert wr() is None def test_chunked_array_construction(): arr = pa.chunked_array([ [1, 2, 3], [4, 5, 6], [7, 8, 9], ]) assert arr.type == pa.int64() assert len(arr) == 9 assert len(arr.chunks) == 3 arr = pa.chunked_array([ [1, 2, 3], [4., 5., 6.], [7, 8, 9], ]) assert arr.type == pa.int64() assert len(arr) == 9 assert len(arr.chunks) == 3 arr = pa.chunked_array([ [1, 2, 3], [4., 5., 6.], [7, 8, 9], ], type=pa.int8()) assert arr.type == pa.int8() assert len(arr) == 9 assert len(arr.chunks) == 3 arr = pa.chunked_array([ [1, 2, 3], [] ]) assert arr.type == pa.int64() assert len(arr) == 3 assert len(arr.chunks) == 2 msg = "cannot construct ChunkedArray from empty vector and omitted type" with pytest.raises(ValueError, match=msg): assert pa.chunked_array([]) assert pa.chunked_array([], type=pa.string()).type == pa.string() assert pa.chunked_array([[]]).type == pa.null() assert pa.chunked_array([[]], type=pa.string()).type == pa.string() def test_combine_chunks(): # ARROW-77363 arr = pa.array([1, 2]) chunked_arr = pa.chunked_array([arr, arr]) res = chunked_arr.combine_chunks() expected = pa.array([1, 2, 1, 2]) assert res.equals(expected) def test_chunked_array_can_combine_chunks_with_no_chunks(): # https://issues.apache.org/jira/browse/ARROW-17256 assert pa.chunked_array([], type=pa.bool_()).combine_chunks() == pa.array( [], type=pa.bool_() ) assert pa.chunked_array( [pa.array([], type=pa.bool_())], type=pa.bool_() ).combine_chunks() == pa.array([], type=pa.bool_()) @pytest.mark.numpy def test_chunked_array_to_numpy(): data = pa.chunked_array([ [1, 2, 3], [4, 5, 6], [] ]) arr1 = np.asarray(data) arr2 = data.to_numpy() assert isinstance(arr2, np.ndarray) assert arr2.shape == (6,) assert np.array_equal(arr1, arr2) def test_chunked_array_mismatch_types(): msg = "chunks must all be same type" with pytest.raises(TypeError, match=msg): # Given array types are different pa.chunked_array([ pa.array([1, 2, 3]), pa.array([1., 2., 3.]) ]) with pytest.raises(TypeError, match=msg): # Given array type is different from explicit type argument pa.chunked_array([pa.array([1, 2, 3])], type=pa.float64()) def test_chunked_array_str(): data = [ pa.array([1, 2, 3]), pa.array([4, 5, 6]) ] data = pa.chunked_array(data) assert str(data) == """[ [ 1, 2, 3 ], [ 4, 5, 6 ] ]""" @pytest.mark.numpy def test_chunked_array_getitem(): data = [ pa.array([1, 2, 3]), pa.array([4, 5, 6]) ] data = pa.chunked_array(data) assert data[1].as_py() == 2 assert data[-1].as_py() == 6 assert data[-6].as_py() == 1 with pytest.raises(IndexError): data[6] with pytest.raises(IndexError): data[-7] # Ensure this works with numpy scalars assert data[np.int32(1)].as_py() == 2 data_slice = data[2:4] assert data_slice.to_pylist() == [3, 4] data_slice = data[4:-1] assert data_slice.to_pylist() == [5] data_slice = data[99:99] assert data_slice.type == data.type assert data_slice.to_pylist() == [] def test_chunked_array_slice(): data = [ pa.array([1, 2, 3]), pa.array([4, 5, 6]) ] data = pa.chunked_array(data) data_slice = data.slice(len(data)) assert data_slice.type == data.type assert data_slice.to_pylist() == [] data_slice = data.slice(len(data) + 10) assert data_slice.type == data.type assert data_slice.to_pylist() == [] table = pa.Table.from_arrays([data], names=["a"]) table_slice = table.slice(len(table)) assert len(table_slice) == 0 table = pa.Table.from_arrays([data], names=["a"]) table_slice = table.slice(len(table) + 10) assert len(table_slice) == 0 def test_chunked_array_iter(): data = [ pa.array([0]), pa.array([1, 2, 3]), pa.array([4, 5, 6]), pa.array([7, 8, 9]) ] arr = pa.chunked_array(data) for i, j in zip(range(10), arr): assert i == j.as_py() assert isinstance(arr, Iterable) def test_chunked_array_equals(): def eq(xarrs, yarrs): if isinstance(xarrs, pa.ChunkedArray): x = xarrs else: x = pa.chunked_array(xarrs) if isinstance(yarrs, pa.ChunkedArray): y = yarrs else: y = pa.chunked_array(yarrs) assert x.equals(y) assert y.equals(x) assert x == y assert x != str(y) def ne(xarrs, yarrs): if isinstance(xarrs, pa.ChunkedArray): x = xarrs else: x = pa.chunked_array(xarrs) if isinstance(yarrs, pa.ChunkedArray): y = yarrs else: y = pa.chunked_array(yarrs) assert not x.equals(y) assert not y.equals(x) assert x != y eq(pa.chunked_array([], type=pa.int32()), pa.chunked_array([], type=pa.int32())) ne(pa.chunked_array([], type=pa.int32()), pa.chunked_array([], type=pa.int64())) a = pa.array([0, 2], type=pa.int32()) b = pa.array([0, 2], type=pa.int64()) c = pa.array([0, 3], type=pa.int32()) d = pa.array([0, 2, 0, 3], type=pa.int32()) eq([a], [a]) ne([a], [b]) eq([a, c], [a, c]) eq([a, c], [d]) ne([c, a], [a, c]) # ARROW-4822 assert not pa.chunked_array([], type=pa.int32()).equals(None) @pytest.mark.parametrize( ('data', 'typ'), [ ([True, False, True, True], pa.bool_()), ([1, 2, 4, 6], pa.int64()), ([1.0, 2.5, None], pa.float64()), (['a', None, 'b'], pa.string()), ([], pa.list_(pa.uint8())), ([[1, 2], [3]], pa.list_(pa.int64())), ([['a'], None, ['b', 'c']], pa.list_(pa.string())), ([(1, 'a'), (2, 'c'), None], pa.struct([pa.field('a', pa.int64()), pa.field('b', pa.string())])) ] ) def test_chunked_array_pickle(data, typ, pickle_module): arrays = [] while data: arrays.append(pa.array(data[:2], type=typ)) data = data[2:] array = pa.chunked_array(arrays, type=typ) array.validate() result = pickle_module.loads(pickle_module.dumps(array)) result.validate() assert result.equals(array) @pytest.mark.pandas def test_chunked_array_to_pandas(): import pandas as pd data = [ pa.array([-10, -5, 0, 5, 10]) ] table = pa.table(data, names=['a']) col = table.column(0) assert isinstance(col, pa.ChunkedArray) series = col.to_pandas() assert isinstance(series, pd.Series) assert series.shape == (5,) assert series[0] == -10 assert series.name == 'a' @pytest.mark.pandas def test_chunked_array_to_pandas_preserve_name(): # https://issues.apache.org/jira/browse/ARROW-7709 import pandas as pd import pandas.testing as tm for data in [ pa.array([1, 2, 3]), pa.array(pd.Categorical(["a", "b", "a"])), pa.array(pd.date_range("2012", periods=3)), pa.array(pd.date_range("2012", periods=3, tz="Europe/Brussels")), pa.array([1, 2, 3], pa.timestamp("ms")), pa.array([1, 2, 3], pa.timestamp("ms", "Europe/Brussels"))]: table = pa.table({"name": data}) result = table.column("name").to_pandas() assert result.name == "name" expected = pd.Series(data.to_pandas(), name="name") tm.assert_series_equal(result, expected) @pytest.mark.pandas def test_table_roundtrip_to_pandas_empty_dataframe(): # https://issues.apache.org/jira/browse/ARROW-10643 # The conversion should not results in a table with 0 rows if the original # DataFrame has a RangeIndex but is empty. import pandas as pd data = pd.DataFrame(index=pd.RangeIndex(0, 10, 1)) table = pa.table(data) result = table.to_pandas() assert table.num_rows == 10 assert data.shape == (10, 0) assert result.shape == (10, 0) assert result.index.equals(data.index) data = pd.DataFrame(index=pd.RangeIndex(0, 10, 3)) table = pa.table(data) result = table.to_pandas() assert table.num_rows == 4 assert data.shape == (4, 0) assert result.shape == (4, 0) assert result.index.equals(data.index) @pytest.mark.pandas def test_recordbatch_roundtrip_to_pandas_empty_dataframe(): # https://issues.apache.org/jira/browse/ARROW-10643 # The conversion should not results in a RecordBatch with 0 rows if # the original DataFrame has a RangeIndex but is empty. import pandas as pd data = pd.DataFrame(index=pd.RangeIndex(0, 10, 1)) batch = pa.RecordBatch.from_pandas(data) result = batch.to_pandas() assert batch.num_rows == 10 assert data.shape == (10, 0) assert result.shape == (10, 0) assert result.index.equals(data.index) data = pd.DataFrame(index=pd.RangeIndex(0, 10, 3)) batch = pa.RecordBatch.from_pandas(data) result = batch.to_pandas() assert batch.num_rows == 4 assert data.shape == (4, 0) assert result.shape == (4, 0) assert result.index.equals(data.index) @pytest.mark.pandas def test_to_pandas_empty_table(): # https://issues.apache.org/jira/browse/ARROW-15370 import pandas as pd import pandas.testing as tm df = pd.DataFrame({'a': [1, 2], 'b': [0.1, 0.2]}) table = pa.table(df) result = table.schema.empty_table().to_pandas() assert result.shape == (0, 2) tm.assert_frame_equal(result, df.iloc[:0]) @pytest.mark.pandas @pytest.mark.nopandas def test_chunked_array_asarray(): # ensure this is tested both when pandas is present or not (ARROW-6564) data = [ pa.array([0]), pa.array([1, 2, 3]) ] chunked_arr = pa.chunked_array(data) np_arr = np.asarray(chunked_arr) assert np_arr.tolist() == [0, 1, 2, 3] assert np_arr.dtype == np.dtype('int64') # An optional type can be specified when calling np.asarray np_arr = np.asarray(chunked_arr, dtype='str') assert np_arr.tolist() == ['0', '1', '2', '3'] # Types are modified when there are nulls data = [ pa.array([1, None]), pa.array([1, 2, 3]) ] chunked_arr = pa.chunked_array(data) np_arr = np.asarray(chunked_arr) elements = np_arr.tolist() assert elements[0] == 1. assert np.isnan(elements[1]) assert elements[2:] == [1., 2., 3.] assert np_arr.dtype == np.dtype('float64') # DictionaryType data will be converted to dense numpy array arr = pa.DictionaryArray.from_arrays( pa.array([0, 1, 2, 0, 1]), pa.array(['a', 'b', 'c'])) chunked_arr = pa.chunked_array([arr, arr]) np_arr = np.asarray(chunked_arr) assert np_arr.dtype == np.dtype('object') assert np_arr.tolist() == ['a', 'b', 'c', 'a', 'b'] * 2 def test_chunked_array_flatten(): ty = pa.struct([pa.field('x', pa.int16()), pa.field('y', pa.float32())]) a = pa.array([(1, 2.5), (3, 4.5), (5, 6.5)], type=ty) carr = pa.chunked_array(a) x, y = carr.flatten() assert x.equals(pa.chunked_array(pa.array([1, 3, 5], type=pa.int16()))) assert y.equals(pa.chunked_array(pa.array([2.5, 4.5, 6.5], type=pa.float32()))) # Empty column a = pa.array([], type=ty) carr = pa.chunked_array(a) x, y = carr.flatten() assert x.equals(pa.chunked_array(pa.array([], type=pa.int16()))) assert y.equals(pa.chunked_array(pa.array([], type=pa.float32()))) def test_chunked_array_unify_dictionaries(): arr = pa.chunked_array([ pa.array(["foo", "bar", None, "foo"]).dictionary_encode(), pa.array(["quux", None, "foo"]).dictionary_encode(), ]) assert arr.chunk(0).dictionary.equals(pa.array(["foo", "bar"])) assert arr.chunk(1).dictionary.equals(pa.array(["quux", "foo"])) arr = arr.unify_dictionaries() expected_dict = pa.array(["foo", "bar", "quux"]) assert arr.chunk(0).dictionary.equals(expected_dict) assert arr.chunk(1).dictionary.equals(expected_dict) assert arr.to_pylist() == ["foo", "bar", None, "foo", "quux", None, "foo"] def test_recordbatch_dunder_init(): with pytest.raises(TypeError, match='RecordBatch'): pa.RecordBatch() def test_chunked_array_c_array_interface(): class ArrayWrapper: def __init__(self, array): self.array = array def __arrow_c_array__(self, requested_schema=None): return self.array.__arrow_c_array__(requested_schema) data = pa.array([1, 2, 3], pa.int64()) chunked = pa.chunked_array([data]) wrapper = ArrayWrapper(data) # Can roundtrip through the wrapper. result = pa.chunked_array(wrapper) assert result == chunked # Can also import with a type that implementer can cast to. result = pa.chunked_array(wrapper, type=pa.int16()) assert result == chunked.cast(pa.int16()) def test_chunked_array_c_stream_interface(): class ChunkedArrayWrapper: def __init__(self, chunked): self.chunked = chunked def __arrow_c_stream__(self, requested_schema=None): return self.chunked.__arrow_c_stream__(requested_schema) data = pa.chunked_array([[1, 2, 3], [4, None, 6]]) wrapper = ChunkedArrayWrapper(data) # Can roundtrip through the wrapper. result = pa.chunked_array(wrapper) assert result == data # Can also import with a type that implementer can cast to. result = pa.chunked_array(wrapper, type=pa.int16()) assert result == data.cast(pa.int16()) class BatchWrapper: def __init__(self, batch): self.batch = batch def __arrow_c_array__(self, requested_schema=None): return self.batch.__arrow_c_array__(requested_schema) class BatchDeviceWrapper: def __init__(self, batch): self.batch = batch def __arrow_c_device_array__(self, requested_schema=None, **kwargs): return self.batch.__arrow_c_device_array__(requested_schema, **kwargs) @pytest.mark.parametrize("wrapper_class", [BatchWrapper, BatchDeviceWrapper]) def test_recordbatch_c_array_interface(wrapper_class): data = pa.record_batch([ pa.array([1, 2, 3], type=pa.int64()) ], names=['a']) wrapper = wrapper_class(data) # Can roundtrip through the wrapper. result = pa.record_batch(wrapper) assert result == data # Can also import with a schema that implementer can cast to. castable_schema = pa.schema([ pa.field('a', pa.int32()) ]) result = pa.record_batch(wrapper, schema=castable_schema) expected = pa.record_batch([ pa.array([1, 2, 3], type=pa.int32()) ], names=['a']) assert result == expected def test_recordbatch_c_array_interface_device_unsupported_keyword(): # For the device-aware version, we raise a specific error for unsupported keywords data = pa.record_batch( [pa.array([1, 2, 3], type=pa.int64())], names=['a'] ) with pytest.raises( NotImplementedError, match=r"Received unsupported keyword argument\(s\): \['other'\]" ): data.__arrow_c_device_array__(other="not-none") # but with None value it is ignored _ = data.__arrow_c_device_array__(other=None) @pytest.mark.parametrize("wrapper_class", [BatchWrapper, BatchDeviceWrapper]) def test_table_c_array_interface(wrapper_class): data = pa.record_batch([ pa.array([1, 2, 3], type=pa.int64()) ], names=['a']) wrapper = wrapper_class(data) # Can roundtrip through the wrapper. result = pa.table(wrapper) expected = pa.Table.from_batches([data]) assert result == expected # Can also import with a schema that implementer can cast to. castable_schema = pa.schema([ pa.field('a', pa.int32()) ]) result = pa.table(wrapper, schema=castable_schema) expected = pa.table({ 'a': pa.array([1, 2, 3], type=pa.int32()) }) assert result == expected def test_table_c_stream_interface(): class StreamWrapper: def __init__(self, batches): self.batches = batches def __arrow_c_stream__(self, requested_schema=None): reader = pa.RecordBatchReader.from_batches( self.batches[0].schema, self.batches) return reader.__arrow_c_stream__(requested_schema) data = [ pa.record_batch([pa.array([1, 2, 3], type=pa.int64())], names=['a']), pa.record_batch([pa.array([4, 5, 6], type=pa.int64())], names=['a']) ] wrapper = StreamWrapper(data) # Can roundtrip through the wrapper. result = pa.table(wrapper) expected = pa.Table.from_batches(data) assert result == expected # Passing schema works if already that schema result = pa.table(wrapper, schema=data[0].schema) assert result == expected # Passing a different schema will cast good_schema = pa.schema([pa.field('a', pa.int32())]) result = pa.table(wrapper, schema=good_schema) assert result == expected.cast(good_schema) # If schema doesn't match, raises NotImplementedError with pytest.raises( pa.lib.ArrowTypeError, match="Field 0 cannot be cast" ): pa.table( wrapper, schema=pa.schema([pa.field('a', pa.list_(pa.int32()))]) ) def test_recordbatch_itercolumns(): data = [ pa.array(range(5), type='int16'), pa.array([-10, -5, 0, None, 10], type='int32') ] batch = pa.record_batch(data, ['c0', 'c1']) columns = [] for col in batch.itercolumns(): columns.append(col) assert batch.columns == columns assert batch == pa.record_batch(columns, names=batch.column_names) assert batch != pa.record_batch(columns[1:], names=batch.column_names[1:]) assert batch != columns def test_recordbatch_equals(): data1 = [ pa.array(range(5), type='int16'), pa.array([-10, -5, 0, None, 10], type='int32') ] data2 = [ pa.array(['a', 'b', 'c']), pa.array([['d'], ['e'], ['f']]), ] column_names = ['c0', 'c1'] batch = pa.record_batch(data1, column_names) assert batch == pa.record_batch(data1, column_names) assert batch.equals(pa.record_batch(data1, column_names)) assert batch != pa.record_batch(data2, column_names) assert not batch.equals(pa.record_batch(data2, column_names)) batch_meta = pa.record_batch(data1, names=column_names, metadata={'key': 'value'}) assert batch_meta.equals(batch) assert not batch_meta.equals(batch, check_metadata=True) # ARROW-8889 assert not batch.equals(None) assert batch != "foo" def test_recordbatch_take(): batch = pa.record_batch( [pa.array([1, 2, 3, None, 5]), pa.array(['a', 'b', 'c', 'd', 'e'])], ['f1', 'f2']) assert batch.take(pa.array([2, 3])).equals(batch.slice(2, 2)) assert batch.take(pa.array([2, None])).equals( pa.record_batch([pa.array([3, None]), pa.array(['c', None])], ['f1', 'f2'])) def test_recordbatch_column_sets_private_name(): # ARROW-6429 rb = pa.record_batch([pa.array([1, 2, 3, 4])], names=['a0']) assert rb[0]._name == 'a0' def test_recordbatch_from_arrays_validate_schema(): # ARROW-6263 arr = pa.array([1, 2]) schema = pa.schema([pa.field('f0', pa.list_(pa.utf8()))]) with pytest.raises(NotImplementedError): pa.record_batch([arr], schema=schema) def test_recordbatch_from_arrays_validate_lengths(): # ARROW-2820 data = [pa.array([1]), pa.array(["tokyo", "like", "happy"]), pa.array(["derek"])] with pytest.raises(ValueError): pa.record_batch(data, ['id', 'tags', 'name']) def test_recordbatch_no_fields(): batch = pa.record_batch([], []) assert len(batch) == 0 assert batch.num_rows == 0 assert batch.num_columns == 0 def test_recordbatch_from_arrays_invalid_names(): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]) ] with pytest.raises(ValueError): pa.record_batch(data, names=['a', 'b', 'c']) with pytest.raises(ValueError): pa.record_batch(data, names=['a']) def test_recordbatch_empty_metadata(): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]) ] batch = pa.record_batch(data, ['c0', 'c1']) assert batch.schema.metadata is None def test_recordbatch_pickle(pickle_module): data = [ pa.array(range(5), type='int8'), pa.array([-10, -5, 0, 5, 10], type='float32') ] fields = [ pa.field('ints', pa.int8()), pa.field('floats', pa.float32()), ] schema = pa.schema(fields, metadata={b'foo': b'bar'}) batch = pa.record_batch(data, schema=schema) result = pickle_module.loads(pickle_module.dumps(batch)) assert result.equals(batch) assert result.schema == schema def test_recordbatch_get_field(): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array(range(5, 10)) ] batch = pa.RecordBatch.from_arrays(data, names=('a', 'b', 'c')) assert batch.field('a').equals(batch.schema.field('a')) assert batch.field(0).equals(batch.schema.field('a')) with pytest.raises(KeyError): batch.field('d') with pytest.raises(TypeError): batch.field(None) with pytest.raises(IndexError): batch.field(4) def test_recordbatch_select_column(): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array(range(5, 10)) ] batch = pa.RecordBatch.from_arrays(data, names=('a', 'b', 'c')) assert batch.column('a').equals(batch.column(0)) with pytest.raises( KeyError, match='Field "d" does not exist in schema'): batch.column('d') with pytest.raises(TypeError): batch.column(None) with pytest.raises(IndexError): batch.column(4) def test_recordbatch_select(): a1 = pa.array([1, 2, 3, None, 5]) a2 = pa.array(['a', 'b', 'c', 'd', 'e']) a3 = pa.array([[1, 2], [3, 4], [5, 6], None, [9, 10]]) batch = pa.record_batch([a1, a2, a3], ['f1', 'f2', 'f3']) # selecting with string names result = batch.select(['f1']) expected = pa.record_batch([a1], ['f1']) assert result.equals(expected) result = batch.select(['f3', 'f2']) expected = pa.record_batch([a3, a2], ['f3', 'f2']) assert result.equals(expected) # selecting with integer indices result = batch.select([0]) expected = pa.record_batch([a1], ['f1']) assert result.equals(expected) result = batch.select([2, 1]) expected = pa.record_batch([a3, a2], ['f3', 'f2']) assert result.equals(expected) # preserve metadata batch2 = batch.replace_schema_metadata({"a": "test"}) result = batch2.select(["f1", "f2"]) assert b"a" in result.schema.metadata # selecting non-existing column raises with pytest.raises(KeyError, match='Field "f5" does not exist'): batch.select(['f5']) with pytest.raises(IndexError, match="index out of bounds"): batch.select([5]) # duplicate selection gives duplicated names in resulting recordbatch result = batch.select(['f2', 'f2']) expected = pa.record_batch([a2, a2], ['f2', 'f2']) assert result.equals(expected) # selection duplicated column raises batch = pa.record_batch([a1, a2, a3], ['f1', 'f2', 'f1']) with pytest.raises(KeyError, match='Field "f1" exists 2 times'): batch.select(['f1']) result = batch.select(['f2']) expected = pa.record_batch([a2], ['f2']) assert result.equals(expected) def test_recordbatch_from_struct_array_invalid(): with pytest.raises(TypeError): pa.RecordBatch.from_struct_array(pa.array(range(5))) def test_recordbatch_from_struct_array(): struct_array = pa.array( [{"ints": 1}, {"floats": 1.0}], type=pa.struct([("ints", pa.int32()), ("floats", pa.float32())]), ) result = pa.RecordBatch.from_struct_array(struct_array) assert result.equals(pa.RecordBatch.from_arrays( [ pa.array([1, None], type=pa.int32()), pa.array([None, 1.0], type=pa.float32()), ], ["ints", "floats"] )) def test_recordbatch_to_struct_array(): batch = pa.RecordBatch.from_arrays( [ pa.array([1, None], type=pa.int32()), pa.array([None, 1.0], type=pa.float32()), ], ["ints", "floats"] ) result = batch.to_struct_array() assert result.equals(pa.array( [{"ints": 1}, {"floats": 1.0}], type=pa.struct([("ints", pa.int32()), ("floats", pa.float32())]), )) def test_table_from_struct_array_invalid(): with pytest.raises(TypeError, match="Argument 'struct_array' has incorrect type"): pa.Table.from_struct_array(pa.array(range(5))) def test_table_from_struct_array(): struct_array = pa.array( [{"ints": 1}, {"floats": 1.0}], type=pa.struct([("ints", pa.int32()), ("floats", pa.float32())]), ) result = pa.Table.from_struct_array(struct_array) assert result.equals(pa.Table.from_arrays( [ pa.array([1, None], type=pa.int32()), pa.array([None, 1.0], type=pa.float32()), ], ["ints", "floats"] )) def test_table_from_struct_array_chunked_array(): chunked_struct_array = pa.chunked_array( [[{"ints": 1}, {"floats": 1.0}]], type=pa.struct([("ints", pa.int32()), ("floats", pa.float32())]), ) result = pa.Table.from_struct_array(chunked_struct_array) assert result.equals(pa.Table.from_arrays( [ pa.array([1, None], type=pa.int32()), pa.array([None, 1.0], type=pa.float32()), ], ["ints", "floats"] )) def test_table_to_struct_array(): table = pa.Table.from_arrays( [ pa.array([1, None], type=pa.int32()), pa.array([None, 1.0], type=pa.float32()), ], ["ints", "floats"] ) result = table.to_struct_array() assert result.equals(pa.chunked_array( [[{"ints": 1}, {"floats": 1.0}]], type=pa.struct([("ints", pa.int32()), ("floats", pa.float32())]), )) def test_table_to_struct_array_with_max_chunksize(): table = pa.Table.from_arrays( [ pa.array([1, None], type=pa.int32()), pa.array([None, 1.0], type=pa.float32()), ], ["ints", "floats"] ) result = table.to_struct_array(max_chunksize=1) assert result.equals(pa.chunked_array( [[{"ints": 1}], [{"floats": 1.0}]], type=pa.struct([("ints", pa.int32()), ("floats", pa.float32())]), )) def check_tensors(tensor, expected_tensor, type, size): assert tensor.equals(expected_tensor) assert tensor.size == size assert tensor.type == type assert tensor.shape == expected_tensor.shape assert tensor.strides == expected_tensor.strides @pytest.mark.numpy @pytest.mark.parametrize('typ_str', [ "uint8", "uint16", "uint32", "uint64", "int8", "int16", "int32", "int64", "float32", "float64", ]) def test_recordbatch_to_tensor_uniform_type(typ_str): typ = np.dtype(typ_str) arr1 = [1, 2, 3, 4, 5, 6, 7, 8, 9] arr2 = [10, 20, 30, 40, 50, 60, 70, 80, 90] arr3 = [100, 100, 100, 100, 100, 100, 100, 100, 100] batch = pa.RecordBatch.from_arrays( [ pa.array(arr1, type=pa.from_numpy_dtype(typ)), pa.array(arr2, type=pa.from_numpy_dtype(typ)), pa.array(arr3, type=pa.from_numpy_dtype(typ)), ], ["a", "b", "c"] ) result = batch.to_tensor(row_major=False) x = np.column_stack([arr1, arr2, arr3]).astype(typ, order="F") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.from_numpy_dtype(typ), 27) result = batch.to_tensor() x = np.column_stack([arr1, arr2, arr3]).astype(typ, order="C") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.from_numpy_dtype(typ), 27) # Test offset batch1 = batch.slice(1) arr1 = [2, 3, 4, 5, 6, 7, 8, 9] arr2 = [20, 30, 40, 50, 60, 70, 80, 90] arr3 = [100, 100, 100, 100, 100, 100, 100, 100] result = batch1.to_tensor(row_major=False) x = np.column_stack([arr1, arr2, arr3]).astype(typ, order="F") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.from_numpy_dtype(typ), 24) result = batch1.to_tensor() x = np.column_stack([arr1, arr2, arr3]).astype(typ, order="C") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.from_numpy_dtype(typ), 24) batch2 = batch.slice(1, 5) arr1 = [2, 3, 4, 5, 6] arr2 = [20, 30, 40, 50, 60] arr3 = [100, 100, 100, 100, 100] result = batch2.to_tensor(row_major=False) x = np.column_stack([arr1, arr2, arr3]).astype(typ, order="F") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.from_numpy_dtype(typ), 15) result = batch2.to_tensor() x = np.column_stack([arr1, arr2, arr3]).astype(typ, order="C") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.from_numpy_dtype(typ), 15) @pytest.mark.numpy def test_recordbatch_to_tensor_uniform_float_16(): arr1 = [1, 2, 3, 4, 5, 6, 7, 8, 9] arr2 = [10, 20, 30, 40, 50, 60, 70, 80, 90] arr3 = [100, 100, 100, 100, 100, 100, 100, 100, 100] batch = pa.RecordBatch.from_arrays( [ pa.array(np.array(arr1, dtype=np.float16), type=pa.float16()), pa.array(np.array(arr2, dtype=np.float16), type=pa.float16()), pa.array(np.array(arr3, dtype=np.float16), type=pa.float16()), ], ["a", "b", "c"] ) result = batch.to_tensor(row_major=False) x = np.column_stack([arr1, arr2, arr3]).astype(np.float16, order="F") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.float16(), 27) result = batch.to_tensor() x = np.column_stack([arr1, arr2, arr3]).astype(np.float16, order="C") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.float16(), 27) @pytest.mark.numpy def test_recordbatch_to_tensor_mixed_type(): # uint16 + int16 = int32 arr1 = [1, 2, 3, 4, 5, 6, 7, 8, 9] arr2 = [10, 20, 30, 40, 50, 60, 70, 80, 90] arr3 = [100, 200, 300, np.nan, 500, 600, 700, 800, 900] batch = pa.RecordBatch.from_arrays( [ pa.array(arr1, type=pa.uint16()), pa.array(arr2, type=pa.int16()), ], ["a", "b"] ) result = batch.to_tensor(row_major=False) x = np.column_stack([arr1, arr2]).astype(np.int32, order="F") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.int32(), 18) result = batch.to_tensor() x = np.column_stack([arr1, arr2]).astype(np.int32, order="C") expected = pa.Tensor.from_numpy(x) check_tensors(result, expected, pa.int32(), 18) # uint16 + int16 + float32 = float64 batch = pa.RecordBatch.from_arrays( [ pa.array(arr1, type=pa.uint16()), pa.array(arr2, type=pa.int16()), pa.array(arr3, type=pa.float32()), ], ["a", "b", "c"] ) result = batch.to_tensor(row_major=False) x = np.column_stack([arr1, arr2, arr3]).astype(np.float64, order="F") expected = pa.Tensor.from_numpy(x) np.testing.assert_equal(result.to_numpy(), x) assert result.size == 27 assert result.type == pa.float64() assert result.shape == expected.shape assert result.strides == expected.strides result = batch.to_tensor() x = np.column_stack([arr1, arr2, arr3]).astype(np.float64, order="C") expected = pa.Tensor.from_numpy(x) np.testing.assert_equal(result.to_numpy(), x) assert result.size == 27 assert result.type == pa.float64() assert result.shape == expected.shape assert result.strides == expected.strides @pytest.mark.numpy def test_recordbatch_to_tensor_unsupported_mixed_type_with_float16(): arr1 = [1, 2, 3, 4, 5, 6, 7, 8, 9] arr2 = [10, 20, 30, 40, 50, 60, 70, 80, 90] arr3 = [100, 200, 300, 400, 500, 600, 700, 800, 900] batch = pa.RecordBatch.from_arrays( [ pa.array(arr1, type=pa.uint16()), pa.array(np.array(arr2, dtype=np.float16), type=pa.float16()), pa.array(arr3, type=pa.float32()), ], ["a", "b", "c"] ) with pytest.raises( NotImplementedError, match="Casting from or to halffloat is not supported." ): batch.to_tensor() @pytest.mark.numpy def test_recordbatch_to_tensor_nan(): arr1 = [1, 2, 3, 4, np.nan, 6, 7, 8, 9] arr2 = [10, 20, 30, 40, 50, 60, 70, np.nan, 90] batch = pa.RecordBatch.from_arrays( [ pa.array(arr1, type=pa.float32()), pa.array(arr2, type=pa.float32()), ], ["a", "b"] ) result = batch.to_tensor(row_major=False) x = np.column_stack([arr1, arr2]).astype(np.float32, order="F") expected = pa.Tensor.from_numpy(x) np.testing.assert_equal(result.to_numpy(), x) assert result.size == 18 assert result.type == pa.float32() assert result.shape == expected.shape assert result.strides == expected.strides @pytest.mark.numpy def test_recordbatch_to_tensor_null(): arr1 = [1, 2, 3, 4, None, 6, 7, 8, 9] arr2 = [10, 20, 30, 40, 50, 60, 70, None, 90] batch = pa.RecordBatch.from_arrays( [ pa.array(arr1, type=pa.int32()), pa.array(arr2, type=pa.float32()), ], ["a", "b"] ) with pytest.raises( pa.ArrowTypeError, match="Can only convert a RecordBatch with no nulls." ): batch.to_tensor() result = batch.to_tensor(null_to_nan=True, row_major=False) x = np.column_stack([arr1, arr2]).astype(np.float64, order="F") expected = pa.Tensor.from_numpy(x) np.testing.assert_equal(result.to_numpy(), x) assert result.size == 18 assert result.type == pa.float64() assert result.shape == expected.shape assert result.strides == expected.strides # int32 -> float64 batch = pa.RecordBatch.from_arrays( [ pa.array(arr1, type=pa.int32()), pa.array(arr2, type=pa.int32()), ], ["a", "b"] ) result = batch.to_tensor(null_to_nan=True, row_major=False) np.testing.assert_equal(result.to_numpy(), x) assert result.size == 18 assert result.type == pa.float64() assert result.shape == expected.shape assert result.strides == expected.strides # int8 -> float32 batch = pa.RecordBatch.from_arrays( [ pa.array(arr1, type=pa.int8()), pa.array(arr2, type=pa.int8()), ], ["a", "b"] ) result = batch.to_tensor(null_to_nan=True, row_major=False) x = np.column_stack([arr1, arr2]).astype(np.float32, order="F") expected = pa.Tensor.from_numpy(x) np.testing.assert_equal(result.to_numpy(), x) assert result.size == 18 assert result.type == pa.float32() assert result.shape == expected.shape assert result.strides == expected.strides @pytest.mark.numpy def test_recordbatch_to_tensor_empty(): batch = pa.RecordBatch.from_arrays( [ pa.array([], type=pa.float32()), pa.array([], type=pa.float32()), ], ["a", "b"] ) result = batch.to_tensor() x = np.column_stack([[], []]).astype(np.float32, order="F") expected = pa.Tensor.from_numpy(x) assert result.size == expected.size assert result.type == pa.float32() assert result.shape == expected.shape assert result.strides == (4, 4) def test_recordbatch_to_tensor_unsupported(): arr1 = [1, 2, 3, 4, 5, 6, 7, 8, 9] # Unsupported data type arr2 = ["a", "b", "c", "a", "b", "c", "a", "b", "c"] batch = pa.RecordBatch.from_arrays( [ pa.array(arr1, type=pa.int32()), pa.array(arr2, type=pa.utf8()), ], ["a", "b"] ) with pytest.raises( pa.ArrowTypeError, match="DataType is not supported" ): batch.to_tensor() def _table_like_slice_tests(factory): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]) ] names = ['c0', 'c1'] obj = factory(data, names=names) sliced = obj.slice(2) assert sliced.num_rows == 3 expected = factory([x.slice(2) for x in data], names=names) assert sliced.equals(expected) sliced2 = obj.slice(2, 2) expected2 = factory([x.slice(2, 2) for x in data], names=names) assert sliced2.equals(expected2) # 0 offset assert obj.slice(0).equals(obj) # Slice past end of array assert len(obj.slice(len(obj))) == 0 with pytest.raises(IndexError): obj.slice(-1) # Check __getitem__-based slicing assert obj.slice(0, 0).equals(obj[:0]) assert obj.slice(0, 2).equals(obj[:2]) assert obj.slice(2, 2).equals(obj[2:4]) assert obj.slice(2, len(obj) - 2).equals(obj[2:]) assert obj.slice(len(obj) - 2, 2).equals(obj[-2:]) assert obj.slice(len(obj) - 4, 2).equals(obj[-4:-2]) def test_recordbatch_slice_getitem(): return _table_like_slice_tests(pa.RecordBatch.from_arrays) def test_table_slice_getitem(): return _table_like_slice_tests(pa.table) @pytest.mark.pandas def test_slice_zero_length_table(): # ARROW-7907: a segfault on this code was fixed after 0.16.0 table = pa.table({'a': pa.array([], type=pa.timestamp('us'))}) table_slice = table.slice(0, 0) table_slice.to_pandas() table = pa.table({'a': pa.chunked_array([], type=pa.string())}) table.to_pandas() @pytest.mark.numpy def test_recordbatchlist_schema_equals(): a1 = np.array([1], dtype='uint32') a2 = np.array([4.0, 5.0], dtype='float64') batch1 = pa.record_batch([pa.array(a1)], ['c1']) batch2 = pa.record_batch([pa.array(a2)], ['c1']) with pytest.raises(pa.ArrowInvalid): pa.Table.from_batches([batch1, batch2]) def test_table_column_sets_private_name(): # ARROW-6429 t = pa.table([pa.array([1, 2, 3, 4])], names=['a0']) assert t[0]._name == 'a0' def test_table_equals(): table = pa.Table.from_arrays([], names=[]) assert table.equals(table) # ARROW-4822 assert not table.equals(None) other = pa.Table.from_arrays([], names=[], metadata={'key': 'value'}) assert not table.equals(other, check_metadata=True) assert table.equals(other) def test_table_from_batches_and_schema(): schema = pa.schema([ pa.field('a', pa.int64()), pa.field('b', pa.float64()), ]) batch = pa.record_batch([pa.array([1]), pa.array([3.14])], names=['a', 'b']) table = pa.Table.from_batches([batch], schema) assert table.schema.equals(schema) assert table.column(0) == pa.chunked_array([[1]]) assert table.column(1) == pa.chunked_array([[3.14]]) incompatible_schema = pa.schema([pa.field('a', pa.int64())]) with pytest.raises(pa.ArrowInvalid): pa.Table.from_batches([batch], incompatible_schema) incompatible_batch = pa.record_batch([pa.array([1])], ['a']) with pytest.raises(pa.ArrowInvalid): pa.Table.from_batches([incompatible_batch], schema) @pytest.mark.pandas def test_table_to_batches(): from pandas.testing import assert_frame_equal import pandas as pd df1 = pd.DataFrame({'a': list(range(10))}) df2 = pd.DataFrame({'a': list(range(10, 30))}) batch1 = pa.RecordBatch.from_pandas(df1, preserve_index=False) batch2 = pa.RecordBatch.from_pandas(df2, preserve_index=False) table = pa.Table.from_batches([batch1, batch2, batch1]) expected_df = pd.concat([df1, df2, df1], ignore_index=True) batches = table.to_batches() assert len(batches) == 3 assert_frame_equal(pa.Table.from_batches(batches).to_pandas(), expected_df) batches = table.to_batches(max_chunksize=15) assert list(map(len, batches)) == [10, 15, 5, 10] assert_frame_equal(table.to_pandas(), expected_df) assert_frame_equal(pa.Table.from_batches(batches).to_pandas(), expected_df) table_from_iter = pa.Table.from_batches(iter([batch1, batch2, batch1])) assert table.equals(table_from_iter) with pytest.raises(ValueError): table.to_batches(max_chunksize=0) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_basics(cls): data = [ pa.array(range(5), type='int16'), pa.array([-10, -5, 0, None, 10], type='int32') ] table = cls.from_arrays(data, names=('a', 'b')) table.validate() assert not table.schema.metadata assert len(table) == 5 assert table.num_rows == 5 assert table.num_columns == len(data) assert table.shape == (5, 2) # (only the second array has a null bitmap) assert table.get_total_buffer_size() == (5 * 2) + (5 * 4 + 1) assert table.nbytes == (5 * 2) + (5 * 4 + 1) assert sys.getsizeof(table) >= object.__sizeof__( table) + table.get_total_buffer_size() pydict = table.to_pydict() assert pydict == OrderedDict([ ('a', [0, 1, 2, 3, 4]), ('b', [-10, -5, 0, None, 10]) ]) assert isinstance(pydict, dict) assert table == cls.from_pydict(pydict, schema=table.schema) with pytest.raises(IndexError): # bounds checking table[2] columns = [] for col in table.itercolumns(): if cls is pa.Table: assert type(col) is pa.ChunkedArray for chunk in col.iterchunks(): assert chunk is not None with pytest.raises(IndexError): col.chunk(-1) with pytest.raises(IndexError): col.chunk(col.num_chunks) else: assert issubclass(type(col), pa.Array) columns.append(col) assert table.columns == columns assert table == cls.from_arrays(columns, names=table.column_names) assert table != cls.from_arrays(columns[1:], names=table.column_names[1:]) assert table != columns # Schema passed explicitly schema = pa.schema([pa.field('c0', pa.int16(), metadata={'key': 'value'}), pa.field('c1', pa.int32())], metadata={b'foo': b'bar'}) table = cls.from_arrays(data, schema=schema) assert table.schema == schema wr = weakref.ref(table) assert wr() is not None del table assert wr() is None def test_table_dunder_init(): with pytest.raises(TypeError, match='Table'): pa.Table() def test_table_from_arrays_preserves_column_metadata(): # Added to test https://issues.apache.org/jira/browse/ARROW-3866 arr0 = pa.array([1, 2]) arr1 = pa.array([3, 4]) field0 = pa.field('field1', pa.int64(), metadata=dict(a="A", b="B")) field1 = pa.field('field2', pa.int64(), nullable=False) table = pa.Table.from_arrays([arr0, arr1], schema=pa.schema([field0, field1])) assert b"a" in table.field(0).metadata assert table.field(1).nullable is False def test_table_from_arrays_invalid_names(): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]) ] with pytest.raises(ValueError): pa.Table.from_arrays(data, names=['a', 'b', 'c']) with pytest.raises(ValueError): pa.Table.from_arrays(data, names=['a']) def test_table_from_lists(): data = [ list(range(5)), [-10, -5, 0, 5, 10] ] result = pa.table(data, names=['a', 'b']) expected = pa.Table.from_arrays(data, names=['a', 'b']) assert result.equals(expected) schema = pa.schema([ pa.field('a', pa.uint16()), pa.field('b', pa.int64()) ]) result = pa.table(data, schema=schema) expected = pa.Table.from_arrays(data, schema=schema) assert result.equals(expected) def test_table_pickle(pickle_module): data = [ pa.chunked_array([[1, 2], [3, 4]], type=pa.uint32()), pa.chunked_array([["some", "strings", None, ""]], type=pa.string()), ] schema = pa.schema([pa.field('ints', pa.uint32()), pa.field('strs', pa.string())], metadata={b'foo': b'bar'}) table = pa.Table.from_arrays(data, schema=schema) result = pickle_module.loads(pickle_module.dumps(table)) result.validate() assert result.equals(table) def test_table_get_field(): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array(range(5, 10)) ] table = pa.Table.from_arrays(data, names=('a', 'b', 'c')) assert table.field('a').equals(table.schema.field('a')) assert table.field(0).equals(table.schema.field('a')) with pytest.raises(KeyError): table.field('d') with pytest.raises(TypeError): table.field(None) with pytest.raises(IndexError): table.field(4) def test_table_select_column(): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array(range(5, 10)) ] table = pa.Table.from_arrays(data, names=('a', 'b', 'c')) assert table.column('a').equals(table.column(0)) with pytest.raises(KeyError, match='Field "d" does not exist in schema'): table.column('d') with pytest.raises(TypeError): table.column(None) with pytest.raises(IndexError): table.column(4) def test_table_column_with_duplicates(): # ARROW-8209 table = pa.table([pa.array([1, 2, 3]), pa.array([4, 5, 6]), pa.array([7, 8, 9])], names=['a', 'b', 'a']) with pytest.raises(KeyError, match='Field "a" exists 2 times in schema'): table.column('a') @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_add_column(cls): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array(range(5, 10)) ] table = cls.from_arrays(data, names=('a', 'b', 'c')) new_field = pa.field('d', data[1].type) t2 = table.add_column(3, new_field, data[1]) t3 = table.append_column(new_field, data[1]) expected = cls.from_arrays(data + [data[1]], names=('a', 'b', 'c', 'd')) assert t2.equals(expected) assert t3.equals(expected) t4 = table.add_column(0, new_field, data[1]) expected = cls.from_arrays([data[1]] + data, names=('d', 'a', 'b', 'c')) assert t4.equals(expected) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_set_column(cls): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array(range(5, 10)) ] table = cls.from_arrays(data, names=('a', 'b', 'c')) new_field = pa.field('d', data[1].type) t2 = table.set_column(0, new_field, data[1]) expected_data = list(data) expected_data[0] = data[1] expected = cls.from_arrays(expected_data, names=('d', 'b', 'c')) assert t2.equals(expected) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_drop_columns(cls): """ drop one or more columns given labels""" a = pa.array(range(5)) b = pa.array([-10, -5, 0, 5, 10]) c = pa.array(range(5, 10)) table = cls.from_arrays([a, b, c], names=('a', 'b', 'c')) t2 = table.drop_columns(['a', 'b']) t3 = table.drop_columns('a') exp_t2 = cls.from_arrays([c], names=('c',)) assert exp_t2.equals(t2) exp_t3 = cls.from_arrays([b, c], names=('b', 'c',)) assert exp_t3.equals(t3) # -- raise KeyError if column not in Table with pytest.raises(KeyError, match="Column 'd' not found"): table.drop_columns(['d']) def test_table_drop(): """ verify the alias of drop_columns is working""" a = pa.array(range(5)) b = pa.array([-10, -5, 0, 5, 10]) c = pa.array(range(5, 10)) table = pa.Table.from_arrays([a, b, c], names=('a', 'b', 'c')) t2 = table.drop(['a', 'b']) t3 = table.drop('a') exp_t2 = pa.Table.from_arrays([c], names=('c',)) assert exp_t2.equals(t2) exp_t3 = pa.Table.from_arrays([b, c], names=('b', 'c',)) assert exp_t3.equals(t3) # -- raise KeyError if column not in Table with pytest.raises(KeyError, match="Column 'd' not found"): table.drop(['d']) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_remove_column(cls): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array(range(5, 10)) ] table = cls.from_arrays(data, names=('a', 'b', 'c')) t2 = table.remove_column(0) t2.validate() expected = cls.from_arrays(data[1:], names=('b', 'c')) assert t2.equals(expected) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_remove_column_empty(cls): # ARROW-1865 data = [ pa.array(range(5)), ] table = cls.from_arrays(data, names=['a']) t2 = table.remove_column(0) t2.validate() assert len(t2) == len(table) t3 = t2.add_column(0, table.field(0), table[0]) t3.validate() assert t3.equals(table) def test_empty_table_with_names(): # ARROW-13784 data = [] names = ["a", "b"] message = ( 'Length of names [(]2[)] does not match length of arrays [(]0[)]') with pytest.raises(ValueError, match=message): pa.Table.from_arrays(data, names=names) def test_empty_table(): table = pa.table([]) assert table.column_names == [] assert table.equals(pa.Table.from_arrays([], [])) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_rename_columns(cls): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array(range(5, 10)) ] table = cls.from_arrays(data, names=['a', 'b', 'c']) assert table.column_names == ['a', 'b', 'c'] expected = cls.from_arrays(data, names=['eh', 'bee', 'sea']) # Testing with list t2 = table.rename_columns(['eh', 'bee', 'sea']) t2.validate() assert t2.column_names == ['eh', 'bee', 'sea'] assert t2.equals(expected) # Testing with tuple t3 = table.rename_columns(('eh', 'bee', 'sea')) t3.validate() assert t3.column_names == ['eh', 'bee', 'sea'] assert t3.equals(expected) message = "names must be a list or dict not " with pytest.raises(TypeError, match=message): table.rename_columns('not a list') @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_rename_columns_mapping(cls): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array(range(5, 10)) ] table = cls.from_arrays(data, names=['a', 'b', 'c']) assert table.column_names == ['a', 'b', 'c'] expected = cls.from_arrays(data, names=['eh', 'b', 'sea']) t1 = table.rename_columns({'a': 'eh', 'c': 'sea'}) t1.validate() assert t1 == expected # Test renaming duplicate column names table = cls.from_arrays(data, names=['a', 'a', 'c']) expected = cls.from_arrays(data, names=['eh', 'eh', 'sea']) t2 = table.rename_columns({'a': 'eh', 'c': 'sea'}) t2.validate() assert t2 == expected # Test column not found with pytest.raises(KeyError, match=r"Column 'd' not found"): table.rename_columns({'a': 'eh', 'd': 'sea'}) def test_table_flatten(): ty1 = pa.struct([pa.field('x', pa.int16()), pa.field('y', pa.float32())]) ty2 = pa.struct([pa.field('nest', ty1)]) a = pa.array([(1, 2.5), (3, 4.5)], type=ty1) b = pa.array([((11, 12.5),), ((13, 14.5),)], type=ty2) c = pa.array([False, True], type=pa.bool_()) table = pa.Table.from_arrays([a, b, c], names=['a', 'b', 'c']) t2 = table.flatten() t2.validate() expected = pa.Table.from_arrays([ pa.array([1, 3], type=pa.int16()), pa.array([2.5, 4.5], type=pa.float32()), pa.array([(11, 12.5), (13, 14.5)], type=ty1), c], names=['a.x', 'a.y', 'b.nest', 'c']) assert t2.equals(expected) def test_table_combine_chunks(): batch1 = pa.record_batch([pa.array([1]), pa.array(["a"])], names=['f1', 'f2']) batch2 = pa.record_batch([pa.array([2]), pa.array(["b"])], names=['f1', 'f2']) table = pa.Table.from_batches([batch1, batch2]) combined = table.combine_chunks() combined.validate() assert combined.equals(table) for c in combined.columns: assert c.num_chunks == 1 def test_table_unify_dictionaries(): batch1 = pa.record_batch([ pa.array(["foo", "bar", None, "foo"]).dictionary_encode(), pa.array([123, 456, 456, 789]).dictionary_encode(), pa.array([True, False, None, None])], names=['a', 'b', 'c']) batch2 = pa.record_batch([ pa.array(["quux", "foo", None, "quux"]).dictionary_encode(), pa.array([456, 789, 789, None]).dictionary_encode(), pa.array([False, None, None, True])], names=['a', 'b', 'c']) table = pa.Table.from_batches([batch1, batch2]) table = table.replace_schema_metadata({b"key1": b"value1"}) assert table.column(0).chunk(0).dictionary.equals( pa.array(["foo", "bar"])) assert table.column(0).chunk(1).dictionary.equals( pa.array(["quux", "foo"])) assert table.column(1).chunk(0).dictionary.equals( pa.array([123, 456, 789])) assert table.column(1).chunk(1).dictionary.equals( pa.array([456, 789])) table = table.unify_dictionaries(pa.default_memory_pool()) expected_dict_0 = pa.array(["foo", "bar", "quux"]) expected_dict_1 = pa.array([123, 456, 789]) assert table.column(0).chunk(0).dictionary.equals(expected_dict_0) assert table.column(0).chunk(1).dictionary.equals(expected_dict_0) assert table.column(1).chunk(0).dictionary.equals(expected_dict_1) assert table.column(1).chunk(1).dictionary.equals(expected_dict_1) assert table.to_pydict() == { 'a': ["foo", "bar", None, "foo", "quux", "foo", None, "quux"], 'b': [123, 456, 456, 789, 456, 789, 789, None], 'c': [True, False, None, None, False, None, None, True], } assert table.schema.metadata == {b"key1": b"value1"} def test_table_maps_as_pydicts(): arrays = [ pa.array( [{'x': 1, 'y': 2}, {'z': 3}], type=pa.map_(pa.string(), pa.int32()) ) ] table = pa.Table.from_arrays(arrays, names=['a']) table_dict = table.to_pydict(maps_as_pydicts="strict") assert 'a' in table_dict column_list = table_dict['a'] assert len(column_list) == 2 assert column_list == [{'x': 1, 'y': 2}, {'z': 3}] table_list = table.to_pylist(maps_as_pydicts="strict") assert len(table_list) == 2 assert table_list == [{'a': {'x': 1, 'y': 2}}, {'a': {'z': 3}}] def test_concat_tables(): data = [ list(range(5)), [-10., -5., 0., 5., 10.] ] data2 = [ list(range(5, 10)), [1., 2., 3., 4., 5.] ] t1 = pa.Table.from_arrays([pa.array(x) for x in data], names=('a', 'b')) t2 = pa.Table.from_arrays([pa.array(x) for x in data2], names=('a', 'b')) result = pa.concat_tables([t1, t2]) result.validate() assert len(result) == 10 expected = pa.Table.from_arrays([pa.array(x + y) for x, y in zip(data, data2)], names=('a', 'b')) assert result.equals(expected) def test_concat_tables_permissive(): t1 = pa.Table.from_arrays([list(range(10))], names=('a',)) t2 = pa.Table.from_arrays([list(('a', 'b', 'c'))], names=('a',)) with pytest.raises( pa.ArrowTypeError, match="Unable to merge: Field a has incompatible types: int64 vs string"): _ = pa.concat_tables([t1, t2], promote_options="permissive") def test_concat_tables_invalid_option(): t = pa.Table.from_arrays([list(range(10))], names=('a',)) with pytest.raises(ValueError, match="Invalid promote options: invalid"): pa.concat_tables([t, t], promote_options="invalid") def test_concat_tables_none_table(): # ARROW-11997 with pytest.raises(AttributeError): pa.concat_tables([None]) @pytest.mark.pandas def test_concat_tables_with_different_schema_metadata(): import pandas as pd schema = pa.schema([ pa.field('a', pa.string()), pa.field('b', pa.string()), ]) values = list('abcdefgh') df1 = pd.DataFrame({'a': values, 'b': values}) df2 = pd.DataFrame({'a': [np.nan] * 8, 'b': values}) table1 = pa.Table.from_pandas(df1, schema=schema, preserve_index=False) table2 = pa.Table.from_pandas(df2, schema=schema, preserve_index=False) assert table1.schema.equals(table2.schema) assert not table1.schema.equals(table2.schema, check_metadata=True) table3 = pa.concat_tables([table1, table2]) assert table1.schema.equals(table3.schema, check_metadata=True) assert table2.schema.equals(table3.schema) def test_concat_tables_with_promote_option(): t1 = pa.Table.from_arrays( [pa.array([1, 2], type=pa.int64())], ["int64_field"]) t2 = pa.Table.from_arrays( [pa.array([1.0, 2.0], type=pa.float32())], ["float_field"]) with pytest.warns(FutureWarning): result = pa.concat_tables([t1, t2], promote=True) assert result.equals(pa.Table.from_arrays([ pa.array([1, 2, None, None], type=pa.int64()), pa.array([None, None, 1.0, 2.0], type=pa.float32()), ], ["int64_field", "float_field"])) t1 = pa.Table.from_arrays( [pa.array([1, 2], type=pa.int64())], ["f"]) t2 = pa.Table.from_arrays( [pa.array([1, 2], type=pa.float32())], ["f"]) with pytest.raises(pa.ArrowInvalid, match="Schema at index 1 was different:"): with pytest.warns(FutureWarning): pa.concat_tables([t1, t2], promote=False) def test_concat_tables_with_promotion(): t1 = pa.Table.from_arrays( [pa.array([1, 2], type=pa.int64())], ["int64_field"]) t2 = pa.Table.from_arrays( [pa.array([1.0, 2.0], type=pa.float32())], ["float_field"]) result = pa.concat_tables([t1, t2], promote_options="default") assert result.equals(pa.Table.from_arrays([ pa.array([1, 2, None, None], type=pa.int64()), pa.array([None, None, 1.0, 2.0], type=pa.float32()), ], ["int64_field", "float_field"])) t3 = pa.Table.from_arrays( [pa.array([1, 2], type=pa.int32())], ["int64_field"]) result = pa.concat_tables( [t1, t3], promote_options="permissive") assert result.equals(pa.Table.from_arrays([ pa.array([1, 2, 1, 2], type=pa.int64()), ], ["int64_field"])) def test_concat_tables_with_promotion_error(): t1 = pa.Table.from_arrays( [pa.array([1, 2], type=pa.int64())], ["f"]) t2 = pa.Table.from_arrays( [pa.array([1, 2], type=pa.float32())], ["f"]) with pytest.raises(pa.ArrowTypeError, match="Unable to merge:"): pa.concat_tables([t1, t2], promote_options="default") def test_table_negative_indexing(): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), pa.array([1.0, 2.0, 3.0, 4.0, 5.0]), pa.array(['ab', 'bc', 'cd', 'de', 'ef']), ] table = pa.Table.from_arrays(data, names=tuple('abcd')) assert table[-1].equals(table[3]) assert table[-2].equals(table[2]) assert table[-3].equals(table[1]) assert table[-4].equals(table[0]) with pytest.raises(IndexError): table[-5] with pytest.raises(IndexError): table[4] def test_concat_batches(): data = [ list(range(5)), [-10., -5., 0., 5., 10.] ] data2 = [ list(range(5, 10)), [1., 2., 3., 4., 5.] ] t1 = pa.RecordBatch.from_arrays([pa.array(x) for x in data], names=('a', 'b')) t2 = pa.RecordBatch.from_arrays([pa.array(x) for x in data2], names=('a', 'b')) result = pa.concat_batches([t1, t2]) result.validate() assert len(result) == 10 expected = pa.RecordBatch.from_arrays([pa.array(x + y) for x, y in zip(data, data2)], names=('a', 'b')) assert result.equals(expected) def test_concat_batches_different_schema(): t1 = pa.RecordBatch.from_arrays( [pa.array([1, 2], type=pa.int64())], ["f"]) t2 = pa.RecordBatch.from_arrays( [pa.array([1, 2], type=pa.float32())], ["f"]) with pytest.raises(pa.ArrowInvalid, match="not match index 0 recordbatch schema"): pa.concat_batches([t1, t2]) def test_concat_batches_none_batches(): # ARROW-11997 with pytest.raises(AttributeError): pa.concat_batches([None]) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_cast_to_incompatible_schema(cls): data = [ pa.array(range(5)), pa.array([-10, -5, 0, 5, 10]), ] table = cls.from_arrays(data, names=tuple('ab')) target_schema1 = pa.schema([ pa.field('A', pa.int32()), pa.field('b', pa.int16()), ]) target_schema2 = pa.schema([ pa.field('a', pa.int32()), ]) if cls is pa.Table: cls_name = 'table' else: cls_name = 'record batch' message = ("Target schema's field names are not matching the " f"{cls_name}'s field names:.*") with pytest.raises(ValueError, match=message): table.cast(target_schema1) with pytest.raises(ValueError, match=message): table.cast(target_schema2) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_safe_casting(cls): data = [ pa.array(range(5), type=pa.int64()), pa.array([-10, -5, 0, 5, 10], type=pa.int32()), pa.array([1.0, 2.0, 3.0, 4.0, 5.0], type=pa.float64()), pa.array(['ab', 'bc', 'cd', 'de', 'ef'], type=pa.string()) ] table = cls.from_arrays(data, names=tuple('abcd')) expected_data = [ pa.array(range(5), type=pa.int32()), pa.array([-10, -5, 0, 5, 10], type=pa.int16()), pa.array([1, 2, 3, 4, 5], type=pa.int64()), pa.array(['ab', 'bc', 'cd', 'de', 'ef'], type=pa.string()) ] expected_table = cls.from_arrays(expected_data, names=tuple('abcd')) target_schema = pa.schema([ pa.field('a', pa.int32()), pa.field('b', pa.int16()), pa.field('c', pa.int64()), pa.field('d', pa.string()) ]) casted_table = table.cast(target_schema) assert casted_table.equals(expected_table) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_unsafe_casting(cls): data = [ pa.array(range(5), type=pa.int64()), pa.array([-10, -5, 0, 5, 10], type=pa.int32()), pa.array([1.1, 2.2, 3.3, 4.4, 5.5], type=pa.float64()), pa.array(['ab', 'bc', 'cd', 'de', 'ef'], type=pa.string()) ] table = cls.from_arrays(data, names=tuple('abcd')) expected_data = [ pa.array(range(5), type=pa.int32()), pa.array([-10, -5, 0, 5, 10], type=pa.int16()), pa.array([1, 2, 3, 4, 5], type=pa.int64()), pa.array(['ab', 'bc', 'cd', 'de', 'ef'], type=pa.string()) ] expected_table = cls.from_arrays(expected_data, names=tuple('abcd')) target_schema = pa.schema([ pa.field('a', pa.int32()), pa.field('b', pa.int16()), pa.field('c', pa.int64()), pa.field('d', pa.string()) ]) with pytest.raises(pa.ArrowInvalid, match='truncated'): table.cast(target_schema) casted_table = table.cast(target_schema, safe=False) assert casted_table.equals(expected_table) @pytest.mark.numpy def test_invalid_table_construct(): array = np.array([0, 1], dtype=np.uint8) u8 = pa.uint8() arrays = [pa.array(array, type=u8), pa.array(array[1:], type=u8)] with pytest.raises(pa.lib.ArrowInvalid): pa.Table.from_arrays(arrays, names=["a1", "a2"]) @pytest.mark.parametrize('data, klass', [ ((['', 'foo', 'bar'], [4.5, 5, None]), list), ((['', 'foo', 'bar'], [4.5, 5, None]), pa.array), (([[''], ['foo', 'bar']], [[4.5], [5., None]]), pa.chunked_array), ]) def test_from_arrays_schema(data, klass): data = [klass(data[0]), klass(data[1])] schema = pa.schema([('strs', pa.utf8()), ('floats', pa.float32())]) table = pa.Table.from_arrays(data, schema=schema) assert table.num_columns == 2 assert table.num_rows == 3 assert table.schema == schema # length of data and schema not matching schema = pa.schema([('strs', pa.utf8())]) with pytest.raises(ValueError): pa.Table.from_arrays(data, schema=schema) # with different but compatible schema schema = pa.schema([('strs', pa.utf8()), ('floats', pa.float32())]) table = pa.Table.from_arrays(data, schema=schema) assert pa.types.is_float32(table.column('floats').type) assert table.num_columns == 2 assert table.num_rows == 3 assert table.schema == schema # with different and incompatible schema schema = pa.schema([('strs', pa.utf8()), ('floats', pa.timestamp('s'))]) with pytest.raises((NotImplementedError, TypeError)): pa.Table.from_pydict(data, schema=schema) # Cannot pass both schema and metadata / names with pytest.raises(ValueError): pa.Table.from_arrays(data, schema=schema, names=['strs', 'floats']) with pytest.raises(ValueError): pa.Table.from_arrays(data, schema=schema, metadata={b'foo': b'bar'}) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_from_pydict(cls): table = cls.from_pydict({}) assert table.num_columns == 0 assert table.num_rows == 0 assert table.schema == pa.schema([]) assert table.to_pydict() == {} schema = pa.schema([('strs', pa.utf8()), ('floats', pa.float64())]) # With lists as values data = OrderedDict([('strs', ['', 'foo', 'bar']), ('floats', [4.5, 5, None])]) table = cls.from_pydict(data) assert table.num_columns == 2 assert table.num_rows == 3 assert table.schema == schema assert table.to_pydict() == data # With metadata and inferred schema metadata = {b'foo': b'bar'} schema = schema.with_metadata(metadata) table = cls.from_pydict(data, metadata=metadata) assert table.schema == schema assert table.schema.metadata == metadata assert table.to_pydict() == data # With explicit schema table = cls.from_pydict(data, schema=schema) assert table.schema == schema assert table.schema.metadata == metadata assert table.to_pydict() == data # Cannot pass both schema and metadata with pytest.raises(ValueError): cls.from_pydict(data, schema=schema, metadata=metadata) # Non-convertible values given schema with pytest.raises(TypeError): cls.from_pydict({'c0': [0, 1, 2]}, schema=pa.schema([("c0", pa.string())])) # Missing schema fields from the passed mapping with pytest.raises(KeyError, match="doesn\'t contain.* c, d"): cls.from_pydict( {'a': [1, 2, 3], 'b': [3, 4, 5]}, schema=pa.schema([ ('a', pa.int64()), ('c', pa.int32()), ('d', pa.int16()) ]) ) # Passed wrong schema type with pytest.raises(TypeError): cls.from_pydict({'a': [1, 2, 3]}, schema={}) @pytest.mark.parametrize('data, klass', [ ((['', 'foo', 'bar'], [4.5, 5, None]), pa.array), (([[''], ['foo', 'bar']], [[4.5], [5., None]]), pa.chunked_array), ]) def test_table_from_pydict_arrow_arrays(data, klass): data = OrderedDict([('strs', klass(data[0])), ('floats', klass(data[1]))]) schema = pa.schema([('strs', pa.utf8()), ('floats', pa.float64())]) # With arrays as values table = pa.Table.from_pydict(data) assert table.num_columns == 2 assert table.num_rows == 3 assert table.schema == schema # With explicit (matching) schema table = pa.Table.from_pydict(data, schema=schema) assert table.num_columns == 2 assert table.num_rows == 3 assert table.schema == schema # with different but compatible schema schema = pa.schema([('strs', pa.utf8()), ('floats', pa.float32())]) table = pa.Table.from_pydict(data, schema=schema) assert pa.types.is_float32(table.column('floats').type) assert table.num_columns == 2 assert table.num_rows == 3 assert table.schema == schema # with different and incompatible schema schema = pa.schema([('strs', pa.utf8()), ('floats', pa.timestamp('s'))]) with pytest.raises((NotImplementedError, TypeError)): pa.Table.from_pydict(data, schema=schema) @pytest.mark.parametrize('data, klass', [ ((['', 'foo', 'bar'], [4.5, 5, None]), list), ((['', 'foo', 'bar'], [4.5, 5, None]), pa.array), (([[''], ['foo', 'bar']], [[4.5], [5., None]]), pa.chunked_array), ]) def test_table_from_pydict_schema(data, klass): # passed schema is source of truth for the columns data = OrderedDict([('strs', klass(data[0])), ('floats', klass(data[1]))]) # schema has columns not present in data -> error schema = pa.schema([('strs', pa.utf8()), ('floats', pa.float64()), ('ints', pa.int64())]) with pytest.raises(KeyError, match='ints'): pa.Table.from_pydict(data, schema=schema) # data has columns not present in schema -> ignored schema = pa.schema([('strs', pa.utf8())]) table = pa.Table.from_pydict(data, schema=schema) assert table.num_columns == 1 assert table.schema == schema assert table.column_names == ['strs'] @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_from_pylist(cls): table = cls.from_pylist([]) assert table.num_columns == 0 assert table.num_rows == 0 assert table.schema == pa.schema([]) assert table.to_pylist() == [] schema = pa.schema([('strs', pa.utf8()), ('floats', pa.float64())]) # With lists as values data = [{'strs': '', 'floats': 4.5}, {'strs': 'foo', 'floats': 5}, {'strs': 'bar', 'floats': None}] table = cls.from_pylist(data) assert table.num_columns == 2 assert table.num_rows == 3 assert table.schema == schema assert table.to_pylist() == data # With metadata and inferred schema metadata = {b'foo': b'bar'} schema = schema.with_metadata(metadata) table = cls.from_pylist(data, metadata=metadata) assert table.schema == schema assert table.schema.metadata == metadata assert table.to_pylist() == data # With explicit schema table = cls.from_pylist(data, schema=schema) assert table.schema == schema assert table.schema.metadata == metadata assert table.to_pylist() == data # Cannot pass both schema and metadata with pytest.raises(ValueError): cls.from_pylist(data, schema=schema, metadata=metadata) # Non-convertible values given schema with pytest.raises(TypeError): cls.from_pylist([{'c0': 0}, {'c0': 1}, {'c0': 2}], schema=pa.schema([("c0", pa.string())])) # Missing schema fields in the passed mapping translate to None schema = pa.schema([('a', pa.int64()), ('c', pa.int32()), ('d', pa.int16()) ]) table = cls.from_pylist( [{'a': 1, 'b': 3}, {'a': 2, 'b': 4}, {'a': 3, 'b': 5}], schema=schema ) data = [{'a': 1, 'c': None, 'd': None}, {'a': 2, 'c': None, 'd': None}, {'a': 3, 'c': None, 'd': None}] assert table.schema == schema assert table.to_pylist() == data # Passed wrong schema type with pytest.raises(TypeError): cls.from_pylist([{'a': 1}, {'a': 2}, {'a': 3}], schema={}) # If the dictionaries of rows are not same length data = [{'strs': '', 'floats': 4.5}, {'floats': 5}, {'strs': 'bar'}] data2 = [{'strs': '', 'floats': 4.5}, {'strs': None, 'floats': 5}, {'strs': 'bar', 'floats': None}] table = cls.from_pylist(data) assert table.num_columns == 2 assert table.num_rows == 3 assert table.to_pylist() == data2 data = [{'strs': ''}, {'strs': 'foo', 'floats': 5}, {'floats': None}] data2 = [{'strs': ''}, {'strs': 'foo'}, {'strs': None}] table = cls.from_pylist(data) assert table.num_columns == 1 assert table.num_rows == 3 assert table.to_pylist() == data2 @pytest.mark.pandas def test_table_from_pandas_schema(): # passed schema is source of truth for the columns import pandas as pd df = pd.DataFrame(OrderedDict([('strs', ['', 'foo', 'bar']), ('floats', [4.5, 5, None])])) # with different but compatible schema schema = pa.schema([('strs', pa.utf8()), ('floats', pa.float32())]) table = pa.Table.from_pandas(df, schema=schema) assert pa.types.is_float32(table.column('floats').type) assert table.schema.remove_metadata() == schema # with different and incompatible schema schema = pa.schema([('strs', pa.utf8()), ('floats', pa.timestamp('s'))]) with pytest.raises((NotImplementedError, TypeError)): pa.Table.from_pandas(df, schema=schema) # schema has columns not present in data -> error schema = pa.schema([('strs', pa.utf8()), ('floats', pa.float64()), ('ints', pa.int64())]) with pytest.raises(KeyError, match='ints'): pa.Table.from_pandas(df, schema=schema) # data has columns not present in schema -> ignored schema = pa.schema([('strs', pa.utf8())]) table = pa.Table.from_pandas(df, schema=schema) assert table.num_columns == 1 assert table.schema.remove_metadata() == schema assert table.column_names == ['strs'] @pytest.mark.pandas def test_table_factory_function(): import pandas as pd # Put in wrong order to make sure that lines up with schema d = OrderedDict([('b', ['a', 'b', 'c']), ('a', [1, 2, 3])]) d_explicit = {'b': pa.array(['a', 'b', 'c'], type='string'), 'a': pa.array([1, 2, 3], type='int32')} schema = pa.schema([('a', pa.int32()), ('b', pa.string())]) df = pd.DataFrame(d) table1 = pa.table(df) table2 = pa.Table.from_pandas(df) assert table1.equals(table2) table1 = pa.table(df, schema=schema) table2 = pa.Table.from_pandas(df, schema=schema) assert table1.equals(table2) table1 = pa.table(d_explicit) table2 = pa.Table.from_pydict(d_explicit) assert table1.equals(table2) # schema coerces type table1 = pa.table(d, schema=schema) table2 = pa.Table.from_pydict(d, schema=schema) assert table1.equals(table2) def test_table_factory_function_args(): # from_pydict not accepting names: with pytest.raises(ValueError): pa.table({'a': [1, 2, 3]}, names=['a']) # backwards compatibility for schema as first positional argument schema = pa.schema([('a', pa.int32())]) table = pa.table({'a': pa.array([1, 2, 3], type=pa.int64())}, schema) assert table.column('a').type == pa.int32() # from_arrays: accept both names and schema as positional first argument data = [pa.array([1, 2, 3], type='int64')] names = ['a'] table = pa.table(data, names) assert table.column_names == names schema = pa.schema([('a', pa.int64())]) table = pa.table(data, schema) assert table.column_names == names @pytest.mark.pandas def test_table_factory_function_args_pandas(): import pandas as pd # from_pandas not accepting names or metadata: with pytest.raises(ValueError): pa.table(pd.DataFrame({'a': [1, 2, 3]}), names=['a']) with pytest.raises(ValueError): pa.table(pd.DataFrame({'a': [1, 2, 3]}), metadata={b'foo': b'bar'}) # backwards compatibility for schema as first positional argument schema = pa.schema([('a', pa.int32())]) table = pa.table(pd.DataFrame({'a': [1, 2, 3]}), schema) assert table.column('a').type == pa.int32() def test_factory_functions_invalid_input(): with pytest.raises(TypeError, match="Expected pandas DataFrame, python"): pa.table("invalid input") with pytest.raises(TypeError, match="Expected pandas DataFrame"): pa.record_batch("invalid input") def test_table_repr_to_string(): # Schema passed explicitly schema = pa.schema([pa.field('c0', pa.int16(), metadata={'key': 'value'}), pa.field('c1', pa.int32())], metadata={b'foo': b'bar'}) tab = pa.table([pa.array([1, 2, 3, 4], type='int16'), pa.array([10, 20, 30, 40], type='int32')], schema=schema) assert str(tab) == """pyarrow.Table c0: int16 c1: int32 ---- c0: [[1,2,3,4]] c1: [[10,20,30,40]]""" assert tab.to_string(show_metadata=True) == """\ pyarrow.Table c0: int16 -- field metadata -- key: 'value' c1: int32 -- schema metadata -- foo: 'bar'""" assert tab.to_string(preview_cols=5) == """\ pyarrow.Table c0: int16 c1: int32 ---- c0: [[1,2,3,4]] c1: [[10,20,30,40]]""" assert tab.to_string(preview_cols=1) == """\ pyarrow.Table c0: int16 c1: int32 ---- c0: [[1,2,3,4]] ...""" def test_table_repr_to_string_ellipsis(): # Schema passed explicitly schema = pa.schema([pa.field('c0', pa.int16(), metadata={'key': 'value'}), pa.field('c1', pa.int32())], metadata={b'foo': b'bar'}) tab = pa.table([pa.array([1, 2, 3, 4]*10, type='int16'), pa.array([10, 20, 30, 40]*10, type='int32')], schema=schema) assert str(tab) == """pyarrow.Table c0: int16 c1: int32 ---- c0: [[1,2,3,4,1,...,4,1,2,3,4]] c1: [[10,20,30,40,10,...,40,10,20,30,40]]""" def test_record_batch_repr_to_string(): # Schema passed explicitly schema = pa.schema([pa.field('c0', pa.int16(), metadata={'key': 'value'}), pa.field('c1', pa.int32())], metadata={b'foo': b'bar'}) batch = pa.record_batch([pa.array([1, 2, 3, 4], type='int16'), pa.array([10, 20, 30, 40], type='int32')], schema=schema) assert str(batch) == """pyarrow.RecordBatch c0: int16 c1: int32 ---- c0: [1,2,3,4] c1: [10,20,30,40]""" assert batch.to_string(show_metadata=True) == """\ pyarrow.RecordBatch c0: int16 -- field metadata -- key: 'value' c1: int32 -- schema metadata -- foo: 'bar'""" assert batch.to_string(preview_cols=5) == """\ pyarrow.RecordBatch c0: int16 c1: int32 ---- c0: [1,2,3,4] c1: [10,20,30,40]""" assert batch.to_string(preview_cols=1) == """\ pyarrow.RecordBatch c0: int16 c1: int32 ---- c0: [1,2,3,4] ...""" def test_record_batch_repr_to_string_ellipsis(): # Schema passed explicitly schema = pa.schema([pa.field('c0', pa.int16(), metadata={'key': 'value'}), pa.field('c1', pa.int32())], metadata={b'foo': b'bar'}) batch = pa.record_batch([pa.array([1, 2, 3, 4]*10, type='int16'), pa.array([10, 20, 30, 40]*10, type='int32')], schema=schema) assert str(batch) == """pyarrow.RecordBatch c0: int16 c1: int32 ---- c0: [1,2,3,4,1,2,3,4,1,2,...,3,4,1,2,3,4,1,2,3,4] c1: [10,20,30,40,10,20,30,40,10,20,...,30,40,10,20,30,40,10,20,30,40]""" def test_table_function_unicode_schema(): col_a = "äääh" col_b = "öööf" # Put in wrong order to make sure that lines up with schema d = OrderedDict([(col_b, ['a', 'b', 'c']), (col_a, [1, 2, 3])]) schema = pa.schema([(col_a, pa.int32()), (col_b, pa.string())]) result = pa.table(d, schema=schema) assert result[0].chunk(0).equals(pa.array([1, 2, 3], type='int32')) assert result[1].chunk(0).equals(pa.array(['a', 'b', 'c'], type='string')) def test_table_take_vanilla_functionality(): table = pa.table( [pa.array([1, 2, 3, None, 5]), pa.array(['a', 'b', 'c', 'd', 'e'])], ['f1', 'f2']) assert table.take(pa.array([2, 3])).equals(table.slice(2, 2)) def test_table_take_null_index(): table = pa.table( [pa.array([1, 2, 3, None, 5]), pa.array(['a', 'b', 'c', 'd', 'e'])], ['f1', 'f2']) result_with_null_index = pa.table( [pa.array([1, None]), pa.array(['a', None])], ['f1', 'f2']) assert table.take(pa.array([0, None])).equals(result_with_null_index) def test_table_take_non_consecutive(): table = pa.table( [pa.array([1, 2, 3, None, 5]), pa.array(['a', 'b', 'c', 'd', 'e'])], ['f1', 'f2']) result_non_consecutive = pa.table( [pa.array([2, None]), pa.array(['b', 'd'])], ['f1', 'f2']) assert table.take(pa.array([1, 3])).equals(result_non_consecutive) def test_table_select(): a1 = pa.array([1, 2, 3, None, 5]) a2 = pa.array(['a', 'b', 'c', 'd', 'e']) a3 = pa.array([[1, 2], [3, 4], [5, 6], None, [9, 10]]) table = pa.table([a1, a2, a3], ['f1', 'f2', 'f3']) # selecting with string names result = table.select(['f1']) expected = pa.table([a1], ['f1']) assert result.equals(expected) result = table.select(['f3', 'f2']) expected = pa.table([a3, a2], ['f3', 'f2']) assert result.equals(expected) # selecting with integer indices result = table.select([0]) expected = pa.table([a1], ['f1']) assert result.equals(expected) result = table.select([2, 1]) expected = pa.table([a3, a2], ['f3', 'f2']) assert result.equals(expected) # preserve metadata table2 = table.replace_schema_metadata({"a": "test"}) result = table2.select(["f1", "f2"]) assert b"a" in result.schema.metadata # selecting non-existing column raises with pytest.raises(KeyError, match='Field "f5" does not exist'): table.select(['f5']) with pytest.raises(IndexError, match="index out of bounds"): table.select([5]) # duplicate selection gives duplicated names in resulting table result = table.select(['f2', 'f2']) expected = pa.table([a2, a2], ['f2', 'f2']) assert result.equals(expected) # selection duplicated column raises table = pa.table([a1, a2, a3], ['f1', 'f2', 'f1']) with pytest.raises(KeyError, match='Field "f1" exists 2 times'): table.select(['f1']) result = table.select(['f2']) expected = pa.table([a2], ['f2']) assert result.equals(expected) @pytest.mark.acero def test_table_group_by(): def sorted_by_keys(d): # Ensure a guaranteed order of keys for aggregation results. if "keys2" in d: keys = tuple(zip(d["keys"], d["keys2"])) else: keys = d["keys"] sorted_keys = sorted(keys) sorted_d = {"keys": sorted(d["keys"])} for entry in d: if entry == "keys": continue values = dict(zip(keys, d[entry])) for k in sorted_keys: sorted_d.setdefault(entry, []).append(values[k]) return sorted_d table = pa.table([ pa.array(["a", "a", "b", "b", "c"]), pa.array(["X", "X", "Y", "Z", "Z"]), pa.array([1, 2, 3, 4, 5]), pa.array([10, 20, 30, 40, 50]) ], names=["keys", "keys2", "values", "bigvalues"]) r = table.group_by("keys").aggregate([ ("values", "hash_sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_sum": [3, 7, 5] } r = table.group_by("keys").aggregate([ ("values", "hash_sum"), ("values", "hash_count") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_sum": [3, 7, 5], "values_count": [2, 2, 1] } # Test without hash_ prefix r = table.group_by("keys").aggregate([ ("values", "sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_sum": [3, 7, 5] } r = table.group_by("keys").aggregate([ ("values", "max"), ("bigvalues", "sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_max": [2, 4, 5], "bigvalues_sum": [30, 70, 50] } r = table.group_by("keys").aggregate([ ("bigvalues", "max"), ("values", "sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_sum": [3, 7, 5], "bigvalues_max": [20, 40, 50] } r = table.group_by(["keys", "keys2"]).aggregate([ ("values", "sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "b", "c"], "keys2": ["X", "Y", "Z", "Z"], "values_sum": [3, 3, 4, 5] } # Test many arguments r = table.group_by("keys").aggregate([ ("values", "max"), ("bigvalues", "sum"), ("bigvalues", "max"), ([], "count_all"), ("values", "sum") ]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b", "c"], "values_max": [2, 4, 5], "bigvalues_sum": [30, 70, 50], "bigvalues_max": [20, 40, 50], "count_all": [2, 2, 1], "values_sum": [3, 7, 5] } table_with_nulls = pa.table([ pa.array(["a", "a", "a"]), pa.array([1, None, None]) ], names=["keys", "values"]) r = table_with_nulls.group_by(["keys"]).aggregate([ ("values", "count", pc.CountOptions(mode="all")) ]) assert r.to_pydict() == { "keys": ["a"], "values_count": [3] } r = table_with_nulls.group_by(["keys"]).aggregate([ ("values", "count", pc.CountOptions(mode="only_null")) ]) assert r.to_pydict() == { "keys": ["a"], "values_count": [2] } r = table_with_nulls.group_by(["keys"]).aggregate([ ("values", "count", pc.CountOptions(mode="only_valid")) ]) assert r.to_pydict() == { "keys": ["a"], "values_count": [1] } r = table_with_nulls.group_by(["keys"]).aggregate([ ([], "count_all"), # nullary count that takes no parameters ("values", "count", pc.CountOptions(mode="only_valid")) ]) assert r.to_pydict() == { "keys": ["a"], "count_all": [3], "values_count": [1] } r = table_with_nulls.group_by(["keys"]).aggregate([ ([], "count_all") ]) assert r.to_pydict() == { "keys": ["a"], "count_all": [3] } table = pa.table({ 'keys': ['a', 'b', 'a', 'b', 'a', 'b'], 'values': range(6)}) table_with_chunks = pa.Table.from_batches( table.to_batches(max_chunksize=3)) r = table_with_chunks.group_by('keys').aggregate([('values', 'sum')]) assert sorted_by_keys(r.to_pydict()) == { "keys": ["a", "b"], "values_sum": [6, 9] } @pytest.mark.acero def test_table_group_by_first(): # "first" is an ordered aggregation -> requires to specify use_threads=False table1 = pa.table({'a': [1, 2, 3, 4], 'b': ['a', 'b'] * 2}) table2 = pa.table({'a': [1, 2, 3, 4], 'b': ['b', 'a'] * 2}) table = pa.concat_tables([table1, table2]) with pytest.raises(NotImplementedError): table.group_by("b").aggregate([("a", "first")]) result = table.group_by("b", use_threads=False).aggregate([("a", "first")]) expected = pa.table({"b": ["a", "b"], "a_first": [1, 2]}) assert result.equals(expected) @pytest.mark.acero def test_table_group_by_pivot_wider(): table = pa.table({'group': [1, 2, 3, 1, 2, 3], 'key': ['h', 'h', 'h', 'w', 'w', 'w'], 'value': [10, 20, 30, 40, 50, 60]}) with pytest.raises(ValueError, match='accepts 3 arguments but 2 passed'): table.group_by("group").aggregate([("key", "pivot_wider")]) # GH-45739: calling hash_pivot_wider without options shouldn't crash # (even though it's not very useful as key_names=[]) result = table.group_by("group").aggregate([(("key", "value"), "pivot_wider")]) expected = pa.table({'group': [1, 2, 3], 'key_value_pivot_wider': [{}, {}, {}]}) assert result.equals(expected) options = pc.PivotWiderOptions(key_names=('h', 'w')) result = table.group_by("group").aggregate( [(("key", "value"), "pivot_wider", options)]) expected = pa.table( {'group': [1, 2, 3], 'key_value_pivot_wider': [ {'h': 10, 'w': 40}, {'h': 20, 'w': 50}, {'h': 30, 'w': 60}]}) assert result.equals(expected) def test_table_to_recordbatchreader(): table = pa.Table.from_pydict({'x': [1, 2, 3]}) reader = table.to_reader() assert table.schema == reader.schema assert table == reader.read_all() reader = table.to_reader(max_chunksize=2) assert reader.read_next_batch().num_rows == 2 assert reader.read_next_batch().num_rows == 1 @pytest.mark.acero def test_table_join(): t1 = pa.table({ "colA": [1, 2, 6], "col2": ["a", "b", "f"] }) t2 = pa.table({ "colB": [99, 2, 1], "col3": ["Z", "B", "A"] }) result = t1.join(t2, "colA", "colB") assert result.combine_chunks() == pa.table({ "colA": [1, 2, 6], "col2": ["a", "b", "f"], "col3": ["A", "B", None] }) result = t1.join(t2, "colA", "colB", join_type="full outer") assert result.combine_chunks().sort_by("colA") == pa.table({ "colA": [1, 2, 6, 99], "col2": ["a", "b", "f", None], "col3": ["A", "B", None, "Z"] }) @pytest.mark.acero def test_table_join_unique_key(): t1 = pa.table({ "colA": [1, 2, 6], "col2": ["a", "b", "f"] }) t2 = pa.table({ "colA": [99, 2, 1], "col3": ["Z", "B", "A"] }) result = t1.join(t2, "colA") assert result.combine_chunks() == pa.table({ "colA": [1, 2, 6], "col2": ["a", "b", "f"], "col3": ["A", "B", None] }) result = t1.join(t2, "colA", join_type="full outer", right_suffix="_r") assert result.combine_chunks().sort_by("colA") == pa.table({ "colA": [1, 2, 6, 99], "col2": ["a", "b", "f", None], "col3": ["A", "B", None, "Z"] }) @pytest.mark.acero def test_table_join_collisions(): t1 = pa.table({ "colA": [1, 2, 6], "colB": [10, 20, 60], "colVals": ["a", "b", "f"] }) t2 = pa.table({ "colA": [99, 2, 1], "colB": [99, 20, 10], "colVals": ["Z", "B", "A"] }) result = t1.join(t2, "colA", join_type="full outer") assert result.combine_chunks().sort_by("colA") == pa.table([ [1, 2, 6, 99], [10, 20, 60, None], ["a", "b", "f", None], [10, 20, None, 99], ["A", "B", None, "Z"], ], names=["colA", "colB", "colVals", "colB", "colVals"]) @pytest.mark.acero @pytest.mark.parametrize('cls', [(pa.Table), (pa.RecordBatch)]) def test_table_filter_expression(cls): t1 = cls.from_pydict({ "colA": [1, 2, 3, 6], "colB": [10, 20, None, 60], "colVals": ["a", "b", "c", "f"] }) result = t1.filter(pc.field("colB") < 50) assert result == cls.from_pydict({ "colA": [1, 2], "colB": [10, 20], "colVals": ["a", "b"] }) @pytest.mark.acero def test_table_filter_expression_chunks(): t1 = pa.table({ "colA": [1, 2, 6], "colB": [10, 20, 60], "colVals": ["a", "b", "f"] }) t2 = pa.table({ "colA": [99, 2, 1], "colB": [99, 20, 10], "colVals": ["Z", "B", "A"] }) t3 = pa.concat_tables([t1, t2]) result = t3.filter(pc.field("colA") < 10) assert result.combine_chunks() == pa.table({ "colA": [1, 2, 6, 2, 1], "colB": [10, 20, 60, 20, 10], "colVals": ["a", "b", "f", "B", "A"] }) @pytest.mark.acero def test_table_join_many_columns(): t1 = pa.table({ "colA": [1, 2, 6], "col2": ["a", "b", "f"] }) t2 = pa.table({ "colB": [99, 2, 1], "col3": ["Z", "B", "A"], "col4": ["Z", "B", "A"], "col5": ["Z", "B", "A"], "col6": ["Z", "B", "A"], "col7": ["Z", "B", "A"] }) result = t1.join(t2, "colA", "colB") assert result.combine_chunks() == pa.table({ "colA": [1, 2, 6], "col2": ["a", "b", "f"], "col3": ["A", "B", None], "col4": ["A", "B", None], "col5": ["A", "B", None], "col6": ["A", "B", None], "col7": ["A", "B", None] }) result = t1.join(t2, "colA", "colB", join_type="full outer") assert result.combine_chunks().sort_by("colA") == pa.table({ "colA": [1, 2, 6, 99], "col2": ["a", "b", "f", None], "col3": ["A", "B", None, "Z"], "col4": ["A", "B", None, "Z"], "col5": ["A", "B", None, "Z"], "col6": ["A", "B", None, "Z"], "col7": ["A", "B", None, "Z"], }) @pytest.mark.dataset def test_table_join_asof(): t1 = pa.Table.from_pydict({ "colA": [1, 1, 5, 6, 7], "col2": ["a", "b", "a", "b", "f"] }) t2 = pa.Table.from_pydict({ "colB": [2, 9, 15], "col3": ["a", "b", "g"], "colC": [1., 3., 5.] }) r = t1.join_asof( t2, on="colA", by="col2", tolerance=1, right_on="colB", right_by="col3", ) assert r.combine_chunks() == pa.table({ "colA": [1, 1, 5, 6, 7], "col2": ["a", "b", "a", "b", "f"], "colC": [1., None, None, None, None], }) @pytest.mark.dataset def test_table_join_asof_multiple_by(): t1 = pa.table({ "colA": [1, 2, 6], "colB": [10, 20, 60], "on": [1, 2, 3], }) t2 = pa.table({ "colB": [99, 20, 10], "colVals": ["Z", "B", "A"], "colA": [99, 2, 1], "on": [2, 3, 4], }) result = t1.join_asof( t2, on="on", by=["colA", "colB"], tolerance=1 ) assert result.sort_by("colA") == pa.table({ "colA": [1, 2, 6], "colB": [10, 20, 60], "on": [1, 2, 3], "colVals": [None, "B", None], }) @pytest.mark.dataset def test_table_join_asof_empty_by(): t1 = pa.table({ "on": [1, 2, 3], }) t2 = pa.table({ "colVals": ["Z", "B", "A"], "on": [2, 3, 4], }) result = t1.join_asof( t2, on="on", by=[], tolerance=1 ) assert result == pa.table({ "on": [1, 2, 3], "colVals": ["Z", "Z", "B"], }) @pytest.mark.dataset def test_table_join_asof_collisions(): t1 = pa.table({ "colA": [1, 2, 6], "colB": [10, 20, 60], "on": [1, 2, 3], "colVals": ["a", "b", "f"] }) t2 = pa.table({ "colB": [99, 20, 10], "colVals": ["Z", "B", "A"], "colUniq": [100, 200, 300], "colA": [99, 2, 1], "on": [2, 3, 4], }) msg = ( "Columns {'colVals'} present in both tables. " "AsofJoin does not support column collisions." ) with pytest.raises(ValueError, match=msg): t1.join_asof( t2, on="on", by=["colA", "colB"], tolerance=1, right_on="on", right_by=["colA", "colB"], ) @pytest.mark.dataset def test_table_join_asof_by_length_mismatch(): t1 = pa.table({ "colA": [1, 2, 6], "colB": [10, 20, 60], "on": [1, 2, 3], }) t2 = pa.table({ "colVals": ["Z", "B", "A"], "colUniq": [100, 200, 300], "colA": [99, 2, 1], "on": [2, 3, 4], }) msg = "inconsistent size of by-key across inputs" with pytest.raises(pa.lib.ArrowInvalid, match=msg): t1.join_asof( t2, on="on", by=["colA", "colB"], tolerance=1, right_on="on", right_by=["colA"], ) @pytest.mark.dataset def test_table_join_asof_by_type_mismatch(): t1 = pa.table({ "colA": [1, 2, 6], "on": [1, 2, 3], }) t2 = pa.table({ "colVals": ["Z", "B", "A"], "colUniq": [100, 200, 300], "colA": [99., 2., 1.], "on": [2, 3, 4], }) msg = "Expected by-key type int64 but got double for field colA in input 1" with pytest.raises(pa.lib.ArrowInvalid, match=msg): t1.join_asof( t2, on="on", by=["colA"], tolerance=1, right_on="on", right_by=["colA"], ) @pytest.mark.dataset def test_table_join_asof_on_type_mismatch(): t1 = pa.table({ "colA": [1, 2, 6], "on": [1, 2, 3], }) t2 = pa.table({ "colVals": ["Z", "B", "A"], "colUniq": [100, 200, 300], "colA": [99, 2, 1], "on": [2., 3., 4.], }) msg = "Expected on-key type int64 but got double for field on in input 1" with pytest.raises(pa.lib.ArrowInvalid, match=msg): t1.join_asof( t2, on="on", by=["colA"], tolerance=1, right_on="on", right_by=["colA"], ) @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_cast_invalid(cls): # Casting a nullable field to non-nullable should be invalid! table = cls.from_pydict({'a': [None, 1], 'b': [None, True]}) new_schema = pa.schema([pa.field("a", "int64", nullable=True), pa.field("b", "bool", nullable=False)]) with pytest.raises(ValueError): table.cast(new_schema) table = cls.from_pydict({'a': [None, 1], 'b': [False, True]}) assert table.cast(new_schema).schema == new_schema @pytest.mark.parametrize( ('cls'), [ (pa.Table), (pa.RecordBatch) ] ) def test_table_sort_by(cls): table = cls.from_arrays([ pa.array([3, 1, 4, 2, 5]), pa.array(["b", "a", "b", "a", "c"]), ], names=["values", "keys"]) assert table.sort_by("values").to_pydict() == { "keys": ["a", "a", "b", "b", "c"], "values": [1, 2, 3, 4, 5] } assert table.sort_by([("values", "descending")]).to_pydict() == { "keys": ["c", "b", "b", "a", "a"], "values": [5, 4, 3, 2, 1] } tab = cls.from_arrays([ pa.array([5, 7, 7, 35], type=pa.int64()), pa.array(["foo", "car", "bar", "foobar"]) ], names=["a", "b"]) sorted_tab = tab.sort_by([("a", "descending")]) sorted_tab_dict = sorted_tab.to_pydict() assert sorted_tab_dict["a"] == [35, 7, 7, 5] assert sorted_tab_dict["b"] == ["foobar", "car", "bar", "foo"] sorted_tab = tab.sort_by([("a", "ascending")]) sorted_tab_dict = sorted_tab.to_pydict() assert sorted_tab_dict["a"] == [5, 7, 7, 35] assert sorted_tab_dict["b"] == ["foo", "car", "bar", "foobar"] @pytest.mark.numpy @pytest.mark.parametrize("constructor", [pa.table, pa.record_batch]) def test_numpy_asarray(constructor): table = constructor([[1, 2, 3], [4.0, 5.0, 6.0]], names=["a", "b"]) result = np.asarray(table) expected = np.array([[1, 4], [2, 5], [3, 6]], dtype="float64") np.testing.assert_allclose(result, expected) result = np.asarray(table, dtype="int32") np.testing.assert_allclose(result, expected) assert result.dtype == "int32" # no columns table2 = table.select([]) result = np.asarray(table2) expected = np.empty((3, 0)) np.testing.assert_allclose(result, expected) assert result.dtype == "float64" result = np.asarray(table2, dtype="int32") np.testing.assert_allclose(result, expected) assert result.dtype == "int32" # no rows table3 = table.slice(0, 0) result = np.asarray(table3) expected = np.empty((0, 2)) np.testing.assert_allclose(result, expected) assert result.dtype == "float64" result = np.asarray(table3, dtype="int32") np.testing.assert_allclose(result, expected) assert result.dtype == "int32" @pytest.mark.numpy @pytest.mark.parametrize("constructor", [pa.table, pa.record_batch]) def test_numpy_array_protocol(constructor): table = constructor([[1, 2, 3], [4.0, 5.0, 6.0]], names=["a", "b"]) expected = np.array([[1, 4], [2, 5], [3, 6]], dtype="float64") if Version(np.__version__) < Version("2.0.0.dev0"): # copy keyword is not strict and not passed down to __array__ result = np.array(table, copy=False) np.testing.assert_array_equal(result, expected) else: # starting with numpy 2.0, the copy=False keyword is assumed to be strict with pytest.raises(ValueError, match="Unable to avoid a copy"): np.array(table, copy=False) @pytest.mark.acero def test_invalid_non_join_column(): NUM_ITEMS = 30 t1 = pa.Table.from_pydict({ 'id': range(NUM_ITEMS), 'array_column': [[z for z in range(3)] for x in range(NUM_ITEMS)], }) t2 = pa.Table.from_pydict({ 'id': range(NUM_ITEMS), 'value': [x for x in range(NUM_ITEMS)] }) # check as left table with pytest.raises(pa.lib.ArrowInvalid) as excinfo: t1.join(t2, 'id', join_type='inner') exp_error_msg = "Data type list is not supported " \ + "in join non-key field array_column" assert exp_error_msg in str(excinfo.value) # check as right table with pytest.raises(pa.lib.ArrowInvalid) as excinfo: t2.join(t1, 'id', join_type='inner') assert exp_error_msg in str(excinfo.value) @pytest.fixture def cuda_context(): cuda = pytest.importorskip("pyarrow.cuda") return cuda.Context(0) @pytest.fixture def schema(): return pa.schema([pa.field('c0', pa.int32()), pa.field('c1', pa.int32())]) @pytest.fixture def cpu_arrays(schema): return [pa.array([1, 2, 3, 4, 5], schema.field(0).type), pa.array([-10, -5, 0, None, 10], schema.field(1).type)] @pytest.fixture def cuda_arrays(cuda_context, cpu_arrays): return [arr.copy_to(cuda_context.memory_manager) for arr in cpu_arrays] @pytest.fixture def cpu_chunked_array(cpu_arrays): chunked_array = pa.chunked_array(cpu_arrays) assert chunked_array.is_cpu is True return chunked_array @pytest.fixture def cuda_chunked_array(cuda_arrays): chunked_array = pa.chunked_array(cuda_arrays) assert chunked_array.is_cpu is False return chunked_array @pytest.fixture def cpu_and_cuda_chunked_array(cpu_arrays, cuda_arrays): chunked_array = pa.chunked_array(cpu_arrays + cuda_arrays) assert chunked_array.is_cpu is False return chunked_array @pytest.fixture def cpu_recordbatch(cpu_arrays, schema): return pa.record_batch(cpu_arrays, schema=schema) @pytest.fixture def cuda_recordbatch(cuda_context, cpu_recordbatch): return cpu_recordbatch.copy_to(cuda_context.memory_manager) @pytest.fixture def cpu_table(schema, cpu_chunked_array): return pa.table([cpu_chunked_array, cpu_chunked_array], schema=schema) @pytest.fixture def cuda_table(schema, cuda_chunked_array): return pa.table([cuda_chunked_array, cuda_chunked_array], schema=schema) @pytest.fixture def cpu_and_cuda_table(schema, cpu_chunked_array, cuda_chunked_array): return pa.table([cpu_chunked_array, cuda_chunked_array], schema=schema) def test_chunked_array_non_cpu(cuda_context, cpu_chunked_array, cuda_chunked_array, cpu_and_cuda_chunked_array): # type test assert cuda_chunked_array.type == cpu_chunked_array.type # length() test assert cuda_chunked_array.length() == cpu_chunked_array.length() # str() test assert str(cuda_chunked_array) == str(cpu_chunked_array) # repr() test assert str(cuda_chunked_array) in repr(cuda_chunked_array) # validate() test cuda_chunked_array.validate() with pytest.raises(NotImplementedError): cuda_chunked_array.validate(full=True) # null_count test with pytest.raises(NotImplementedError): cuda_chunked_array.null_count # nbytes() test with pytest.raises(NotImplementedError): cuda_chunked_array.nbytes # get_total_buffer_size() test with pytest.raises(NotImplementedError): cuda_chunked_array.get_total_buffer_size() # getitem() test with pytest.raises(NotImplementedError): cuda_chunked_array[0] # is_null() test with pytest.raises(NotImplementedError): cuda_chunked_array.is_null() # is_nan() test with pytest.raises(NotImplementedError): cuda_chunked_array.is_nan() # is_valid() test with pytest.raises(NotImplementedError): cuda_chunked_array.is_valid() # fill_null() test with pytest.raises(NotImplementedError): cuda_chunked_array.fill_null(0) # equals() test with pytest.raises(NotImplementedError): cuda_chunked_array == cuda_chunked_array # to_pandas() test with pytest.raises(NotImplementedError): cuda_chunked_array.to_pandas() # to_numpy() test with pytest.raises(NotImplementedError): cuda_chunked_array.to_numpy() # __array__() test with pytest.raises(NotImplementedError): cuda_chunked_array.__array__() # cast() test with pytest.raises(NotImplementedError): cuda_chunked_array.cast() # dictionary_encode() test with pytest.raises(NotImplementedError): cuda_chunked_array.dictionary_encode() # flatten() test with pytest.raises(NotImplementedError): cuda_chunked_array.flatten() # combine_chunks() test with pytest.raises(NotImplementedError): cuda_chunked_array.combine_chunks() # unique() test with pytest.raises(NotImplementedError): cuda_chunked_array.unique() # value_counts() test with pytest.raises(NotImplementedError): cuda_chunked_array.value_counts() # filter() test with pytest.raises(NotImplementedError): cuda_chunked_array.filter([True, False, True, False, True]) # index() test with pytest.raises(NotImplementedError): cuda_chunked_array.index(5) # slice() test cuda_chunked_array.slice(2, 2) # take() test with pytest.raises(NotImplementedError): cuda_chunked_array.take([1]) # drop_null() test with pytest.raises(NotImplementedError): cuda_chunked_array.drop_null() # sort() test with pytest.raises(NotImplementedError): cuda_chunked_array.sort() # unify_dictionaries() test with pytest.raises(NotImplementedError): cuda_chunked_array.unify_dictionaries() # num_chunks test assert cuda_chunked_array.num_chunks == cpu_chunked_array.num_chunks # chunks test assert len(cuda_chunked_array.chunks) == len(cpu_chunked_array.chunks) # chunk() test chunk = cuda_chunked_array.chunk(0) assert chunk.device_type == pa.DeviceAllocationType.CUDA # to_pylist() test with pytest.raises(NotImplementedError): cuda_chunked_array.to_pylist() # __arrow_c_stream__() test with pytest.raises(NotImplementedError): cuda_chunked_array.__arrow_c_stream__() # __reduce__() test with pytest.raises(NotImplementedError): cuda_chunked_array.__reduce__() def verify_cuda_recordbatch(batch, expected_schema): batch.validate() assert batch.device_type == pa.DeviceAllocationType.CUDA assert batch.is_cpu is False assert batch.num_columns == len(expected_schema.names) assert batch.column_names == expected_schema.names assert str(batch) in repr(batch) for c in batch.columns: assert c.device_type == pa.DeviceAllocationType.CUDA assert batch.schema == expected_schema def test_recordbatch_non_cpu(cuda_context, cpu_recordbatch, cuda_recordbatch, cuda_arrays, schema): verify_cuda_recordbatch(cuda_recordbatch, expected_schema=schema) N = cuda_recordbatch.num_rows # shape test assert cuda_recordbatch.shape == (5, 2) # columns() test assert len(cuda_recordbatch.columns) == 2 # add_column(), set_column() test for fn in [cuda_recordbatch.add_column, cuda_recordbatch.set_column]: col = pa.array([-2, -1, 0, 1, 2], pa.int8() ).copy_to(cuda_context.memory_manager) new_batch = fn(2, 'c2', col) verify_cuda_recordbatch( new_batch, expected_schema=schema.append(pa.field('c2', pa.int8()))) err_msg = ("Got column on device , " "but expected .") with pytest.raises(TypeError, match=err_msg): fn(2, 'c2', [1] * N) # remove_column() test new_batch = cuda_recordbatch.remove_column(1) verify_cuda_recordbatch(new_batch, expected_schema=schema.remove(1)) # drop_columns() test new_batch = cuda_recordbatch.drop_columns(['c1']) verify_cuda_recordbatch(new_batch, expected_schema=schema.remove(1)) empty_batch = cuda_recordbatch.drop_columns(['c0', 'c1']) assert len(empty_batch.columns) == 0 assert empty_batch.device_type == pa.DeviceAllocationType.CUDA # select() test new_batch = cuda_recordbatch.select(['c0']) verify_cuda_recordbatch(new_batch, expected_schema=schema.remove(1)) # cast() test new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', pa.int64())]) with pytest.raises(NotImplementedError): cuda_recordbatch.cast(new_schema) # drop_null() test null_col = pa.array([1] * N, mask=[True, False, True, False, True]).copy_to( cuda_context.memory_manager) cuda_recordbatch_with_nulls = cuda_recordbatch.add_column(2, 'c2', null_col) with pytest.raises(NotImplementedError): cuda_recordbatch_with_nulls.drop_null() # filter() test with pytest.raises(NotImplementedError): cuda_recordbatch.filter([True] * N) # take() test with pytest.raises(NotImplementedError): cuda_recordbatch.take([0]) # sort_by() test with pytest.raises(NotImplementedError): cuda_recordbatch.sort_by('c0') # field() test assert cuda_recordbatch.field(0) == schema.field(0) assert cuda_recordbatch.field(1) == schema.field(1) # equals() test new_batch = cpu_recordbatch.copy_to(cuda_context.memory_manager) with pytest.raises(NotImplementedError): assert cuda_recordbatch.equals(new_batch) is True # from_arrays() test new_batch = pa.RecordBatch.from_arrays(cuda_arrays, ['c0', 'c1']) verify_cuda_recordbatch(new_batch, expected_schema=schema) assert new_batch.copy_to(pa.default_cpu_memory_manager()).equals(cpu_recordbatch) # from_pydict() test new_batch = pa.RecordBatch.from_pydict({'c0': cuda_arrays[0], 'c1': cuda_arrays[1]}) verify_cuda_recordbatch(new_batch, expected_schema=schema) assert new_batch.copy_to(pa.default_cpu_memory_manager()).equals(cpu_recordbatch) # from_struct_array() test fields = [schema.field(i) for i in range(len(schema.names))] struct_array = pa.StructArray.from_arrays(cuda_arrays, fields=fields) with pytest.raises(NotImplementedError): pa.RecordBatch.from_struct_array(struct_array) # nbytes test with pytest.raises(NotImplementedError): assert cuda_recordbatch.nbytes # get_total_buffer_size() test with pytest.raises(NotImplementedError): assert cuda_recordbatch.get_total_buffer_size() # to_pydict() test with pytest.raises(NotImplementedError): cuda_recordbatch.to_pydict() # to_pylist() test with pytest.raises(NotImplementedError): cuda_recordbatch.to_pylist() # to_pandas() test with pytest.raises(NotImplementedError): cuda_recordbatch.to_pandas() # to_tensor() test with pytest.raises(NotImplementedError): cuda_recordbatch.to_tensor() # to_struct_array() test with pytest.raises(NotImplementedError): cuda_recordbatch.to_struct_array() # serialize() test with pytest.raises(NotImplementedError): cuda_recordbatch.serialize() # slice() test new_batch = cuda_recordbatch.slice(1, 3) verify_cuda_recordbatch(new_batch, expected_schema=schema) assert new_batch.num_rows == 3 cpu_batch = new_batch.copy_to(pa.default_cpu_memory_manager()) assert cpu_batch == cpu_recordbatch.slice(1, 3) # replace_schema_metadata() test new_batch = cuda_recordbatch.replace_schema_metadata({b'key': b'value'}) verify_cuda_recordbatch(new_batch, expected_schema=schema) assert new_batch.schema.metadata == {b'key': b'value'} # rename_columns() test new_batch = cuda_recordbatch.rename_columns(['col0', 'col1']) expected_schema = pa.schema( [pa.field('col0', schema.field(0).type), pa.field('col1', schema.field(1).type)]) verify_cuda_recordbatch(new_batch, expected_schema=expected_schema) # validate() test cuda_recordbatch.validate() with pytest.raises(NotImplementedError): cuda_recordbatch.validate(full=True) # __array__() test with pytest.raises(NotImplementedError): cuda_recordbatch.__array__() # __arrow_c_array__() test with pytest.raises(NotImplementedError): cuda_recordbatch.__arrow_c_array__() # __arrow_c_stream__() test with pytest.raises(NotImplementedError): cuda_recordbatch.__arrow_c_stream__() # __dataframe__() test with pytest.raises(NotImplementedError): from_dataframe(cuda_recordbatch.__dataframe__()) def verify_cuda_table(table, expected_schema): table.validate() assert table.is_cpu is False assert table.num_columns == len(expected_schema.names) assert table.column_names == expected_schema.names assert str(table) in repr(table) for c in table.columns: assert c.is_cpu is False for chunk in c.iterchunks(): assert chunk.is_cpu is False assert chunk.device_type == pa.DeviceAllocationType.CUDA assert table.schema == expected_schema def test_table_non_cpu(cuda_context, cpu_table, cuda_table, cuda_arrays, cuda_recordbatch, schema): verify_cuda_table(cuda_table, expected_schema=schema) N = cuda_table.num_rows # shape test assert cuda_table.shape == (10, 2) # columns() test assert len(cuda_table.columns) == 2 # add_column(), set_column() test for fn in [cuda_table.add_column, cuda_table.set_column]: cpu_col = pa.array([1] * N, pa.int8()) cuda_col = cpu_col.copy_to(cuda_context.memory_manager) new_table = fn(2, 'c2', cuda_col) verify_cuda_table(new_table, expected_schema=schema.append( pa.field('c2', pa.int8()))) new_table = fn(2, 'c2', cpu_col) assert new_table.is_cpu is False assert new_table.column(0).is_cpu is False assert new_table.column(1).is_cpu is False assert new_table.column(2).is_cpu is True # remove_column() test new_table = cuda_table.remove_column(1) verify_cuda_table(new_table, expected_schema=schema.remove(1)) # drop_columns() test new_table = cuda_table.drop_columns(['c1']) verify_cuda_table(new_table, expected_schema=schema.remove(1)) new_table = cuda_table.drop_columns(['c0', 'c1']) assert len(new_table.columns) == 0 assert new_table.is_cpu # select() test new_table = cuda_table.select(['c0']) verify_cuda_table(new_table, expected_schema=schema.remove(1)) # cast() test new_schema = pa.schema([pa.field('c0', pa.int64()), pa.field('c1', pa.int64())]) with pytest.raises(NotImplementedError): cuda_table.cast(new_schema) # drop_null() test null_col = pa.array([1] * N, mask=[True] * N).copy_to(cuda_context.memory_manager) cuda_table_with_nulls = cuda_table.add_column(2, 'c2', null_col) with pytest.raises(NotImplementedError): cuda_table_with_nulls.drop_null() # filter() test with pytest.raises(NotImplementedError): cuda_table.filter([True] * N) # take() test with pytest.raises(NotImplementedError): cuda_table.take([0]) # sort_by() test with pytest.raises(NotImplementedError): cuda_table.sort_by('c0') # field() test assert cuda_table.field(0) == schema.field(0) assert cuda_table.field(1) == schema.field(1) # equals() test with pytest.raises(NotImplementedError): assert cuda_table.equals(cpu_table) # from_arrays() test new_table = pa.Table.from_arrays(cuda_arrays, ['c0', 'c1']) verify_cuda_table(new_table, expected_schema=schema) # from_pydict() test new_table = pa.Table.from_pydict({'c0': cuda_arrays[0], 'c1': cuda_arrays[1]}) verify_cuda_table(new_table, expected_schema=schema) # from_struct_array() test fields = [schema.field(i) for i in range(len(schema.names))] struct_array = pa.StructArray.from_arrays(cuda_arrays, fields=fields) with pytest.raises(NotImplementedError): pa.Table.from_struct_array(struct_array) # from_batches() test new_table = pa.Table.from_batches([cuda_recordbatch, cuda_recordbatch], schema) verify_cuda_table(new_table, expected_schema=schema) # nbytes test with pytest.raises(NotImplementedError): assert cuda_table.nbytes # get_total_buffer_size() test with pytest.raises(NotImplementedError): assert cuda_table.get_total_buffer_size() # to_pydict() test with pytest.raises(NotImplementedError): cuda_table.to_pydict() # to_pylist() test with pytest.raises(NotImplementedError): cuda_table.to_pylist() # to_pandas() test with pytest.raises(NotImplementedError): cuda_table.to_pandas() # to_struct_array() test with pytest.raises(NotImplementedError): cuda_table.to_struct_array() # to_batches() test batches = cuda_table.to_batches(max_chunksize=5) for batch in batches: # GH-44049 with pytest.raises(AssertionError): verify_cuda_recordbatch(batch, expected_schema=schema) # to_reader() test reader = cuda_table.to_reader(max_chunksize=5) for batch in reader: # GH-44049 with pytest.raises(AssertionError): verify_cuda_recordbatch(batch, expected_schema=schema) # slice() test new_table = cuda_table.slice(1, 3) verify_cuda_table(new_table, expected_schema=schema) assert new_table.num_rows == 3 # replace_schema_metadata() test new_table = cuda_table.replace_schema_metadata({b'key': b'value'}) verify_cuda_table(new_table, expected_schema=schema) assert new_table.schema.metadata == {b'key': b'value'} # rename_columns() test new_table = cuda_table.rename_columns(['col0', 'col1']) expected_schema = pa.schema( [pa.field('col0', schema.field(0).type), pa.field('col1', schema.field(1).type)]) verify_cuda_table(new_table, expected_schema=expected_schema) # validate() test cuda_table.validate() with pytest.raises(NotImplementedError): cuda_table.validate(full=True) # flatten() test with pytest.raises(NotImplementedError): cuda_table.flatten() # combine_chunks() test with pytest.raises(NotImplementedError): cuda_table.flatten() # unify_dictionaries() test with pytest.raises(NotImplementedError): cuda_table.unify_dictionaries() # group_by() test with pytest.raises(NotImplementedError): cuda_table.group_by('c0') # join() test with pytest.raises(NotImplementedError): cuda_table.join(cuda_table, 'c0') # join_asof() test with pytest.raises(NotImplementedError): cuda_table.join_asof(cuda_table, 'c0', 'c0', 0) # __array__() test with pytest.raises(NotImplementedError): cuda_table.__array__() # __arrow_c_stream__() test with pytest.raises(NotImplementedError): cuda_table.__arrow_c_stream__() # __dataframe__() test with pytest.raises(NotImplementedError): from_dataframe(cuda_table.__dataframe__()) # __reduce__() test with pytest.raises(NotImplementedError): cuda_table.__reduce__()