File size: 4,581 Bytes
9c6594c
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
import ast

import sentry_sdk
from sentry_sdk import serializer
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.scope import add_global_event_processor
from sentry_sdk.utils import walk_exception_chain, iter_stacks

from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from typing import Optional, Dict, Any, Tuple, List
    from types import FrameType

    from sentry_sdk._types import Event, Hint

try:
    import executing
except ImportError:
    raise DidNotEnable("executing is not installed")

try:
    import pure_eval
except ImportError:
    raise DidNotEnable("pure_eval is not installed")

try:
    # Used implicitly, just testing it's available
    import asttokens  # noqa
except ImportError:
    raise DidNotEnable("asttokens is not installed")


class PureEvalIntegration(Integration):
    identifier = "pure_eval"

    @staticmethod
    def setup_once():
        # type: () -> None

        @add_global_event_processor
        def add_executing_info(event, hint):
            # type: (Event, Optional[Hint]) -> Optional[Event]
            if sentry_sdk.get_client().get_integration(PureEvalIntegration) is None:
                return event

            if hint is None:
                return event

            exc_info = hint.get("exc_info", None)

            if exc_info is None:
                return event

            exception = event.get("exception", None)

            if exception is None:
                return event

            values = exception.get("values", None)

            if values is None:
                return event

            for exception, (_exc_type, _exc_value, exc_tb) in zip(
                reversed(values), walk_exception_chain(exc_info)
            ):
                sentry_frames = [
                    frame
                    for frame in exception.get("stacktrace", {}).get("frames", [])
                    if frame.get("function")
                ]
                tbs = list(iter_stacks(exc_tb))
                if len(sentry_frames) != len(tbs):
                    continue

                for sentry_frame, tb in zip(sentry_frames, tbs):
                    sentry_frame["vars"] = (
                        pure_eval_frame(tb.tb_frame) or sentry_frame["vars"]
                    )
            return event


def pure_eval_frame(frame):
    # type: (FrameType) -> Dict[str, Any]
    source = executing.Source.for_frame(frame)
    if not source.tree:
        return {}

    statements = source.statements_at_line(frame.f_lineno)
    if not statements:
        return {}

    scope = stmt = list(statements)[0]
    while True:
        # Get the parent first in case the original statement is already
        # a function definition, e.g. if we're calling a decorator
        # In that case we still want the surrounding scope, not that function
        scope = scope.parent
        if isinstance(scope, (ast.FunctionDef, ast.ClassDef, ast.Module)):
            break

    evaluator = pure_eval.Evaluator.from_frame(frame)
    expressions = evaluator.interesting_expressions_grouped(scope)

    def closeness(expression):
        # type: (Tuple[List[Any], Any]) -> Tuple[int, int]
        # Prioritise expressions with a node closer to the statement executed
        # without being after that statement
        # A higher return value is better - the expression will appear
        # earlier in the list of values and is less likely to be trimmed
        nodes, _value = expression

        def start(n):
            # type: (ast.expr) -> Tuple[int, int]
            return (n.lineno, n.col_offset)

        nodes_before_stmt = [
            node for node in nodes if start(node) < stmt.last_token.end  # type: ignore
        ]
        if nodes_before_stmt:
            # The position of the last node before or in the statement
            return max(start(node) for node in nodes_before_stmt)
        else:
            # The position of the first node after the statement
            # Negative means it's always lower priority than nodes that come before
            # Less negative means closer to the statement and higher priority
            lineno, col_offset = min(start(node) for node in nodes)
            return (-lineno, -col_offset)

    # This adds the first_token and last_token attributes to nodes
    atok = source.asttokens()

    expressions.sort(key=closeness, reverse=True)
    vars = {
        atok.get_text(nodes[0]): value
        for nodes, value in expressions[: serializer.MAX_DATABAG_BREADTH]
    }
    return serializer.serialize(vars, is_vars=True)