Spaces:
Running
Running
from __future__ import annotations | |
import abc | |
import contextvars | |
from typing import Any | |
from ..logger import logger | |
from . import util | |
from .processor_interface import TracingProcessor | |
from .scope import Scope | |
class Trace: | |
""" | |
A trace is the root level object that tracing creates. It represents a logical "workflow". | |
""" | |
def __enter__(self) -> Trace: | |
pass | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
pass | |
def start(self, mark_as_current: bool = False): | |
""" | |
Start the trace. | |
Args: | |
mark_as_current: If true, the trace will be marked as the current trace. | |
""" | |
pass | |
def finish(self, reset_current: bool = False): | |
""" | |
Finish the trace. | |
Args: | |
reset_current: If true, the trace will be reset as the current trace. | |
""" | |
pass | |
def trace_id(self) -> str: | |
""" | |
The trace ID. | |
""" | |
pass | |
def name(self) -> str: | |
""" | |
The name of the workflow being traced. | |
""" | |
pass | |
def export(self) -> dict[str, Any] | None: | |
""" | |
Export the trace as a dictionary. | |
""" | |
pass | |
class NoOpTrace(Trace): | |
""" | |
A no-op trace that will not be recorded. | |
""" | |
def __init__(self): | |
self._started = False | |
self._prev_context_token: contextvars.Token[Trace | None] | None = None | |
def __enter__(self) -> Trace: | |
if self._started: | |
if not self._prev_context_token: | |
logger.error("Trace already started but no context token set") | |
return self | |
self._started = True | |
self.start(mark_as_current=True) | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.finish(reset_current=True) | |
def start(self, mark_as_current: bool = False): | |
if mark_as_current: | |
self._prev_context_token = Scope.set_current_trace(self) | |
def finish(self, reset_current: bool = False): | |
if reset_current and self._prev_context_token is not None: | |
Scope.reset_current_trace(self._prev_context_token) | |
self._prev_context_token = None | |
def trace_id(self) -> str: | |
return "no-op" | |
def name(self) -> str: | |
return "no-op" | |
def export(self) -> dict[str, Any] | None: | |
return None | |
NO_OP_TRACE = NoOpTrace() | |
class TraceImpl(Trace): | |
""" | |
A trace that will be recorded by the tracing library. | |
""" | |
__slots__ = ( | |
"_name", | |
"_trace_id", | |
"group_id", | |
"metadata", | |
"_prev_context_token", | |
"_processor", | |
"_started", | |
) | |
def __init__( | |
self, | |
name: str, | |
trace_id: str | None, | |
group_id: str | None, | |
metadata: dict[str, Any] | None, | |
processor: TracingProcessor, | |
): | |
self._name = name | |
self._trace_id = trace_id or util.gen_trace_id() | |
self.group_id = group_id | |
self.metadata = metadata | |
self._prev_context_token: contextvars.Token[Trace | None] | None = None | |
self._processor = processor | |
self._started = False | |
def trace_id(self) -> str: | |
return self._trace_id | |
def name(self) -> str: | |
return self._name | |
def start(self, mark_as_current: bool = False): | |
if self._started: | |
return | |
self._started = True | |
self._processor.on_trace_start(self) | |
if mark_as_current: | |
self._prev_context_token = Scope.set_current_trace(self) | |
def finish(self, reset_current: bool = False): | |
if not self._started: | |
return | |
self._processor.on_trace_end(self) | |
if reset_current and self._prev_context_token is not None: | |
Scope.reset_current_trace(self._prev_context_token) | |
self._prev_context_token = None | |
def __enter__(self) -> Trace: | |
if self._started: | |
if not self._prev_context_token: | |
logger.error("Trace already started but no context token set") | |
return self | |
self.start(mark_as_current=True) | |
return self | |
def __exit__(self, exc_type, exc_val, exc_tb): | |
self.finish(reset_current=exc_type is not GeneratorExit) | |
def export(self) -> dict[str, Any] | None: | |
return { | |
"object": "trace", | |
"id": self.trace_id, | |
"workflow_name": self.name, | |
"group_id": self.group_id, | |
"metadata": self.metadata, | |
} | |