Spaces:
Running
Running
from __future__ import annotations | |
import os | |
import threading | |
import uuid | |
from abc import ABC, abstractmethod | |
from datetime import datetime, timezone | |
from typing import Any | |
from ..logger import logger | |
from .processor_interface import TracingProcessor | |
from .scope import Scope | |
from .spans import NoOpSpan, Span, SpanImpl, TSpanData | |
from .traces import NoOpTrace, Trace, TraceImpl | |
class SynchronousMultiTracingProcessor(TracingProcessor): | |
""" | |
Forwards all calls to a list of TracingProcessors, in order of registration. | |
""" | |
def __init__(self): | |
# Using a tuple to avoid race conditions when iterating over processors | |
self._processors: tuple[TracingProcessor, ...] = () | |
self._lock = threading.Lock() | |
def add_tracing_processor(self, tracing_processor: TracingProcessor): | |
""" | |
Add a processor to the list of processors. Each processor will receive all traces/spans. | |
""" | |
with self._lock: | |
self._processors += (tracing_processor,) | |
def set_processors(self, processors: list[TracingProcessor]): | |
""" | |
Set the list of processors. This will replace the current list of processors. | |
""" | |
with self._lock: | |
self._processors = tuple(processors) | |
def on_trace_start(self, trace: Trace) -> None: | |
""" | |
Called when a trace is started. | |
""" | |
for processor in self._processors: | |
processor.on_trace_start(trace) | |
def on_trace_end(self, trace: Trace) -> None: | |
""" | |
Called when a trace is finished. | |
""" | |
for processor in self._processors: | |
processor.on_trace_end(trace) | |
def on_span_start(self, span: Span[Any]) -> None: | |
""" | |
Called when a span is started. | |
""" | |
for processor in self._processors: | |
processor.on_span_start(span) | |
def on_span_end(self, span: Span[Any]) -> None: | |
""" | |
Called when a span is finished. | |
""" | |
for processor in self._processors: | |
processor.on_span_end(span) | |
def shutdown(self) -> None: | |
""" | |
Called when the application stops. | |
""" | |
for processor in self._processors: | |
logger.debug(f"Shutting down trace processor {processor}") | |
processor.shutdown() | |
def force_flush(self): | |
""" | |
Force the processors to flush their buffers. | |
""" | |
for processor in self._processors: | |
processor.force_flush() | |
class TraceProvider(ABC): | |
"""Interface for creating traces and spans.""" | |
def register_processor(self, processor: TracingProcessor) -> None: | |
"""Add a processor that will receive all traces and spans.""" | |
def set_processors(self, processors: list[TracingProcessor]) -> None: | |
"""Replace the list of processors with ``processors``.""" | |
def get_current_trace(self) -> Trace | None: | |
"""Return the currently active trace, if any.""" | |
def get_current_span(self) -> Span[Any] | None: | |
"""Return the currently active span, if any.""" | |
def set_disabled(self, disabled: bool) -> None: | |
"""Enable or disable tracing globally.""" | |
def time_iso(self) -> str: | |
"""Return the current time in ISO 8601 format.""" | |
def gen_trace_id(self) -> str: | |
"""Generate a new trace identifier.""" | |
def gen_span_id(self) -> str: | |
"""Generate a new span identifier.""" | |
def gen_group_id(self) -> str: | |
"""Generate a new group identifier.""" | |
def create_trace( | |
self, | |
name: str, | |
trace_id: str | None = None, | |
group_id: str | None = None, | |
metadata: dict[str, Any] | None = None, | |
disabled: bool = False, | |
) -> Trace: | |
"""Create a new trace.""" | |
def create_span( | |
self, | |
span_data: TSpanData, | |
span_id: str | None = None, | |
parent: Trace | Span[Any] | None = None, | |
disabled: bool = False, | |
) -> Span[TSpanData]: | |
"""Create a new span.""" | |
def shutdown(self) -> None: | |
"""Clean up any resources used by the provider.""" | |
class DefaultTraceProvider(TraceProvider): | |
def __init__(self) -> None: | |
self._multi_processor = SynchronousMultiTracingProcessor() | |
self._disabled = os.environ.get("OPENAI_AGENTS_DISABLE_TRACING", "false").lower() in ( | |
"true", | |
"1", | |
) | |
def register_processor(self, processor: TracingProcessor): | |
""" | |
Add a processor to the list of processors. Each processor will receive all traces/spans. | |
""" | |
self._multi_processor.add_tracing_processor(processor) | |
def set_processors(self, processors: list[TracingProcessor]): | |
""" | |
Set the list of processors. This will replace the current list of processors. | |
""" | |
self._multi_processor.set_processors(processors) | |
def get_current_trace(self) -> Trace | None: | |
""" | |
Returns the currently active trace, if any. | |
""" | |
return Scope.get_current_trace() | |
def get_current_span(self) -> Span[Any] | None: | |
""" | |
Returns the currently active span, if any. | |
""" | |
return Scope.get_current_span() | |
def set_disabled(self, disabled: bool) -> None: | |
""" | |
Set whether tracing is disabled. | |
""" | |
self._disabled = disabled | |
def time_iso(self) -> str: | |
"""Return the current time in ISO 8601 format.""" | |
return datetime.now(timezone.utc).isoformat() | |
def gen_trace_id(self) -> str: | |
"""Generate a new trace ID.""" | |
return f"trace_{uuid.uuid4().hex}" | |
def gen_span_id(self) -> str: | |
"""Generate a new span ID.""" | |
return f"span_{uuid.uuid4().hex[:24]}" | |
def gen_group_id(self) -> str: | |
"""Generate a new group ID.""" | |
return f"group_{uuid.uuid4().hex[:24]}" | |
def create_trace( | |
self, | |
name: str, | |
trace_id: str | None = None, | |
group_id: str | None = None, | |
metadata: dict[str, Any] | None = None, | |
disabled: bool = False, | |
) -> Trace: | |
""" | |
Create a new trace. | |
""" | |
if self._disabled or disabled: | |
logger.debug(f"Tracing is disabled. Not creating trace {name}") | |
return NoOpTrace() | |
trace_id = trace_id or self.gen_trace_id() | |
logger.debug(f"Creating trace {name} with id {trace_id}") | |
return TraceImpl( | |
name=name, | |
trace_id=trace_id, | |
group_id=group_id, | |
metadata=metadata, | |
processor=self._multi_processor, | |
) | |
def create_span( | |
self, | |
span_data: TSpanData, | |
span_id: str | None = None, | |
parent: Trace | Span[Any] | None = None, | |
disabled: bool = False, | |
) -> Span[TSpanData]: | |
""" | |
Create a new span. | |
""" | |
if self._disabled or disabled: | |
logger.debug(f"Tracing is disabled. Not creating span {span_data}") | |
return NoOpSpan(span_data) | |
if not parent: | |
current_span = Scope.get_current_span() | |
current_trace = Scope.get_current_trace() | |
if current_trace is None: | |
logger.error( | |
"No active trace. Make sure to start a trace with `trace()` first" | |
"Returning NoOpSpan." | |
) | |
return NoOpSpan(span_data) | |
elif isinstance(current_trace, NoOpTrace) or isinstance(current_span, NoOpSpan): | |
logger.debug( | |
f"Parent {current_span} or {current_trace} is no-op, returning NoOpSpan" | |
) | |
return NoOpSpan(span_data) | |
parent_id = current_span.span_id if current_span else None | |
trace_id = current_trace.trace_id | |
elif isinstance(parent, Trace): | |
if isinstance(parent, NoOpTrace): | |
logger.debug(f"Parent {parent} is no-op, returning NoOpSpan") | |
return NoOpSpan(span_data) | |
trace_id = parent.trace_id | |
parent_id = None | |
elif isinstance(parent, Span): | |
if isinstance(parent, NoOpSpan): | |
logger.debug(f"Parent {parent} is no-op, returning NoOpSpan") | |
return NoOpSpan(span_data) | |
parent_id = parent.span_id | |
trace_id = parent.trace_id | |
logger.debug(f"Creating span {span_data} with id {span_id}") | |
return SpanImpl( | |
trace_id=trace_id, | |
span_id=span_id or self.gen_span_id(), | |
parent_id=parent_id, | |
processor=self._multi_processor, | |
span_data=span_data, | |
) | |
def shutdown(self) -> None: | |
if self._disabled: | |
return | |
try: | |
logger.debug("Shutting down trace provider") | |
self._multi_processor.shutdown() | |
except Exception as e: | |
logger.error(f"Error shutting down trace provider: {e}") | |