|
import os |
|
import random |
|
import threading |
|
from datetime import datetime, timezone |
|
from typing import Optional, List, Callable, TYPE_CHECKING, Any |
|
|
|
from sentry_sdk.utils import format_timestamp, safe_repr |
|
from sentry_sdk.envelope import Envelope, Item, PayloadRef |
|
|
|
if TYPE_CHECKING: |
|
from sentry_sdk._types import Log |
|
|
|
|
|
class LogBatcher: |
|
MAX_LOGS_BEFORE_FLUSH = 100 |
|
FLUSH_WAIT_TIME = 5.0 |
|
|
|
def __init__( |
|
self, |
|
capture_func, |
|
): |
|
|
|
self._log_buffer = [] |
|
self._capture_func = capture_func |
|
self._running = True |
|
self._lock = threading.Lock() |
|
|
|
self._flush_event = threading.Event() |
|
|
|
self._flusher = None |
|
self._flusher_pid = None |
|
|
|
def _ensure_thread(self): |
|
|
|
"""For forking processes we might need to restart this thread. |
|
This ensures that our process actually has that thread running. |
|
""" |
|
if not self._running: |
|
return False |
|
|
|
pid = os.getpid() |
|
if self._flusher_pid == pid: |
|
return True |
|
|
|
with self._lock: |
|
|
|
|
|
if self._flusher_pid == pid: |
|
return True |
|
|
|
self._flusher_pid = pid |
|
|
|
self._flusher = threading.Thread(target=self._flush_loop) |
|
self._flusher.daemon = True |
|
|
|
try: |
|
self._flusher.start() |
|
except RuntimeError: |
|
|
|
|
|
self._running = False |
|
return False |
|
|
|
return True |
|
|
|
def _flush_loop(self): |
|
|
|
while self._running: |
|
self._flush_event.wait(self.FLUSH_WAIT_TIME + random.random()) |
|
self._flush_event.clear() |
|
self._flush() |
|
|
|
def add( |
|
self, |
|
log, |
|
): |
|
|
|
if not self._ensure_thread() or self._flusher is None: |
|
return None |
|
|
|
with self._lock: |
|
self._log_buffer.append(log) |
|
if len(self._log_buffer) >= self.MAX_LOGS_BEFORE_FLUSH: |
|
self._flush_event.set() |
|
|
|
def kill(self): |
|
|
|
if self._flusher is None: |
|
return |
|
|
|
self._running = False |
|
self._flush_event.set() |
|
self._flusher = None |
|
|
|
def flush(self): |
|
|
|
self._flush() |
|
|
|
@staticmethod |
|
def _log_to_transport_format(log): |
|
|
|
def format_attribute(val): |
|
|
|
if isinstance(val, bool): |
|
return {"value": val, "type": "boolean"} |
|
if isinstance(val, int): |
|
return {"value": val, "type": "integer"} |
|
if isinstance(val, float): |
|
return {"value": val, "type": "double"} |
|
if isinstance(val, str): |
|
return {"value": val, "type": "string"} |
|
return {"value": safe_repr(val), "type": "string"} |
|
|
|
if "sentry.severity_number" not in log["attributes"]: |
|
log["attributes"]["sentry.severity_number"] = log["severity_number"] |
|
if "sentry.severity_text" not in log["attributes"]: |
|
log["attributes"]["sentry.severity_text"] = log["severity_text"] |
|
|
|
res = { |
|
"timestamp": int(log["time_unix_nano"]) / 1.0e9, |
|
"trace_id": log.get("trace_id", "00000000-0000-0000-0000-000000000000"), |
|
"level": str(log["severity_text"]), |
|
"body": str(log["body"]), |
|
"attributes": { |
|
k: format_attribute(v) for (k, v) in log["attributes"].items() |
|
}, |
|
} |
|
|
|
return res |
|
|
|
def _flush(self): |
|
|
|
|
|
envelope = Envelope( |
|
headers={"sent_at": format_timestamp(datetime.now(timezone.utc))} |
|
) |
|
with self._lock: |
|
if len(self._log_buffer) == 0: |
|
return None |
|
|
|
envelope.add_item( |
|
Item( |
|
type="log", |
|
content_type="application/vnd.sentry.items.log+json", |
|
headers={ |
|
"item_count": len(self._log_buffer), |
|
}, |
|
payload=PayloadRef( |
|
json={ |
|
"items": [ |
|
self._log_to_transport_format(log) |
|
for log in self._log_buffer |
|
] |
|
} |
|
), |
|
) |
|
) |
|
self._log_buffer.clear() |
|
|
|
self._capture_func(envelope) |
|
return envelope |
|
|