|
import json |
|
|
|
import sentry_sdk |
|
from sentry_sdk.integrations import Integration |
|
from sentry_sdk.integrations._wsgi_common import request_body_within_bounds |
|
from sentry_sdk.utils import ( |
|
AnnotatedValue, |
|
capture_internal_exceptions, |
|
event_from_exception, |
|
) |
|
|
|
from dramatiq.broker import Broker |
|
from dramatiq.message import Message |
|
from dramatiq.middleware import Middleware, default_middleware |
|
from dramatiq.errors import Retry |
|
|
|
from typing import TYPE_CHECKING |
|
|
|
if TYPE_CHECKING: |
|
from typing import Any, Callable, Dict, Optional, Union |
|
from sentry_sdk._types import Event, Hint |
|
|
|
|
|
class DramatiqIntegration(Integration): |
|
""" |
|
Dramatiq integration for Sentry |
|
|
|
Please make sure that you call `sentry_sdk.init` *before* initializing |
|
your broker, as it monkey patches `Broker.__init__`. |
|
|
|
This integration was originally developed and maintained |
|
by https://github.com/jacobsvante and later donated to the Sentry |
|
project. |
|
""" |
|
|
|
identifier = "dramatiq" |
|
|
|
@staticmethod |
|
def setup_once(): |
|
|
|
_patch_dramatiq_broker() |
|
|
|
|
|
def _patch_dramatiq_broker(): |
|
|
|
original_broker__init__ = Broker.__init__ |
|
|
|
def sentry_patched_broker__init__(self, *args, **kw): |
|
|
|
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) |
|
|
|
try: |
|
middleware = kw.pop("middleware") |
|
except KeyError: |
|
|
|
|
|
|
|
if len(args) == 1: |
|
middleware = args[0] |
|
args = [] |
|
else: |
|
middleware = None |
|
|
|
if middleware is None: |
|
middleware = list(m() for m in default_middleware) |
|
else: |
|
middleware = list(middleware) |
|
|
|
if integration is not None: |
|
middleware = [m for m in middleware if not isinstance(m, SentryMiddleware)] |
|
middleware.insert(0, SentryMiddleware()) |
|
|
|
kw["middleware"] = middleware |
|
original_broker__init__(self, *args, **kw) |
|
|
|
Broker.__init__ = sentry_patched_broker__init__ |
|
|
|
|
|
class SentryMiddleware(Middleware): |
|
""" |
|
A Dramatiq middleware that automatically captures and sends |
|
exceptions to Sentry. |
|
|
|
This is automatically added to every instantiated broker via the |
|
DramatiqIntegration. |
|
""" |
|
|
|
def before_process_message(self, broker, message): |
|
|
|
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) |
|
if integration is None: |
|
return |
|
|
|
message._scope_manager = sentry_sdk.new_scope() |
|
message._scope_manager.__enter__() |
|
|
|
scope = sentry_sdk.get_current_scope() |
|
scope.set_transaction_name(message.actor_name) |
|
scope.set_extra("dramatiq_message_id", message.message_id) |
|
scope.add_event_processor(_make_message_event_processor(message, integration)) |
|
|
|
def after_process_message(self, broker, message, *, result=None, exception=None): |
|
|
|
integration = sentry_sdk.get_client().get_integration(DramatiqIntegration) |
|
if integration is None: |
|
return |
|
|
|
actor = broker.get_actor(message.actor_name) |
|
throws = message.options.get("throws") or actor.options.get("throws") |
|
|
|
try: |
|
if ( |
|
exception is not None |
|
and not (throws and isinstance(exception, throws)) |
|
and not isinstance(exception, Retry) |
|
): |
|
event, hint = event_from_exception( |
|
exception, |
|
client_options=sentry_sdk.get_client().options, |
|
mechanism={ |
|
"type": DramatiqIntegration.identifier, |
|
"handled": False, |
|
}, |
|
) |
|
sentry_sdk.capture_event(event, hint=hint) |
|
finally: |
|
message._scope_manager.__exit__(None, None, None) |
|
|
|
|
|
def _make_message_event_processor(message, integration): |
|
|
|
|
|
def inner(event, hint): |
|
|
|
with capture_internal_exceptions(): |
|
DramatiqMessageExtractor(message).extract_into_event(event) |
|
|
|
return event |
|
|
|
return inner |
|
|
|
|
|
class DramatiqMessageExtractor: |
|
def __init__(self, message): |
|
|
|
self.message_data = dict(message.asdict()) |
|
|
|
def content_length(self): |
|
|
|
return len(json.dumps(self.message_data)) |
|
|
|
def extract_into_event(self, event): |
|
|
|
client = sentry_sdk.get_client() |
|
if not client.is_active(): |
|
return |
|
|
|
contexts = event.setdefault("contexts", {}) |
|
request_info = contexts.setdefault("dramatiq", {}) |
|
request_info["type"] = "dramatiq" |
|
|
|
data = None |
|
if not request_body_within_bounds(client, self.content_length()): |
|
data = AnnotatedValue.removed_because_over_size_limit() |
|
else: |
|
data = self.message_data |
|
|
|
request_info["data"] = data |
|
|