from __future__ import annotations import os import threading import uuid from abc import ABC, abstractmethod from datetime import datetime, timezone from typing import Any from ..logger import logger from .processor_interface import TracingProcessor from .scope import Scope from .spans import NoOpSpan, Span, SpanImpl, TSpanData from .traces import NoOpTrace, Trace, TraceImpl class SynchronousMultiTracingProcessor(TracingProcessor): """ Forwards all calls to a list of TracingProcessors, in order of registration. """ def __init__(self): # Using a tuple to avoid race conditions when iterating over processors self._processors: tuple[TracingProcessor, ...] = () self._lock = threading.Lock() def add_tracing_processor(self, tracing_processor: TracingProcessor): """ Add a processor to the list of processors. Each processor will receive all traces/spans. """ with self._lock: self._processors += (tracing_processor,) def set_processors(self, processors: list[TracingProcessor]): """ Set the list of processors. This will replace the current list of processors. """ with self._lock: self._processors = tuple(processors) def on_trace_start(self, trace: Trace) -> None: """ Called when a trace is started. """ for processor in self._processors: processor.on_trace_start(trace) def on_trace_end(self, trace: Trace) -> None: """ Called when a trace is finished. """ for processor in self._processors: processor.on_trace_end(trace) def on_span_start(self, span: Span[Any]) -> None: """ Called when a span is started. """ for processor in self._processors: processor.on_span_start(span) def on_span_end(self, span: Span[Any]) -> None: """ Called when a span is finished. """ for processor in self._processors: processor.on_span_end(span) def shutdown(self) -> None: """ Called when the application stops. """ for processor in self._processors: logger.debug(f"Shutting down trace processor {processor}") processor.shutdown() def force_flush(self): """ Force the processors to flush their buffers. """ for processor in self._processors: processor.force_flush() class TraceProvider(ABC): """Interface for creating traces and spans.""" @abstractmethod def register_processor(self, processor: TracingProcessor) -> None: """Add a processor that will receive all traces and spans.""" @abstractmethod def set_processors(self, processors: list[TracingProcessor]) -> None: """Replace the list of processors with ``processors``.""" @abstractmethod def get_current_trace(self) -> Trace | None: """Return the currently active trace, if any.""" @abstractmethod def get_current_span(self) -> Span[Any] | None: """Return the currently active span, if any.""" @abstractmethod def set_disabled(self, disabled: bool) -> None: """Enable or disable tracing globally.""" @abstractmethod def time_iso(self) -> str: """Return the current time in ISO 8601 format.""" @abstractmethod def gen_trace_id(self) -> str: """Generate a new trace identifier.""" @abstractmethod def gen_span_id(self) -> str: """Generate a new span identifier.""" @abstractmethod def gen_group_id(self) -> str: """Generate a new group identifier.""" @abstractmethod def create_trace( self, name: str, trace_id: str | None = None, group_id: str | None = None, metadata: dict[str, Any] | None = None, disabled: bool = False, ) -> Trace: """Create a new trace.""" @abstractmethod def create_span( self, span_data: TSpanData, span_id: str | None = None, parent: Trace | Span[Any] | None = None, disabled: bool = False, ) -> Span[TSpanData]: """Create a new span.""" @abstractmethod def shutdown(self) -> None: """Clean up any resources used by the provider.""" class DefaultTraceProvider(TraceProvider): def __init__(self) -> None: self._multi_processor = SynchronousMultiTracingProcessor() self._disabled = os.environ.get("OPENAI_AGENTS_DISABLE_TRACING", "false").lower() in ( "true", "1", ) def register_processor(self, processor: TracingProcessor): """ Add a processor to the list of processors. Each processor will receive all traces/spans. """ self._multi_processor.add_tracing_processor(processor) def set_processors(self, processors: list[TracingProcessor]): """ Set the list of processors. This will replace the current list of processors. """ self._multi_processor.set_processors(processors) def get_current_trace(self) -> Trace | None: """ Returns the currently active trace, if any. """ return Scope.get_current_trace() def get_current_span(self) -> Span[Any] | None: """ Returns the currently active span, if any. """ return Scope.get_current_span() def set_disabled(self, disabled: bool) -> None: """ Set whether tracing is disabled. """ self._disabled = disabled def time_iso(self) -> str: """Return the current time in ISO 8601 format.""" return datetime.now(timezone.utc).isoformat() def gen_trace_id(self) -> str: """Generate a new trace ID.""" return f"trace_{uuid.uuid4().hex}" def gen_span_id(self) -> str: """Generate a new span ID.""" return f"span_{uuid.uuid4().hex[:24]}" def gen_group_id(self) -> str: """Generate a new group ID.""" return f"group_{uuid.uuid4().hex[:24]}" def create_trace( self, name: str, trace_id: str | None = None, group_id: str | None = None, metadata: dict[str, Any] | None = None, disabled: bool = False, ) -> Trace: """ Create a new trace. """ if self._disabled or disabled: logger.debug(f"Tracing is disabled. Not creating trace {name}") return NoOpTrace() trace_id = trace_id or self.gen_trace_id() logger.debug(f"Creating trace {name} with id {trace_id}") return TraceImpl( name=name, trace_id=trace_id, group_id=group_id, metadata=metadata, processor=self._multi_processor, ) def create_span( self, span_data: TSpanData, span_id: str | None = None, parent: Trace | Span[Any] | None = None, disabled: bool = False, ) -> Span[TSpanData]: """ Create a new span. """ if self._disabled or disabled: logger.debug(f"Tracing is disabled. Not creating span {span_data}") return NoOpSpan(span_data) if not parent: current_span = Scope.get_current_span() current_trace = Scope.get_current_trace() if current_trace is None: logger.error( "No active trace. Make sure to start a trace with `trace()` first" "Returning NoOpSpan." ) return NoOpSpan(span_data) elif isinstance(current_trace, NoOpTrace) or isinstance(current_span, NoOpSpan): logger.debug( f"Parent {current_span} or {current_trace} is no-op, returning NoOpSpan" ) return NoOpSpan(span_data) parent_id = current_span.span_id if current_span else None trace_id = current_trace.trace_id elif isinstance(parent, Trace): if isinstance(parent, NoOpTrace): logger.debug(f"Parent {parent} is no-op, returning NoOpSpan") return NoOpSpan(span_data) trace_id = parent.trace_id parent_id = None elif isinstance(parent, Span): if isinstance(parent, NoOpSpan): logger.debug(f"Parent {parent} is no-op, returning NoOpSpan") return NoOpSpan(span_data) parent_id = parent.span_id trace_id = parent.trace_id logger.debug(f"Creating span {span_data} with id {span_id}") return SpanImpl( trace_id=trace_id, span_id=span_id or self.gen_span_id(), parent_id=parent_id, processor=self._multi_processor, span_data=span_data, ) def shutdown(self) -> None: if self._disabled: return try: logger.debug("Shutting down trace provider") self._multi_processor.shutdown() except Exception as e: logger.error(f"Error shutting down trace provider: {e}")