|
import asyncio |
|
import logging |
|
import sys |
|
from typing import Literal |
|
from uuid import uuid4 |
|
|
|
from rich.logging import RichHandler |
|
|
|
|
|
class StructuredLogger(logging.Logger): |
|
async def stream_message(self, message: str) -> None: |
|
"""Streams the message character by character asynchronously.""" |
|
try: |
|
sys.stdout.write("\r") |
|
for char in message: |
|
sys.stdout.write(char) |
|
sys.stdout.flush() |
|
await asyncio.sleep(0.002) |
|
sys.stdout.write("\n") |
|
except Exception: |
|
pass |
|
|
|
def _log( |
|
self, |
|
level, |
|
msg, |
|
args, |
|
exc_info=None, |
|
extra=None, |
|
stack_info=False, |
|
stacklevel=1, |
|
): |
|
if extra is None: |
|
extra = {} |
|
|
|
json_fields = { |
|
"logger_name": self.name, |
|
"message": msg % args if args else msg, |
|
} |
|
|
|
exc_type, exc_value, exc_traceback = sys.exc_info() |
|
if exc_type is not None: |
|
json_fields["exception_class"] = exc_type.__name__ |
|
json_fields["exception_message"] = str(exc_value) |
|
|
|
json_fields.update(extra) |
|
|
|
super()._log( |
|
level, |
|
msg, |
|
args, |
|
exc_info, |
|
{"json_fields": json_fields}, |
|
stack_info, |
|
stacklevel + 1, |
|
) |
|
|
|
|
|
def create_logger( |
|
name: str, |
|
level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO", |
|
detailed_name: bool = False, |
|
) -> logging.Logger: |
|
unique_name = f"{name}-{str(uuid4())[:8]}" |
|
logger = logging.getLogger(unique_name) |
|
logger.setLevel(level) |
|
|
|
|
|
rich_handler = RichHandler( |
|
rich_tracebacks=True, |
|
markup=True, |
|
show_path=False, |
|
show_time=False, |
|
log_time_format="[%s]", |
|
) |
|
|
|
if detailed_name: |
|
rich_handler.setFormatter(logging.Formatter("%(name)s:\n%(message)s")) |
|
else: |
|
rich_handler.setFormatter(logging.Formatter("-----\n%(message)s")) |
|
|
|
logger.addHandler(rich_handler) |
|
logger.propagate = False |
|
|
|
return logger |
|
|
|
|
|
|
|
logging.setLoggerClass(StructuredLogger) |
|
|
|
|
|
logger = create_logger(__name__, level="INFO") |
|
|