ConradLinus's picture
Upload folder using huggingface_hub
d631808 verified
from __future__ import annotations
import abc
import contextvars
from typing import Any
from ..logger import logger
from . import util
from .processor_interface import TracingProcessor
from .scope import Scope
class Trace:
"""
A trace is the root level object that tracing creates. It represents a logical "workflow".
"""
@abc.abstractmethod
def __enter__(self) -> Trace:
pass
@abc.abstractmethod
def __exit__(self, exc_type, exc_val, exc_tb):
pass
@abc.abstractmethod
def start(self, mark_as_current: bool = False):
"""
Start the trace.
Args:
mark_as_current: If true, the trace will be marked as the current trace.
"""
pass
@abc.abstractmethod
def finish(self, reset_current: bool = False):
"""
Finish the trace.
Args:
reset_current: If true, the trace will be reset as the current trace.
"""
pass
@property
@abc.abstractmethod
def trace_id(self) -> str:
"""
The trace ID.
"""
pass
@property
@abc.abstractmethod
def name(self) -> str:
"""
The name of the workflow being traced.
"""
pass
@abc.abstractmethod
def export(self) -> dict[str, Any] | None:
"""
Export the trace as a dictionary.
"""
pass
class NoOpTrace(Trace):
"""
A no-op trace that will not be recorded.
"""
def __init__(self):
self._started = False
self._prev_context_token: contextvars.Token[Trace | None] | None = None
def __enter__(self) -> Trace:
if self._started:
if not self._prev_context_token:
logger.error("Trace already started but no context token set")
return self
self._started = True
self.start(mark_as_current=True)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.finish(reset_current=True)
def start(self, mark_as_current: bool = False):
if mark_as_current:
self._prev_context_token = Scope.set_current_trace(self)
def finish(self, reset_current: bool = False):
if reset_current and self._prev_context_token is not None:
Scope.reset_current_trace(self._prev_context_token)
self._prev_context_token = None
@property
def trace_id(self) -> str:
return "no-op"
@property
def name(self) -> str:
return "no-op"
def export(self) -> dict[str, Any] | None:
return None
NO_OP_TRACE = NoOpTrace()
class TraceImpl(Trace):
"""
A trace that will be recorded by the tracing library.
"""
__slots__ = (
"_name",
"_trace_id",
"group_id",
"metadata",
"_prev_context_token",
"_processor",
"_started",
)
def __init__(
self,
name: str,
trace_id: str | None,
group_id: str | None,
metadata: dict[str, Any] | None,
processor: TracingProcessor,
):
self._name = name
self._trace_id = trace_id or util.gen_trace_id()
self.group_id = group_id
self.metadata = metadata
self._prev_context_token: contextvars.Token[Trace | None] | None = None
self._processor = processor
self._started = False
@property
def trace_id(self) -> str:
return self._trace_id
@property
def name(self) -> str:
return self._name
def start(self, mark_as_current: bool = False):
if self._started:
return
self._started = True
self._processor.on_trace_start(self)
if mark_as_current:
self._prev_context_token = Scope.set_current_trace(self)
def finish(self, reset_current: bool = False):
if not self._started:
return
self._processor.on_trace_end(self)
if reset_current and self._prev_context_token is not None:
Scope.reset_current_trace(self._prev_context_token)
self._prev_context_token = None
def __enter__(self) -> Trace:
if self._started:
if not self._prev_context_token:
logger.error("Trace already started but no context token set")
return self
self.start(mark_as_current=True)
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.finish(reset_current=exc_type is not GeneratorExit)
def export(self) -> dict[str, Any] | None:
return {
"object": "trace",
"id": self.trace_id,
"workflow_name": self.name,
"group_id": self.group_id,
"metadata": self.metadata,
}