File size: 4,581 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 |
import ast
import sentry_sdk
from sentry_sdk import serializer
from sentry_sdk.integrations import Integration, DidNotEnable
from sentry_sdk.scope import add_global_event_processor
from sentry_sdk.utils import walk_exception_chain, iter_stacks
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from typing import Optional, Dict, Any, Tuple, List
from types import FrameType
from sentry_sdk._types import Event, Hint
try:
import executing
except ImportError:
raise DidNotEnable("executing is not installed")
try:
import pure_eval
except ImportError:
raise DidNotEnable("pure_eval is not installed")
try:
# Used implicitly, just testing it's available
import asttokens # noqa
except ImportError:
raise DidNotEnable("asttokens is not installed")
class PureEvalIntegration(Integration):
identifier = "pure_eval"
@staticmethod
def setup_once():
# type: () -> None
@add_global_event_processor
def add_executing_info(event, hint):
# type: (Event, Optional[Hint]) -> Optional[Event]
if sentry_sdk.get_client().get_integration(PureEvalIntegration) is None:
return event
if hint is None:
return event
exc_info = hint.get("exc_info", None)
if exc_info is None:
return event
exception = event.get("exception", None)
if exception is None:
return event
values = exception.get("values", None)
if values is None:
return event
for exception, (_exc_type, _exc_value, exc_tb) in zip(
reversed(values), walk_exception_chain(exc_info)
):
sentry_frames = [
frame
for frame in exception.get("stacktrace", {}).get("frames", [])
if frame.get("function")
]
tbs = list(iter_stacks(exc_tb))
if len(sentry_frames) != len(tbs):
continue
for sentry_frame, tb in zip(sentry_frames, tbs):
sentry_frame["vars"] = (
pure_eval_frame(tb.tb_frame) or sentry_frame["vars"]
)
return event
def pure_eval_frame(frame):
# type: (FrameType) -> Dict[str, Any]
source = executing.Source.for_frame(frame)
if not source.tree:
return {}
statements = source.statements_at_line(frame.f_lineno)
if not statements:
return {}
scope = stmt = list(statements)[0]
while True:
# Get the parent first in case the original statement is already
# a function definition, e.g. if we're calling a decorator
# In that case we still want the surrounding scope, not that function
scope = scope.parent
if isinstance(scope, (ast.FunctionDef, ast.ClassDef, ast.Module)):
break
evaluator = pure_eval.Evaluator.from_frame(frame)
expressions = evaluator.interesting_expressions_grouped(scope)
def closeness(expression):
# type: (Tuple[List[Any], Any]) -> Tuple[int, int]
# Prioritise expressions with a node closer to the statement executed
# without being after that statement
# A higher return value is better - the expression will appear
# earlier in the list of values and is less likely to be trimmed
nodes, _value = expression
def start(n):
# type: (ast.expr) -> Tuple[int, int]
return (n.lineno, n.col_offset)
nodes_before_stmt = [
node for node in nodes if start(node) < stmt.last_token.end # type: ignore
]
if nodes_before_stmt:
# The position of the last node before or in the statement
return max(start(node) for node in nodes_before_stmt)
else:
# The position of the first node after the statement
# Negative means it's always lower priority than nodes that come before
# Less negative means closer to the statement and higher priority
lineno, col_offset = min(start(node) for node in nodes)
return (-lineno, -col_offset)
# This adds the first_token and last_token attributes to nodes
atok = source.asttokens()
expressions.sort(key=closeness, reverse=True)
vars = {
atok.get_text(nodes[0]): value
for nodes, value in expressions[: serializer.MAX_DATABAG_BREADTH]
}
return serializer.serialize(vars, is_vars=True)
|