File size: 8,909 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 |
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from cpython.exc cimport PyErr_CheckSignals, PyErr_SetInterrupt
from pyarrow.includes.libarrow cimport CStatus
from pyarrow.includes.libarrow_python cimport IsPyError, RestorePyError
from pyarrow.includes.common cimport c_string
from contextlib import contextmanager
import os
import signal
import threading
from pyarrow.lib import is_threading_enabled
from pyarrow.util import _break_traceback_cycle_from_frame
class ArrowException(Exception):
pass
class ArrowInvalid(ValueError, ArrowException):
pass
class ArrowMemoryError(MemoryError, ArrowException):
pass
class ArrowKeyError(KeyError, ArrowException):
def __str__(self):
# Override KeyError.__str__, as it uses the repr() of the key
return ArrowException.__str__(self)
class ArrowTypeError(TypeError, ArrowException):
pass
class ArrowNotImplementedError(NotImplementedError, ArrowException):
pass
class ArrowCapacityError(ArrowException):
pass
class ArrowIndexError(IndexError, ArrowException):
pass
class ArrowSerializationError(ArrowException):
pass
class ArrowCancelled(ArrowException):
def __init__(self, message, signum=None):
super().__init__(message)
self.signum = signum
# Compatibility alias
ArrowIOError = IOError
# check_status() and convert_status() could be written directly in C++
# if we didn't define Arrow-specific subclasses (ArrowInvalid etc.)
cdef int check_status(const CStatus& status) except -1 nogil:
if status.ok():
return 0
with gil:
if IsPyError(status):
RestorePyError(status)
return -1
raise convert_status(status)
cdef object convert_status(const CStatus& status):
if IsPyError(status):
try:
RestorePyError(status)
except BaseException as e:
return e
# We don't use Status::ToString() as it would redundantly include
# the C++ class name.
message = frombytes(status.message(), safe=True)
detail = status.detail()
if detail != nullptr:
message += ". Detail: " + frombytes(detail.get().ToString(),
safe=True)
if status.IsInvalid():
return ArrowInvalid(message)
elif status.IsIOError():
# Note: OSError constructor is
# OSError(message)
# or
# OSError(errno, message, filename=None)
# or (on Windows)
# OSError(errno, message, filename, winerror)
errno = ErrnoFromStatus(status)
winerror = WinErrorFromStatus(status)
if winerror != 0:
return IOError(errno, message, None, winerror)
elif errno != 0:
return IOError(errno, message)
else:
return IOError(message)
elif status.IsOutOfMemory():
return ArrowMemoryError(message)
elif status.IsKeyError():
return ArrowKeyError(message)
elif status.IsNotImplemented():
return ArrowNotImplementedError(message)
elif status.IsTypeError():
return ArrowTypeError(message)
elif status.IsCapacityError():
return ArrowCapacityError(message)
elif status.IsIndexError():
return ArrowIndexError(message)
elif status.IsSerializationError():
return ArrowSerializationError(message)
elif status.IsCancelled():
signum = SignalFromStatus(status)
if signum > 0:
return ArrowCancelled(message, signum)
else:
return ArrowCancelled(message)
else:
message = frombytes(status.ToString(), safe=True)
return ArrowException(message)
# These are API functions for C++ PyArrow
cdef api int pyarrow_internal_check_status(const CStatus& status) \
except -1 nogil:
return check_status(status)
cdef api object pyarrow_internal_convert_status(const CStatus& status):
return convert_status(status)
cdef class StopToken:
cdef void init(self, CStopToken stop_token):
self.stop_token = move(stop_token)
cdef c_bool signal_handlers_enabled = True
def enable_signal_handlers(c_bool enable):
"""
Enable or disable interruption of long-running operations.
By default, certain long running operations will detect user
interruptions, such as by pressing Ctrl-C. This detection relies
on setting a signal handler for the duration of the long-running
operation, and may therefore interfere with other frameworks or
libraries (such as an event loop).
Parameters
----------
enable : bool
Whether to enable user interruption by setting a temporary
signal handler.
"""
global signal_handlers_enabled
signal_handlers_enabled = enable
# For internal use
# Whether we need a workaround for https://bugs.python.org/issue42248
have_signal_refcycle = (sys.version_info < (3, 8, 10) or
(3, 9) <= sys.version_info < (3, 9, 5) or
sys.version_info[:2] == (3, 10))
cdef class SignalStopHandler:
cdef:
StopToken _stop_token
vector[int] _signals
c_bool _enabled
def __cinit__(self):
self._enabled = False
self._init_signals()
if have_signal_refcycle:
_break_traceback_cycle_from_frame(sys._getframe(0))
self._stop_token = StopToken()
if not self._signals.empty():
maybe_source = SetSignalStopSource()
if not maybe_source.ok():
# See ARROW-11841 / ARROW-17173: in complex interaction
# scenarios (such as R calling into Python), SetSignalStopSource()
# may have already activated a signal-receiving StopSource.
# Just warn instead of erroring out.
maybe_source.status().Warn()
else:
self._stop_token.init(deref(maybe_source).token())
# signals don't work on Emscripten without threads.
# and possibly other single-thread environments.
self._enabled = is_threading_enabled()
def _init_signals(self):
if (signal_handlers_enabled and
threading.current_thread() is threading.main_thread()):
self._signals = [
sig for sig in (signal.SIGINT, signal.SIGTERM)
if signal.getsignal(sig) not in (signal.SIG_DFL,
signal.SIG_IGN, None)]
def __enter__(self):
if self._enabled:
check_status(RegisterCancellingSignalHandler(self._signals))
return self
def __exit__(self, exc_type, exc_value, exc_tb):
if self._enabled:
UnregisterCancellingSignalHandler()
if exc_value is None:
# Make sure we didn't lose a signal
try:
check_status(self._stop_token.stop_token.Poll())
except ArrowCancelled as e:
exc_value = e
if isinstance(exc_value, ArrowCancelled):
if exc_value.signum:
# Re-emit the exact same signal. We restored the Python signal
# handler above, so it should receive it.
if os.name == 'nt':
SendSignal(exc_value.signum)
else:
SendSignalToThread(exc_value.signum,
threading.main_thread().ident)
else:
# Simulate Python receiving a SIGINT
# (see https://bugs.python.org/issue43356 for why we can't
# simulate the exact signal number)
PyErr_SetInterrupt()
# Maximize chances of the Python signal handler being executed now.
# Otherwise a potential KeyboardInterrupt might be missed by an
# immediately enclosing try/except block.
PyErr_CheckSignals()
# ArrowCancelled will be re-raised if PyErr_CheckSignals()
# returned successfully.
def __dealloc__(self):
if self._enabled:
ResetSignalStopSource()
@property
def stop_token(self):
return self._stop_token
|