File size: 8,678 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 |
import io
import logging
import os
import urllib.parse
import urllib.request
import urllib.error
import urllib3
import sys
from itertools import chain, product
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Any
from typing import Callable
from typing import Dict
from typing import Optional
from typing import Self
from sentry_sdk.utils import (
logger as sentry_logger,
env_to_bool,
capture_internal_exceptions,
)
from sentry_sdk.envelope import Envelope
logger = logging.getLogger("spotlight")
DEFAULT_SPOTLIGHT_URL = "http://localhost:8969/stream"
DJANGO_SPOTLIGHT_MIDDLEWARE_PATH = "sentry_sdk.spotlight.SpotlightMiddleware"
class SpotlightClient:
def __init__(self, url):
# type: (str) -> None
self.url = url
self.http = urllib3.PoolManager()
self.fails = 0
def capture_envelope(self, envelope):
# type: (Envelope) -> None
body = io.BytesIO()
envelope.serialize_into(body)
try:
req = self.http.request(
url=self.url,
body=body.getvalue(),
method="POST",
headers={
"Content-Type": "application/x-sentry-envelope",
},
)
req.close()
self.fails = 0
except Exception as e:
if self.fails < 2:
sentry_logger.warning(str(e))
self.fails += 1
elif self.fails == 2:
self.fails += 1
sentry_logger.warning(
"Looks like Spotlight is not running, will keep trying to send events but will not log errors."
)
# omitting self.fails += 1 in the `else:` case intentionally
# to avoid overflowing the variable if Spotlight never becomes reachable
try:
from django.utils.deprecation import MiddlewareMixin
from django.http import HttpResponseServerError, HttpResponse, HttpRequest
from django.conf import settings
SPOTLIGHT_JS_ENTRY_PATH = "/assets/main.js"
SPOTLIGHT_JS_SNIPPET_PATTERN = (
"<script>window.__spotlight = {{ initOptions: {{ sidecarUrl: '{spotlight_url}', fullPage: false }} }};</script>\n"
'<script type="module" crossorigin src="{spotlight_js_url}"></script>\n'
)
SPOTLIGHT_ERROR_PAGE_SNIPPET = (
'<html><base href="{spotlight_url}">\n'
'<script>window.__spotlight = {{ initOptions: {{ fullPage: true, startFrom: "/errors/{event_id}" }}}};</script>\n'
)
CHARSET_PREFIX = "charset="
BODY_TAG_NAME = "body"
BODY_CLOSE_TAG_POSSIBILITIES = tuple(
"</{}>".format("".join(chars))
for chars in product(*zip(BODY_TAG_NAME.upper(), BODY_TAG_NAME.lower()))
)
class SpotlightMiddleware(MiddlewareMixin): # type: ignore[misc]
_spotlight_script = None # type: Optional[str]
_spotlight_url = None # type: Optional[str]
def __init__(self, get_response):
# type: (Self, Callable[..., HttpResponse]) -> None
super().__init__(get_response)
import sentry_sdk.api
self.sentry_sdk = sentry_sdk.api
spotlight_client = self.sentry_sdk.get_client().spotlight
if spotlight_client is None:
sentry_logger.warning(
"Cannot find Spotlight client from SpotlightMiddleware, disabling the middleware."
)
return None
# Spotlight URL has a trailing `/stream` part at the end so split it off
self._spotlight_url = urllib.parse.urljoin(spotlight_client.url, "../")
@property
def spotlight_script(self):
# type: (Self) -> Optional[str]
if self._spotlight_url is not None and self._spotlight_script is None:
try:
spotlight_js_url = urllib.parse.urljoin(
self._spotlight_url, SPOTLIGHT_JS_ENTRY_PATH
)
req = urllib.request.Request(
spotlight_js_url,
method="HEAD",
)
urllib.request.urlopen(req)
self._spotlight_script = SPOTLIGHT_JS_SNIPPET_PATTERN.format(
spotlight_url=self._spotlight_url,
spotlight_js_url=spotlight_js_url,
)
except urllib.error.URLError as err:
sentry_logger.debug(
"Cannot get Spotlight JS to inject at %s. SpotlightMiddleware will not be very useful.",
spotlight_js_url,
exc_info=err,
)
return self._spotlight_script
def process_response(self, _request, response):
# type: (Self, HttpRequest, HttpResponse) -> Optional[HttpResponse]
content_type_header = tuple(
p.strip()
for p in response.headers.get("Content-Type", "").lower().split(";")
)
content_type = content_type_header[0]
if len(content_type_header) > 1 and content_type_header[1].startswith(
CHARSET_PREFIX
):
encoding = content_type_header[1][len(CHARSET_PREFIX) :]
else:
encoding = "utf-8"
if (
self.spotlight_script is not None
and not response.streaming
and content_type == "text/html"
):
content_length = len(response.content)
injection = self.spotlight_script.encode(encoding)
injection_site = next(
(
idx
for idx in (
response.content.rfind(body_variant.encode(encoding))
for body_variant in BODY_CLOSE_TAG_POSSIBILITIES
)
if idx > -1
),
content_length,
)
# This approach works even when we don't have a `</body>` tag
response.content = (
response.content[:injection_site]
+ injection
+ response.content[injection_site:]
)
if response.has_header("Content-Length"):
response.headers["Content-Length"] = content_length + len(injection)
return response
def process_exception(self, _request, exception):
# type: (Self, HttpRequest, Exception) -> Optional[HttpResponseServerError]
if not settings.DEBUG or not self._spotlight_url:
return None
try:
spotlight = (
urllib.request.urlopen(self._spotlight_url).read().decode("utf-8")
)
except urllib.error.URLError:
return None
else:
event_id = self.sentry_sdk.capture_exception(exception)
return HttpResponseServerError(
spotlight.replace(
"<html>",
SPOTLIGHT_ERROR_PAGE_SNIPPET.format(
spotlight_url=self._spotlight_url, event_id=event_id
),
)
)
except ImportError:
settings = None
def setup_spotlight(options):
# type: (Dict[str, Any]) -> Optional[SpotlightClient]
_handler = logging.StreamHandler(sys.stderr)
_handler.setFormatter(logging.Formatter(" [spotlight] %(levelname)s: %(message)s"))
logger.addHandler(_handler)
logger.setLevel(logging.INFO)
url = options.get("spotlight")
if url is True:
url = DEFAULT_SPOTLIGHT_URL
if not isinstance(url, str):
return None
with capture_internal_exceptions():
if (
settings is not None
and settings.DEBUG
and env_to_bool(os.environ.get("SENTRY_SPOTLIGHT_ON_ERROR", "1"))
and env_to_bool(os.environ.get("SENTRY_SPOTLIGHT_MIDDLEWARE", "1"))
):
middleware = settings.MIDDLEWARE
if DJANGO_SPOTLIGHT_MIDDLEWARE_PATH not in middleware:
settings.MIDDLEWARE = type(middleware)(
chain(middleware, (DJANGO_SPOTLIGHT_MIDDLEWARE_PATH,))
)
logger.info("Enabled Spotlight integration for Django")
client = SpotlightClient(url)
logger.info("Enabled Spotlight using sidecar at %s", url)
return client
|