|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
import sys |
|
|
|
from cpython.object cimport Py_LT, Py_EQ, Py_GT, Py_LE, Py_NE, Py_GE |
|
from cython.operator cimport dereference as deref |
|
|
|
from collections import namedtuple |
|
|
|
from pyarrow.lib import frombytes, tobytes, ArrowInvalid |
|
from pyarrow.lib cimport * |
|
from pyarrow.includes.common cimport * |
|
from pyarrow.includes.libarrow cimport * |
|
import pyarrow.lib as lib |
|
from pyarrow.util import _DEPR_MSG |
|
from libcpp cimport bool as c_bool |
|
|
|
import inspect |
|
try: |
|
import numpy as np |
|
except ImportError: |
|
np = None |
|
import warnings |
|
|
|
|
|
__pas = None |
|
_substrait_msg = ( |
|
"The pyarrow installation is not built with support for Substrait." |
|
) |
|
|
|
|
|
SUPPORTED_INPUT_ARR_TYPES = (list, tuple) |
|
if np is not None: |
|
SUPPORTED_INPUT_ARR_TYPES += (np.ndarray, ) |
|
|
|
|
|
def _pas(): |
|
global __pas |
|
if __pas is None: |
|
try: |
|
import pyarrow.substrait as pas |
|
__pas = pas |
|
except ImportError: |
|
raise ImportError(_substrait_msg) |
|
return __pas |
|
|
|
|
|
def _forbid_instantiation(klass, subclasses_instead=True): |
|
msg = '{} is an abstract class thus cannot be initialized.'.format( |
|
klass.__name__ |
|
) |
|
if subclasses_instead: |
|
subclasses = [cls.__name__ for cls in klass.__subclasses__] |
|
msg += ' Use one of the subclasses instead: {}'.format( |
|
', '.join(subclasses) |
|
) |
|
raise TypeError(msg) |
|
|
|
|
|
cdef vector[CSortKey] unwrap_sort_keys(sort_keys, allow_str=True): |
|
cdef vector[CSortKey] c_sort_keys |
|
if allow_str and isinstance(sort_keys, str): |
|
c_sort_keys.push_back( |
|
CSortKey(_ensure_field_ref(""), unwrap_sort_order(sort_keys)) |
|
) |
|
else: |
|
for name, order in sort_keys: |
|
c_sort_keys.push_back( |
|
CSortKey(_ensure_field_ref(name), unwrap_sort_order(order)) |
|
) |
|
return c_sort_keys |
|
|
|
|
|
cdef wrap_scalar_function(const shared_ptr[CFunction]& sp_func): |
|
""" |
|
Wrap a C++ scalar Function in a ScalarFunction object. |
|
""" |
|
cdef ScalarFunction func = ScalarFunction.__new__(ScalarFunction) |
|
func.init(sp_func) |
|
return func |
|
|
|
|
|
cdef wrap_vector_function(const shared_ptr[CFunction]& sp_func): |
|
""" |
|
Wrap a C++ vector Function in a VectorFunction object. |
|
""" |
|
cdef VectorFunction func = VectorFunction.__new__(VectorFunction) |
|
func.init(sp_func) |
|
return func |
|
|
|
|
|
cdef wrap_scalar_aggregate_function(const shared_ptr[CFunction]& sp_func): |
|
""" |
|
Wrap a C++ aggregate Function in a ScalarAggregateFunction object. |
|
""" |
|
cdef ScalarAggregateFunction func = \ |
|
ScalarAggregateFunction.__new__(ScalarAggregateFunction) |
|
func.init(sp_func) |
|
return func |
|
|
|
|
|
cdef wrap_hash_aggregate_function(const shared_ptr[CFunction]& sp_func): |
|
""" |
|
Wrap a C++ aggregate Function in a HashAggregateFunction object. |
|
""" |
|
cdef HashAggregateFunction func = \ |
|
HashAggregateFunction.__new__(HashAggregateFunction) |
|
func.init(sp_func) |
|
return func |
|
|
|
|
|
cdef wrap_meta_function(const shared_ptr[CFunction]& sp_func): |
|
""" |
|
Wrap a C++ meta Function in a MetaFunction object. |
|
""" |
|
cdef MetaFunction func = MetaFunction.__new__(MetaFunction) |
|
func.init(sp_func) |
|
return func |
|
|
|
|
|
cdef wrap_function(const shared_ptr[CFunction]& sp_func): |
|
""" |
|
Wrap a C++ Function in a Function object. |
|
|
|
This dispatches to specialized wrappers depending on the function kind. |
|
""" |
|
if sp_func.get() == NULL: |
|
raise ValueError("Function was NULL") |
|
|
|
cdef FunctionKind c_kind = sp_func.get().kind() |
|
if c_kind == FunctionKind_SCALAR: |
|
return wrap_scalar_function(sp_func) |
|
elif c_kind == FunctionKind_VECTOR: |
|
return wrap_vector_function(sp_func) |
|
elif c_kind == FunctionKind_SCALAR_AGGREGATE: |
|
return wrap_scalar_aggregate_function(sp_func) |
|
elif c_kind == FunctionKind_HASH_AGGREGATE: |
|
return wrap_hash_aggregate_function(sp_func) |
|
elif c_kind == FunctionKind_META: |
|
return wrap_meta_function(sp_func) |
|
else: |
|
raise NotImplementedError("Unknown Function::Kind") |
|
|
|
|
|
cdef wrap_scalar_kernel(const CScalarKernel* c_kernel): |
|
if c_kernel == NULL: |
|
raise ValueError("Kernel was NULL") |
|
cdef ScalarKernel kernel = ScalarKernel.__new__(ScalarKernel) |
|
kernel.init(c_kernel) |
|
return kernel |
|
|
|
|
|
cdef wrap_vector_kernel(const CVectorKernel* c_kernel): |
|
if c_kernel == NULL: |
|
raise ValueError("Kernel was NULL") |
|
cdef VectorKernel kernel = VectorKernel.__new__(VectorKernel) |
|
kernel.init(c_kernel) |
|
return kernel |
|
|
|
|
|
cdef wrap_scalar_aggregate_kernel(const CScalarAggregateKernel* c_kernel): |
|
if c_kernel == NULL: |
|
raise ValueError("Kernel was NULL") |
|
cdef ScalarAggregateKernel kernel = \ |
|
ScalarAggregateKernel.__new__(ScalarAggregateKernel) |
|
kernel.init(c_kernel) |
|
return kernel |
|
|
|
|
|
cdef wrap_hash_aggregate_kernel(const CHashAggregateKernel* c_kernel): |
|
if c_kernel == NULL: |
|
raise ValueError("Kernel was NULL") |
|
cdef HashAggregateKernel kernel = \ |
|
HashAggregateKernel.__new__(HashAggregateKernel) |
|
kernel.init(c_kernel) |
|
return kernel |
|
|
|
|
|
cdef class Kernel(_Weakrefable): |
|
""" |
|
A kernel object. |
|
|
|
Kernels handle the execution of a Function for a certain signature. |
|
""" |
|
|
|
def __init__(self): |
|
raise TypeError("Do not call {}'s constructor directly" |
|
.format(self.__class__.__name__)) |
|
|
|
|
|
cdef class ScalarKernel(Kernel): |
|
cdef const CScalarKernel* kernel |
|
|
|
cdef void init(self, const CScalarKernel* kernel) except *: |
|
self.kernel = kernel |
|
|
|
def __repr__(self): |
|
return ("ScalarKernel<{}>" |
|
.format(frombytes(self.kernel.signature.get().ToString()))) |
|
|
|
|
|
cdef class VectorKernel(Kernel): |
|
cdef const CVectorKernel* kernel |
|
|
|
cdef void init(self, const CVectorKernel* kernel) except *: |
|
self.kernel = kernel |
|
|
|
def __repr__(self): |
|
return ("VectorKernel<{}>" |
|
.format(frombytes(self.kernel.signature.get().ToString()))) |
|
|
|
|
|
cdef class ScalarAggregateKernel(Kernel): |
|
cdef const CScalarAggregateKernel* kernel |
|
|
|
cdef void init(self, const CScalarAggregateKernel* kernel) except *: |
|
self.kernel = kernel |
|
|
|
def __repr__(self): |
|
return ("ScalarAggregateKernel<{}>" |
|
.format(frombytes(self.kernel.signature.get().ToString()))) |
|
|
|
|
|
cdef class HashAggregateKernel(Kernel): |
|
cdef const CHashAggregateKernel* kernel |
|
|
|
cdef void init(self, const CHashAggregateKernel* kernel) except *: |
|
self.kernel = kernel |
|
|
|
def __repr__(self): |
|
return ("HashAggregateKernel<{}>" |
|
.format(frombytes(self.kernel.signature.get().ToString()))) |
|
|
|
|
|
FunctionDoc = namedtuple( |
|
"FunctionDoc", |
|
("summary", "description", "arg_names", "options_class", |
|
"options_required")) |
|
|
|
|
|
cdef class Function(_Weakrefable): |
|
""" |
|
A compute function. |
|
|
|
A function implements a certain logical computation over a range of |
|
possible input signatures. Each signature accepts a range of input |
|
types and is implemented by a given Kernel. |
|
|
|
Functions can be of different kinds: |
|
|
|
* "scalar" functions apply an item-wise computation over all items |
|
of their inputs. Each item in the output only depends on the values |
|
of the inputs at the same position. Examples: addition, comparisons, |
|
string predicates... |
|
|
|
* "vector" functions apply a collection-wise computation, such that |
|
each item in the output may depend on the values of several items |
|
in each input. Examples: dictionary encoding, sorting, extracting |
|
unique values... |
|
|
|
* "scalar_aggregate" functions reduce the dimensionality of the inputs by |
|
applying a reduction function. Examples: sum, min_max, mode... |
|
|
|
* "hash_aggregate" functions apply a reduction function to an input |
|
subdivided by grouping criteria. They may not be directly called. |
|
Examples: hash_sum, hash_min_max... |
|
|
|
* "meta" functions dispatch to other functions. |
|
""" |
|
|
|
cdef: |
|
shared_ptr[CFunction] sp_func |
|
CFunction* base_func |
|
|
|
_kind_map = { |
|
FunctionKind_SCALAR: "scalar", |
|
FunctionKind_VECTOR: "vector", |
|
FunctionKind_SCALAR_AGGREGATE: "scalar_aggregate", |
|
FunctionKind_HASH_AGGREGATE: "hash_aggregate", |
|
FunctionKind_META: "meta", |
|
} |
|
|
|
def __init__(self): |
|
raise TypeError("Do not call {}'s constructor directly" |
|
.format(self.__class__.__name__)) |
|
|
|
cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: |
|
self.sp_func = sp_func |
|
self.base_func = sp_func.get() |
|
|
|
def __repr__(self): |
|
return ("arrow.compute.Function<name={}, kind={}, " |
|
"arity={}, num_kernels={}>" |
|
.format(self.name, self.kind, self.arity, self.num_kernels)) |
|
|
|
def __reduce__(self): |
|
|
|
return get_function, (self.name,) |
|
|
|
@property |
|
def name(self): |
|
""" |
|
The function name. |
|
""" |
|
return frombytes(self.base_func.name()) |
|
|
|
@property |
|
def arity(self): |
|
""" |
|
The function arity. |
|
|
|
If Ellipsis (i.e. `...`) is returned, the function takes a variable |
|
number of arguments. |
|
""" |
|
cdef CArity arity = self.base_func.arity() |
|
if arity.is_varargs: |
|
return ... |
|
else: |
|
return arity.num_args |
|
|
|
@property |
|
def kind(self): |
|
""" |
|
The function kind. |
|
""" |
|
cdef FunctionKind c_kind = self.base_func.kind() |
|
try: |
|
return self._kind_map[c_kind] |
|
except KeyError: |
|
raise NotImplementedError("Unknown Function::Kind") |
|
|
|
@property |
|
def _doc(self): |
|
""" |
|
The C++-like function documentation (for internal use). |
|
""" |
|
cdef CFunctionDoc c_doc = self.base_func.doc() |
|
return FunctionDoc(frombytes(c_doc.summary), |
|
frombytes(c_doc.description), |
|
[frombytes(s) for s in c_doc.arg_names], |
|
frombytes(c_doc.options_class), |
|
c_doc.options_required) |
|
|
|
@property |
|
def num_kernels(self): |
|
""" |
|
The number of kernels implementing this function. |
|
""" |
|
return self.base_func.num_kernels() |
|
|
|
def call(self, args, FunctionOptions options=None, |
|
MemoryPool memory_pool=None, length=None): |
|
""" |
|
Call the function on the given arguments. |
|
|
|
Parameters |
|
---------- |
|
args : iterable |
|
The arguments to pass to the function. Accepted types depend |
|
on the specific function. |
|
options : FunctionOptions, optional |
|
Options instance for executing this function. This should have |
|
the right concrete options type. |
|
memory_pool : pyarrow.MemoryPool, optional |
|
If not passed, will allocate memory from the default memory pool. |
|
length : int, optional |
|
Batch size for execution, for nullary (no argument) functions. If |
|
not passed, will be inferred from passed data. |
|
""" |
|
cdef: |
|
const CFunctionOptions* c_options = NULL |
|
CMemoryPool* pool = maybe_unbox_memory_pool(memory_pool) |
|
CExecContext c_exec_ctx = CExecContext(pool) |
|
CExecBatch c_batch |
|
CDatum result |
|
|
|
_pack_compute_args(args, &c_batch.values) |
|
|
|
if options is not None: |
|
c_options = options.get_options() |
|
|
|
if length is not None: |
|
c_batch.length = length |
|
with nogil: |
|
result = GetResultValue( |
|
self.base_func.Execute(c_batch, c_options, &c_exec_ctx) |
|
) |
|
else: |
|
with nogil: |
|
result = GetResultValue( |
|
self.base_func.Execute(c_batch.values, c_options, |
|
&c_exec_ctx) |
|
) |
|
|
|
return wrap_datum(result) |
|
|
|
|
|
cdef class ScalarFunction(Function): |
|
cdef const CScalarFunction* func |
|
|
|
cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: |
|
Function.init(self, sp_func) |
|
self.func = <const CScalarFunction*> sp_func.get() |
|
|
|
@property |
|
def kernels(self): |
|
""" |
|
The kernels implementing this function. |
|
""" |
|
cdef vector[const CScalarKernel*] kernels = self.func.kernels() |
|
return [wrap_scalar_kernel(k) for k in kernels] |
|
|
|
|
|
cdef class VectorFunction(Function): |
|
cdef const CVectorFunction* func |
|
|
|
cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: |
|
Function.init(self, sp_func) |
|
self.func = <const CVectorFunction*> sp_func.get() |
|
|
|
@property |
|
def kernels(self): |
|
""" |
|
The kernels implementing this function. |
|
""" |
|
cdef vector[const CVectorKernel*] kernels = self.func.kernels() |
|
return [wrap_vector_kernel(k) for k in kernels] |
|
|
|
|
|
cdef class ScalarAggregateFunction(Function): |
|
cdef const CScalarAggregateFunction* func |
|
|
|
cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: |
|
Function.init(self, sp_func) |
|
self.func = <const CScalarAggregateFunction*> sp_func.get() |
|
|
|
@property |
|
def kernels(self): |
|
""" |
|
The kernels implementing this function. |
|
""" |
|
cdef vector[const CScalarAggregateKernel*] kernels = \ |
|
self.func.kernels() |
|
return [wrap_scalar_aggregate_kernel(k) for k in kernels] |
|
|
|
|
|
cdef class HashAggregateFunction(Function): |
|
cdef const CHashAggregateFunction* func |
|
|
|
cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: |
|
Function.init(self, sp_func) |
|
self.func = <const CHashAggregateFunction*> sp_func.get() |
|
|
|
@property |
|
def kernels(self): |
|
""" |
|
The kernels implementing this function. |
|
""" |
|
cdef vector[const CHashAggregateKernel*] kernels = self.func.kernels() |
|
return [wrap_hash_aggregate_kernel(k) for k in kernels] |
|
|
|
|
|
cdef class MetaFunction(Function): |
|
cdef const CMetaFunction* func |
|
|
|
cdef void init(self, const shared_ptr[CFunction]& sp_func) except *: |
|
Function.init(self, sp_func) |
|
self.func = <const CMetaFunction*> sp_func.get() |
|
|
|
|
|
@property |
|
def kernels(self): |
|
""" |
|
The kernels implementing this function. |
|
""" |
|
return [] |
|
|
|
|
|
cdef _pack_compute_args(object values, vector[CDatum]* out): |
|
for val in values: |
|
if isinstance(val, SUPPORTED_INPUT_ARR_TYPES): |
|
val = lib.asarray(val) |
|
|
|
if isinstance(val, Array): |
|
out.push_back(CDatum((<Array> val).sp_array)) |
|
continue |
|
elif isinstance(val, ChunkedArray): |
|
out.push_back(CDatum((<ChunkedArray> val).sp_chunked_array)) |
|
continue |
|
elif isinstance(val, Scalar): |
|
out.push_back(CDatum((<Scalar> val).unwrap())) |
|
continue |
|
elif isinstance(val, RecordBatch): |
|
out.push_back(CDatum((<RecordBatch> val).sp_batch)) |
|
continue |
|
elif isinstance(val, Table): |
|
out.push_back(CDatum((<Table> val).sp_table)) |
|
continue |
|
else: |
|
|
|
try: |
|
scal = lib.scalar(val) |
|
except Exception: |
|
|
|
pass |
|
else: |
|
out.push_back(CDatum((<Scalar> scal).unwrap())) |
|
continue |
|
|
|
raise TypeError(f"Got unexpected argument type {type(val)} " |
|
"for compute function") |
|
|
|
|
|
cdef class FunctionRegistry(_Weakrefable): |
|
cdef CFunctionRegistry* registry |
|
|
|
def __init__(self): |
|
self.registry = GetFunctionRegistry() |
|
|
|
def list_functions(self): |
|
""" |
|
Return all function names in the registry. |
|
""" |
|
cdef vector[c_string] names = self.registry.GetFunctionNames() |
|
return [frombytes(name) for name in names] |
|
|
|
def get_function(self, name): |
|
""" |
|
Look up a function by name in the registry. |
|
|
|
Parameters |
|
---------- |
|
name : str |
|
The name of the function to lookup |
|
""" |
|
cdef: |
|
c_string c_name = tobytes(name) |
|
shared_ptr[CFunction] func |
|
with nogil: |
|
func = GetResultValue(self.registry.GetFunction(c_name)) |
|
return wrap_function(func) |
|
|
|
|
|
cdef FunctionRegistry _global_func_registry = FunctionRegistry() |
|
|
|
|
|
def function_registry(): |
|
return _global_func_registry |
|
|
|
|
|
def get_function(name): |
|
""" |
|
Get a function by name. |
|
|
|
The function is looked up in the global registry |
|
(as returned by `function_registry()`). |
|
|
|
Parameters |
|
---------- |
|
name : str |
|
The name of the function to lookup |
|
""" |
|
return _global_func_registry.get_function(name) |
|
|
|
|
|
def list_functions(): |
|
""" |
|
Return all function names in the global registry. |
|
""" |
|
return _global_func_registry.list_functions() |
|
|
|
|
|
def call_function(name, args, options=None, memory_pool=None, length=None): |
|
""" |
|
Call a named function. |
|
|
|
The function is looked up in the global registry |
|
(as returned by `function_registry()`). |
|
|
|
Parameters |
|
---------- |
|
name : str |
|
The name of the function to call. |
|
args : list |
|
The arguments to the function. |
|
options : optional |
|
options provided to the function. |
|
memory_pool : MemoryPool, optional |
|
memory pool to use for allocations during function execution. |
|
length : int, optional |
|
Batch size for execution, for nullary (no argument) functions. If not |
|
passed, inferred from data. |
|
""" |
|
func = _global_func_registry.get_function(name) |
|
return func.call(args, options=options, memory_pool=memory_pool, |
|
length=length) |
|
|
|
|
|
cdef class FunctionOptions(_Weakrefable): |
|
__slots__ = () |
|
|
|
cdef const CFunctionOptions* get_options(self) except NULL: |
|
return self.wrapped.get() |
|
|
|
cdef void init(self, const shared_ptr[CFunctionOptions]& sp): |
|
self.wrapped = sp |
|
|
|
cdef inline shared_ptr[CFunctionOptions] unwrap(self): |
|
return self.wrapped |
|
|
|
def serialize(self): |
|
cdef: |
|
CResult[shared_ptr[CBuffer]] res = self.get_options().Serialize() |
|
shared_ptr[CBuffer] c_buf = GetResultValue(res) |
|
return pyarrow_wrap_buffer(c_buf) |
|
|
|
@staticmethod |
|
def deserialize(buf): |
|
""" |
|
Deserialize options for a function. |
|
|
|
Parameters |
|
---------- |
|
buf : Buffer |
|
The buffer containing the data to deserialize. |
|
""" |
|
cdef: |
|
shared_ptr[CBuffer] c_buf = pyarrow_unwrap_buffer(buf) |
|
CResult[unique_ptr[CFunctionOptions]] maybe_options = \ |
|
DeserializeFunctionOptions(deref(c_buf)) |
|
shared_ptr[CFunctionOptions] c_options |
|
c_options = to_shared(GetResultValue(move(maybe_options))) |
|
type_name = frombytes(c_options.get().options_type().type_name()) |
|
module = globals() |
|
if type_name not in module: |
|
raise ValueError(f'Cannot deserialize "{type_name}"') |
|
klass = module[type_name] |
|
options = klass.__new__(klass) |
|
(<FunctionOptions> options).init(c_options) |
|
return options |
|
|
|
def __repr__(self): |
|
type_name = self.__class__.__name__ |
|
|
|
string_repr = frombytes(self.get_options().ToString())[1:-1] |
|
return f"{type_name}({string_repr})" |
|
|
|
def __eq__(self, FunctionOptions other): |
|
return self.get_options().Equals(deref(other.get_options())) |
|
|
|
|
|
def _raise_invalid_function_option(value, description, *, |
|
exception_class=ValueError): |
|
raise exception_class(f"\"{value}\" is not a valid {description}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
cdef class _CastOptions(FunctionOptions): |
|
cdef CCastOptions* options |
|
|
|
cdef void init(self, const shared_ptr[CFunctionOptions]& sp): |
|
FunctionOptions.init(self, sp) |
|
self.options = <CCastOptions*> self.wrapped.get() |
|
|
|
def _set_options(self, DataType target_type, allow_int_overflow, |
|
allow_time_truncate, allow_time_overflow, |
|
allow_decimal_truncate, allow_float_truncate, |
|
allow_invalid_utf8): |
|
cdef: |
|
shared_ptr[CCastOptions] wrapped = make_shared[CCastOptions]() |
|
self.init(<shared_ptr[CFunctionOptions]> wrapped) |
|
self._set_type(target_type) |
|
if allow_int_overflow is not None: |
|
self.allow_int_overflow = allow_int_overflow |
|
if allow_time_truncate is not None: |
|
self.allow_time_truncate = allow_time_truncate |
|
if allow_time_overflow is not None: |
|
self.allow_time_overflow = allow_time_overflow |
|
if allow_decimal_truncate is not None: |
|
self.allow_decimal_truncate = allow_decimal_truncate |
|
if allow_float_truncate is not None: |
|
self.allow_float_truncate = allow_float_truncate |
|
if allow_invalid_utf8 is not None: |
|
self.allow_invalid_utf8 = allow_invalid_utf8 |
|
|
|
def _set_type(self, target_type=None): |
|
if target_type is not None: |
|
deref(self.options).to_type = \ |
|
(<DataType> ensure_type(target_type)).sp_type |
|
|
|
def _set_safe(self): |
|
self.init(shared_ptr[CFunctionOptions]( |
|
new CCastOptions(CCastOptions.Safe()))) |
|
|
|
def _set_unsafe(self): |
|
self.init(shared_ptr[CFunctionOptions]( |
|
new CCastOptions(CCastOptions.Unsafe()))) |
|
|
|
def is_safe(self): |
|
return not (deref(self.options).allow_int_overflow or |
|
deref(self.options).allow_time_truncate or |
|
deref(self.options).allow_time_overflow or |
|
deref(self.options).allow_decimal_truncate or |
|
deref(self.options).allow_float_truncate or |
|
deref(self.options).allow_invalid_utf8) |
|
|
|
@property |
|
def allow_int_overflow(self): |
|
return deref(self.options).allow_int_overflow |
|
|
|
@allow_int_overflow.setter |
|
def allow_int_overflow(self, c_bool flag): |
|
deref(self.options).allow_int_overflow = flag |
|
|
|
@property |
|
def allow_time_truncate(self): |
|
return deref(self.options).allow_time_truncate |
|
|
|
@allow_time_truncate.setter |
|
def allow_time_truncate(self, c_bool flag): |
|
deref(self.options).allow_time_truncate = flag |
|
|
|
@property |
|
def allow_time_overflow(self): |
|
return deref(self.options).allow_time_overflow |
|
|
|
@allow_time_overflow.setter |
|
def allow_time_overflow(self, c_bool flag): |
|
deref(self.options).allow_time_overflow = flag |
|
|
|
@property |
|
def allow_decimal_truncate(self): |
|
return deref(self.options).allow_decimal_truncate |
|
|
|
@allow_decimal_truncate.setter |
|
def allow_decimal_truncate(self, c_bool flag): |
|
deref(self.options).allow_decimal_truncate = flag |
|
|
|
@property |
|
def allow_float_truncate(self): |
|
return deref(self.options).allow_float_truncate |
|
|
|
@allow_float_truncate.setter |
|
def allow_float_truncate(self, c_bool flag): |
|
deref(self.options).allow_float_truncate = flag |
|
|
|
@property |
|
def allow_invalid_utf8(self): |
|
return deref(self.options).allow_invalid_utf8 |
|
|
|
@allow_invalid_utf8.setter |
|
def allow_invalid_utf8(self, c_bool flag): |
|
deref(self.options).allow_invalid_utf8 = flag |
|
|
|
|
|
class CastOptions(_CastOptions): |
|
""" |
|
Options for the `cast` function. |
|
|
|
Parameters |
|
---------- |
|
target_type : DataType, optional |
|
The PyArrow type to cast to. |
|
allow_int_overflow : bool, default False |
|
Whether integer overflow is allowed when casting. |
|
allow_time_truncate : bool, default False |
|
Whether time precision truncation is allowed when casting. |
|
allow_time_overflow : bool, default False |
|
Whether date/time range overflow is allowed when casting. |
|
allow_decimal_truncate : bool, default False |
|
Whether decimal precision truncation is allowed when casting. |
|
allow_float_truncate : bool, default False |
|
Whether floating-point precision truncation is allowed when casting. |
|
allow_invalid_utf8 : bool, default False |
|
Whether producing invalid utf8 data is allowed when casting. |
|
""" |
|
|
|
def __init__(self, target_type=None, *, allow_int_overflow=None, |
|
allow_time_truncate=None, allow_time_overflow=None, |
|
allow_decimal_truncate=None, allow_float_truncate=None, |
|
allow_invalid_utf8=None): |
|
self._set_options(target_type, allow_int_overflow, allow_time_truncate, |
|
allow_time_overflow, allow_decimal_truncate, |
|
allow_float_truncate, allow_invalid_utf8) |
|
|
|
@staticmethod |
|
def safe(target_type=None): |
|
"""" |
|
Create a CastOptions for a safe cast. |
|
|
|
Parameters |
|
---------- |
|
target_type : optional |
|
Target cast type for the safe cast. |
|
""" |
|
self = CastOptions() |
|
self._set_safe() |
|
self._set_type(target_type) |
|
return self |
|
|
|
@staticmethod |
|
def unsafe(target_type=None): |
|
"""" |
|
Create a CastOptions for an unsafe cast. |
|
|
|
Parameters |
|
---------- |
|
target_type : optional |
|
Target cast type for the unsafe cast. |
|
""" |
|
self = CastOptions() |
|
self._set_unsafe() |
|
self._set_type(target_type) |
|
return self |
|
|
|
|
|
def _skip_nulls_doc(): |
|
|
|
|
|
return """skip_nulls : bool, default True |
|
Whether to skip (ignore) nulls in the input. |
|
If False, any null in the input forces the output to null. |
|
""" |
|
|
|
|
|
def _min_count_doc(*, default): |
|
return f"""min_count : int, default {default} |
|
Minimum number of non-null values in the input. If the number |
|
of non-null values is below `min_count`, the output is null. |
|
""" |
|
|
|
|
|
cdef class _ElementWiseAggregateOptions(FunctionOptions): |
|
def _set_options(self, skip_nulls): |
|
self.wrapped.reset(new CElementWiseAggregateOptions(skip_nulls)) |
|
|
|
|
|
class ElementWiseAggregateOptions(_ElementWiseAggregateOptions): |
|
__doc__ = f""" |
|
Options for element-wise aggregate functions. |
|
|
|
Parameters |
|
---------- |
|
{_skip_nulls_doc()} |
|
""" |
|
|
|
def __init__(self, *, skip_nulls=True): |
|
self._set_options(skip_nulls) |
|
|
|
|
|
cdef CRoundMode unwrap_round_mode(round_mode) except *: |
|
if round_mode == "down": |
|
return CRoundMode_DOWN |
|
elif round_mode == "up": |
|
return CRoundMode_UP |
|
elif round_mode == "towards_zero": |
|
return CRoundMode_TOWARDS_ZERO |
|
elif round_mode == "towards_infinity": |
|
return CRoundMode_TOWARDS_INFINITY |
|
elif round_mode == "half_down": |
|
return CRoundMode_HALF_DOWN |
|
elif round_mode == "half_up": |
|
return CRoundMode_HALF_UP |
|
elif round_mode == "half_towards_zero": |
|
return CRoundMode_HALF_TOWARDS_ZERO |
|
elif round_mode == "half_towards_infinity": |
|
return CRoundMode_HALF_TOWARDS_INFINITY |
|
elif round_mode == "half_to_even": |
|
return CRoundMode_HALF_TO_EVEN |
|
elif round_mode == "half_to_odd": |
|
return CRoundMode_HALF_TO_ODD |
|
_raise_invalid_function_option(round_mode, "round mode") |
|
|
|
|
|
cdef class _RoundOptions(FunctionOptions): |
|
def _set_options(self, ndigits, round_mode): |
|
self.wrapped.reset( |
|
new CRoundOptions(ndigits, unwrap_round_mode(round_mode)) |
|
) |
|
|
|
|
|
class RoundOptions(_RoundOptions): |
|
""" |
|
Options for rounding numbers. |
|
|
|
Parameters |
|
---------- |
|
ndigits : int, default 0 |
|
Number of fractional digits to round to. |
|
round_mode : str, default "half_to_even" |
|
Rounding and tie-breaking mode. |
|
Accepted values are "down", "up", "towards_zero", "towards_infinity", |
|
"half_down", "half_up", "half_towards_zero", "half_towards_infinity", |
|
"half_to_even", "half_to_odd". |
|
""" |
|
|
|
def __init__(self, ndigits=0, round_mode="half_to_even"): |
|
self._set_options(ndigits, round_mode) |
|
|
|
|
|
cdef class _RoundBinaryOptions(FunctionOptions): |
|
def _set_options(self, round_mode): |
|
self.wrapped.reset( |
|
new CRoundBinaryOptions(unwrap_round_mode(round_mode)) |
|
) |
|
|
|
|
|
class RoundBinaryOptions(_RoundBinaryOptions): |
|
""" |
|
Options for rounding numbers when ndigits is provided by a second array |
|
|
|
Parameters |
|
---------- |
|
round_mode : str, default "half_to_even" |
|
Rounding and tie-breaking mode. |
|
Accepted values are "down", "up", "towards_zero", "towards_infinity", |
|
"half_down", "half_up", "half_towards_zero", "half_towards_infinity", |
|
"half_to_even", "half_to_odd". |
|
""" |
|
|
|
def __init__(self, round_mode="half_to_even"): |
|
self._set_options(round_mode) |
|
|
|
|
|
cdef CCalendarUnit unwrap_round_temporal_unit(unit) except *: |
|
if unit == "nanosecond": |
|
return CCalendarUnit_NANOSECOND |
|
elif unit == "microsecond": |
|
return CCalendarUnit_MICROSECOND |
|
elif unit == "millisecond": |
|
return CCalendarUnit_MILLISECOND |
|
elif unit == "second": |
|
return CCalendarUnit_SECOND |
|
elif unit == "minute": |
|
return CCalendarUnit_MINUTE |
|
elif unit == "hour": |
|
return CCalendarUnit_HOUR |
|
elif unit == "day": |
|
return CCalendarUnit_DAY |
|
elif unit == "week": |
|
return CCalendarUnit_WEEK |
|
elif unit == "month": |
|
return CCalendarUnit_MONTH |
|
elif unit == "quarter": |
|
return CCalendarUnit_QUARTER |
|
elif unit == "year": |
|
return CCalendarUnit_YEAR |
|
_raise_invalid_function_option(unit, "Calendar unit") |
|
|
|
|
|
cdef class _RoundTemporalOptions(FunctionOptions): |
|
def _set_options(self, multiple, unit, week_starts_monday, |
|
ceil_is_strictly_greater, calendar_based_origin): |
|
self.wrapped.reset( |
|
new CRoundTemporalOptions( |
|
multiple, unwrap_round_temporal_unit(unit), |
|
week_starts_monday, ceil_is_strictly_greater, |
|
calendar_based_origin) |
|
) |
|
|
|
|
|
class RoundTemporalOptions(_RoundTemporalOptions): |
|
""" |
|
Options for rounding temporal values. |
|
|
|
Parameters |
|
---------- |
|
multiple : int, default 1 |
|
Number of units to round to. |
|
unit : str, default "day" |
|
The unit in which `multiple` is expressed. |
|
Accepted values are "year", "quarter", "month", "week", "day", |
|
"hour", "minute", "second", "millisecond", "microsecond", |
|
"nanosecond". |
|
week_starts_monday : bool, default True |
|
If True, weeks start on Monday; if False, on Sunday. |
|
ceil_is_strictly_greater : bool, default False |
|
If True, ceil returns a rounded value that is strictly greater than the |
|
input. For example: ceiling 1970-01-01T00:00:00 to 3 hours would |
|
yield 1970-01-01T03:00:00 if set to True and 1970-01-01T00:00:00 |
|
if set to False. |
|
This applies to the ceil_temporal function only. |
|
calendar_based_origin : bool, default False |
|
By default, the origin is 1970-01-01T00:00:00. By setting this to True, |
|
rounding origin will be beginning of one less precise calendar unit. |
|
E.g.: rounding to hours will use beginning of day as origin. |
|
|
|
By default time is rounded to a multiple of units since |
|
1970-01-01T00:00:00. By setting calendar_based_origin to true, |
|
time will be rounded to number of units since the last greater |
|
calendar unit. |
|
For example: rounding to multiple of days since the beginning of the |
|
month or to hours since the beginning of the day. |
|
Exceptions: week and quarter are not used as greater units, |
|
therefore days will be rounded to the beginning of the month not |
|
week. Greater unit of week is a year. |
|
Note that ceiling and rounding might change sorting order of an array |
|
near greater unit change. For example rounding YYYY-mm-dd 23:00:00 to |
|
5 hours will ceil and round to YYYY-mm-dd+1 01:00:00 and floor to |
|
YYYY-mm-dd 20:00:00. On the other hand YYYY-mm-dd+1 00:00:00 will |
|
ceil, round and floor to YYYY-mm-dd+1 00:00:00. This can break the |
|
order of an already ordered array. |
|
|
|
""" |
|
|
|
def __init__(self, multiple=1, unit="day", *, week_starts_monday=True, |
|
ceil_is_strictly_greater=False, |
|
calendar_based_origin=False): |
|
self._set_options(multiple, unit, week_starts_monday, |
|
ceil_is_strictly_greater, |
|
calendar_based_origin) |
|
|
|
|
|
cdef class _RoundToMultipleOptions(FunctionOptions): |
|
def _set_options(self, multiple, round_mode): |
|
if not isinstance(multiple, Scalar): |
|
try: |
|
multiple = lib.scalar(multiple) |
|
except Exception: |
|
_raise_invalid_function_option( |
|
multiple, "multiple type for RoundToMultipleOptions", |
|
exception_class=TypeError) |
|
|
|
self.wrapped.reset( |
|
new CRoundToMultipleOptions( |
|
pyarrow_unwrap_scalar(multiple), unwrap_round_mode(round_mode)) |
|
) |
|
|
|
|
|
class RoundToMultipleOptions(_RoundToMultipleOptions): |
|
""" |
|
Options for rounding numbers to a multiple. |
|
|
|
Parameters |
|
---------- |
|
multiple : numeric scalar, default 1.0 |
|
Multiple to round to. Should be a scalar of a type compatible |
|
with the argument to be rounded. |
|
round_mode : str, default "half_to_even" |
|
Rounding and tie-breaking mode. |
|
Accepted values are "down", "up", "towards_zero", "towards_infinity", |
|
"half_down", "half_up", "half_towards_zero", "half_towards_infinity", |
|
"half_to_even", "half_to_odd". |
|
""" |
|
|
|
def __init__(self, multiple=1.0, round_mode="half_to_even"): |
|
self._set_options(multiple, round_mode) |
|
|
|
|
|
cdef class _JoinOptions(FunctionOptions): |
|
_null_handling_map = { |
|
"emit_null": CJoinNullHandlingBehavior_EMIT_NULL, |
|
"skip": CJoinNullHandlingBehavior_SKIP, |
|
"replace": CJoinNullHandlingBehavior_REPLACE, |
|
} |
|
|
|
def _set_options(self, null_handling, null_replacement): |
|
try: |
|
self.wrapped.reset( |
|
new CJoinOptions(self._null_handling_map[null_handling], |
|
tobytes(null_replacement)) |
|
) |
|
except KeyError: |
|
_raise_invalid_function_option(null_handling, "null handling") |
|
|
|
|
|
class JoinOptions(_JoinOptions): |
|
""" |
|
Options for the `binary_join_element_wise` function. |
|
|
|
Parameters |
|
---------- |
|
null_handling : str, default "emit_null" |
|
How to handle null values in the inputs. |
|
Accepted values are "emit_null", "skip", "replace". |
|
null_replacement : str, default "" |
|
Replacement string to emit for null inputs if `null_handling` |
|
is "replace". |
|
""" |
|
|
|
def __init__(self, null_handling="emit_null", null_replacement=""): |
|
self._set_options(null_handling, null_replacement) |
|
|
|
|
|
cdef class _MatchSubstringOptions(FunctionOptions): |
|
def _set_options(self, pattern, ignore_case): |
|
self.wrapped.reset( |
|
new CMatchSubstringOptions(tobytes(pattern), ignore_case) |
|
) |
|
|
|
|
|
class MatchSubstringOptions(_MatchSubstringOptions): |
|
""" |
|
Options for looking for a substring. |
|
|
|
Parameters |
|
---------- |
|
pattern : str |
|
Substring pattern to look for inside input values. |
|
ignore_case : bool, default False |
|
Whether to perform a case-insensitive match. |
|
""" |
|
|
|
def __init__(self, pattern, *, ignore_case=False): |
|
self._set_options(pattern, ignore_case) |
|
|
|
|
|
cdef class _PadOptions(FunctionOptions): |
|
def _set_options(self, width, padding, lean_left_on_odd_padding): |
|
self.wrapped.reset(new CPadOptions(width, tobytes(padding), lean_left_on_odd_padding)) |
|
|
|
|
|
class PadOptions(_PadOptions): |
|
""" |
|
Options for padding strings. |
|
|
|
Parameters |
|
---------- |
|
width : int |
|
Desired string length. |
|
padding : str, default " " |
|
What to pad the string with. Should be one byte or codepoint. |
|
lean_left_on_odd_padding : bool, default True |
|
What to do if there is an odd number of padding characters (in case |
|
of centered padding). Defaults to aligning on the left (i.e. adding |
|
the extra padding character on the right). |
|
""" |
|
|
|
def __init__(self, width, padding=' ', lean_left_on_odd_padding=True): |
|
self._set_options(width, padding, lean_left_on_odd_padding) |
|
|
|
|
|
cdef class _TrimOptions(FunctionOptions): |
|
def _set_options(self, characters): |
|
self.wrapped.reset(new CTrimOptions(tobytes(characters))) |
|
|
|
|
|
class TrimOptions(_TrimOptions): |
|
""" |
|
Options for trimming characters from strings. |
|
|
|
Parameters |
|
---------- |
|
characters : str |
|
Individual characters to be trimmed from the string. |
|
""" |
|
|
|
def __init__(self, characters): |
|
self._set_options(tobytes(characters)) |
|
|
|
|
|
cdef class _ReplaceSubstringOptions(FunctionOptions): |
|
def _set_options(self, pattern, replacement, max_replacements): |
|
self.wrapped.reset( |
|
new CReplaceSubstringOptions(tobytes(pattern), |
|
tobytes(replacement), |
|
max_replacements) |
|
) |
|
|
|
|
|
class ReplaceSubstringOptions(_ReplaceSubstringOptions): |
|
""" |
|
Options for replacing matched substrings. |
|
|
|
Parameters |
|
---------- |
|
pattern : str |
|
Substring pattern to look for inside input values. |
|
replacement : str |
|
What to replace the pattern with. |
|
max_replacements : int or None, default None |
|
The maximum number of strings to replace in each |
|
input value (unlimited if None). |
|
""" |
|
|
|
def __init__(self, pattern, replacement, *, max_replacements=None): |
|
if max_replacements is None: |
|
max_replacements = -1 |
|
self._set_options(pattern, replacement, max_replacements) |
|
|
|
|
|
cdef class _ExtractRegexOptions(FunctionOptions): |
|
def _set_options(self, pattern): |
|
self.wrapped.reset(new CExtractRegexOptions(tobytes(pattern))) |
|
|
|
|
|
class ExtractRegexOptions(_ExtractRegexOptions): |
|
""" |
|
Options for the `extract_regex` function. |
|
|
|
Parameters |
|
---------- |
|
pattern : str |
|
Regular expression with named capture fields. |
|
""" |
|
|
|
def __init__(self, pattern): |
|
self._set_options(pattern) |
|
|
|
|
|
cdef class _ExtractRegexSpanOptions(FunctionOptions): |
|
def _set_options(self, pattern): |
|
self.wrapped.reset(new CExtractRegexSpanOptions(tobytes(pattern))) |
|
|
|
|
|
class ExtractRegexSpanOptions(_ExtractRegexSpanOptions): |
|
""" |
|
Options for the `extract_regex_span` function. |
|
|
|
Parameters |
|
---------- |
|
pattern : str |
|
Regular expression with named capture fields. |
|
""" |
|
|
|
def __init__(self, pattern): |
|
self._set_options(pattern) |
|
|
|
|
|
cdef class _SliceOptions(FunctionOptions): |
|
def _set_options(self, start, stop, step): |
|
self.wrapped.reset(new CSliceOptions(start, stop, step)) |
|
|
|
|
|
class SliceOptions(_SliceOptions): |
|
""" |
|
Options for slicing. |
|
|
|
Parameters |
|
---------- |
|
start : int |
|
Index to start slicing at (inclusive). |
|
stop : int or None, default None |
|
If given, index to stop slicing at (exclusive). |
|
If not given, slicing will stop at the end. |
|
step : int, default 1 |
|
Slice step. |
|
""" |
|
|
|
def __init__(self, start, stop=None, step=1): |
|
if stop is None: |
|
stop = sys.maxsize |
|
if step < 0: |
|
stop = -stop |
|
self._set_options(start, stop, step) |
|
|
|
|
|
cdef class _ListSliceOptions(FunctionOptions): |
|
cpdef _set_options(self, start, stop=None, step=1, return_fixed_size_list=None): |
|
cdef: |
|
CListSliceOptions* opts |
|
opts = new CListSliceOptions( |
|
start, |
|
<optional[int64_t]>nullopt if stop is None |
|
else <optional[int64_t]>(<int64_t>stop), |
|
step, |
|
<optional[c_bool]>nullopt if return_fixed_size_list is None |
|
else <optional[c_bool]>(<c_bool>return_fixed_size_list) |
|
) |
|
self.wrapped.reset(opts) |
|
|
|
|
|
class ListSliceOptions(_ListSliceOptions): |
|
""" |
|
Options for list array slicing. |
|
|
|
Parameters |
|
---------- |
|
start : int |
|
Index to start slicing inner list elements (inclusive). |
|
stop : Optional[int], default None |
|
If given, index to stop slicing at (exclusive). |
|
If not given, slicing will stop at the end. (NotImplemented) |
|
step : int, default 1 |
|
Slice step. |
|
return_fixed_size_list : Optional[bool], default None |
|
Whether to return a FixedSizeListArray. If true _and_ stop is after |
|
a list element's length, nulls will be appended to create the |
|
requested slice size. The default of `None` will return the same |
|
type which was passed in. |
|
""" |
|
|
|
def __init__(self, start, stop=None, step=1, return_fixed_size_list=None): |
|
self._set_options(start, stop, step, return_fixed_size_list) |
|
|
|
|
|
cdef class _ReplaceSliceOptions(FunctionOptions): |
|
def _set_options(self, start, stop, replacement): |
|
self.wrapped.reset( |
|
new CReplaceSliceOptions(start, stop, tobytes(replacement)) |
|
) |
|
|
|
|
|
class ReplaceSliceOptions(_ReplaceSliceOptions): |
|
""" |
|
Options for replacing slices. |
|
|
|
Parameters |
|
---------- |
|
start : int |
|
Index to start slicing at (inclusive). |
|
stop : int |
|
Index to stop slicing at (exclusive). |
|
replacement : str |
|
What to replace the slice with. |
|
""" |
|
|
|
def __init__(self, start, stop, replacement): |
|
self._set_options(start, stop, replacement) |
|
|
|
|
|
cdef class _FilterOptions(FunctionOptions): |
|
_null_selection_map = { |
|
"drop": CFilterNullSelectionBehavior_DROP, |
|
"emit_null": CFilterNullSelectionBehavior_EMIT_NULL, |
|
} |
|
|
|
def _set_options(self, null_selection_behavior): |
|
try: |
|
self.wrapped.reset( |
|
new CFilterOptions( |
|
self._null_selection_map[null_selection_behavior] |
|
) |
|
) |
|
except KeyError: |
|
_raise_invalid_function_option(null_selection_behavior, |
|
"null selection behavior") |
|
|
|
|
|
class FilterOptions(_FilterOptions): |
|
""" |
|
Options for selecting with a boolean filter. |
|
|
|
Parameters |
|
---------- |
|
null_selection_behavior : str, default "drop" |
|
How to handle nulls in the selection filter. |
|
Accepted values are "drop", "emit_null". |
|
""" |
|
|
|
def __init__(self, null_selection_behavior="drop"): |
|
self._set_options(null_selection_behavior) |
|
|
|
|
|
cdef class _DictionaryEncodeOptions(FunctionOptions): |
|
_null_encoding_map = { |
|
"encode": CDictionaryEncodeNullEncodingBehavior_ENCODE, |
|
"mask": CDictionaryEncodeNullEncodingBehavior_MASK, |
|
} |
|
|
|
def _set_options(self, null_encoding): |
|
try: |
|
self.wrapped.reset( |
|
new CDictionaryEncodeOptions( |
|
self._null_encoding_map[null_encoding] |
|
) |
|
) |
|
except KeyError: |
|
_raise_invalid_function_option(null_encoding, "null encoding") |
|
|
|
|
|
class DictionaryEncodeOptions(_DictionaryEncodeOptions): |
|
""" |
|
Options for dictionary encoding. |
|
|
|
Parameters |
|
---------- |
|
null_encoding : str, default "mask" |
|
How to encode nulls in the input. |
|
Accepted values are "mask" (null inputs emit a null in the indices |
|
array), "encode" (null inputs emit a non-null index pointing to |
|
a null value in the dictionary array). |
|
""" |
|
|
|
def __init__(self, null_encoding="mask"): |
|
self._set_options(null_encoding) |
|
|
|
|
|
cdef class _RunEndEncodeOptions(FunctionOptions): |
|
def _set_options(self, run_end_type): |
|
run_end_ty = ensure_type(run_end_type) |
|
self.wrapped.reset(new CRunEndEncodeOptions(pyarrow_unwrap_data_type(run_end_ty))) |
|
|
|
|
|
class RunEndEncodeOptions(_RunEndEncodeOptions): |
|
""" |
|
Options for run-end encoding. |
|
|
|
Parameters |
|
---------- |
|
run_end_type : DataType, default pyarrow.int32() |
|
The data type of the run_ends array. |
|
|
|
Accepted values are pyarrow.{int16(), int32(), int64()}. |
|
""" |
|
|
|
def __init__(self, run_end_type=lib.int32()): |
|
self._set_options(run_end_type) |
|
|
|
|
|
cdef class _TakeOptions(FunctionOptions): |
|
def _set_options(self, boundscheck): |
|
self.wrapped.reset(new CTakeOptions(boundscheck)) |
|
|
|
|
|
class TakeOptions(_TakeOptions): |
|
""" |
|
Options for the `take` and `array_take` functions. |
|
|
|
Parameters |
|
---------- |
|
boundscheck : boolean, default True |
|
Whether to check indices are within bounds. If False and an |
|
index is out of bounds, behavior is undefined (the process |
|
may crash). |
|
""" |
|
|
|
def __init__(self, *, boundscheck=True): |
|
self._set_options(boundscheck) |
|
|
|
|
|
cdef class _MakeStructOptions(FunctionOptions): |
|
def _set_options(self, field_names, field_nullability, field_metadata): |
|
cdef: |
|
vector[c_string] c_field_names |
|
vector[shared_ptr[const CKeyValueMetadata]] c_field_metadata |
|
for name in field_names: |
|
c_field_names.push_back(tobytes(name)) |
|
for metadata in field_metadata: |
|
c_field_metadata.push_back(pyarrow_unwrap_metadata(metadata)) |
|
self.wrapped.reset( |
|
new CMakeStructOptions(c_field_names, field_nullability, |
|
c_field_metadata) |
|
) |
|
|
|
|
|
class MakeStructOptions(_MakeStructOptions): |
|
""" |
|
Options for the `make_struct` function. |
|
|
|
Parameters |
|
---------- |
|
field_names : sequence of str |
|
Names of the struct fields to create. |
|
field_nullability : sequence of bool, optional |
|
Nullability information for each struct field. |
|
If omitted, all fields are nullable. |
|
field_metadata : sequence of KeyValueMetadata, optional |
|
Metadata for each struct field. |
|
""" |
|
|
|
def __init__(self, field_names=(), *, field_nullability=None, |
|
field_metadata=None): |
|
if field_nullability is None: |
|
field_nullability = [True] * len(field_names) |
|
if field_metadata is None: |
|
field_metadata = [None] * len(field_names) |
|
self._set_options(field_names, field_nullability, field_metadata) |
|
|
|
|
|
cdef CFieldRef _ensure_field_ref(value) except *: |
|
cdef: |
|
CFieldRef field_ref |
|
const CFieldRef* field_ref_ptr |
|
|
|
if isinstance(value, (list, tuple)): |
|
value = Expression._nested_field(tuple(value)) |
|
|
|
if isinstance(value, Expression): |
|
field_ref_ptr = (<Expression>value).unwrap().field_ref() |
|
if field_ref_ptr is NULL: |
|
raise ValueError("Unable to get FieldRef from Expression") |
|
field_ref = <CFieldRef>deref(field_ref_ptr) |
|
elif isinstance(value, (bytes, str)): |
|
if value.startswith(b'.' if isinstance(value, bytes) else '.'): |
|
field_ref = GetResultValue( |
|
CFieldRef.FromDotPath(<c_string>tobytes(value))) |
|
else: |
|
field_ref = CFieldRef(<c_string>tobytes(value)) |
|
elif isinstance(value, int): |
|
field_ref = CFieldRef(<int> value) |
|
else: |
|
raise TypeError("Expected a field reference as a str or int, list of " |
|
f"str or int, or Expression. Got {type(value)} instead.") |
|
return field_ref |
|
|
|
|
|
cdef class _StructFieldOptions(FunctionOptions): |
|
def _set_options(self, indices): |
|
|
|
if isinstance(indices, (list, tuple)) and not len(indices): |
|
|
|
self.wrapped.reset( |
|
new CStructFieldOptions(<vector[int]>indices)) |
|
return |
|
|
|
cdef CFieldRef field_ref = _ensure_field_ref(indices) |
|
self.wrapped.reset(new CStructFieldOptions(field_ref)) |
|
|
|
|
|
class StructFieldOptions(_StructFieldOptions): |
|
""" |
|
Options for the `struct_field` function. |
|
|
|
Parameters |
|
---------- |
|
indices : List[str], List[bytes], List[int], Expression, bytes, str, or int |
|
List of indices for chained field lookup, for example `[4, 1]` |
|
will look up the second nested field in the fifth outer field. |
|
""" |
|
|
|
def __init__(self, indices): |
|
self._set_options(indices) |
|
|
|
|
|
cdef class _ScalarAggregateOptions(FunctionOptions): |
|
def _set_options(self, skip_nulls, min_count): |
|
self.wrapped.reset(new CScalarAggregateOptions(skip_nulls, min_count)) |
|
|
|
|
|
class ScalarAggregateOptions(_ScalarAggregateOptions): |
|
__doc__ = f""" |
|
Options for scalar aggregations. |
|
|
|
Parameters |
|
---------- |
|
{_skip_nulls_doc()} |
|
{_min_count_doc(default=1)} |
|
""" |
|
|
|
def __init__(self, *, skip_nulls=True, min_count=1): |
|
self._set_options(skip_nulls, min_count) |
|
|
|
|
|
cdef class _CountOptions(FunctionOptions): |
|
_mode_map = { |
|
"only_valid": CCountMode_ONLY_VALID, |
|
"only_null": CCountMode_ONLY_NULL, |
|
"all": CCountMode_ALL, |
|
} |
|
|
|
def _set_options(self, mode): |
|
try: |
|
self.wrapped.reset(new CCountOptions(self._mode_map[mode])) |
|
except KeyError: |
|
_raise_invalid_function_option(mode, "count mode") |
|
|
|
|
|
class CountOptions(_CountOptions): |
|
""" |
|
Options for the `count` function. |
|
|
|
Parameters |
|
---------- |
|
mode : str, default "only_valid" |
|
Which values to count in the input. |
|
Accepted values are "only_valid", "only_null", "all". |
|
""" |
|
|
|
def __init__(self, mode="only_valid"): |
|
self._set_options(mode) |
|
|
|
|
|
cdef class _IndexOptions(FunctionOptions): |
|
def _set_options(self, scalar): |
|
self.wrapped.reset(new CIndexOptions(pyarrow_unwrap_scalar(scalar))) |
|
|
|
|
|
class IndexOptions(_IndexOptions): |
|
""" |
|
Options for the `index` function. |
|
|
|
Parameters |
|
---------- |
|
value : Scalar |
|
The value to search for. |
|
""" |
|
|
|
def __init__(self, value): |
|
self._set_options(value) |
|
|
|
|
|
cdef class _MapLookupOptions(FunctionOptions): |
|
_occurrence_map = { |
|
"all": CMapLookupOccurrence_ALL, |
|
"first": CMapLookupOccurrence_FIRST, |
|
"last": CMapLookupOccurrence_LAST, |
|
} |
|
|
|
def _set_options(self, query_key, occurrence): |
|
try: |
|
self.wrapped.reset( |
|
new CMapLookupOptions( |
|
pyarrow_unwrap_scalar(query_key), |
|
self._occurrence_map[occurrence] |
|
) |
|
) |
|
except KeyError: |
|
_raise_invalid_function_option(occurrence, |
|
"Should either be first, last, or all") |
|
|
|
|
|
class MapLookupOptions(_MapLookupOptions): |
|
""" |
|
Options for the `map_lookup` function. |
|
|
|
Parameters |
|
---------- |
|
query_key : Scalar or Object can be converted to Scalar |
|
The key to search for. |
|
occurrence : str |
|
The occurrence(s) to return from the Map |
|
Accepted values are "first", "last", or "all". |
|
""" |
|
|
|
def __init__(self, query_key, occurrence): |
|
if not isinstance(query_key, lib.Scalar): |
|
query_key = lib.scalar(query_key) |
|
|
|
self._set_options(query_key, occurrence) |
|
|
|
|
|
cdef class _ModeOptions(FunctionOptions): |
|
def _set_options(self, n, skip_nulls, min_count): |
|
self.wrapped.reset(new CModeOptions(n, skip_nulls, min_count)) |
|
|
|
|
|
class ModeOptions(_ModeOptions): |
|
__doc__ = f""" |
|
Options for the `mode` function. |
|
|
|
Parameters |
|
---------- |
|
n : int, default 1 |
|
Number of distinct most-common values to return. |
|
{_skip_nulls_doc()} |
|
{_min_count_doc(default=0)} |
|
""" |
|
|
|
def __init__(self, n=1, *, skip_nulls=True, min_count=0): |
|
self._set_options(n, skip_nulls, min_count) |
|
|
|
|
|
cdef class _SetLookupOptions(FunctionOptions): |
|
def _set_options(self, value_set, c_bool skip_nulls): |
|
cdef unique_ptr[CDatum] valset |
|
if isinstance(value_set, Array): |
|
valset.reset(new CDatum((<Array> value_set).sp_array)) |
|
elif isinstance(value_set, ChunkedArray): |
|
valset.reset( |
|
new CDatum((<ChunkedArray> value_set).sp_chunked_array) |
|
) |
|
elif isinstance(value_set, Scalar): |
|
valset.reset(new CDatum((<Scalar> value_set).unwrap())) |
|
else: |
|
_raise_invalid_function_option(value_set, "value set", |
|
exception_class=TypeError) |
|
|
|
self.wrapped.reset(new CSetLookupOptions(deref(valset), skip_nulls)) |
|
|
|
|
|
class SetLookupOptions(_SetLookupOptions): |
|
""" |
|
Options for the `is_in` and `index_in` functions. |
|
|
|
Parameters |
|
---------- |
|
value_set : Array |
|
Set of values to look for in the input. |
|
skip_nulls : bool, default False |
|
If False, nulls in the input are matched in the value_set just |
|
like regular values. |
|
If True, nulls in the input always fail matching. |
|
""" |
|
|
|
def __init__(self, value_set, *, skip_nulls=False): |
|
self._set_options(value_set, skip_nulls) |
|
|
|
|
|
cdef class _StrptimeOptions(FunctionOptions): |
|
_unit_map = { |
|
"s": TimeUnit_SECOND, |
|
"ms": TimeUnit_MILLI, |
|
"us": TimeUnit_MICRO, |
|
"ns": TimeUnit_NANO, |
|
} |
|
|
|
def _set_options(self, format, unit, error_is_null): |
|
try: |
|
self.wrapped.reset( |
|
new CStrptimeOptions(tobytes(format), self._unit_map[unit], |
|
error_is_null) |
|
) |
|
except KeyError: |
|
_raise_invalid_function_option(unit, "time unit") |
|
|
|
|
|
class StrptimeOptions(_StrptimeOptions): |
|
""" |
|
Options for the `strptime` function. |
|
|
|
Parameters |
|
---------- |
|
format : str |
|
Pattern for parsing input strings as timestamps, such as "%Y/%m/%d". |
|
Note that the semantics of the format follow the C/C++ strptime, not the Python one. |
|
There are differences in behavior, for example how the "%y" placeholder |
|
handles years with less than four digits. |
|
unit : str |
|
Timestamp unit of the output. |
|
Accepted values are "s", "ms", "us", "ns". |
|
error_is_null : boolean, default False |
|
Return null on parsing errors if true or raise if false. |
|
""" |
|
|
|
def __init__(self, format, unit, error_is_null=False): |
|
self._set_options(format, unit, error_is_null) |
|
|
|
|
|
cdef class _StrftimeOptions(FunctionOptions): |
|
def _set_options(self, format, locale): |
|
self.wrapped.reset( |
|
new CStrftimeOptions(tobytes(format), tobytes(locale)) |
|
) |
|
|
|
|
|
class StrftimeOptions(_StrftimeOptions): |
|
""" |
|
Options for the `strftime` function. |
|
|
|
Parameters |
|
---------- |
|
format : str, default "%Y-%m-%dT%H:%M:%S" |
|
Pattern for formatting input values. |
|
locale : str, default "C" |
|
Locale to use for locale-specific format specifiers. |
|
""" |
|
|
|
def __init__(self, format="%Y-%m-%dT%H:%M:%S", locale="C"): |
|
self._set_options(format, locale) |
|
|
|
|
|
cdef class _DayOfWeekOptions(FunctionOptions): |
|
def _set_options(self, count_from_zero, week_start): |
|
self.wrapped.reset( |
|
new CDayOfWeekOptions(count_from_zero, week_start) |
|
) |
|
|
|
|
|
class DayOfWeekOptions(_DayOfWeekOptions): |
|
""" |
|
Options for the `day_of_week` function. |
|
|
|
Parameters |
|
---------- |
|
count_from_zero : bool, default True |
|
If True, number days from 0, otherwise from 1. |
|
week_start : int, default 1 |
|
Which day does the week start with (Monday=1, Sunday=7). |
|
How this value is numbered is unaffected by `count_from_zero`. |
|
""" |
|
|
|
def __init__(self, *, count_from_zero=True, week_start=1): |
|
self._set_options(count_from_zero, week_start) |
|
|
|
|
|
cdef class _WeekOptions(FunctionOptions): |
|
def _set_options(self, week_starts_monday, count_from_zero, |
|
first_week_is_fully_in_year): |
|
self.wrapped.reset( |
|
new CWeekOptions(week_starts_monday, count_from_zero, |
|
first_week_is_fully_in_year) |
|
) |
|
|
|
|
|
class WeekOptions(_WeekOptions): |
|
""" |
|
Options for the `week` function. |
|
|
|
Parameters |
|
---------- |
|
week_starts_monday : bool, default True |
|
If True, weeks start on Monday; if False, on Sunday. |
|
count_from_zero : bool, default False |
|
If True, dates at the start of a year that fall into the last week |
|
of the previous year emit 0. |
|
If False, they emit 52 or 53 (the week number of the last week |
|
of the previous year). |
|
first_week_is_fully_in_year : bool, default False |
|
If True, week number 0 is fully in January. |
|
If False, a week that begins on December 29, 30 or 31 is considered |
|
to be week number 0 of the following year. |
|
""" |
|
|
|
def __init__(self, *, week_starts_monday=True, count_from_zero=False, |
|
first_week_is_fully_in_year=False): |
|
self._set_options(week_starts_monday, |
|
count_from_zero, first_week_is_fully_in_year) |
|
|
|
|
|
cdef class _AssumeTimezoneOptions(FunctionOptions): |
|
_ambiguous_map = { |
|
"raise": CAssumeTimezoneAmbiguous_AMBIGUOUS_RAISE, |
|
"earliest": CAssumeTimezoneAmbiguous_AMBIGUOUS_EARLIEST, |
|
"latest": CAssumeTimezoneAmbiguous_AMBIGUOUS_LATEST, |
|
} |
|
_nonexistent_map = { |
|
"raise": CAssumeTimezoneNonexistent_NONEXISTENT_RAISE, |
|
"earliest": CAssumeTimezoneNonexistent_NONEXISTENT_EARLIEST, |
|
"latest": CAssumeTimezoneNonexistent_NONEXISTENT_LATEST, |
|
} |
|
|
|
def _set_options(self, timezone, ambiguous, nonexistent): |
|
if ambiguous not in self._ambiguous_map: |
|
_raise_invalid_function_option(ambiguous, |
|
"'ambiguous' timestamp handling") |
|
if nonexistent not in self._nonexistent_map: |
|
_raise_invalid_function_option(nonexistent, |
|
"'nonexistent' timestamp handling") |
|
self.wrapped.reset( |
|
new CAssumeTimezoneOptions(tobytes(timezone), |
|
self._ambiguous_map[ambiguous], |
|
self._nonexistent_map[nonexistent]) |
|
) |
|
|
|
|
|
class AssumeTimezoneOptions(_AssumeTimezoneOptions): |
|
""" |
|
Options for the `assume_timezone` function. |
|
|
|
Parameters |
|
---------- |
|
timezone : str |
|
Timezone to assume for the input. |
|
ambiguous : str, default "raise" |
|
How to handle timestamps that are ambiguous in the assumed timezone. |
|
Accepted values are "raise", "earliest", "latest". |
|
nonexistent : str, default "raise" |
|
How to handle timestamps that don't exist in the assumed timezone. |
|
Accepted values are "raise", "earliest", "latest". |
|
""" |
|
|
|
def __init__(self, timezone, *, ambiguous="raise", nonexistent="raise"): |
|
self._set_options(timezone, ambiguous, nonexistent) |
|
|
|
|
|
cdef class _NullOptions(FunctionOptions): |
|
def _set_options(self, nan_is_null): |
|
self.wrapped.reset(new CNullOptions(nan_is_null)) |
|
|
|
|
|
class NullOptions(_NullOptions): |
|
""" |
|
Options for the `is_null` function. |
|
|
|
Parameters |
|
---------- |
|
nan_is_null : bool, default False |
|
Whether floating-point NaN values are considered null. |
|
""" |
|
|
|
def __init__(self, *, nan_is_null=False): |
|
self._set_options(nan_is_null) |
|
|
|
|
|
cdef class _VarianceOptions(FunctionOptions): |
|
def _set_options(self, ddof, skip_nulls, min_count): |
|
self.wrapped.reset(new CVarianceOptions(ddof, skip_nulls, min_count)) |
|
|
|
|
|
class VarianceOptions(_VarianceOptions): |
|
__doc__ = f""" |
|
Options for the `variance` and `stddev` functions. |
|
|
|
Parameters |
|
---------- |
|
ddof : int, default 0 |
|
Number of degrees of freedom. |
|
{_skip_nulls_doc()} |
|
{_min_count_doc(default=0)} |
|
""" |
|
|
|
def __init__(self, *, ddof=0, skip_nulls=True, min_count=0): |
|
self._set_options(ddof, skip_nulls, min_count) |
|
|
|
|
|
cdef class _SkewOptions(FunctionOptions): |
|
def _set_options(self, skip_nulls, biased, min_count): |
|
self.wrapped.reset(new CSkewOptions(skip_nulls, biased, min_count)) |
|
|
|
|
|
class SkewOptions(_SkewOptions): |
|
__doc__ = f""" |
|
Options for the `skew` and `kurtosis` functions. |
|
|
|
Parameters |
|
---------- |
|
{_skip_nulls_doc()} |
|
biased : bool, default True |
|
Whether the calculated value is biased. |
|
If False, the value computed includes a correction factor to reduce bias. |
|
{_min_count_doc(default=0)} |
|
""" |
|
|
|
def __init__(self, *, skip_nulls=True, biased=True, min_count=0): |
|
self._set_options(skip_nulls, biased, min_count) |
|
|
|
|
|
cdef class _SplitOptions(FunctionOptions): |
|
def _set_options(self, max_splits, reverse): |
|
self.wrapped.reset(new CSplitOptions(max_splits, reverse)) |
|
|
|
|
|
class SplitOptions(_SplitOptions): |
|
""" |
|
Options for splitting on whitespace. |
|
|
|
Parameters |
|
---------- |
|
max_splits : int or None, default None |
|
Maximum number of splits for each input value (unlimited if None). |
|
reverse : bool, default False |
|
Whether to start splitting from the end of each input value. |
|
This only has an effect if `max_splits` is not None. |
|
""" |
|
|
|
def __init__(self, *, max_splits=None, reverse=False): |
|
if max_splits is None: |
|
max_splits = -1 |
|
self._set_options(max_splits, reverse) |
|
|
|
|
|
cdef class _SplitPatternOptions(FunctionOptions): |
|
def _set_options(self, pattern, max_splits, reverse): |
|
self.wrapped.reset( |
|
new CSplitPatternOptions(tobytes(pattern), max_splits, reverse) |
|
) |
|
|
|
|
|
class SplitPatternOptions(_SplitPatternOptions): |
|
""" |
|
Options for splitting on a string pattern. |
|
|
|
Parameters |
|
---------- |
|
pattern : str |
|
String pattern to split on. |
|
max_splits : int or None, default None |
|
Maximum number of splits for each input value (unlimited if None). |
|
reverse : bool, default False |
|
Whether to start splitting from the end of each input value. |
|
This only has an effect if `max_splits` is not None. |
|
""" |
|
|
|
def __init__(self, pattern, *, max_splits=None, reverse=False): |
|
if max_splits is None: |
|
max_splits = -1 |
|
self._set_options(pattern, max_splits, reverse) |
|
|
|
|
|
cdef CSortOrder unwrap_sort_order(order) except *: |
|
if order == "ascending": |
|
return CSortOrder_Ascending |
|
elif order == "descending": |
|
return CSortOrder_Descending |
|
_raise_invalid_function_option(order, "sort order") |
|
|
|
|
|
cdef CNullPlacement unwrap_null_placement(null_placement) except *: |
|
if null_placement == "at_start": |
|
return CNullPlacement_AtStart |
|
elif null_placement == "at_end": |
|
return CNullPlacement_AtEnd |
|
_raise_invalid_function_option(null_placement, "null placement") |
|
|
|
|
|
cdef class _PartitionNthOptions(FunctionOptions): |
|
def _set_options(self, pivot, null_placement): |
|
self.wrapped.reset(new CPartitionNthOptions( |
|
pivot, unwrap_null_placement(null_placement))) |
|
|
|
|
|
class PartitionNthOptions(_PartitionNthOptions): |
|
""" |
|
Options for the `partition_nth_indices` function. |
|
|
|
Parameters |
|
---------- |
|
pivot : int |
|
Index into the equivalent sorted array of the pivot element. |
|
null_placement : str, default "at_end" |
|
Where nulls in the input should be partitioned. |
|
Accepted values are "at_start", "at_end". |
|
""" |
|
|
|
def __init__(self, pivot, *, null_placement="at_end"): |
|
self._set_options(pivot, null_placement) |
|
|
|
|
|
cdef class _WinsorizeOptions(FunctionOptions): |
|
def _set_options(self, lower_limit, upper_limit): |
|
self.wrapped.reset(new CWinsorizeOptions(lower_limit, upper_limit)) |
|
|
|
|
|
class WinsorizeOptions(_WinsorizeOptions): |
|
""" |
|
Options for the `winsorize` function. |
|
|
|
Parameters |
|
---------- |
|
lower_limit : float, between 0 and 1 |
|
The quantile below which all values are replaced with the quantile's value. |
|
upper_limit : float, between 0 and 1 |
|
The quantile above which all values are replaced with the quantile's value. |
|
""" |
|
|
|
def __init__(self, lower_limit, upper_limit): |
|
self._set_options(lower_limit, upper_limit) |
|
|
|
|
|
cdef class _CumulativeOptions(FunctionOptions): |
|
def _set_options(self, start, skip_nulls): |
|
if start is None: |
|
self.wrapped.reset(new CCumulativeOptions(skip_nulls)) |
|
elif isinstance(start, Scalar): |
|
self.wrapped.reset(new CCumulativeOptions( |
|
pyarrow_unwrap_scalar(start), skip_nulls)) |
|
else: |
|
try: |
|
start = lib.scalar(start) |
|
self.wrapped.reset(new CCumulativeOptions( |
|
pyarrow_unwrap_scalar(start), skip_nulls)) |
|
except Exception: |
|
_raise_invalid_function_option( |
|
start, "`start` type for CumulativeOptions", TypeError) |
|
|
|
|
|
class CumulativeOptions(_CumulativeOptions): |
|
""" |
|
Options for `cumulative_*` functions. |
|
|
|
- cumulative_sum |
|
- cumulative_sum_checked |
|
- cumulative_prod |
|
- cumulative_prod_checked |
|
- cumulative_max |
|
- cumulative_min |
|
|
|
Parameters |
|
---------- |
|
start : Scalar, default None |
|
Starting value for the cumulative operation. If none is given, |
|
a default value depending on the operation and input type is used. |
|
skip_nulls : bool, default False |
|
When false, the first encountered null is propagated. |
|
""" |
|
|
|
def __init__(self, start=None, *, skip_nulls=False): |
|
self._set_options(start, skip_nulls) |
|
|
|
|
|
class CumulativeSumOptions(_CumulativeOptions): |
|
""" |
|
Options for `cumulative_sum` function. |
|
|
|
Parameters |
|
---------- |
|
start : Scalar, default None |
|
Starting value for sum computation |
|
skip_nulls : bool, default False |
|
When false, the first encountered null is propagated. |
|
""" |
|
|
|
def __init__(self, start=None, *, skip_nulls=False): |
|
warnings.warn( |
|
_DEPR_MSG.format("CumulativeSumOptions", "14.0", "CumulativeOptions"), |
|
FutureWarning, |
|
stacklevel=2 |
|
) |
|
self._set_options(start, skip_nulls) |
|
|
|
|
|
cdef class _PairwiseOptions(FunctionOptions): |
|
def _set_options(self, period): |
|
self.wrapped.reset(new CPairwiseOptions(period)) |
|
|
|
|
|
class PairwiseOptions(_PairwiseOptions): |
|
""" |
|
Options for `pairwise` functions. |
|
|
|
Parameters |
|
---------- |
|
period : int, default 1 |
|
Period for applying the period function. |
|
""" |
|
|
|
def __init__(self, period=1): |
|
self._set_options(period) |
|
|
|
|
|
cdef class _ListFlattenOptions(FunctionOptions): |
|
def _set_options(self, recursive): |
|
self.wrapped.reset(new CListFlattenOptions(recursive)) |
|
|
|
|
|
class ListFlattenOptions(_ListFlattenOptions): |
|
""" |
|
Options for `list_flatten` function |
|
|
|
Parameters |
|
---------- |
|
recursive : bool, default False |
|
When True, the list array is flattened recursively until an array |
|
of non-list values is formed. |
|
""" |
|
|
|
def __init__(self, recursive=False): |
|
self._set_options(recursive) |
|
|
|
|
|
cdef class _ArraySortOptions(FunctionOptions): |
|
def _set_options(self, order, null_placement): |
|
self.wrapped.reset(new CArraySortOptions( |
|
unwrap_sort_order(order), unwrap_null_placement(null_placement))) |
|
|
|
|
|
class ArraySortOptions(_ArraySortOptions): |
|
""" |
|
Options for the `array_sort_indices` function. |
|
|
|
Parameters |
|
---------- |
|
order : str, default "ascending" |
|
Which order to sort values in. |
|
Accepted values are "ascending", "descending". |
|
null_placement : str, default "at_end" |
|
Where nulls in the input should be sorted. |
|
Accepted values are "at_start", "at_end". |
|
""" |
|
|
|
def __init__(self, order="ascending", *, null_placement="at_end"): |
|
self._set_options(order, null_placement) |
|
|
|
|
|
cdef class _SortOptions(FunctionOptions): |
|
def _set_options(self, sort_keys, null_placement): |
|
self.wrapped.reset(new CSortOptions( |
|
unwrap_sort_keys(sort_keys, allow_str=False), |
|
unwrap_null_placement(null_placement))) |
|
|
|
|
|
class SortOptions(_SortOptions): |
|
""" |
|
Options for the `sort_indices` function. |
|
|
|
Parameters |
|
---------- |
|
sort_keys : sequence of (name, order) tuples |
|
Names of field/column keys to sort the input on, |
|
along with the order each field/column is sorted in. |
|
Accepted values for `order` are "ascending", "descending". |
|
The field name can be a string column name or expression. |
|
null_placement : str, default "at_end" |
|
Where nulls in input should be sorted, only applying to |
|
columns/fields mentioned in `sort_keys`. |
|
Accepted values are "at_start", "at_end". |
|
""" |
|
|
|
def __init__(self, sort_keys=(), *, null_placement="at_end"): |
|
self._set_options(sort_keys, null_placement) |
|
|
|
|
|
cdef class _SelectKOptions(FunctionOptions): |
|
def _set_options(self, k, sort_keys): |
|
self.wrapped.reset(new CSelectKOptions(k, unwrap_sort_keys(sort_keys, allow_str=False))) |
|
|
|
|
|
class SelectKOptions(_SelectKOptions): |
|
""" |
|
Options for top/bottom k-selection. |
|
|
|
Parameters |
|
---------- |
|
k : int |
|
Number of leading values to select in sorted order |
|
(i.e. the largest values if sort order is "descending", |
|
the smallest otherwise). |
|
sort_keys : sequence of (name, order) tuples |
|
Names of field/column keys to sort the input on, |
|
along with the order each field/column is sorted in. |
|
Accepted values for `order` are "ascending", "descending". |
|
The field name can be a string column name or expression. |
|
""" |
|
|
|
def __init__(self, k, sort_keys): |
|
self._set_options(k, sort_keys) |
|
|
|
|
|
cdef class _QuantileOptions(FunctionOptions): |
|
_interp_map = { |
|
"linear": CQuantileInterp_LINEAR, |
|
"lower": CQuantileInterp_LOWER, |
|
"higher": CQuantileInterp_HIGHER, |
|
"nearest": CQuantileInterp_NEAREST, |
|
"midpoint": CQuantileInterp_MIDPOINT, |
|
} |
|
|
|
def _set_options(self, quantiles, interp, skip_nulls, min_count): |
|
try: |
|
self.wrapped.reset( |
|
new CQuantileOptions(quantiles, self._interp_map[interp], |
|
skip_nulls, min_count) |
|
) |
|
except KeyError: |
|
_raise_invalid_function_option(interp, "quantile interpolation") |
|
|
|
|
|
class QuantileOptions(_QuantileOptions): |
|
__doc__ = f""" |
|
Options for the `quantile` function. |
|
|
|
Parameters |
|
---------- |
|
q : double or sequence of double, default 0.5 |
|
Probability levels of the quantiles to compute. All values must be in |
|
[0, 1]. |
|
interpolation : str, default "linear" |
|
How to break ties between competing data points for a given quantile. |
|
Accepted values are: |
|
|
|
- "linear": compute an interpolation |
|
- "lower": always use the smallest of the two data points |
|
- "higher": always use the largest of the two data points |
|
- "nearest": select the data point that is closest to the quantile |
|
- "midpoint": compute the (unweighted) mean of the two data points |
|
{_skip_nulls_doc()} |
|
{_min_count_doc(default=0)} |
|
""" |
|
|
|
def __init__(self, q=0.5, *, interpolation="linear", skip_nulls=True, |
|
min_count=0): |
|
if not isinstance(q, SUPPORTED_INPUT_ARR_TYPES): |
|
q = [q] |
|
self._set_options(q, interpolation, skip_nulls, min_count) |
|
|
|
|
|
cdef class _TDigestOptions(FunctionOptions): |
|
def _set_options(self, quantiles, delta, buffer_size, skip_nulls, |
|
min_count): |
|
self.wrapped.reset( |
|
new CTDigestOptions(quantiles, delta, buffer_size, skip_nulls, |
|
min_count) |
|
) |
|
|
|
|
|
class TDigestOptions(_TDigestOptions): |
|
__doc__ = f""" |
|
Options for the `tdigest` function. |
|
|
|
Parameters |
|
---------- |
|
q : double or sequence of double, default 0.5 |
|
Probability levels of the quantiles to approximate. All values must be |
|
in [0, 1]. |
|
delta : int, default 100 |
|
Compression parameter for the T-digest algorithm. |
|
buffer_size : int, default 500 |
|
Buffer size for the T-digest algorithm. |
|
{_skip_nulls_doc()} |
|
{_min_count_doc(default=0)} |
|
""" |
|
|
|
def __init__(self, q=0.5, *, delta=100, buffer_size=500, skip_nulls=True, |
|
min_count=0): |
|
if not isinstance(q, SUPPORTED_INPUT_ARR_TYPES): |
|
q = [q] |
|
self._set_options(q, delta, buffer_size, skip_nulls, min_count) |
|
|
|
|
|
cdef class _Utf8NormalizeOptions(FunctionOptions): |
|
_form_map = { |
|
"NFC": CUtf8NormalizeForm_NFC, |
|
"NFKC": CUtf8NormalizeForm_NFKC, |
|
"NFD": CUtf8NormalizeForm_NFD, |
|
"NFKD": CUtf8NormalizeForm_NFKD, |
|
} |
|
|
|
def _set_options(self, form): |
|
try: |
|
self.wrapped.reset( |
|
new CUtf8NormalizeOptions(self._form_map[form]) |
|
) |
|
except KeyError: |
|
_raise_invalid_function_option(form, |
|
"Unicode normalization form") |
|
|
|
|
|
class Utf8NormalizeOptions(_Utf8NormalizeOptions): |
|
""" |
|
Options for the `utf8_normalize` function. |
|
|
|
Parameters |
|
---------- |
|
form : str |
|
Unicode normalization form. |
|
Accepted values are "NFC", "NFKC", "NFD", NFKD". |
|
""" |
|
|
|
def __init__(self, form): |
|
self._set_options(form) |
|
|
|
|
|
cdef class _RandomOptions(FunctionOptions): |
|
def _set_options(self, initializer): |
|
if initializer == 'system': |
|
self.wrapped.reset(new CRandomOptions( |
|
CRandomOptions.FromSystemRandom())) |
|
return |
|
|
|
if not isinstance(initializer, int): |
|
try: |
|
initializer = hash(initializer) |
|
except TypeError: |
|
raise TypeError( |
|
f"initializer should be 'system', an integer, " |
|
f"or a hashable object; got {initializer!r}") |
|
|
|
if initializer < 0: |
|
initializer += 2**64 |
|
self.wrapped.reset(new CRandomOptions( |
|
CRandomOptions.FromSeed(initializer))) |
|
|
|
|
|
class RandomOptions(_RandomOptions): |
|
""" |
|
Options for random generation. |
|
|
|
Parameters |
|
---------- |
|
initializer : int or str |
|
How to initialize the underlying random generator. |
|
If an integer is given, it is used as a seed. |
|
If "system" is given, the random generator is initialized with |
|
a system-specific source of (hopefully true) randomness. |
|
Other values are invalid. |
|
""" |
|
|
|
def __init__(self, *, initializer='system'): |
|
self._set_options(initializer) |
|
|
|
|
|
cdef class _RankOptions(FunctionOptions): |
|
|
|
_tiebreaker_map = { |
|
"min": CRankOptionsTiebreaker_Min, |
|
"max": CRankOptionsTiebreaker_Max, |
|
"first": CRankOptionsTiebreaker_First, |
|
"dense": CRankOptionsTiebreaker_Dense, |
|
} |
|
|
|
def _set_options(self, sort_keys, null_placement, tiebreaker): |
|
try: |
|
self.wrapped.reset( |
|
new CRankOptions(unwrap_sort_keys(sort_keys), |
|
unwrap_null_placement(null_placement), |
|
self._tiebreaker_map[tiebreaker]) |
|
) |
|
except KeyError: |
|
_raise_invalid_function_option(tiebreaker, "tiebreaker") |
|
|
|
|
|
class RankOptions(_RankOptions): |
|
""" |
|
Options for the `rank` function. |
|
|
|
Parameters |
|
---------- |
|
sort_keys : sequence of (name, order) tuples or str, default "ascending" |
|
Names of field/column keys to sort the input on, |
|
along with the order each field/column is sorted in. |
|
Accepted values for `order` are "ascending", "descending". |
|
The field name can be a string column name or expression. |
|
Alternatively, one can simply pass "ascending" or "descending" as a string |
|
if the input is array-like. |
|
null_placement : str, default "at_end" |
|
Where nulls in input should be sorted. |
|
Accepted values are "at_start", "at_end". |
|
tiebreaker : str, default "first" |
|
Configure how ties between equal values are handled. |
|
Accepted values are: |
|
|
|
- "min": Ties get the smallest possible rank in sorted order. |
|
- "max": Ties get the largest possible rank in sorted order. |
|
- "first": Ranks are assigned in order of when ties appear in the |
|
input. This ensures the ranks are a stable permutation |
|
of the input. |
|
- "dense": The ranks span a dense [1, M] interval where M is the |
|
number of distinct values in the input. |
|
""" |
|
|
|
def __init__(self, sort_keys="ascending", *, null_placement="at_end", tiebreaker="first"): |
|
self._set_options(sort_keys, null_placement, tiebreaker) |
|
|
|
|
|
cdef class _RankQuantileOptions(FunctionOptions): |
|
|
|
def _set_options(self, sort_keys, null_placement): |
|
self.wrapped.reset( |
|
new CRankQuantileOptions(unwrap_sort_keys(sort_keys), |
|
unwrap_null_placement(null_placement)) |
|
) |
|
|
|
|
|
class RankQuantileOptions(_RankQuantileOptions): |
|
""" |
|
Options for the `rank_quantile` function. |
|
|
|
Parameters |
|
---------- |
|
sort_keys : sequence of (name, order) tuples or str, default "ascending" |
|
Names of field/column keys to sort the input on, |
|
along with the order each field/column is sorted in. |
|
Accepted values for `order` are "ascending", "descending". |
|
The field name can be a string column name or expression. |
|
Alternatively, one can simply pass "ascending" or "descending" as a string |
|
if the input is array-like. |
|
null_placement : str, default "at_end" |
|
Where nulls in input should be sorted. |
|
Accepted values are "at_start", "at_end". |
|
""" |
|
|
|
def __init__(self, sort_keys="ascending", *, null_placement="at_end"): |
|
self._set_options(sort_keys, null_placement) |
|
|
|
|
|
cdef class _PivotWiderOptions(FunctionOptions): |
|
|
|
def _set_options(self, key_names, unexpected_key_behavior): |
|
cdef: |
|
vector[c_string] c_key_names |
|
PivotWiderUnexpectedKeyBehavior c_unexpected_key_behavior |
|
if unexpected_key_behavior == "ignore": |
|
c_unexpected_key_behavior = PivotWiderUnexpectedKeyBehavior_Ignore |
|
elif unexpected_key_behavior == "raise": |
|
c_unexpected_key_behavior = PivotWiderUnexpectedKeyBehavior_Raise |
|
else: |
|
raise ValueError( |
|
f"Unsupported value for unexpected_key_behavior: " |
|
f"expected 'ignore' or 'raise', got {unexpected_key_behavior!r}") |
|
|
|
for k in key_names: |
|
c_key_names.push_back(tobytes(k)) |
|
|
|
self.wrapped.reset( |
|
new CPivotWiderOptions(move(c_key_names), c_unexpected_key_behavior) |
|
) |
|
|
|
|
|
class PivotWiderOptions(_PivotWiderOptions): |
|
""" |
|
Options for the `pivot_wider` function. |
|
|
|
Parameters |
|
---------- |
|
key_names : sequence of str |
|
The pivot key names expected in the pivot key column. |
|
For each entry in `key_names`, a column with the same name is emitted |
|
in the struct output. |
|
unexpected_key_behavior : str, default "ignore" |
|
The behavior when pivot keys not in `key_names` are encountered. |
|
Accepted values are "ignore", "raise". |
|
If "ignore", unexpected keys are silently ignored. |
|
If "raise", unexpected keys raise a KeyError. |
|
""" |
|
|
|
def __init__(self, key_names, *, unexpected_key_behavior="ignore"): |
|
self._set_options(key_names, unexpected_key_behavior) |
|
|
|
|
|
cdef class Expression(_Weakrefable): |
|
""" |
|
A logical expression to be evaluated against some input. |
|
|
|
To create an expression: |
|
|
|
- Use the factory function ``pyarrow.compute.scalar()`` to create a |
|
scalar (not necessary when combined, see example below). |
|
- Use the factory function ``pyarrow.compute.field()`` to reference |
|
a field (column in table). |
|
- Compare fields and scalars with ``<``, ``<=``, ``==``, ``>=``, ``>``. |
|
- Combine expressions using python operators ``&`` (logical and), |
|
``|`` (logical or) and ``~`` (logical not). |
|
Note: python keywords ``and``, ``or`` and ``not`` cannot be used |
|
to combine expressions. |
|
- Create expression predicates using Expression methods such as |
|
``pyarrow.compute.Expression.isin()``. |
|
|
|
Examples |
|
-------- |
|
|
|
>>> import pyarrow.compute as pc |
|
>>> (pc.field("a") < pc.scalar(3)) | (pc.field("b") > 7) |
|
<pyarrow.compute.Expression ((a < 3) or (b > 7))> |
|
>>> pc.field('a') != 3 |
|
<pyarrow.compute.Expression (a != 3)> |
|
>>> pc.field('a').isin([1, 2, 3]) |
|
<pyarrow.compute.Expression is_in(a, {value_set=int64:[ |
|
1, |
|
2, |
|
3 |
|
], null_matching_behavior=MATCH})> |
|
""" |
|
|
|
def __init__(self): |
|
msg = 'Expression is an abstract class thus cannot be initialized.' |
|
raise TypeError(msg) |
|
|
|
cdef void init(self, const CExpression& sp): |
|
self.expr = sp |
|
|
|
@staticmethod |
|
cdef wrap(const CExpression& sp): |
|
cdef Expression self = Expression.__new__(Expression) |
|
self.init(sp) |
|
return self |
|
|
|
cdef inline CExpression unwrap(self): |
|
return self.expr |
|
|
|
def equals(self, Expression other): |
|
""" |
|
Parameters |
|
---------- |
|
other : pyarrow.dataset.Expression |
|
|
|
Returns |
|
------- |
|
bool |
|
""" |
|
return self.expr.Equals(other.unwrap()) |
|
|
|
def __str__(self): |
|
return frombytes(self.expr.ToString()) |
|
|
|
def __repr__(self): |
|
return "<pyarrow.compute.{0} {1}>".format( |
|
self.__class__.__name__, str(self) |
|
) |
|
|
|
@staticmethod |
|
def from_substrait(object message not None): |
|
""" |
|
Deserialize an expression from Substrait |
|
|
|
The serialized message must be an ExtendedExpression message that has |
|
only a single expression. The name of the expression and the schema |
|
the expression was bound to will be ignored. Use |
|
pyarrow.substrait.deserialize_expressions if this information is needed |
|
or if the message might contain multiple expressions. |
|
|
|
Parameters |
|
---------- |
|
message : bytes or Buffer or a protobuf Message |
|
The Substrait message to deserialize |
|
|
|
Returns |
|
------- |
|
Expression |
|
The deserialized expression |
|
""" |
|
expressions = _pas().BoundExpressions.from_substrait(message).expressions |
|
if len(expressions) == 0: |
|
raise ValueError("Substrait message did not contain any expressions") |
|
if len(expressions) > 1: |
|
raise ValueError( |
|
"Substrait message contained multiple expressions. Use pyarrow.substrait.deserialize_expressions instead") |
|
return next(iter(expressions.values())) |
|
|
|
def to_substrait(self, Schema schema not None, c_bool allow_arrow_extensions=False): |
|
""" |
|
Serialize the expression using Substrait |
|
|
|
The expression will be serialized as an ExtendedExpression message that has a |
|
single expression named "expression" |
|
|
|
Parameters |
|
---------- |
|
schema : Schema |
|
The input schema the expression will be bound to |
|
allow_arrow_extensions : bool, default False |
|
If False then only functions that are part of the core Substrait function |
|
definitions will be allowed. Set this to True to allow pyarrow-specific functions |
|
but the result may not be accepted by other compute libraries. |
|
|
|
Returns |
|
------- |
|
Buffer |
|
A buffer containing the serialized Protobuf plan. |
|
""" |
|
return _pas().serialize_expressions([self], ["expression"], schema, allow_arrow_extensions=allow_arrow_extensions) |
|
|
|
@staticmethod |
|
def _deserialize(Buffer buffer not None): |
|
return Expression.wrap(GetResultValue(CDeserializeExpression( |
|
pyarrow_unwrap_buffer(buffer)))) |
|
|
|
def __reduce__(self): |
|
buffer = pyarrow_wrap_buffer(GetResultValue( |
|
CSerializeExpression(self.expr))) |
|
return Expression._deserialize, (buffer,) |
|
|
|
@staticmethod |
|
cdef Expression _expr_or_scalar(object expr): |
|
if isinstance(expr, Expression): |
|
return (<Expression> expr) |
|
return (<Expression> Expression._scalar(expr)) |
|
|
|
@staticmethod |
|
def _call(str function_name, list arguments, FunctionOptions options=None): |
|
cdef: |
|
vector[CExpression] c_arguments |
|
shared_ptr[CFunctionOptions] c_options |
|
|
|
for argument in arguments: |
|
if not isinstance(argument, Expression): |
|
|
|
try: |
|
argument = Expression._scalar(argument) |
|
except ArrowInvalid: |
|
raise TypeError( |
|
"only other expressions allowed as arguments") |
|
c_arguments.push_back((<Expression> argument).expr) |
|
|
|
if options is not None: |
|
c_options = options.unwrap() |
|
|
|
return Expression.wrap(CMakeCallExpression( |
|
tobytes(function_name), move(c_arguments), c_options)) |
|
|
|
def __richcmp__(self, other, int op): |
|
other = Expression._expr_or_scalar(other) |
|
return Expression._call({ |
|
Py_EQ: "equal", |
|
Py_NE: "not_equal", |
|
Py_GT: "greater", |
|
Py_GE: "greater_equal", |
|
Py_LT: "less", |
|
Py_LE: "less_equal", |
|
}[op], [self, other]) |
|
|
|
def __bool__(self): |
|
raise ValueError( |
|
"An Expression cannot be evaluated to python True or False. " |
|
"If you are using the 'and', 'or' or 'not' operators, use '&', " |
|
"'|' or '~' instead." |
|
) |
|
|
|
def __invert__(self): |
|
return Expression._call("invert", [self]) |
|
|
|
def __and__(Expression self, other): |
|
other = Expression._expr_or_scalar(other) |
|
return Expression._call("and_kleene", [self, other]) |
|
|
|
def __or__(Expression self, other): |
|
other = Expression._expr_or_scalar(other) |
|
return Expression._call("or_kleene", [self, other]) |
|
|
|
def __add__(Expression self, other): |
|
other = Expression._expr_or_scalar(other) |
|
return Expression._call("add_checked", [self, other]) |
|
|
|
def __mul__(Expression self, other): |
|
other = Expression._expr_or_scalar(other) |
|
return Expression._call("multiply_checked", [self, other]) |
|
|
|
def __sub__(Expression self, other): |
|
other = Expression._expr_or_scalar(other) |
|
return Expression._call("subtract_checked", [self, other]) |
|
|
|
def __truediv__(Expression self, other): |
|
other = Expression._expr_or_scalar(other) |
|
return Expression._call("divide_checked", [self, other]) |
|
|
|
def is_valid(self): |
|
""" |
|
Check whether the expression is not-null (valid). |
|
|
|
This creates a new expression equivalent to calling the |
|
`is_valid` compute function on this expression. |
|
|
|
Returns |
|
------- |
|
is_valid : Expression |
|
""" |
|
return Expression._call("is_valid", [self]) |
|
|
|
def is_null(self, bint nan_is_null=False): |
|
""" |
|
Check whether the expression is null. |
|
|
|
This creates a new expression equivalent to calling the |
|
`is_null` compute function on this expression. |
|
|
|
Parameters |
|
---------- |
|
nan_is_null : boolean, default False |
|
Whether floating-point NaNs are considered null. |
|
|
|
Returns |
|
------- |
|
is_null : Expression |
|
""" |
|
options = NullOptions(nan_is_null=nan_is_null) |
|
return Expression._call("is_null", [self], options) |
|
|
|
def is_nan(self): |
|
""" |
|
Check whether the expression is NaN. |
|
|
|
This creates a new expression equivalent to calling the |
|
`is_nan` compute function on this expression. |
|
|
|
Returns |
|
------- |
|
is_nan : Expression |
|
""" |
|
return Expression._call("is_nan", [self]) |
|
|
|
def cast(self, type=None, safe=None, options=None): |
|
""" |
|
Explicitly set or change the expression's data type. |
|
|
|
This creates a new expression equivalent to calling the |
|
`cast` compute function on this expression. |
|
|
|
Parameters |
|
---------- |
|
type : DataType, default None |
|
Type to cast array to. |
|
safe : boolean, default True |
|
Whether to check for conversion errors such as overflow. |
|
options : CastOptions, default None |
|
Additional checks pass by CastOptions |
|
|
|
Returns |
|
------- |
|
cast : Expression |
|
""" |
|
safe_vars_passed = (safe is not None) or (type is not None) |
|
|
|
if safe_vars_passed and (options is not None): |
|
raise ValueError("Must either pass values for 'type' and 'safe' or pass a " |
|
"value for 'options'") |
|
|
|
if options is None: |
|
type = ensure_type(type, allow_none=False) |
|
if safe is False: |
|
options = CastOptions.unsafe(type) |
|
else: |
|
options = CastOptions.safe(type) |
|
return Expression._call("cast", [self], options) |
|
|
|
def isin(self, values): |
|
""" |
|
Check whether the expression is contained in values. |
|
|
|
This creates a new expression equivalent to calling the |
|
`is_in` compute function on this expression. |
|
|
|
Parameters |
|
---------- |
|
values : Array or iterable |
|
The values to check for. |
|
|
|
Returns |
|
------- |
|
isin : Expression |
|
A new expression that, when evaluated, checks whether |
|
this expression's value is contained in `values`. |
|
""" |
|
if not isinstance(values, Array): |
|
values = lib.array(values) |
|
|
|
options = SetLookupOptions(values) |
|
return Expression._call("is_in", [self], options) |
|
|
|
@staticmethod |
|
def _field(name_or_idx not None): |
|
cdef: |
|
CFieldRef c_field |
|
|
|
if isinstance(name_or_idx, int): |
|
return Expression.wrap(CMakeFieldExpressionByIndex(name_or_idx)) |
|
else: |
|
c_field = CFieldRef(<c_string> tobytes(name_or_idx)) |
|
return Expression.wrap(CMakeFieldExpression(c_field)) |
|
|
|
@staticmethod |
|
def _nested_field(tuple names not None): |
|
cdef: |
|
vector[CFieldRef] nested |
|
|
|
if len(names) == 0: |
|
raise ValueError("nested field reference should be non-empty") |
|
nested.reserve(len(names)) |
|
for name in names: |
|
if isinstance(name, int): |
|
nested.push_back(CFieldRef(<int>name)) |
|
else: |
|
nested.push_back(CFieldRef(<c_string> tobytes(name))) |
|
return Expression.wrap(CMakeFieldExpression(CFieldRef(move(nested)))) |
|
|
|
@staticmethod |
|
def _scalar(value): |
|
cdef: |
|
Scalar scalar |
|
|
|
if isinstance(value, Scalar): |
|
scalar = value |
|
else: |
|
scalar = lib.scalar(value) |
|
|
|
return Expression.wrap(CMakeScalarExpression(scalar.unwrap())) |
|
|
|
|
|
_deserialize = Expression._deserialize |
|
cdef CExpression _true = CMakeScalarExpression( |
|
<shared_ptr[CScalar]> make_shared[CBooleanScalar](True) |
|
) |
|
|
|
|
|
cdef CExpression _bind(Expression filter, Schema schema) except *: |
|
assert schema is not None |
|
|
|
if filter is None: |
|
return _true |
|
|
|
return GetResultValue(filter.unwrap().Bind( |
|
deref(pyarrow_unwrap_schema(schema).get()))) |
|
|
|
|
|
cdef class UdfContext: |
|
""" |
|
Per-invocation function context/state. |
|
|
|
This object will always be the first argument to a user-defined |
|
function. It should not be used outside of a call to the function. |
|
""" |
|
|
|
def __init__(self): |
|
raise TypeError("Do not call {}'s constructor directly" |
|
.format(self.__class__.__name__)) |
|
|
|
cdef void init(self, const CUdfContext &c_context): |
|
self.c_context = c_context |
|
|
|
@property |
|
def batch_length(self): |
|
""" |
|
The common length of all input arguments (int). |
|
|
|
In the case that all arguments are scalars, this value |
|
is used to pass the "actual length" of the arguments, |
|
e.g. because the scalar values are encoding a column |
|
with a constant value. |
|
""" |
|
return self.c_context.batch_length |
|
|
|
@property |
|
def memory_pool(self): |
|
""" |
|
A memory pool for allocations (:class:`MemoryPool`). |
|
|
|
This is the memory pool supplied by the user when they invoked |
|
the function and it should be used in any calls to arrow that the |
|
UDF makes if that call accepts a memory_pool. |
|
""" |
|
return box_memory_pool(self.c_context.pool) |
|
|
|
|
|
cdef inline CFunctionDoc _make_function_doc(dict func_doc) except *: |
|
""" |
|
Helper function to generate the FunctionDoc |
|
This function accepts a dictionary and expects the |
|
summary(str), description(str) and arg_names(List[str]) keys. |
|
""" |
|
cdef: |
|
CFunctionDoc f_doc |
|
vector[c_string] c_arg_names |
|
|
|
f_doc.summary = tobytes(func_doc["summary"]) |
|
f_doc.description = tobytes(func_doc["description"]) |
|
for arg_name in func_doc["arg_names"]: |
|
c_arg_names.push_back(tobytes(arg_name)) |
|
f_doc.arg_names = c_arg_names |
|
|
|
|
|
f_doc.options_class = b"" |
|
f_doc.options_required = False |
|
return f_doc |
|
|
|
|
|
cdef object box_udf_context(const CUdfContext& c_context): |
|
cdef UdfContext context = UdfContext.__new__(UdfContext) |
|
context.init(c_context) |
|
return context |
|
|
|
|
|
cdef _udf_callback(user_function, const CUdfContext& c_context, inputs): |
|
""" |
|
Helper callback function used to wrap the UdfContext from Python to C++ |
|
execution. |
|
""" |
|
context = box_udf_context(c_context) |
|
return user_function(context, *inputs) |
|
|
|
|
|
def _get_udf_context(memory_pool, batch_length): |
|
cdef CUdfContext c_context |
|
c_context.pool = maybe_unbox_memory_pool(memory_pool) |
|
c_context.batch_length = batch_length |
|
context = box_udf_context(c_context) |
|
return context |
|
|
|
|
|
ctypedef CStatus (*CRegisterUdf)(PyObject* function, function[CallbackUdf] wrapper, |
|
const CUdfOptions& options, CFunctionRegistry* registry) |
|
|
|
cdef class RegisterUdf(_Weakrefable): |
|
cdef CRegisterUdf register_func |
|
|
|
cdef void init(self, const CRegisterUdf register_func): |
|
self.register_func = register_func |
|
|
|
|
|
cdef get_register_scalar_function(): |
|
cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf) |
|
reg.register_func = RegisterScalarFunction |
|
return reg |
|
|
|
|
|
cdef get_register_tabular_function(): |
|
cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf) |
|
reg.register_func = RegisterTabularFunction |
|
return reg |
|
|
|
|
|
cdef get_register_aggregate_function(): |
|
cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf) |
|
reg.register_func = RegisterAggregateFunction |
|
return reg |
|
|
|
cdef get_register_vector_function(): |
|
cdef RegisterUdf reg = RegisterUdf.__new__(RegisterUdf) |
|
reg.register_func = RegisterVectorFunction |
|
return reg |
|
|
|
|
|
def register_scalar_function(func, function_name, function_doc, in_types, out_type, |
|
func_registry=None): |
|
""" |
|
Register a user-defined scalar function. |
|
|
|
This API is EXPERIMENTAL. |
|
|
|
A scalar function is a function that executes elementwise |
|
operations on arrays or scalars, i.e. a scalar function must |
|
be computed row-by-row with no state where each output row |
|
is computed only from its corresponding input row. |
|
In other words, all argument arrays have the same length, |
|
and the output array is of the same length as the arguments. |
|
Scalar functions are the only functions allowed in query engine |
|
expressions. |
|
|
|
Parameters |
|
---------- |
|
func : callable |
|
A callable implementing the user-defined function. |
|
The first argument is the context argument of type |
|
UdfContext. |
|
Then, it must take arguments equal to the number of |
|
in_types defined. It must return an Array or Scalar |
|
matching the out_type. It must return a Scalar if |
|
all arguments are scalar, else it must return an Array. |
|
|
|
To define a varargs function, pass a callable that takes |
|
*args. The last in_type will be the type of all varargs |
|
arguments. |
|
function_name : str |
|
Name of the function. There should only be one function |
|
registered with this name in the function registry. |
|
function_doc : dict |
|
A dictionary object with keys "summary" (str), |
|
and "description" (str). |
|
in_types : Dict[str, DataType] |
|
A dictionary mapping function argument names to |
|
their respective DataType. |
|
The argument names will be used to generate |
|
documentation for the function. The number of |
|
arguments specified here determines the function |
|
arity. |
|
out_type : DataType |
|
Output type of the function. |
|
func_registry : FunctionRegistry |
|
Optional function registry to use instead of the default global one. |
|
|
|
Examples |
|
-------- |
|
>>> import pyarrow as pa |
|
>>> import pyarrow.compute as pc |
|
>>> |
|
>>> func_doc = {} |
|
>>> func_doc["summary"] = "simple udf" |
|
>>> func_doc["description"] = "add a constant to a scalar" |
|
>>> |
|
>>> def add_constant(ctx, array): |
|
... return pc.add(array, 1, memory_pool=ctx.memory_pool) |
|
>>> |
|
>>> func_name = "py_add_func" |
|
>>> in_types = {"array": pa.int64()} |
|
>>> out_type = pa.int64() |
|
>>> pc.register_scalar_function(add_constant, func_name, func_doc, |
|
... in_types, out_type) |
|
>>> |
|
>>> func = pc.get_function(func_name) |
|
>>> func.name |
|
'py_add_func' |
|
>>> answer = pc.call_function(func_name, [pa.array([20])]) |
|
>>> answer |
|
<pyarrow.lib.Int64Array object at ...> |
|
[ |
|
21 |
|
] |
|
""" |
|
return _register_user_defined_function(get_register_scalar_function(), |
|
func, function_name, function_doc, in_types, |
|
out_type, func_registry) |
|
|
|
|
|
def register_vector_function(func, function_name, function_doc, in_types, out_type, |
|
func_registry=None): |
|
""" |
|
Register a user-defined vector function. |
|
|
|
This API is EXPERIMENTAL. |
|
|
|
A vector function is a function that executes vector |
|
operations on arrays. Vector function is often used |
|
when compute doesn't fit other more specific types of |
|
functions (e.g., scalar and aggregate). |
|
|
|
Parameters |
|
---------- |
|
func : callable |
|
A callable implementing the user-defined function. |
|
The first argument is the context argument of type |
|
UdfContext. |
|
Then, it must take arguments equal to the number of |
|
in_types defined. It must return an Array or Scalar |
|
matching the out_type. It must return a Scalar if |
|
all arguments are scalar, else it must return an Array. |
|
|
|
To define a varargs function, pass a callable that takes |
|
*args. The last in_type will be the type of all varargs |
|
arguments. |
|
function_name : str |
|
Name of the function. There should only be one function |
|
registered with this name in the function registry. |
|
function_doc : dict |
|
A dictionary object with keys "summary" (str), |
|
and "description" (str). |
|
in_types : Dict[str, DataType] |
|
A dictionary mapping function argument names to |
|
their respective DataType. |
|
The argument names will be used to generate |
|
documentation for the function. The number of |
|
arguments specified here determines the function |
|
arity. |
|
out_type : DataType |
|
Output type of the function. |
|
func_registry : FunctionRegistry |
|
Optional function registry to use instead of the default global one. |
|
|
|
Examples |
|
-------- |
|
>>> import pyarrow as pa |
|
>>> import pyarrow.compute as pc |
|
>>> |
|
>>> func_doc = {} |
|
>>> func_doc["summary"] = "percent rank" |
|
>>> func_doc["description"] = "compute percent rank" |
|
>>> |
|
>>> def list_flatten_udf(ctx, x): |
|
... return pc.list_flatten(x) |
|
>>> |
|
>>> func_name = "list_flatten_udf" |
|
>>> in_types = {"array": pa.list_(pa.int64())} |
|
>>> out_type = pa.int64() |
|
>>> pc.register_vector_function(list_flatten_udf, func_name, func_doc, |
|
... in_types, out_type) |
|
>>> |
|
>>> answer = pc.call_function(func_name, [pa.array([[1, 2], [3, 4]])]) |
|
>>> answer |
|
<pyarrow.lib.Int64Array object at ...> |
|
[ |
|
1, |
|
2, |
|
3, |
|
4 |
|
] |
|
""" |
|
return _register_user_defined_function(get_register_vector_function(), |
|
func, function_name, function_doc, in_types, |
|
out_type, func_registry) |
|
|
|
|
|
def register_aggregate_function(func, function_name, function_doc, in_types, out_type, |
|
func_registry=None): |
|
""" |
|
Register a user-defined non-decomposable aggregate function. |
|
|
|
This API is EXPERIMENTAL. |
|
|
|
A non-decomposable aggregation function is a function that executes |
|
aggregate operations on the whole data that it is aggregating. |
|
In other words, non-decomposable aggregate function cannot be |
|
split into consume/merge/finalize steps. |
|
|
|
This is often used with ordered or segmented aggregation where groups |
|
can be emit before accumulating all of the input data. |
|
|
|
Note that currently the size of any input column cannot exceed 2 GB |
|
for a single segment (all groups combined). |
|
|
|
Parameters |
|
---------- |
|
func : callable |
|
A callable implementing the user-defined function. |
|
The first argument is the context argument of type |
|
UdfContext. |
|
Then, it must take arguments equal to the number of |
|
in_types defined. It must return a Scalar matching the |
|
out_type. |
|
To define a varargs function, pass a callable that takes |
|
*args. The in_type needs to match in type of inputs when |
|
the function gets called. |
|
function_name : str |
|
Name of the function. This name must be unique, i.e., |
|
there should only be one function registered with |
|
this name in the function registry. |
|
function_doc : dict |
|
A dictionary object with keys "summary" (str), |
|
and "description" (str). |
|
in_types : Dict[str, DataType] |
|
A dictionary mapping function argument names to |
|
their respective DataType. |
|
The argument names will be used to generate |
|
documentation for the function. The number of |
|
arguments specified here determines the function |
|
arity. |
|
out_type : DataType |
|
Output type of the function. |
|
func_registry : FunctionRegistry |
|
Optional function registry to use instead of the default global one. |
|
|
|
Examples |
|
-------- |
|
>>> import numpy as np |
|
>>> import pyarrow as pa |
|
>>> import pyarrow.compute as pc |
|
>>> |
|
>>> func_doc = {} |
|
>>> func_doc["summary"] = "simple median udf" |
|
>>> func_doc["description"] = "compute median" |
|
>>> |
|
>>> def compute_median(ctx, array): |
|
... return pa.scalar(np.median(array)) |
|
>>> |
|
>>> func_name = "py_compute_median" |
|
>>> in_types = {"array": pa.int64()} |
|
>>> out_type = pa.float64() |
|
>>> pc.register_aggregate_function(compute_median, func_name, func_doc, |
|
... in_types, out_type) |
|
>>> |
|
>>> func = pc.get_function(func_name) |
|
>>> func.name |
|
'py_compute_median' |
|
>>> answer = pc.call_function(func_name, [pa.array([20, 40])]) |
|
>>> answer |
|
<pyarrow.DoubleScalar: 30.0> |
|
>>> table = pa.table([pa.array([1, 1, 2, 2]), pa.array([10, 20, 30, 40])], names=['k', 'v']) |
|
>>> result = table.group_by('k').aggregate([('v', 'py_compute_median')]) |
|
>>> result |
|
pyarrow.Table |
|
k: int64 |
|
v_py_compute_median: double |
|
---- |
|
k: [[1,2]] |
|
v_py_compute_median: [[15,35]] |
|
""" |
|
return _register_user_defined_function(get_register_aggregate_function(), |
|
func, function_name, function_doc, in_types, |
|
out_type, func_registry) |
|
|
|
|
|
def register_tabular_function(func, function_name, function_doc, in_types, out_type, |
|
func_registry=None): |
|
""" |
|
Register a user-defined tabular function. |
|
|
|
This API is EXPERIMENTAL. |
|
|
|
A tabular function is one accepting a context argument of type |
|
UdfContext and returning a generator of struct arrays. |
|
The in_types argument must be empty and the out_type argument |
|
specifies a schema. Each struct array must have field types |
|
corresponding to the schema. |
|
|
|
Parameters |
|
---------- |
|
func : callable |
|
A callable implementing the user-defined function. |
|
The only argument is the context argument of type |
|
UdfContext. It must return a callable that |
|
returns on each invocation a StructArray matching |
|
the out_type, where an empty array indicates end. |
|
function_name : str |
|
Name of the function. There should only be one function |
|
registered with this name in the function registry. |
|
function_doc : dict |
|
A dictionary object with keys "summary" (str), |
|
and "description" (str). |
|
in_types : Dict[str, DataType] |
|
Must be an empty dictionary (reserved for future use). |
|
out_type : Union[Schema, DataType] |
|
Schema of the function's output, or a corresponding flat struct type. |
|
func_registry : FunctionRegistry |
|
Optional function registry to use instead of the default global one. |
|
""" |
|
cdef: |
|
shared_ptr[CSchema] c_schema |
|
shared_ptr[CDataType] c_type |
|
|
|
if isinstance(out_type, Schema): |
|
c_schema = pyarrow_unwrap_schema(out_type) |
|
with nogil: |
|
c_type = <shared_ptr[CDataType]>make_shared[CStructType](deref(c_schema).fields()) |
|
out_type = pyarrow_wrap_data_type(c_type) |
|
return _register_user_defined_function(get_register_tabular_function(), |
|
func, function_name, function_doc, in_types, |
|
out_type, func_registry) |
|
|
|
|
|
def _register_user_defined_function(register_func, func, function_name, function_doc, in_types, |
|
out_type, func_registry=None): |
|
""" |
|
Register a user-defined function. |
|
|
|
This method itself doesn't care about the type of the UDF |
|
(i.e., scalar vs tabular vs aggregate) |
|
|
|
Parameters |
|
---------- |
|
register_func: object |
|
An object holding a CRegisterUdf in a "register_func" attribute. |
|
func : callable |
|
A callable implementing the user-defined function. |
|
function_name : str |
|
Name of the function. There should only be one function |
|
registered with this name in the function registry. |
|
function_doc : dict |
|
A dictionary object with keys "summary" (str), |
|
and "description" (str). |
|
in_types : Dict[str, DataType] |
|
A dictionary mapping function argument names to |
|
their respective DataType. |
|
out_type : DataType |
|
Output type of the function. |
|
func_registry : FunctionRegistry |
|
Optional function registry to use instead of the default global one. |
|
""" |
|
cdef: |
|
CRegisterUdf c_register_func |
|
c_string c_func_name |
|
CArity c_arity |
|
CFunctionDoc c_func_doc |
|
vector[shared_ptr[CDataType]] c_in_types |
|
PyObject* c_function |
|
shared_ptr[CDataType] c_out_type |
|
CUdfOptions c_options |
|
CFunctionRegistry* c_func_registry |
|
|
|
if callable(func): |
|
c_function = <PyObject*>func |
|
else: |
|
raise TypeError("func must be a callable") |
|
|
|
c_func_name = tobytes(function_name) |
|
|
|
func_spec = inspect.getfullargspec(func) |
|
num_args = -1 |
|
if isinstance(in_types, dict): |
|
for in_type in in_types.values(): |
|
c_in_types.push_back( |
|
pyarrow_unwrap_data_type(ensure_type(in_type))) |
|
function_doc["arg_names"] = in_types.keys() |
|
num_args = len(in_types) |
|
else: |
|
raise TypeError( |
|
"in_types must be a dictionary of DataType") |
|
|
|
c_arity = CArity(<int> num_args, func_spec.varargs) |
|
|
|
if "summary" not in function_doc: |
|
raise ValueError("Function doc must contain a summary") |
|
|
|
if "description" not in function_doc: |
|
raise ValueError("Function doc must contain a description") |
|
|
|
if "arg_names" not in function_doc: |
|
raise ValueError("Function doc must contain arg_names") |
|
|
|
c_func_doc = _make_function_doc(function_doc) |
|
|
|
c_out_type = pyarrow_unwrap_data_type(ensure_type(out_type)) |
|
|
|
c_options.func_name = c_func_name |
|
c_options.arity = c_arity |
|
c_options.func_doc = c_func_doc |
|
c_options.input_types = c_in_types |
|
c_options.output_type = c_out_type |
|
|
|
if func_registry is None: |
|
c_func_registry = NULL |
|
else: |
|
c_func_registry = (<FunctionRegistry>func_registry).registry |
|
|
|
c_register_func = (<RegisterUdf>register_func).register_func |
|
|
|
check_status(c_register_func(c_function, |
|
<function[CallbackUdf]> &_udf_callback, |
|
c_options, c_func_registry)) |
|
|
|
|
|
def call_tabular_function(function_name, args=None, func_registry=None): |
|
""" |
|
Get a record batch iterator from a tabular function. |
|
|
|
Parameters |
|
---------- |
|
function_name : str |
|
Name of the function. |
|
args : iterable |
|
The arguments to pass to the function. Accepted types depend |
|
on the specific function. Currently, only an empty args is supported. |
|
func_registry : FunctionRegistry |
|
Optional function registry to use instead of the default global one. |
|
""" |
|
cdef: |
|
c_string c_func_name |
|
vector[CDatum] c_args |
|
CFunctionRegistry* c_func_registry |
|
shared_ptr[CRecordBatchReader] c_reader |
|
RecordBatchReader reader |
|
|
|
c_func_name = tobytes(function_name) |
|
if func_registry is None: |
|
c_func_registry = NULL |
|
else: |
|
c_func_registry = (<FunctionRegistry>func_registry).registry |
|
if args is None: |
|
args = [] |
|
_pack_compute_args(args, &c_args) |
|
|
|
with nogil: |
|
c_reader = GetResultValue(CallTabularFunction( |
|
c_func_name, c_args, c_func_registry)) |
|
reader = RecordBatchReader.__new__(RecordBatchReader) |
|
reader.reader = c_reader |
|
return RecordBatchReader.from_batches(pyarrow_wrap_schema(deref(c_reader).schema()), reader) |
|
|