Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Comprehensive Async Batch Logging System for GAIA Questions | |
Provides detailed per-question logs, batch summary, and classification analysis | |
""" | |
import os | |
import json | |
import asyncio | |
import logging | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Optional, Any | |
from collections import defaultdict | |
from dataclasses import dataclass, asdict | |
class QuestionResult: | |
"""Data class for storing question processing results""" | |
task_id: str | |
question_text: str | |
classification: str | |
complexity: int | |
confidence: float | |
expected_answer: str | |
our_answer: str | |
status: str # CORRECT, INCORRECT, PARTIAL, ERROR | |
accuracy_score: float | |
total_duration: float | |
classification_time: float | |
solving_time: float | |
validation_time: float | |
error_type: Optional[str] = None | |
error_details: Optional[str] = None | |
tools_used: List[str] = None | |
anti_hallucination_applied: bool = False | |
override_reason: Optional[str] = None | |
def __post_init__(self): | |
if self.tools_used is None: | |
self.tools_used = [] | |
class AsyncBatchLogger: | |
"""Comprehensive logging system for async batch processing""" | |
def __init__(self, base_log_dir: str = "logs"): | |
self.base_log_dir = Path(base_log_dir) | |
self.base_log_dir.mkdir(exist_ok=True) | |
# Initialize timestamps | |
self.batch_start_time = datetime.now() | |
self.timestamp = self.batch_start_time.strftime("%Y%m%d_%H%M%S") | |
# Create log files | |
self.summary_log_path = self.base_log_dir / f"async_batch_summary_{self.timestamp}.log" | |
self.batch_analysis_path = self.base_log_dir / f"async_batch_analysis_{self.timestamp}.json" | |
# Initialize data structures | |
self.question_results: Dict[str, QuestionResult] = {} | |
self.classification_results = defaultdict(list) | |
self.batch_metrics = { | |
"total_questions": 0, | |
"completed_questions": 0, | |
"correct_answers": 0, | |
"accuracy_rate": 0.0, | |
"total_duration": 0.0, | |
"start_time": self.batch_start_time.isoformat(), | |
"end_time": None | |
} | |
# Initialize summary logger | |
self.summary_logger = self._setup_summary_logger() | |
# Active question loggers for concurrent access | |
self.question_loggers: Dict[str, logging.Logger] = {} | |
def _setup_summary_logger(self) -> logging.Logger: | |
"""Set up the batch summary logger""" | |
logger = logging.getLogger(f"batch_summary_{self.timestamp}") | |
logger.setLevel(logging.INFO) | |
# Create file handler | |
handler = logging.FileHandler(self.summary_log_path) | |
formatter = logging.Formatter('[%(asctime)s] %(message)s', datefmt='%H:%M:%S') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
# Also log to console | |
console_handler = logging.StreamHandler() | |
console_handler.setFormatter(formatter) | |
logger.addHandler(console_handler) | |
return logger | |
def _setup_question_logger(self, task_id: str) -> logging.Logger: | |
"""Set up detailed logger for a specific question""" | |
question_log_path = self.base_log_dir / f"async_batch_question_{task_id}_{self.timestamp}.log" | |
logger = logging.getLogger(f"question_{task_id}_{self.timestamp}") | |
logger.setLevel(logging.INFO) | |
# Create file handler | |
handler = logging.FileHandler(question_log_path) | |
formatter = logging.Formatter('%(message)s') | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
return logger | |
async def log_batch_start(self, total_questions: int, concurrency: int): | |
"""Log the start of batch processing""" | |
self.batch_metrics["total_questions"] = total_questions | |
self.summary_logger.info(f"BATCH_START | Total: {total_questions} questions | Concurrency: {concurrency}") | |
self.summary_logger.info(f"Timestamp: {self.batch_start_time.isoformat()}") | |
self.summary_logger.info(f"Log Directory: {self.base_log_dir}") | |
self.summary_logger.info("-" * 80) | |
async def log_question_start(self, task_id: str, question_data: Dict): | |
"""Log the start of processing a specific question""" | |
# Set up question-specific logger | |
question_logger = self._setup_question_logger(task_id) | |
self.question_loggers[task_id] = question_logger | |
# Log detailed question start | |
question_logger.info("=" * 80) | |
question_logger.info("ASYNC BATCH QUESTION PROCESSING") | |
question_logger.info("=" * 80) | |
question_logger.info(f"Question ID: {task_id}") | |
question_logger.info(f"Start Time: {datetime.now().isoformat()}") | |
question_logger.info(f"Question Text: {question_data.get('question', 'N/A')}") | |
question_logger.info(f"Level: {question_data.get('Level', 'Unknown')}") | |
question_logger.info(f"Has File: {'Yes' if question_data.get('file_name') else 'No'}") | |
if question_data.get('file_name'): | |
question_logger.info(f"File: {question_data.get('file_name')}") | |
question_logger.info("") | |
async def log_classification(self, task_id: str, classification: Dict): | |
"""Log question classification details""" | |
if task_id not in self.question_loggers: | |
return | |
logger = self.question_loggers[task_id] | |
logger.info("--- CLASSIFICATION PHASE ---") | |
logger.info(f"Primary Agent: {classification.get('primary_agent', 'unknown')}") | |
logger.info(f"Secondary Agents: {', '.join(classification.get('secondary_agents', []))}") | |
logger.info(f"Complexity: {classification.get('complexity', 0)}/5") | |
logger.info(f"Confidence: {classification.get('confidence', 0.0):.3f}") | |
logger.info(f"Tools Needed: {', '.join(classification.get('tools_needed', []))}") | |
logger.info(f"Reasoning: {classification.get('reasoning', 'N/A')}") | |
logger.info("") | |
async def log_solving_start(self, task_id: str, routing_plan: Dict): | |
"""Log the start of the solving phase""" | |
if task_id not in self.question_loggers: | |
return | |
logger = self.question_loggers[task_id] | |
logger.info("--- SOLVING PHASE ---") | |
logger.info(f"Route to: {routing_plan.get('primary_route', 'unknown')} agent") | |
logger.info(f"Coordination: {'Yes' if routing_plan.get('requires_coordination') else 'No'}") | |
logger.info(f"Estimated Duration: {routing_plan.get('estimated_duration', 'unknown')}") | |
logger.info("") | |
logger.info("Tool Executions:") | |
async def log_tool_execution(self, task_id: str, tool_name: str, duration: float, result_summary: str): | |
"""Log individual tool execution""" | |
if task_id not in self.question_loggers: | |
return | |
logger = self.question_loggers[task_id] | |
logger.info(f" - {tool_name}: {duration:.1f}s โ {result_summary[:100]}...") | |
async def log_answer_processing(self, task_id: str, raw_response: str, processed_answer: str, | |
anti_hallucination_applied: bool = False, override_reason: str = None): | |
"""Log answer processing and anti-hallucination details""" | |
if task_id not in self.question_loggers: | |
return | |
logger = self.question_loggers[task_id] | |
logger.info("") | |
logger.info("Agent Response (first 500 chars):") | |
logger.info(raw_response[:500] + ("..." if len(raw_response) > 500 else "")) | |
logger.info("") | |
logger.info(f"Processed Answer: {processed_answer}") | |
if anti_hallucination_applied: | |
logger.info(f"๐จ ANTI-HALLUCINATION OVERRIDE APPLIED") | |
logger.info(f"Reason: {override_reason}") | |
logger.info("") | |
async def log_question_complete(self, task_id: str, result: QuestionResult): | |
"""Log the completion of a question with full results""" | |
if task_id not in self.question_loggers: | |
return | |
logger = self.question_loggers[task_id] | |
# Store result | |
self.question_results[task_id] = result | |
self.classification_results[result.classification].append(result) | |
# Update batch metrics | |
self.batch_metrics["completed_questions"] += 1 | |
if result.status == "CORRECT": | |
self.batch_metrics["correct_answers"] += 1 | |
# Log validation phase | |
logger.info("--- VALIDATION PHASE ---") | |
logger.info(f"Expected Answer: {result.expected_answer}") | |
logger.info(f"Our Answer: {result.our_answer}") | |
logger.info(f"Status: {result.status}") | |
logger.info(f"Accuracy Score: {result.accuracy_score:.1%}") | |
logger.info("") | |
# Log performance metrics | |
logger.info("--- PERFORMANCE METRICS ---") | |
logger.info(f"Total Duration: {result.total_duration:.1f}s") | |
logger.info(f"Classification Time: {result.classification_time:.1f}s") | |
logger.info(f"Solving Time: {result.solving_time:.1f}s") | |
logger.info(f"Validation Time: {result.validation_time:.1f}s") | |
if result.error_type: | |
logger.info(f"Error Type: {result.error_type}") | |
logger.info(f"Error Details: {result.error_details}") | |
logger.info("") | |
logger.info("=" * 80) | |
logger.info("END QUESTION LOG") | |
logger.info("=" * 80) | |
# Log to summary | |
status_emoji = "โ " if result.status == "CORRECT" else "๐ก" if result.status == "PARTIAL" else "โ" | |
override_info = f" | {result.override_reason}" if result.anti_hallucination_applied else "" | |
self.summary_logger.info( | |
f"{status_emoji} {task_id[:8]}... | {result.classification} | {result.status} | " | |
f"{result.accuracy_score:.0%} | {result.total_duration:.1f}s{override_info}" | |
) | |
async def log_batch_progress(self): | |
"""Log current batch progress with ETA""" | |
completed = self.batch_metrics["completed_questions"] | |
total = self.batch_metrics["total_questions"] | |
if completed == 0: | |
return | |
# Calculate accuracy | |
accuracy = (self.batch_metrics["correct_answers"] / completed) * 100 | |
# Calculate ETA | |
elapsed_time = (datetime.now() - self.batch_start_time).total_seconds() | |
avg_time_per_question = elapsed_time / completed | |
remaining_questions = total - completed | |
eta_seconds = remaining_questions * avg_time_per_question | |
eta_minutes = int(eta_seconds // 60) | |
eta_seconds = int(eta_seconds % 60) | |
self.summary_logger.info( | |
f"๐ PROGRESS | {completed}/{total} completed | {accuracy:.1f}% accuracy | " | |
f"ETA: {eta_minutes}m {eta_seconds}s" | |
) | |
async def log_batch_complete(self): | |
"""Log batch completion with final summary""" | |
end_time = datetime.now() | |
total_duration = (end_time - self.batch_start_time).total_seconds() | |
# Update batch metrics | |
self.batch_metrics["end_time"] = end_time.isoformat() | |
self.batch_metrics["total_duration"] = total_duration | |
completed = self.batch_metrics["completed_questions"] | |
total = self.batch_metrics["total_questions"] | |
accuracy = (self.batch_metrics["correct_answers"] / completed * 100) if completed > 0 else 0 | |
self.batch_metrics["accuracy_rate"] = accuracy / 100 | |
self.summary_logger.info("-" * 80) | |
self.summary_logger.info( | |
f"๐ BATCH_COMPLETE | {completed}/{total} | {accuracy:.1f}% accuracy | " | |
f"Total: {int(total_duration//60)}m {int(total_duration%60)}s" | |
) | |
# Generate classification analysis | |
await self.generate_classification_analysis() | |
# Export final results | |
await self.export_results() | |
self.summary_logger.info(f"๐ Analysis exported: {self.batch_analysis_path}") | |
self.summary_logger.info(f"๐ Summary log: {self.summary_log_path}") | |
async def generate_classification_analysis(self): | |
"""Generate detailed analysis by classification""" | |
analysis = { | |
"batch_metadata": self.batch_metrics, | |
"classification_breakdown": {}, | |
"overall_recommendations": [] | |
} | |
for classification, results in self.classification_results.items(): | |
if not results: | |
continue | |
# Calculate metrics | |
total = len(results) | |
correct = len([r for r in results if r.status == "CORRECT"]) | |
partial = len([r for r in results if r.status == "PARTIAL"]) | |
errors = len([r for r in results if r.status == "ERROR"]) | |
accuracy_rate = correct / total if total > 0 else 0 | |
avg_duration = sum(r.total_duration for r in results) / total if total > 0 else 0 | |
# Error analysis | |
error_types = defaultdict(int) | |
failed_questions = [] | |
for result in results: | |
if result.status in ["INCORRECT", "ERROR"]: | |
error_types[result.error_type or "unknown"] += 1 | |
failed_questions.append({ | |
"task_id": result.task_id, | |
"error_type": result.error_type, | |
"error_details": result.error_details | |
}) | |
# Generate recommendations | |
recommendations = self._generate_recommendations(classification, results, error_types) | |
classification_analysis = { | |
"classification": classification, | |
"total_questions": total, | |
"accuracy_rate": accuracy_rate, | |
"successful": correct, | |
"partial": partial, | |
"failed": total - correct - partial, | |
"errors": errors, | |
"performance_metrics": { | |
"avg_duration": avg_duration, | |
"min_duration": min(r.total_duration for r in results) if results else 0, | |
"max_duration": max(r.total_duration for r in results) if results else 0 | |
}, | |
"error_breakdown": dict(error_types), | |
"failed_questions": failed_questions, | |
"improvement_recommendations": recommendations | |
} | |
analysis["classification_breakdown"][classification] = classification_analysis | |
# Generate overall recommendations | |
analysis["overall_recommendations"] = self._generate_overall_recommendations() | |
# Save classification analysis | |
with open(self.batch_analysis_path, 'w') as f: | |
json.dump(analysis, f, indent=2, ensure_ascii=False) | |
def _generate_recommendations(self, classification: str, results: List[QuestionResult], | |
error_types: Dict[str, int]) -> List[str]: | |
"""Generate specific recommendations for a classification""" | |
recommendations = [] | |
accuracy_rate = len([r for r in results if r.status == "CORRECT"]) / len(results) | |
if accuracy_rate < 0.8: | |
recommendations.append(f"๐ง Low accuracy ({accuracy_rate:.1%}) - needs immediate attention") | |
# Classification-specific recommendations | |
if classification == "multimedia": | |
if "timeout" in error_types: | |
recommendations.append("โฑ๏ธ Optimize video processing timeout limits") | |
if "audio_processing" in error_types: | |
recommendations.append("๐ต Enhance audio transcription accuracy") | |
if accuracy_rate > 0.9: | |
recommendations.append("โ Excellent multimedia processing - ready for production") | |
elif classification == "research": | |
if "hallucination" in error_types: | |
recommendations.append("๐จ Strengthen anti-hallucination safeguards") | |
if "wikipedia" in error_types: | |
recommendations.append("๐ Improve Wikipedia tool integration") | |
if accuracy_rate > 0.9: | |
recommendations.append("โ Excellent research capabilities - ready for production") | |
elif classification == "logic_math": | |
if "chess" in error_types: | |
recommendations.append("โ๏ธ Enhance chess analysis algorithms") | |
if "calculation" in error_types: | |
recommendations.append("๐งฎ Improve mathematical calculation accuracy") | |
if accuracy_rate > 0.9: | |
recommendations.append("โ Excellent logic/math processing - ready for production") | |
elif classification == "file_processing": | |
if "python_execution" in error_types: | |
recommendations.append("๐ Optimize Python code execution environment") | |
if "excel_processing" in error_types: | |
recommendations.append("๐ Enhance Excel file processing capabilities") | |
if accuracy_rate > 0.9: | |
recommendations.append("โ Excellent file processing - ready for production") | |
# Performance recommendations | |
avg_duration = sum(r.total_duration for r in results) / len(results) | |
if avg_duration > 60: | |
recommendations.append(f"โก Optimize performance - avg duration {avg_duration:.1f}s") | |
return recommendations | |
def _generate_overall_recommendations(self) -> List[str]: | |
"""Generate overall system recommendations""" | |
recommendations = [] | |
total_accuracy = self.batch_metrics["accuracy_rate"] | |
if total_accuracy >= 0.95: | |
recommendations.append("๐ EXCELLENT: 95%+ accuracy achieved - production ready!") | |
elif total_accuracy >= 0.90: | |
recommendations.append("โ GREAT: 90%+ accuracy - minor optimizations needed") | |
elif total_accuracy >= 0.80: | |
recommendations.append("๐ง GOOD: 80%+ accuracy - moderate improvements needed") | |
elif total_accuracy >= 0.70: | |
recommendations.append("โ ๏ธ ACCEPTABLE: 70%+ accuracy - significant improvements needed") | |
else: | |
recommendations.append("๐จ CRITICAL: <70% accuracy - major system overhaul required") | |
# Add specific system recommendations | |
recommendations.extend([ | |
"๐ Monitor performance metrics for production deployment", | |
"๐ Implement continuous improvement based on classification analysis", | |
"๐ Track accuracy trends over time", | |
"๐ ๏ธ Focus improvement efforts on lowest-performing classifications" | |
]) | |
return recommendations | |
async def export_results(self): | |
"""Export comprehensive results for analysis""" | |
# Export individual question results | |
results_data = { | |
"batch_metadata": self.batch_metrics, | |
"question_results": [asdict(result) for result in self.question_results.values()], | |
"classification_summary": { | |
classification: { | |
"count": len(results), | |
"accuracy": len([r for r in results if r.status == "CORRECT"]) / len(results) | |
} | |
for classification, results in self.classification_results.items() | |
} | |
} | |
results_file = self.base_log_dir / f"async_batch_results_{self.timestamp}.json" | |
with open(results_file, 'w') as f: | |
json.dump(results_data, f, indent=2, ensure_ascii=False) | |
self.summary_logger.info(f"๐ Detailed results: {results_file}") |