File size: 8,678 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
import io
import logging
import os
import urllib.parse
import urllib.request
import urllib.error
import urllib3
import sys

from itertools import chain, product

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Any
    from typing import Callable
    from typing import Dict
    from typing import Optional
    from typing import Self

from sentry_sdk.utils import (
    logger as sentry_logger,
    env_to_bool,
    capture_internal_exceptions,
)
from sentry_sdk.envelope import Envelope


logger = logging.getLogger("spotlight")


DEFAULT_SPOTLIGHT_URL = "http://localhost:8969/stream"
DJANGO_SPOTLIGHT_MIDDLEWARE_PATH = "sentry_sdk.spotlight.SpotlightMiddleware"


class SpotlightClient:
    def __init__(self, url):
        # type: (str) -> None
        self.url = url
        self.http = urllib3.PoolManager()
        self.fails = 0

    def capture_envelope(self, envelope):
        # type: (Envelope) -> None
        body = io.BytesIO()
        envelope.serialize_into(body)
        try:
            req = self.http.request(
                url=self.url,
                body=body.getvalue(),
                method="POST",
                headers={
                    "Content-Type": "application/x-sentry-envelope",
                },
            )
            req.close()
            self.fails = 0
        except Exception as e:
            if self.fails < 2:
                sentry_logger.warning(str(e))
                self.fails += 1
            elif self.fails == 2:
                self.fails += 1
                sentry_logger.warning(
                    "Looks like Spotlight is not running, will keep trying to send events but will not log errors."
                )
            # omitting self.fails += 1 in the `else:` case intentionally
            # to avoid overflowing the variable if Spotlight never becomes reachable


try:
    from django.utils.deprecation import MiddlewareMixin
    from django.http import HttpResponseServerError, HttpResponse, HttpRequest
    from django.conf import settings

    SPOTLIGHT_JS_ENTRY_PATH = "/assets/main.js"
    SPOTLIGHT_JS_SNIPPET_PATTERN = (
        "<script>window.__spotlight = {{ initOptions: {{ sidecarUrl: '{spotlight_url}', fullPage: false }} }};</script>\n"
        '<script type="module" crossorigin src="{spotlight_js_url}"></script>\n'
    )
    SPOTLIGHT_ERROR_PAGE_SNIPPET = (
        '<html><base href="{spotlight_url}">\n'
        '<script>window.__spotlight = {{ initOptions: {{ fullPage: true, startFrom: "/errors/{event_id}" }}}};</script>\n'
    )
    CHARSET_PREFIX = "charset="
    BODY_TAG_NAME = "body"
    BODY_CLOSE_TAG_POSSIBILITIES = tuple(
        "</{}>".format("".join(chars))
        for chars in product(*zip(BODY_TAG_NAME.upper(), BODY_TAG_NAME.lower()))
    )

    class SpotlightMiddleware(MiddlewareMixin):  # type: ignore[misc]
        _spotlight_script = None  # type: Optional[str]
        _spotlight_url = None  # type: Optional[str]

        def __init__(self, get_response):
            # type: (Self, Callable[..., HttpResponse]) -> None
            super().__init__(get_response)

            import sentry_sdk.api

            self.sentry_sdk = sentry_sdk.api

            spotlight_client = self.sentry_sdk.get_client().spotlight
            if spotlight_client is None:
                sentry_logger.warning(
                    "Cannot find Spotlight client from SpotlightMiddleware, disabling the middleware."
                )
                return None
            # Spotlight URL has a trailing `/stream` part at the end so split it off
            self._spotlight_url = urllib.parse.urljoin(spotlight_client.url, "../")

        @property
        def spotlight_script(self):
            # type: (Self) -> Optional[str]
            if self._spotlight_url is not None and self._spotlight_script is None:
                try:
                    spotlight_js_url = urllib.parse.urljoin(
                        self._spotlight_url, SPOTLIGHT_JS_ENTRY_PATH
                    )
                    req = urllib.request.Request(
                        spotlight_js_url,
                        method="HEAD",
                    )
                    urllib.request.urlopen(req)
                    self._spotlight_script = SPOTLIGHT_JS_SNIPPET_PATTERN.format(
                        spotlight_url=self._spotlight_url,
                        spotlight_js_url=spotlight_js_url,
                    )
                except urllib.error.URLError as err:
                    sentry_logger.debug(
                        "Cannot get Spotlight JS to inject at %s. SpotlightMiddleware will not be very useful.",
                        spotlight_js_url,
                        exc_info=err,
                    )

            return self._spotlight_script

        def process_response(self, _request, response):
            # type: (Self, HttpRequest, HttpResponse) -> Optional[HttpResponse]
            content_type_header = tuple(
                p.strip()
                for p in response.headers.get("Content-Type", "").lower().split(";")
            )
            content_type = content_type_header[0]
            if len(content_type_header) > 1 and content_type_header[1].startswith(
                CHARSET_PREFIX
            ):
                encoding = content_type_header[1][len(CHARSET_PREFIX) :]
            else:
                encoding = "utf-8"

            if (
                self.spotlight_script is not None
                and not response.streaming
                and content_type == "text/html"
            ):
                content_length = len(response.content)
                injection = self.spotlight_script.encode(encoding)
                injection_site = next(
                    (
                        idx
                        for idx in (
                            response.content.rfind(body_variant.encode(encoding))
                            for body_variant in BODY_CLOSE_TAG_POSSIBILITIES
                        )
                        if idx > -1
                    ),
                    content_length,
                )

                # This approach works even when we don't have a `</body>` tag
                response.content = (
                    response.content[:injection_site]
                    + injection
                    + response.content[injection_site:]
                )

                if response.has_header("Content-Length"):
                    response.headers["Content-Length"] = content_length + len(injection)

            return response

        def process_exception(self, _request, exception):
            # type: (Self, HttpRequest, Exception) -> Optional[HttpResponseServerError]
            if not settings.DEBUG or not self._spotlight_url:
                return None

            try:
                spotlight = (
                    urllib.request.urlopen(self._spotlight_url).read().decode("utf-8")
                )
            except urllib.error.URLError:
                return None
            else:
                event_id = self.sentry_sdk.capture_exception(exception)
                return HttpResponseServerError(
                    spotlight.replace(
                        "<html>",
                        SPOTLIGHT_ERROR_PAGE_SNIPPET.format(
                            spotlight_url=self._spotlight_url, event_id=event_id
                        ),
                    )
                )

except ImportError:
    settings = None


def setup_spotlight(options):
    # type: (Dict[str, Any]) -> Optional[SpotlightClient]
    _handler = logging.StreamHandler(sys.stderr)
    _handler.setFormatter(logging.Formatter(" [spotlight] %(levelname)s: %(message)s"))
    logger.addHandler(_handler)
    logger.setLevel(logging.INFO)

    url = options.get("spotlight")

    if url is True:
        url = DEFAULT_SPOTLIGHT_URL

    if not isinstance(url, str):
        return None

    with capture_internal_exceptions():
        if (
            settings is not None
            and settings.DEBUG
            and env_to_bool(os.environ.get("SENTRY_SPOTLIGHT_ON_ERROR", "1"))
            and env_to_bool(os.environ.get("SENTRY_SPOTLIGHT_MIDDLEWARE", "1"))
        ):
            middleware = settings.MIDDLEWARE
            if DJANGO_SPOTLIGHT_MIDDLEWARE_PATH not in middleware:
                settings.MIDDLEWARE = type(middleware)(
                    chain(middleware, (DJANGO_SPOTLIGHT_MIDDLEWARE_PATH,))
                )
                logger.info("Enabled Spotlight integration for Django")

    client = SpotlightClient(url)
    logger.info("Enabled Spotlight using sidecar at %s", url)

    return client