|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
from cpython.exc cimport PyErr_CheckSignals, PyErr_SetInterrupt |
|
|
|
from pyarrow.includes.libarrow cimport CStatus |
|
from pyarrow.includes.libarrow_python cimport IsPyError, RestorePyError |
|
from pyarrow.includes.common cimport c_string |
|
|
|
from contextlib import contextmanager |
|
import os |
|
import signal |
|
import threading |
|
|
|
from pyarrow.lib import is_threading_enabled |
|
from pyarrow.util import _break_traceback_cycle_from_frame |
|
|
|
|
|
class ArrowException(Exception): |
|
pass |
|
|
|
|
|
class ArrowInvalid(ValueError, ArrowException): |
|
pass |
|
|
|
|
|
class ArrowMemoryError(MemoryError, ArrowException): |
|
pass |
|
|
|
|
|
class ArrowKeyError(KeyError, ArrowException): |
|
def __str__(self): |
|
|
|
return ArrowException.__str__(self) |
|
|
|
|
|
class ArrowTypeError(TypeError, ArrowException): |
|
pass |
|
|
|
|
|
class ArrowNotImplementedError(NotImplementedError, ArrowException): |
|
pass |
|
|
|
|
|
class ArrowCapacityError(ArrowException): |
|
pass |
|
|
|
|
|
class ArrowIndexError(IndexError, ArrowException): |
|
pass |
|
|
|
|
|
class ArrowSerializationError(ArrowException): |
|
pass |
|
|
|
|
|
class ArrowCancelled(ArrowException): |
|
def __init__(self, message, signum=None): |
|
super().__init__(message) |
|
self.signum = signum |
|
|
|
|
|
|
|
ArrowIOError = IOError |
|
|
|
|
|
|
|
|
|
cdef int check_status(const CStatus& status) except -1 nogil: |
|
if status.ok(): |
|
return 0 |
|
|
|
with gil: |
|
if IsPyError(status): |
|
RestorePyError(status) |
|
return -1 |
|
|
|
raise convert_status(status) |
|
|
|
|
|
cdef object convert_status(const CStatus& status): |
|
if IsPyError(status): |
|
try: |
|
RestorePyError(status) |
|
except BaseException as e: |
|
return e |
|
|
|
|
|
|
|
message = frombytes(status.message(), safe=True) |
|
detail = status.detail() |
|
if detail != nullptr: |
|
message += ". Detail: " + frombytes(detail.get().ToString(), |
|
safe=True) |
|
|
|
if status.IsInvalid(): |
|
return ArrowInvalid(message) |
|
elif status.IsIOError(): |
|
|
|
|
|
|
|
|
|
|
|
|
|
errno = ErrnoFromStatus(status) |
|
winerror = WinErrorFromStatus(status) |
|
if winerror != 0: |
|
return IOError(errno, message, None, winerror) |
|
elif errno != 0: |
|
return IOError(errno, message) |
|
else: |
|
return IOError(message) |
|
elif status.IsOutOfMemory(): |
|
return ArrowMemoryError(message) |
|
elif status.IsKeyError(): |
|
return ArrowKeyError(message) |
|
elif status.IsNotImplemented(): |
|
return ArrowNotImplementedError(message) |
|
elif status.IsTypeError(): |
|
return ArrowTypeError(message) |
|
elif status.IsCapacityError(): |
|
return ArrowCapacityError(message) |
|
elif status.IsIndexError(): |
|
return ArrowIndexError(message) |
|
elif status.IsSerializationError(): |
|
return ArrowSerializationError(message) |
|
elif status.IsCancelled(): |
|
signum = SignalFromStatus(status) |
|
if signum > 0: |
|
return ArrowCancelled(message, signum) |
|
else: |
|
return ArrowCancelled(message) |
|
else: |
|
message = frombytes(status.ToString(), safe=True) |
|
return ArrowException(message) |
|
|
|
|
|
|
|
cdef api int pyarrow_internal_check_status(const CStatus& status) \ |
|
except -1 nogil: |
|
return check_status(status) |
|
|
|
cdef api object pyarrow_internal_convert_status(const CStatus& status): |
|
return convert_status(status) |
|
|
|
|
|
cdef class StopToken: |
|
cdef void init(self, CStopToken stop_token): |
|
self.stop_token = move(stop_token) |
|
|
|
|
|
cdef c_bool signal_handlers_enabled = True |
|
|
|
|
|
def enable_signal_handlers(c_bool enable): |
|
""" |
|
Enable or disable interruption of long-running operations. |
|
|
|
By default, certain long running operations will detect user |
|
interruptions, such as by pressing Ctrl-C. This detection relies |
|
on setting a signal handler for the duration of the long-running |
|
operation, and may therefore interfere with other frameworks or |
|
libraries (such as an event loop). |
|
|
|
Parameters |
|
---------- |
|
enable : bool |
|
Whether to enable user interruption by setting a temporary |
|
signal handler. |
|
""" |
|
global signal_handlers_enabled |
|
signal_handlers_enabled = enable |
|
|
|
|
|
|
|
|
|
|
|
have_signal_refcycle = (sys.version_info < (3, 8, 10) or |
|
(3, 9) <= sys.version_info < (3, 9, 5) or |
|
sys.version_info[:2] == (3, 10)) |
|
|
|
cdef class SignalStopHandler: |
|
cdef: |
|
StopToken _stop_token |
|
vector[int] _signals |
|
c_bool _enabled |
|
|
|
def __cinit__(self): |
|
self._enabled = False |
|
|
|
self._init_signals() |
|
if have_signal_refcycle: |
|
_break_traceback_cycle_from_frame(sys._getframe(0)) |
|
|
|
self._stop_token = StopToken() |
|
|
|
if not self._signals.empty(): |
|
maybe_source = SetSignalStopSource() |
|
if not maybe_source.ok(): |
|
|
|
|
|
|
|
|
|
maybe_source.status().Warn() |
|
else: |
|
self._stop_token.init(deref(maybe_source).token()) |
|
|
|
|
|
self._enabled = is_threading_enabled() |
|
|
|
def _init_signals(self): |
|
if (signal_handlers_enabled and |
|
threading.current_thread() is threading.main_thread()): |
|
self._signals = [ |
|
sig for sig in (signal.SIGINT, signal.SIGTERM) |
|
if signal.getsignal(sig) not in (signal.SIG_DFL, |
|
signal.SIG_IGN, None)] |
|
|
|
def __enter__(self): |
|
if self._enabled: |
|
check_status(RegisterCancellingSignalHandler(self._signals)) |
|
return self |
|
|
|
def __exit__(self, exc_type, exc_value, exc_tb): |
|
if self._enabled: |
|
UnregisterCancellingSignalHandler() |
|
if exc_value is None: |
|
|
|
try: |
|
check_status(self._stop_token.stop_token.Poll()) |
|
except ArrowCancelled as e: |
|
exc_value = e |
|
if isinstance(exc_value, ArrowCancelled): |
|
if exc_value.signum: |
|
|
|
|
|
if os.name == 'nt': |
|
SendSignal(exc_value.signum) |
|
else: |
|
SendSignalToThread(exc_value.signum, |
|
threading.main_thread().ident) |
|
else: |
|
|
|
|
|
|
|
PyErr_SetInterrupt() |
|
|
|
|
|
|
|
PyErr_CheckSignals() |
|
|
|
|
|
|
|
def __dealloc__(self): |
|
if self._enabled: |
|
ResetSignalStopSource() |
|
|
|
@property |
|
def stop_token(self): |
|
return self._stop_token |
|
|