File size: 4,162 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 |
import inspect
import sys
import sentry_sdk
from sentry_sdk.consts import OP, SPANSTATUS
from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration
from sentry_sdk.tracing import TransactionSource
from sentry_sdk.utils import (
event_from_exception,
logger,
package_version,
qualname_from_function,
reraise,
)
try:
import ray # type: ignore[import-not-found]
except ImportError:
raise DidNotEnable("Ray not installed.")
import functools
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from collections.abc import Callable
from typing import Any, Optional
from sentry_sdk.utils import ExcInfo
def _check_sentry_initialized():
# type: () -> None
if sentry_sdk.get_client().is_active():
return
logger.debug(
"[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded."
)
def _patch_ray_remote():
# type: () -> None
old_remote = ray.remote
@functools.wraps(old_remote)
def new_remote(f, *args, **kwargs):
# type: (Callable[..., Any], *Any, **Any) -> Callable[..., Any]
if inspect.isclass(f):
# Ray Actors
# (https://docs.ray.io/en/latest/ray-core/actors.html)
# are not supported
# (Only Ray Tasks are supported)
return old_remote(f, *args, *kwargs)
def _f(*f_args, _tracing=None, **f_kwargs):
# type: (Any, Optional[dict[str, Any]], Any) -> Any
"""
Ray Worker
"""
_check_sentry_initialized()
transaction = sentry_sdk.continue_trace(
_tracing or {},
op=OP.QUEUE_TASK_RAY,
name=qualname_from_function(f),
origin=RayIntegration.origin,
source=TransactionSource.TASK,
)
with sentry_sdk.start_transaction(transaction) as transaction:
try:
result = f(*f_args, **f_kwargs)
transaction.set_status(SPANSTATUS.OK)
except Exception:
transaction.set_status(SPANSTATUS.INTERNAL_ERROR)
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)
return result
rv = old_remote(_f, *args, *kwargs)
old_remote_method = rv.remote
def _remote_method_with_header_propagation(*args, **kwargs):
# type: (*Any, **Any) -> Any
"""
Ray Client
"""
with sentry_sdk.start_span(
op=OP.QUEUE_SUBMIT_RAY,
name=qualname_from_function(f),
origin=RayIntegration.origin,
) as span:
tracing = {
k: v
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers()
}
try:
result = old_remote_method(*args, **kwargs, _tracing=tracing)
span.set_status(SPANSTATUS.OK)
except Exception:
span.set_status(SPANSTATUS.INTERNAL_ERROR)
exc_info = sys.exc_info()
_capture_exception(exc_info)
reraise(*exc_info)
return result
rv.remote = _remote_method_with_header_propagation
return rv
ray.remote = new_remote
def _capture_exception(exc_info, **kwargs):
# type: (ExcInfo, **Any) -> None
client = sentry_sdk.get_client()
event, hint = event_from_exception(
exc_info,
client_options=client.options,
mechanism={
"handled": False,
"type": RayIntegration.identifier,
},
)
sentry_sdk.capture_event(event, hint=hint)
class RayIntegration(Integration):
identifier = "ray"
origin = f"auto.queue.{identifier}"
@staticmethod
def setup_once():
# type: () -> None
version = package_version("ray")
_check_minimum_version(RayIntegration, version)
_patch_ray_remote()
|