File size: 5,085 Bytes
9c6594c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 |
import os
import random
import threading
from datetime import datetime, timezone
from typing import Optional, List, Callable, TYPE_CHECKING, Any
from sentry_sdk.utils import format_timestamp, safe_repr
from sentry_sdk.envelope import Envelope, Item, PayloadRef
if TYPE_CHECKING:
from sentry_sdk._types import Log
class LogBatcher:
MAX_LOGS_BEFORE_FLUSH = 100
FLUSH_WAIT_TIME = 5.0
def __init__(
self,
capture_func, # type: Callable[[Envelope], None]
):
# type: (...) -> None
self._log_buffer = [] # type: List[Log]
self._capture_func = capture_func
self._running = True
self._lock = threading.Lock()
self._flush_event = threading.Event() # type: threading.Event
self._flusher = None # type: Optional[threading.Thread]
self._flusher_pid = None # type: Optional[int]
def _ensure_thread(self):
# type: (...) -> bool
"""For forking processes we might need to restart this thread.
This ensures that our process actually has that thread running.
"""
if not self._running:
return False
pid = os.getpid()
if self._flusher_pid == pid:
return True
with self._lock:
# Recheck to make sure another thread didn't get here and start the
# the flusher in the meantime
if self._flusher_pid == pid:
return True
self._flusher_pid = pid
self._flusher = threading.Thread(target=self._flush_loop)
self._flusher.daemon = True
try:
self._flusher.start()
except RuntimeError:
# Unfortunately at this point the interpreter is in a state that no
# longer allows us to spawn a thread and we have to bail.
self._running = False
return False
return True
def _flush_loop(self):
# type: (...) -> None
while self._running:
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random())
self._flush_event.clear()
self._flush()
def add(
self,
log, # type: Log
):
# type: (...) -> None
if not self._ensure_thread() or self._flusher is None:
return None
with self._lock:
self._log_buffer.append(log)
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH:
self._flush_event.set()
def kill(self):
# type: (...) -> None
if self._flusher is None:
return
self._running = False
self._flush_event.set()
self._flusher = None
def flush(self):
# type: (...) -> None
self._flush()
@staticmethod
def _log_to_transport_format(log):
# type: (Log) -> Any
def format_attribute(val):
# type: (int | float | str | bool) -> Any
if isinstance(val, bool):
return {"value": val, "type": "boolean"}
if isinstance(val, int):
return {"value": val, "type": "integer"}
if isinstance(val, float):
return {"value": val, "type": "double"}
if isinstance(val, str):
return {"value": val, "type": "string"}
return {"value": safe_repr(val), "type": "string"}
if "sentry.severity_number" not in log["attributes"]:
log["attributes"]["sentry.severity_number"] = log["severity_number"]
if "sentry.severity_text" not in log["attributes"]:
log["attributes"]["sentry.severity_text"] = log["severity_text"]
res = {
"timestamp": int(log["time_unix_nano"]) / 1.0e9,
"trace_id": log.get("trace_id", "00000000-0000-0000-0000-000000000000"),
"level": str(log["severity_text"]),
"body": str(log["body"]),
"attributes": {
k: format_attribute(v) for (k, v) in log["attributes"].items()
},
}
return res
def _flush(self):
# type: (...) -> Optional[Envelope]
envelope = Envelope(
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))}
)
with self._lock:
if len(self._log_buffer) == 0:
return None
envelope.add_item(
Item(
type="log",
content_type="application/vnd.sentry.items.log+json",
headers={
"item_count": len(self._log_buffer),
},
payload=PayloadRef(
json={
"items": [
self._log_to_transport_format(log)
for log in self._log_buffer
]
}
),
)
)
self._log_buffer.clear()
self._capture_func(envelope)
return envelope
|