|
import io |
|
import os |
|
import random |
|
import re |
|
import sys |
|
import threading |
|
import time |
|
import warnings |
|
import zlib |
|
from abc import ABC, abstractmethod |
|
from contextlib import contextmanager |
|
from datetime import datetime, timezone |
|
from functools import wraps, partial |
|
|
|
import sentry_sdk |
|
from sentry_sdk.utils import ( |
|
ContextVar, |
|
now, |
|
nanosecond_time, |
|
to_timestamp, |
|
serialize_frame, |
|
json_dumps, |
|
) |
|
from sentry_sdk.envelope import Envelope, Item |
|
from sentry_sdk.tracing import TransactionSource |
|
|
|
from typing import TYPE_CHECKING |
|
|
|
if TYPE_CHECKING: |
|
from typing import Any |
|
from typing import Callable |
|
from typing import Dict |
|
from typing import Generator |
|
from typing import Iterable |
|
from typing import List |
|
from typing import Optional |
|
from typing import Set |
|
from typing import Tuple |
|
from typing import Union |
|
|
|
from sentry_sdk._types import BucketKey |
|
from sentry_sdk._types import DurationUnit |
|
from sentry_sdk._types import FlushedMetricValue |
|
from sentry_sdk._types import MeasurementUnit |
|
from sentry_sdk._types import MetricMetaKey |
|
from sentry_sdk._types import MetricTagValue |
|
from sentry_sdk._types import MetricTags |
|
from sentry_sdk._types import MetricTagsInternal |
|
from sentry_sdk._types import MetricType |
|
from sentry_sdk._types import MetricValue |
|
|
|
|
|
warnings.warn( |
|
"The sentry_sdk.metrics module is deprecated and will be removed in the next major release. " |
|
"Sentry will reject all metrics sent after October 7, 2024. " |
|
"Learn more: https://sentry.zendesk.com/hc/en-us/articles/26369339769883-Upcoming-API-Changes-to-Metrics", |
|
DeprecationWarning, |
|
stacklevel=2, |
|
) |
|
|
|
_in_metrics = ContextVar("in_metrics", default=False) |
|
_set = set |
|
|
|
GOOD_TRANSACTION_SOURCES = frozenset( |
|
[ |
|
TransactionSource.ROUTE, |
|
TransactionSource.VIEW, |
|
TransactionSource.COMPONENT, |
|
TransactionSource.TASK, |
|
] |
|
) |
|
|
|
_sanitize_unit = partial(re.compile(r"[^a-zA-Z0-9_]+").sub, "") |
|
_sanitize_metric_key = partial(re.compile(r"[^a-zA-Z0-9_\-.]+").sub, "_") |
|
_sanitize_tag_key = partial(re.compile(r"[^a-zA-Z0-9_\-.\/]+").sub, "") |
|
|
|
|
|
def _sanitize_tag_value(value): |
|
|
|
table = str.maketrans( |
|
{ |
|
"\n": "\\n", |
|
"\r": "\\r", |
|
"\t": "\\t", |
|
"\\": "\\\\", |
|
"|": "\\u{7c}", |
|
",": "\\u{2c}", |
|
} |
|
) |
|
return value.translate(table) |
|
|
|
|
|
def get_code_location(stacklevel): |
|
|
|
try: |
|
frm = sys._getframe(stacklevel) |
|
except Exception: |
|
return None |
|
|
|
return serialize_frame( |
|
frm, include_local_variables=False, include_source_context=True |
|
) |
|
|
|
|
|
@contextmanager |
|
def recursion_protection(): |
|
|
|
"""Enters recursion protection and returns the old flag.""" |
|
old_in_metrics = _in_metrics.get() |
|
_in_metrics.set(True) |
|
try: |
|
yield old_in_metrics |
|
finally: |
|
_in_metrics.set(old_in_metrics) |
|
|
|
|
|
def metrics_noop(func): |
|
|
|
"""Convenient decorator that uses `recursion_protection` to |
|
make a function a noop. |
|
""" |
|
|
|
@wraps(func) |
|
def new_func(*args, **kwargs): |
|
|
|
with recursion_protection() as in_metrics: |
|
if not in_metrics: |
|
return func(*args, **kwargs) |
|
|
|
return new_func |
|
|
|
|
|
class Metric(ABC): |
|
__slots__ = () |
|
|
|
@abstractmethod |
|
def __init__(self, first): |
|
|
|
pass |
|
|
|
@property |
|
@abstractmethod |
|
def weight(self): |
|
|
|
pass |
|
|
|
@abstractmethod |
|
def add(self, value): |
|
|
|
pass |
|
|
|
@abstractmethod |
|
def serialize_value(self): |
|
|
|
pass |
|
|
|
|
|
class CounterMetric(Metric): |
|
__slots__ = ("value",) |
|
|
|
def __init__( |
|
self, first |
|
): |
|
|
|
self.value = float(first) |
|
|
|
@property |
|
def weight(self): |
|
|
|
return 1 |
|
|
|
def add( |
|
self, value |
|
): |
|
|
|
self.value += float(value) |
|
|
|
def serialize_value(self): |
|
|
|
return (self.value,) |
|
|
|
|
|
class GaugeMetric(Metric): |
|
__slots__ = ( |
|
"last", |
|
"min", |
|
"max", |
|
"sum", |
|
"count", |
|
) |
|
|
|
def __init__( |
|
self, first |
|
): |
|
|
|
first = float(first) |
|
self.last = first |
|
self.min = first |
|
self.max = first |
|
self.sum = first |
|
self.count = 1 |
|
|
|
@property |
|
def weight(self): |
|
|
|
|
|
return 5 |
|
|
|
def add( |
|
self, value |
|
): |
|
|
|
value = float(value) |
|
self.last = value |
|
self.min = min(self.min, value) |
|
self.max = max(self.max, value) |
|
self.sum += value |
|
self.count += 1 |
|
|
|
def serialize_value(self): |
|
|
|
return ( |
|
self.last, |
|
self.min, |
|
self.max, |
|
self.sum, |
|
self.count, |
|
) |
|
|
|
|
|
class DistributionMetric(Metric): |
|
__slots__ = ("value",) |
|
|
|
def __init__( |
|
self, first |
|
): |
|
|
|
self.value = [float(first)] |
|
|
|
@property |
|
def weight(self): |
|
|
|
return len(self.value) |
|
|
|
def add( |
|
self, value |
|
): |
|
|
|
self.value.append(float(value)) |
|
|
|
def serialize_value(self): |
|
|
|
return self.value |
|
|
|
|
|
class SetMetric(Metric): |
|
__slots__ = ("value",) |
|
|
|
def __init__( |
|
self, first |
|
): |
|
|
|
self.value = {first} |
|
|
|
@property |
|
def weight(self): |
|
|
|
return len(self.value) |
|
|
|
def add( |
|
self, value |
|
): |
|
|
|
self.value.add(value) |
|
|
|
def serialize_value(self): |
|
|
|
def _hash(x): |
|
|
|
if isinstance(x, str): |
|
return zlib.crc32(x.encode("utf-8")) & 0xFFFFFFFF |
|
return int(x) |
|
|
|
return (_hash(value) for value in self.value) |
|
|
|
|
|
def _encode_metrics(flushable_buckets): |
|
|
|
out = io.BytesIO() |
|
_write = out.write |
|
|
|
|
|
|
|
|
|
|
|
|
|
for timestamp, buckets in flushable_buckets: |
|
for bucket_key, metric in buckets.items(): |
|
metric_type, metric_name, metric_unit, metric_tags = bucket_key |
|
metric_name = _sanitize_metric_key(metric_name) |
|
metric_unit = _sanitize_unit(metric_unit) |
|
_write(metric_name.encode("utf-8")) |
|
_write(b"@") |
|
_write(metric_unit.encode("utf-8")) |
|
|
|
for serialized_value in metric.serialize_value(): |
|
_write(b":") |
|
_write(str(serialized_value).encode("utf-8")) |
|
|
|
_write(b"|") |
|
_write(metric_type.encode("ascii")) |
|
|
|
if metric_tags: |
|
_write(b"|#") |
|
first = True |
|
for tag_key, tag_value in metric_tags: |
|
tag_key = _sanitize_tag_key(tag_key) |
|
if not tag_key: |
|
continue |
|
if first: |
|
first = False |
|
else: |
|
_write(b",") |
|
_write(tag_key.encode("utf-8")) |
|
_write(b":") |
|
_write(_sanitize_tag_value(tag_value).encode("utf-8")) |
|
|
|
_write(b"|T") |
|
_write(str(timestamp).encode("ascii")) |
|
_write(b"\n") |
|
|
|
return out.getvalue() |
|
|
|
|
|
def _encode_locations(timestamp, code_locations): |
|
|
|
mapping = {} |
|
|
|
for key, loc in code_locations: |
|
metric_type, name, unit = key |
|
mri = "{}:{}@{}".format( |
|
metric_type, _sanitize_metric_key(name), _sanitize_unit(unit) |
|
) |
|
|
|
loc["type"] = "location" |
|
mapping.setdefault(mri, []).append(loc) |
|
|
|
return json_dumps({"timestamp": timestamp, "mapping": mapping}) |
|
|
|
|
|
METRIC_TYPES = { |
|
"c": CounterMetric, |
|
"g": GaugeMetric, |
|
"d": DistributionMetric, |
|
"s": SetMetric, |
|
} |
|
|
|
|
|
TIMING_FUNCTIONS = { |
|
"nanosecond": nanosecond_time, |
|
"microsecond": lambda: nanosecond_time() / 1000.0, |
|
"millisecond": lambda: nanosecond_time() / 1000000.0, |
|
"second": now, |
|
"minute": lambda: now() / 60.0, |
|
"hour": lambda: now() / 3600.0, |
|
"day": lambda: now() / 3600.0 / 24.0, |
|
"week": lambda: now() / 3600.0 / 24.0 / 7.0, |
|
} |
|
|
|
|
|
class LocalAggregator: |
|
__slots__ = ("_measurements",) |
|
|
|
def __init__(self): |
|
|
|
self._measurements = ( |
|
{} |
|
) |
|
|
|
def add( |
|
self, |
|
ty, |
|
key, |
|
value, |
|
unit, |
|
tags, |
|
): |
|
|
|
export_key = "%s:%s@%s" % (ty, key, unit) |
|
bucket_key = (export_key, tags) |
|
|
|
old = self._measurements.get(bucket_key) |
|
if old is not None: |
|
v_min, v_max, v_count, v_sum = old |
|
v_min = min(v_min, value) |
|
v_max = max(v_max, value) |
|
v_count += 1 |
|
v_sum += value |
|
else: |
|
v_min = v_max = v_sum = value |
|
v_count = 1 |
|
self._measurements[bucket_key] = (v_min, v_max, v_count, v_sum) |
|
|
|
def to_json(self): |
|
|
|
rv = {} |
|
for (export_key, tags), ( |
|
v_min, |
|
v_max, |
|
v_count, |
|
v_sum, |
|
) in self._measurements.items(): |
|
rv.setdefault(export_key, []).append( |
|
{ |
|
"tags": _tags_to_dict(tags), |
|
"min": v_min, |
|
"max": v_max, |
|
"count": v_count, |
|
"sum": v_sum, |
|
} |
|
) |
|
return rv |
|
|
|
|
|
class MetricsAggregator: |
|
ROLLUP_IN_SECONDS = 10.0 |
|
MAX_WEIGHT = 100000 |
|
FLUSHER_SLEEP_TIME = 5.0 |
|
|
|
def __init__( |
|
self, |
|
capture_func, |
|
enable_code_locations=False, |
|
): |
|
|
|
self.buckets = {} |
|
self._enable_code_locations = enable_code_locations |
|
self._seen_locations = _set() |
|
self._pending_locations = {} |
|
self._buckets_total_weight = 0 |
|
self._capture_func = capture_func |
|
self._running = True |
|
self._lock = threading.Lock() |
|
|
|
self._flush_event = threading.Event() |
|
self._force_flush = False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
self._flush_shift = random.random() * self.ROLLUP_IN_SECONDS |
|
|
|
self._flusher = None |
|
self._flusher_pid = None |
|
|
|
def _ensure_thread(self): |
|
|
|
"""For forking processes we might need to restart this thread. |
|
This ensures that our process actually has that thread running. |
|
""" |
|
if not self._running: |
|
return False |
|
|
|
pid = os.getpid() |
|
if self._flusher_pid == pid: |
|
return True |
|
|
|
with self._lock: |
|
|
|
|
|
if self._flusher_pid == pid: |
|
return True |
|
|
|
self._flusher_pid = pid |
|
|
|
self._flusher = threading.Thread(target=self._flush_loop) |
|
self._flusher.daemon = True |
|
|
|
try: |
|
self._flusher.start() |
|
except RuntimeError: |
|
|
|
|
|
self._running = False |
|
return False |
|
|
|
return True |
|
|
|
def _flush_loop(self): |
|
|
|
_in_metrics.set(True) |
|
while self._running or self._force_flush: |
|
if self._running: |
|
self._flush_event.wait(self.FLUSHER_SLEEP_TIME) |
|
self._flush() |
|
|
|
def _flush(self): |
|
|
|
self._emit(self._flushable_buckets(), self._flushable_locations()) |
|
|
|
def _flushable_buckets(self): |
|
|
|
with self._lock: |
|
force_flush = self._force_flush |
|
cutoff = time.time() - self.ROLLUP_IN_SECONDS - self._flush_shift |
|
flushable_buckets = () |
|
weight_to_remove = 0 |
|
|
|
if force_flush: |
|
flushable_buckets = self.buckets.items() |
|
self.buckets = {} |
|
self._buckets_total_weight = 0 |
|
self._force_flush = False |
|
else: |
|
flushable_buckets = [] |
|
for buckets_timestamp, buckets in self.buckets.items(): |
|
|
|
if buckets_timestamp <= cutoff: |
|
flushable_buckets.append((buckets_timestamp, buckets)) |
|
|
|
|
|
for buckets_timestamp, buckets in flushable_buckets: |
|
for metric in buckets.values(): |
|
weight_to_remove += metric.weight |
|
del self.buckets[buckets_timestamp] |
|
|
|
self._buckets_total_weight -= weight_to_remove |
|
|
|
return flushable_buckets |
|
|
|
def _flushable_locations(self): |
|
|
|
with self._lock: |
|
locations = self._pending_locations |
|
self._pending_locations = {} |
|
return locations |
|
|
|
@metrics_noop |
|
def add( |
|
self, |
|
ty, |
|
key, |
|
value, |
|
unit, |
|
tags, |
|
timestamp=None, |
|
local_aggregator=None, |
|
stacklevel=0, |
|
): |
|
|
|
if not self._ensure_thread() or self._flusher is None: |
|
return None |
|
|
|
if timestamp is None: |
|
timestamp = time.time() |
|
elif isinstance(timestamp, datetime): |
|
timestamp = to_timestamp(timestamp) |
|
|
|
bucket_timestamp = int( |
|
(timestamp // self.ROLLUP_IN_SECONDS) * self.ROLLUP_IN_SECONDS |
|
) |
|
serialized_tags = _serialize_tags(tags) |
|
bucket_key = ( |
|
ty, |
|
key, |
|
unit, |
|
serialized_tags, |
|
) |
|
|
|
with self._lock: |
|
local_buckets = self.buckets.setdefault(bucket_timestamp, {}) |
|
metric = local_buckets.get(bucket_key) |
|
if metric is not None: |
|
previous_weight = metric.weight |
|
metric.add(value) |
|
else: |
|
metric = local_buckets[bucket_key] = METRIC_TYPES[ty](value) |
|
previous_weight = 0 |
|
|
|
added = metric.weight - previous_weight |
|
|
|
if stacklevel is not None: |
|
self.record_code_location(ty, key, unit, stacklevel + 2, timestamp) |
|
|
|
|
|
self._consider_force_flush() |
|
|
|
|
|
|
|
if local_aggregator is not None: |
|
local_value = float(added if ty == "s" else value) |
|
local_aggregator.add(ty, key, local_value, unit, serialized_tags) |
|
|
|
def record_code_location( |
|
self, |
|
ty, |
|
key, |
|
unit, |
|
stacklevel, |
|
timestamp=None, |
|
): |
|
|
|
if not self._enable_code_locations: |
|
return |
|
if timestamp is None: |
|
timestamp = time.time() |
|
meta_key = (ty, key, unit) |
|
start_of_day = datetime.fromtimestamp(timestamp, timezone.utc).replace( |
|
hour=0, minute=0, second=0, microsecond=0, tzinfo=None |
|
) |
|
start_of_day = int(to_timestamp(start_of_day)) |
|
|
|
if (start_of_day, meta_key) not in self._seen_locations: |
|
self._seen_locations.add((start_of_day, meta_key)) |
|
loc = get_code_location(stacklevel + 3) |
|
if loc is not None: |
|
|
|
|
|
self._pending_locations.setdefault(start_of_day, []).append( |
|
(meta_key, loc) |
|
) |
|
|
|
@metrics_noop |
|
def need_code_location( |
|
self, |
|
ty, |
|
key, |
|
unit, |
|
timestamp, |
|
): |
|
|
|
if self._enable_code_locations: |
|
return False |
|
meta_key = (ty, key, unit) |
|
start_of_day = datetime.fromtimestamp(timestamp, timezone.utc).replace( |
|
hour=0, minute=0, second=0, microsecond=0, tzinfo=None |
|
) |
|
start_of_day = int(to_timestamp(start_of_day)) |
|
return (start_of_day, meta_key) not in self._seen_locations |
|
|
|
def kill(self): |
|
|
|
if self._flusher is None: |
|
return |
|
|
|
self._running = False |
|
self._flush_event.set() |
|
self._flusher = None |
|
|
|
@metrics_noop |
|
def flush(self): |
|
|
|
self._force_flush = True |
|
self._flush() |
|
|
|
def _consider_force_flush(self): |
|
|
|
|
|
total_weight = len(self.buckets) + self._buckets_total_weight |
|
if total_weight >= self.MAX_WEIGHT: |
|
self._force_flush = True |
|
self._flush_event.set() |
|
|
|
def _emit( |
|
self, |
|
flushable_buckets, |
|
code_locations, |
|
): |
|
|
|
envelope = Envelope() |
|
|
|
if flushable_buckets: |
|
encoded_metrics = _encode_metrics(flushable_buckets) |
|
envelope.add_item(Item(payload=encoded_metrics, type="statsd")) |
|
|
|
for timestamp, locations in code_locations.items(): |
|
encoded_locations = _encode_locations(timestamp, locations) |
|
envelope.add_item(Item(payload=encoded_locations, type="metric_meta")) |
|
|
|
if envelope.items: |
|
self._capture_func(envelope) |
|
return envelope |
|
return None |
|
|
|
|
|
def _serialize_tags( |
|
tags, |
|
): |
|
|
|
if not tags: |
|
return () |
|
|
|
rv = [] |
|
for key, value in tags.items(): |
|
|
|
if isinstance(value, (list, tuple)): |
|
for inner_value in value: |
|
if inner_value is not None: |
|
rv.append((key, str(inner_value))) |
|
elif value is not None: |
|
rv.append((key, str(value))) |
|
|
|
|
|
|
|
return tuple(sorted(rv)) |
|
|
|
|
|
def _tags_to_dict(tags): |
|
|
|
rv = {} |
|
for tag_name, tag_value in tags: |
|
old_value = rv.get(tag_name) |
|
if old_value is not None: |
|
if isinstance(old_value, list): |
|
old_value.append(tag_value) |
|
else: |
|
rv[tag_name] = [old_value, tag_value] |
|
else: |
|
rv[tag_name] = tag_value |
|
return rv |
|
|
|
|
|
def _get_aggregator(): |
|
|
|
client = sentry_sdk.get_client() |
|
return ( |
|
client.metrics_aggregator |
|
if client.is_active() and client.metrics_aggregator is not None |
|
else None |
|
) |
|
|
|
|
|
def _get_aggregator_and_update_tags(key, value, unit, tags): |
|
|
|
client = sentry_sdk.get_client() |
|
if not client.is_active() or client.metrics_aggregator is None: |
|
return None, None, tags |
|
|
|
updated_tags = dict(tags or ()) |
|
updated_tags.setdefault("release", client.options["release"]) |
|
updated_tags.setdefault("environment", client.options["environment"]) |
|
|
|
scope = sentry_sdk.get_current_scope() |
|
local_aggregator = None |
|
|
|
|
|
|
|
transaction_source = scope._transaction_info.get("source") |
|
if transaction_source in GOOD_TRANSACTION_SOURCES: |
|
transaction_name = scope._transaction |
|
if transaction_name: |
|
updated_tags.setdefault("transaction", transaction_name) |
|
if scope._span is not None: |
|
local_aggregator = scope._span._get_local_aggregator() |
|
|
|
experiments = client.options.get("_experiments", {}) |
|
before_emit_callback = experiments.get("before_emit_metric") |
|
if before_emit_callback is not None: |
|
with recursion_protection() as in_metrics: |
|
if not in_metrics: |
|
if not before_emit_callback(key, value, unit, updated_tags): |
|
return None, None, updated_tags |
|
|
|
return client.metrics_aggregator, local_aggregator, updated_tags |
|
|
|
|
|
def increment( |
|
key, |
|
value=1.0, |
|
unit="none", |
|
tags=None, |
|
timestamp=None, |
|
stacklevel=0, |
|
): |
|
|
|
"""Increments a counter.""" |
|
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags( |
|
key, value, unit, tags |
|
) |
|
if aggregator is not None: |
|
aggregator.add( |
|
"c", key, value, unit, tags, timestamp, local_aggregator, stacklevel |
|
) |
|
|
|
|
|
|
|
incr = increment |
|
|
|
|
|
class _Timing: |
|
def __init__( |
|
self, |
|
key, |
|
tags, |
|
timestamp, |
|
value, |
|
unit, |
|
stacklevel, |
|
): |
|
|
|
self.key = key |
|
self.tags = tags |
|
self.timestamp = timestamp |
|
self.value = value |
|
self.unit = unit |
|
self.entered = None |
|
self._span = None |
|
self.stacklevel = stacklevel |
|
|
|
def _validate_invocation(self, context): |
|
|
|
if self.value is not None: |
|
raise TypeError( |
|
"cannot use timing as %s when a value is provided" % context |
|
) |
|
|
|
def __enter__(self): |
|
|
|
self.entered = TIMING_FUNCTIONS[self.unit]() |
|
self._validate_invocation("context-manager") |
|
self._span = sentry_sdk.start_span(op="metric.timing", name=self.key) |
|
if self.tags: |
|
for key, value in self.tags.items(): |
|
if isinstance(value, (tuple, list)): |
|
value = ",".join(sorted(map(str, value))) |
|
self._span.set_tag(key, value) |
|
self._span.__enter__() |
|
|
|
|
|
aggregator = _get_aggregator() |
|
if aggregator is not None: |
|
aggregator.record_code_location("d", self.key, self.unit, self.stacklevel) |
|
|
|
return self |
|
|
|
def __exit__(self, exc_type, exc_value, tb): |
|
|
|
assert self._span, "did not enter" |
|
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags( |
|
self.key, |
|
self.value, |
|
self.unit, |
|
self.tags, |
|
) |
|
if aggregator is not None: |
|
elapsed = TIMING_FUNCTIONS[self.unit]() - self.entered |
|
aggregator.add( |
|
"d", |
|
self.key, |
|
elapsed, |
|
self.unit, |
|
tags, |
|
self.timestamp, |
|
local_aggregator, |
|
None, |
|
) |
|
|
|
self._span.__exit__(exc_type, exc_value, tb) |
|
self._span = None |
|
|
|
def __call__(self, f): |
|
|
|
self._validate_invocation("decorator") |
|
|
|
@wraps(f) |
|
def timed_func(*args, **kwargs): |
|
|
|
with timing( |
|
key=self.key, |
|
tags=self.tags, |
|
timestamp=self.timestamp, |
|
unit=self.unit, |
|
stacklevel=self.stacklevel + 1, |
|
): |
|
return f(*args, **kwargs) |
|
|
|
return timed_func |
|
|
|
|
|
def timing( |
|
key, |
|
value=None, |
|
unit="second", |
|
tags=None, |
|
timestamp=None, |
|
stacklevel=0, |
|
): |
|
|
|
"""Emits a distribution with the time it takes to run the given code block. |
|
|
|
This method supports three forms of invocation: |
|
|
|
- when a `value` is provided, it functions similar to `distribution` but with |
|
- it can be used as a context manager |
|
- it can be used as a decorator |
|
""" |
|
if value is not None: |
|
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags( |
|
key, value, unit, tags |
|
) |
|
if aggregator is not None: |
|
aggregator.add( |
|
"d", key, value, unit, tags, timestamp, local_aggregator, stacklevel |
|
) |
|
return _Timing(key, tags, timestamp, value, unit, stacklevel) |
|
|
|
|
|
def distribution( |
|
key, |
|
value, |
|
unit="none", |
|
tags=None, |
|
timestamp=None, |
|
stacklevel=0, |
|
): |
|
|
|
"""Emits a distribution.""" |
|
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags( |
|
key, value, unit, tags |
|
) |
|
if aggregator is not None: |
|
aggregator.add( |
|
"d", key, value, unit, tags, timestamp, local_aggregator, stacklevel |
|
) |
|
|
|
|
|
def set( |
|
key, |
|
value, |
|
unit="none", |
|
tags=None, |
|
timestamp=None, |
|
stacklevel=0, |
|
): |
|
|
|
"""Emits a set.""" |
|
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags( |
|
key, value, unit, tags |
|
) |
|
if aggregator is not None: |
|
aggregator.add( |
|
"s", key, value, unit, tags, timestamp, local_aggregator, stacklevel |
|
) |
|
|
|
|
|
def gauge( |
|
key, |
|
value, |
|
unit="none", |
|
tags=None, |
|
timestamp=None, |
|
stacklevel=0, |
|
): |
|
|
|
"""Emits a gauge.""" |
|
aggregator, local_aggregator, tags = _get_aggregator_and_update_tags( |
|
key, value, unit, tags |
|
) |
|
if aggregator is not None: |
|
aggregator.add( |
|
"g", key, value, unit, tags, timestamp, local_aggregator, stacklevel |
|
) |
|
|