|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import pytest |
|
|
|
import os |
|
import pyarrow as pa |
|
from pyarrow import Codec |
|
from pyarrow import fs |
|
from pyarrow.lib import is_threading_enabled |
|
from pyarrow.tests.util import windows_has_tzdata |
|
import sys |
|
|
|
|
|
groups = [ |
|
'acero', |
|
'azure', |
|
'brotli', |
|
'bz2', |
|
'cython', |
|
'dataset', |
|
'hypothesis', |
|
'fastparquet', |
|
'flight', |
|
'gandiva', |
|
'gcs', |
|
'gdb', |
|
'gzip', |
|
'hdfs', |
|
'large_memory', |
|
'lz4', |
|
'memory_leak', |
|
'nopandas', |
|
'nonumpy', |
|
'numpy', |
|
'orc', |
|
'pandas', |
|
'parquet', |
|
'parquet_encryption', |
|
'processes', |
|
'requires_testing_data', |
|
's3', |
|
'slow', |
|
'snappy', |
|
'sockets', |
|
'substrait', |
|
'threading', |
|
'timezone_data', |
|
'zstd', |
|
] |
|
|
|
defaults = { |
|
'acero': False, |
|
'azure': False, |
|
'brotli': Codec.is_available('brotli'), |
|
'bz2': Codec.is_available('bz2'), |
|
'cython': False, |
|
'dataset': False, |
|
'fastparquet': False, |
|
'flight': False, |
|
'gandiva': False, |
|
'gcs': False, |
|
'gdb': True, |
|
'gzip': Codec.is_available('gzip'), |
|
'hdfs': False, |
|
'hypothesis': False, |
|
'large_memory': False, |
|
'lz4': Codec.is_available('lz4'), |
|
'memory_leak': False, |
|
'nopandas': False, |
|
'nonumpy': False, |
|
'numpy': False, |
|
'orc': False, |
|
'pandas': False, |
|
'parquet': False, |
|
'parquet_encryption': False, |
|
'processes': True, |
|
'requires_testing_data': True, |
|
's3': False, |
|
'slow': False, |
|
'snappy': Codec.is_available('snappy'), |
|
'sockets': True, |
|
'substrait': False, |
|
'threading': is_threading_enabled(), |
|
'timezone_data': True, |
|
'zstd': Codec.is_available('zstd'), |
|
} |
|
|
|
if sys.platform == "emscripten": |
|
|
|
|
|
|
|
defaults['gdb'] = False |
|
defaults['processes'] = False |
|
defaults['sockets'] = False |
|
|
|
if sys.platform == "win32": |
|
defaults['timezone_data'] = windows_has_tzdata() |
|
elif sys.platform == "emscripten": |
|
defaults['timezone_data'] = os.path.exists("/usr/share/zoneinfo") |
|
|
|
try: |
|
import cython |
|
defaults['cython'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
import fastparquet |
|
defaults['fastparquet'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
import pyarrow.gandiva |
|
defaults['gandiva'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
import pyarrow.acero |
|
defaults['acero'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
import pyarrow.dataset |
|
defaults['dataset'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
import pyarrow.orc |
|
if sys.platform == "win32": |
|
defaults['orc'] = True |
|
else: |
|
|
|
|
|
|
|
defaults['orc'] = defaults['timezone_data'] |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
import pandas |
|
defaults['pandas'] = True |
|
except ImportError: |
|
defaults['nopandas'] = True |
|
|
|
try: |
|
import numpy |
|
defaults['numpy'] = True |
|
except ImportError: |
|
defaults['nonumpy'] = True |
|
|
|
try: |
|
import pyarrow.parquet |
|
defaults['parquet'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
import pyarrow.parquet.encryption |
|
defaults['parquet_encryption'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
import pyarrow.flight |
|
defaults['flight'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
from pyarrow.fs import AzureFileSystem |
|
defaults['azure'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
from pyarrow.fs import GcsFileSystem |
|
defaults['gcs'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
from pyarrow.fs import S3FileSystem |
|
defaults['s3'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
from pyarrow.fs import HadoopFileSystem |
|
defaults['hdfs'] = True |
|
except ImportError: |
|
pass |
|
|
|
try: |
|
import pyarrow.substrait |
|
defaults['substrait'] = True |
|
except ImportError: |
|
pass |
|
|
|
|
|
|
|
def pytest_ignore_collect(collection_path, config): |
|
if config.option.doctestmodules: |
|
|
|
if "/pyarrow/tests/" in str(collection_path): |
|
return True |
|
|
|
doctest_groups = [ |
|
'dataset', |
|
'orc', |
|
'parquet', |
|
'flight', |
|
'substrait', |
|
] |
|
|
|
|
|
for group in doctest_groups: |
|
if 'pyarrow/{}'.format(group) in str(collection_path): |
|
if not defaults[group]: |
|
return True |
|
|
|
if 'pyarrow/parquet/encryption' in str(collection_path): |
|
if not defaults['parquet_encryption']: |
|
return True |
|
|
|
if 'pyarrow/cuda' in str(collection_path): |
|
try: |
|
import pyarrow.cuda |
|
return False |
|
except ImportError: |
|
return True |
|
|
|
if 'pyarrow/fs' in str(collection_path): |
|
try: |
|
from pyarrow.fs import S3FileSystem |
|
return False |
|
except ImportError: |
|
return True |
|
|
|
if getattr(config.option, "doctest_cython", False): |
|
if "/pyarrow/tests/" in str(collection_path): |
|
return True |
|
if "/pyarrow/_parquet_encryption" in str(collection_path): |
|
return True |
|
|
|
return False |
|
|
|
|
|
|
|
@pytest.fixture(autouse=True) |
|
def _docdir(request): |
|
|
|
|
|
doctest_m = request.config.option.doctestmodules |
|
doctest_c = getattr(request.config.option, "doctest_cython", False) |
|
|
|
if doctest_m or doctest_c: |
|
|
|
|
|
tmpdir = request.getfixturevalue('tmpdir') |
|
|
|
|
|
with tmpdir.as_cwd(): |
|
yield |
|
|
|
else: |
|
yield |
|
|
|
|
|
|
|
@pytest.fixture(autouse=True) |
|
def add_fs(doctest_namespace, request, tmp_path): |
|
|
|
|
|
doctest_m = request.config.option.doctestmodules |
|
doctest_c = getattr(request.config.option, "doctest_cython", False) |
|
|
|
if doctest_m or doctest_c: |
|
|
|
doctest_namespace["fs"] = fs |
|
|
|
|
|
local = fs.LocalFileSystem() |
|
path = tmp_path / 'pyarrow-fs-example.dat' |
|
with local.open_output_stream(str(path)) as stream: |
|
stream.write(b'data') |
|
doctest_namespace["local"] = local |
|
doctest_namespace["local_path"] = str(tmp_path) |
|
doctest_namespace["path"] = str(path) |
|
yield |
|
|
|
|
|
|
|
@pytest.fixture(scope="session") |
|
def unary_func_fixture(): |
|
""" |
|
Register a unary scalar function. |
|
""" |
|
from pyarrow import compute as pc |
|
|
|
def unary_function(ctx, x): |
|
return pc.call_function("add", [x, 1], |
|
memory_pool=ctx.memory_pool) |
|
func_name = "y=x+1" |
|
unary_doc = {"summary": "add function", |
|
"description": "test add function"} |
|
pc.register_scalar_function(unary_function, |
|
func_name, |
|
unary_doc, |
|
{"array": pa.int64()}, |
|
pa.int64()) |
|
return unary_function, func_name |
|
|
|
|
|
@pytest.fixture(scope="session") |
|
def unary_agg_func_fixture(): |
|
""" |
|
Register a unary aggregate function (mean) |
|
""" |
|
from pyarrow import compute as pc |
|
import numpy as np |
|
|
|
def func(ctx, x): |
|
return pa.scalar(np.nanmean(x)) |
|
|
|
func_name = "mean_udf" |
|
func_doc = {"summary": "y=avg(x)", |
|
"description": "find mean of x"} |
|
|
|
pc.register_aggregate_function(func, |
|
func_name, |
|
func_doc, |
|
{ |
|
"x": pa.float64(), |
|
}, |
|
pa.float64() |
|
) |
|
return func, func_name |
|
|
|
|
|
@pytest.fixture(scope="session") |
|
def varargs_agg_func_fixture(): |
|
""" |
|
Register a unary aggregate function |
|
""" |
|
from pyarrow import compute as pc |
|
import numpy as np |
|
|
|
def func(ctx, *args): |
|
sum = 0.0 |
|
for arg in args: |
|
sum += np.nanmean(arg) |
|
return pa.scalar(sum) |
|
|
|
func_name = "sum_mean" |
|
func_doc = {"summary": "Varargs aggregate", |
|
"description": "Varargs aggregate"} |
|
|
|
pc.register_aggregate_function(func, |
|
func_name, |
|
func_doc, |
|
{ |
|
"x": pa.int64(), |
|
"y": pa.float64() |
|
}, |
|
pa.float64() |
|
) |
|
return func, func_name |
|
|