File size: 5,247 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 |
import sentry_sdk
from sentry_sdk.consts import OP, SPANDATA
from sentry_sdk.integrations import _check_minimum_version, Integration, DidNotEnable
from sentry_sdk.tracing import Span
from sentry_sdk.scope import should_send_default_pii
from sentry_sdk.utils import capture_internal_exceptions, ensure_integration_enabled
from typing import TYPE_CHECKING, TypeVar
# Hack to get new Python features working in older versions
# without introducing a hard dependency on `typing_extensions`
# from: https://stackoverflow.com/a/71944042/300572
if TYPE_CHECKING:
from typing import ParamSpec, Callable
else:
# Fake ParamSpec
class ParamSpec:
def __init__(self, _):
self.args = None
self.kwargs = None
# Callable[anything] will return None
class _Callable:
def __getitem__(self, _):
return None
# Make instances
Callable = _Callable()
try:
import clickhouse_driver # type: ignore[import-not-found]
except ImportError:
raise DidNotEnable("clickhouse-driver not installed.")
class ClickhouseDriverIntegration(Integration):
identifier = "clickhouse_driver"
origin = f"auto.db.{identifier}"
@staticmethod
def setup_once() -> None:
_check_minimum_version(ClickhouseDriverIntegration, clickhouse_driver.VERSION)
# Every query is done using the Connection's `send_query` function
clickhouse_driver.connection.Connection.send_query = _wrap_start(
clickhouse_driver.connection.Connection.send_query
)
# If the query contains parameters then the send_data function is used to send those parameters to clickhouse
clickhouse_driver.client.Client.send_data = _wrap_send_data(
clickhouse_driver.client.Client.send_data
)
# Every query ends either with the Client's `receive_end_of_query` (no result expected)
# or its `receive_result` (result expected)
clickhouse_driver.client.Client.receive_end_of_query = _wrap_end(
clickhouse_driver.client.Client.receive_end_of_query
)
if hasattr(clickhouse_driver.client.Client, "receive_end_of_insert_query"):
# In 0.2.7, insert queries are handled separately via `receive_end_of_insert_query`
clickhouse_driver.client.Client.receive_end_of_insert_query = _wrap_end(
clickhouse_driver.client.Client.receive_end_of_insert_query
)
clickhouse_driver.client.Client.receive_result = _wrap_end(
clickhouse_driver.client.Client.receive_result
)
P = ParamSpec("P")
T = TypeVar("T")
def _wrap_start(f: Callable[P, T]) -> Callable[P, T]:
@ensure_integration_enabled(ClickhouseDriverIntegration, f)
def _inner(*args: P.args, **kwargs: P.kwargs) -> T:
connection = args[0]
query = args[1]
query_id = args[2] if len(args) > 2 else kwargs.get("query_id")
params = args[3] if len(args) > 3 else kwargs.get("params")
span = sentry_sdk.start_span(
op=OP.DB,
name=query,
origin=ClickhouseDriverIntegration.origin,
)
connection._sentry_span = span # type: ignore[attr-defined]
_set_db_data(span, connection)
span.set_data("query", query)
if query_id:
span.set_data("db.query_id", query_id)
if params and should_send_default_pii():
span.set_data("db.params", params)
# run the original code
ret = f(*args, **kwargs)
return ret
return _inner
def _wrap_end(f: Callable[P, T]) -> Callable[P, T]:
def _inner_end(*args: P.args, **kwargs: P.kwargs) -> T:
res = f(*args, **kwargs)
instance = args[0]
span = getattr(instance.connection, "_sentry_span", None) # type: ignore[attr-defined]
if span is not None:
if res is not None and should_send_default_pii():
span.set_data("db.result", res)
with capture_internal_exceptions():
span.scope.add_breadcrumb(
message=span._data.pop("query"), category="query", data=span._data
)
span.finish()
return res
return _inner_end
def _wrap_send_data(f: Callable[P, T]) -> Callable[P, T]:
def _inner_send_data(*args: P.args, **kwargs: P.kwargs) -> T:
instance = args[0] # type: clickhouse_driver.client.Client
data = args[2]
span = getattr(instance.connection, "_sentry_span", None)
if span is not None:
_set_db_data(span, instance.connection)
if should_send_default_pii():
db_params = span._data.get("db.params", [])
db_params.extend(data)
span.set_data("db.params", db_params)
return f(*args, **kwargs)
return _inner_send_data
def _set_db_data(
span: Span, connection: clickhouse_driver.connection.Connection
) -> None:
span.set_data(SPANDATA.DB_SYSTEM, "clickhouse")
span.set_data(SPANDATA.SERVER_ADDRESS, connection.host)
span.set_data(SPANDATA.SERVER_PORT, connection.port)
span.set_data(SPANDATA.DB_NAME, connection.database)
span.set_data(SPANDATA.DB_USER, connection.user)
|