File size: 3,382 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
import sentry_sdk
from sentry_sdk.consts import OP
from sentry_sdk.integrations import DidNotEnable
from sentry_sdk.integrations.grpc.consts import SPAN_ORIGIN
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any, Callable, Iterator, Iterable, Union
try:
import grpc
from grpc import ClientCallDetails, Call
from grpc._interceptor import _UnaryOutcome
from grpc.aio._interceptor import UnaryStreamCall
from google.protobuf.message import Message
except ImportError:
raise DidNotEnable("grpcio is not installed")
class ClientInterceptor(
grpc.UnaryUnaryClientInterceptor, grpc.UnaryStreamClientInterceptor # type: ignore
):
_is_intercepted = False
def intercept_unary_unary(self, continuation, client_call_details, request):
# type: (ClientInterceptor, Callable[[ClientCallDetails, Message], _UnaryOutcome], ClientCallDetails, Message) -> _UnaryOutcome
method = client_call_details.method
with sentry_sdk.start_span(
op=OP.GRPC_CLIENT,
name="unary unary call to %s" % method,
origin=SPAN_ORIGIN,
) as span:
span.set_data("type", "unary unary")
span.set_data("method", method)
client_call_details = self._update_client_call_details_metadata_from_scope(
client_call_details
)
response = continuation(client_call_details, request)
span.set_data("code", response.code().name)
return response
def intercept_unary_stream(self, continuation, client_call_details, request):
# type: (ClientInterceptor, Callable[[ClientCallDetails, Message], Union[Iterable[Any], UnaryStreamCall]], ClientCallDetails, Message) -> Union[Iterator[Message], Call]
method = client_call_details.method
with sentry_sdk.start_span(
op=OP.GRPC_CLIENT,
name="unary stream call to %s" % method,
origin=SPAN_ORIGIN,
) as span:
span.set_data("type", "unary stream")
span.set_data("method", method)
client_call_details = self._update_client_call_details_metadata_from_scope(
client_call_details
)
response = continuation(
client_call_details, request
) # type: UnaryStreamCall
# Setting code on unary-stream leads to execution getting stuck
# span.set_data("code", response.code().name)
return response
@staticmethod
def _update_client_call_details_metadata_from_scope(client_call_details):
# type: (ClientCallDetails) -> ClientCallDetails
metadata = (
list(client_call_details.metadata) if client_call_details.metadata else []
)
for (
key,
value,
) in sentry_sdk.get_current_scope().iter_trace_propagation_headers():
metadata.append((key, value))
client_call_details = grpc._interceptor._ClientCallDetails(
method=client_call_details.method,
timeout=client_call_details.timeout,
metadata=metadata,
credentials=client_call_details.credentials,
wait_for_ready=client_call_details.wait_for_ready,
compression=client_call_details.compression,
)
return client_call_details
|