|
import inspect |
|
import sys |
|
|
|
import sentry_sdk |
|
from sentry_sdk.consts import OP, SPANSTATUS |
|
from sentry_sdk.integrations import _check_minimum_version, DidNotEnable, Integration |
|
from sentry_sdk.tracing import TransactionSource |
|
from sentry_sdk.utils import ( |
|
event_from_exception, |
|
logger, |
|
package_version, |
|
qualname_from_function, |
|
reraise, |
|
) |
|
|
|
try: |
|
import ray |
|
except ImportError: |
|
raise DidNotEnable("Ray not installed.") |
|
import functools |
|
|
|
from typing import TYPE_CHECKING |
|
|
|
if TYPE_CHECKING: |
|
from collections.abc import Callable |
|
from typing import Any, Optional |
|
from sentry_sdk.utils import ExcInfo |
|
|
|
|
|
def _check_sentry_initialized(): |
|
|
|
if sentry_sdk.get_client().is_active(): |
|
return |
|
|
|
logger.debug( |
|
"[Tracing] Sentry not initialized in ray cluster worker, performance data will be discarded." |
|
) |
|
|
|
|
|
def _patch_ray_remote(): |
|
|
|
old_remote = ray.remote |
|
|
|
@functools.wraps(old_remote) |
|
def new_remote(f, *args, **kwargs): |
|
|
|
if inspect.isclass(f): |
|
|
|
|
|
|
|
|
|
return old_remote(f, *args, *kwargs) |
|
|
|
def _f(*f_args, _tracing=None, **f_kwargs): |
|
|
|
""" |
|
Ray Worker |
|
""" |
|
_check_sentry_initialized() |
|
|
|
transaction = sentry_sdk.continue_trace( |
|
_tracing or {}, |
|
op=OP.QUEUE_TASK_RAY, |
|
name=qualname_from_function(f), |
|
origin=RayIntegration.origin, |
|
source=TransactionSource.TASK, |
|
) |
|
|
|
with sentry_sdk.start_transaction(transaction) as transaction: |
|
try: |
|
result = f(*f_args, **f_kwargs) |
|
transaction.set_status(SPANSTATUS.OK) |
|
except Exception: |
|
transaction.set_status(SPANSTATUS.INTERNAL_ERROR) |
|
exc_info = sys.exc_info() |
|
_capture_exception(exc_info) |
|
reraise(*exc_info) |
|
|
|
return result |
|
|
|
rv = old_remote(_f, *args, *kwargs) |
|
old_remote_method = rv.remote |
|
|
|
def _remote_method_with_header_propagation(*args, **kwargs): |
|
|
|
""" |
|
Ray Client |
|
""" |
|
with sentry_sdk.start_span( |
|
op=OP.QUEUE_SUBMIT_RAY, |
|
name=qualname_from_function(f), |
|
origin=RayIntegration.origin, |
|
) as span: |
|
tracing = { |
|
k: v |
|
for k, v in sentry_sdk.get_current_scope().iter_trace_propagation_headers() |
|
} |
|
try: |
|
result = old_remote_method(*args, **kwargs, _tracing=tracing) |
|
span.set_status(SPANSTATUS.OK) |
|
except Exception: |
|
span.set_status(SPANSTATUS.INTERNAL_ERROR) |
|
exc_info = sys.exc_info() |
|
_capture_exception(exc_info) |
|
reraise(*exc_info) |
|
|
|
return result |
|
|
|
rv.remote = _remote_method_with_header_propagation |
|
|
|
return rv |
|
|
|
ray.remote = new_remote |
|
|
|
|
|
def _capture_exception(exc_info, **kwargs): |
|
|
|
client = sentry_sdk.get_client() |
|
|
|
event, hint = event_from_exception( |
|
exc_info, |
|
client_options=client.options, |
|
mechanism={ |
|
"handled": False, |
|
"type": RayIntegration.identifier, |
|
}, |
|
) |
|
sentry_sdk.capture_event(event, hint=hint) |
|
|
|
|
|
class RayIntegration(Integration): |
|
identifier = "ray" |
|
origin = f"auto.queue.{identifier}" |
|
|
|
@staticmethod |
|
def setup_once(): |
|
|
|
version = package_version("ray") |
|
_check_minimum_version(RayIntegration, version) |
|
|
|
_patch_ray_remote() |
|
|