#!/usr/bin/env python3 """ Comprehensive Async Batch Logging System for GAIA Questions Provides detailed per-question logs, batch summary, and classification analysis """ import os import json import asyncio import logging from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any from collections import defaultdict from dataclasses import dataclass, asdict @dataclass class QuestionResult: """Data class for storing question processing results""" task_id: str question_text: str classification: str complexity: int confidence: float expected_answer: str our_answer: str status: str # CORRECT, INCORRECT, PARTIAL, ERROR accuracy_score: float total_duration: float classification_time: float solving_time: float validation_time: float error_type: Optional[str] = None error_details: Optional[str] = None tools_used: List[str] = None anti_hallucination_applied: bool = False override_reason: Optional[str] = None def __post_init__(self): if self.tools_used is None: self.tools_used = [] class AsyncBatchLogger: """Comprehensive logging system for async batch processing""" def __init__(self, base_log_dir: str = "logs"): self.base_log_dir = Path(base_log_dir) self.base_log_dir.mkdir(exist_ok=True) # Initialize timestamps self.batch_start_time = datetime.now() self.timestamp = self.batch_start_time.strftime("%Y%m%d_%H%M%S") # Create log files self.summary_log_path = self.base_log_dir / f"async_batch_summary_{self.timestamp}.log" self.batch_analysis_path = self.base_log_dir / f"async_batch_analysis_{self.timestamp}.json" # Initialize data structures self.question_results: Dict[str, QuestionResult] = {} self.classification_results = defaultdict(list) self.batch_metrics = { "total_questions": 0, "completed_questions": 0, "correct_answers": 0, "accuracy_rate": 0.0, "total_duration": 0.0, "start_time": self.batch_start_time.isoformat(), "end_time": None } # Initialize summary logger self.summary_logger = self._setup_summary_logger() # Active question loggers for concurrent access self.question_loggers: Dict[str, logging.Logger] = {} def _setup_summary_logger(self) -> logging.Logger: """Set up the batch summary logger""" logger = logging.getLogger(f"batch_summary_{self.timestamp}") logger.setLevel(logging.INFO) # Create file handler handler = logging.FileHandler(self.summary_log_path) formatter = logging.Formatter('[%(asctime)s] %(message)s', datefmt='%H:%M:%S') handler.setFormatter(formatter) logger.addHandler(handler) # Also log to console console_handler = logging.StreamHandler() console_handler.setFormatter(formatter) logger.addHandler(console_handler) return logger def _setup_question_logger(self, task_id: str) -> logging.Logger: """Set up detailed logger for a specific question""" question_log_path = self.base_log_dir / f"async_batch_question_{task_id}_{self.timestamp}.log" logger = logging.getLogger(f"question_{task_id}_{self.timestamp}") logger.setLevel(logging.INFO) # Create file handler handler = logging.FileHandler(question_log_path) formatter = logging.Formatter('%(message)s') handler.setFormatter(formatter) logger.addHandler(handler) return logger async def log_batch_start(self, total_questions: int, concurrency: int): """Log the start of batch processing""" self.batch_metrics["total_questions"] = total_questions self.summary_logger.info(f"BATCH_START | Total: {total_questions} questions | Concurrency: {concurrency}") self.summary_logger.info(f"Timestamp: {self.batch_start_time.isoformat()}") self.summary_logger.info(f"Log Directory: {self.base_log_dir}") self.summary_logger.info("-" * 80) async def log_question_start(self, task_id: str, question_data: Dict): """Log the start of processing a specific question""" # Set up question-specific logger question_logger = self._setup_question_logger(task_id) self.question_loggers[task_id] = question_logger # Log detailed question start question_logger.info("=" * 80) question_logger.info("ASYNC BATCH QUESTION PROCESSING") question_logger.info("=" * 80) question_logger.info(f"Question ID: {task_id}") question_logger.info(f"Start Time: {datetime.now().isoformat()}") question_logger.info(f"Question Text: {question_data.get('question', 'N/A')}") question_logger.info(f"Level: {question_data.get('Level', 'Unknown')}") question_logger.info(f"Has File: {'Yes' if question_data.get('file_name') else 'No'}") if question_data.get('file_name'): question_logger.info(f"File: {question_data.get('file_name')}") question_logger.info("") async def log_classification(self, task_id: str, classification: Dict): """Log question classification details""" if task_id not in self.question_loggers: return logger = self.question_loggers[task_id] logger.info("--- CLASSIFICATION PHASE ---") logger.info(f"Primary Agent: {classification.get('primary_agent', 'unknown')}") logger.info(f"Secondary Agents: {', '.join(classification.get('secondary_agents', []))}") logger.info(f"Complexity: {classification.get('complexity', 0)}/5") logger.info(f"Confidence: {classification.get('confidence', 0.0):.3f}") logger.info(f"Tools Needed: {', '.join(classification.get('tools_needed', []))}") logger.info(f"Reasoning: {classification.get('reasoning', 'N/A')}") logger.info("") async def log_solving_start(self, task_id: str, routing_plan: Dict): """Log the start of the solving phase""" if task_id not in self.question_loggers: return logger = self.question_loggers[task_id] logger.info("--- SOLVING PHASE ---") logger.info(f"Route to: {routing_plan.get('primary_route', 'unknown')} agent") logger.info(f"Coordination: {'Yes' if routing_plan.get('requires_coordination') else 'No'}") logger.info(f"Estimated Duration: {routing_plan.get('estimated_duration', 'unknown')}") logger.info("") logger.info("Tool Executions:") async def log_tool_execution(self, task_id: str, tool_name: str, duration: float, result_summary: str): """Log individual tool execution""" if task_id not in self.question_loggers: return logger = self.question_loggers[task_id] logger.info(f" - {tool_name}: {duration:.1f}s → {result_summary[:100]}...") async def log_answer_processing(self, task_id: str, raw_response: str, processed_answer: str, anti_hallucination_applied: bool = False, override_reason: str = None): """Log answer processing and anti-hallucination details""" if task_id not in self.question_loggers: return logger = self.question_loggers[task_id] logger.info("") logger.info("Agent Response (first 500 chars):") logger.info(raw_response[:500] + ("..." if len(raw_response) > 500 else "")) logger.info("") logger.info(f"Processed Answer: {processed_answer}") if anti_hallucination_applied: logger.info(f"🚨 ANTI-HALLUCINATION OVERRIDE APPLIED") logger.info(f"Reason: {override_reason}") logger.info("") async def log_question_complete(self, task_id: str, result: QuestionResult): """Log the completion of a question with full results""" if task_id not in self.question_loggers: return logger = self.question_loggers[task_id] # Store result self.question_results[task_id] = result self.classification_results[result.classification].append(result) # Update batch metrics self.batch_metrics["completed_questions"] += 1 if result.status == "CORRECT": self.batch_metrics["correct_answers"] += 1 # Log validation phase logger.info("--- VALIDATION PHASE ---") logger.info(f"Expected Answer: {result.expected_answer}") logger.info(f"Our Answer: {result.our_answer}") logger.info(f"Status: {result.status}") logger.info(f"Accuracy Score: {result.accuracy_score:.1%}") logger.info("") # Log performance metrics logger.info("--- PERFORMANCE METRICS ---") logger.info(f"Total Duration: {result.total_duration:.1f}s") logger.info(f"Classification Time: {result.classification_time:.1f}s") logger.info(f"Solving Time: {result.solving_time:.1f}s") logger.info(f"Validation Time: {result.validation_time:.1f}s") if result.error_type: logger.info(f"Error Type: {result.error_type}") logger.info(f"Error Details: {result.error_details}") logger.info("") logger.info("=" * 80) logger.info("END QUESTION LOG") logger.info("=" * 80) # Log to summary status_emoji = "✅" if result.status == "CORRECT" else "🟡" if result.status == "PARTIAL" else "❌" override_info = f" | {result.override_reason}" if result.anti_hallucination_applied else "" self.summary_logger.info( f"{status_emoji} {task_id[:8]}... | {result.classification} | {result.status} | " f"{result.accuracy_score:.0%} | {result.total_duration:.1f}s{override_info}" ) async def log_batch_progress(self): """Log current batch progress with ETA""" completed = self.batch_metrics["completed_questions"] total = self.batch_metrics["total_questions"] if completed == 0: return # Calculate accuracy accuracy = (self.batch_metrics["correct_answers"] / completed) * 100 # Calculate ETA elapsed_time = (datetime.now() - self.batch_start_time).total_seconds() avg_time_per_question = elapsed_time / completed remaining_questions = total - completed eta_seconds = remaining_questions * avg_time_per_question eta_minutes = int(eta_seconds // 60) eta_seconds = int(eta_seconds % 60) self.summary_logger.info( f"📊 PROGRESS | {completed}/{total} completed | {accuracy:.1f}% accuracy | " f"ETA: {eta_minutes}m {eta_seconds}s" ) async def log_batch_complete(self): """Log batch completion with final summary""" end_time = datetime.now() total_duration = (end_time - self.batch_start_time).total_seconds() # Update batch metrics self.batch_metrics["end_time"] = end_time.isoformat() self.batch_metrics["total_duration"] = total_duration completed = self.batch_metrics["completed_questions"] total = self.batch_metrics["total_questions"] accuracy = (self.batch_metrics["correct_answers"] / completed * 100) if completed > 0 else 0 self.batch_metrics["accuracy_rate"] = accuracy / 100 self.summary_logger.info("-" * 80) self.summary_logger.info( f"🏁 BATCH_COMPLETE | {completed}/{total} | {accuracy:.1f}% accuracy | " f"Total: {int(total_duration//60)}m {int(total_duration%60)}s" ) # Generate classification analysis await self.generate_classification_analysis() # Export final results await self.export_results() self.summary_logger.info(f"📊 Analysis exported: {self.batch_analysis_path}") self.summary_logger.info(f"📋 Summary log: {self.summary_log_path}") async def generate_classification_analysis(self): """Generate detailed analysis by classification""" analysis = { "batch_metadata": self.batch_metrics, "classification_breakdown": {}, "overall_recommendations": [] } for classification, results in self.classification_results.items(): if not results: continue # Calculate metrics total = len(results) correct = len([r for r in results if r.status == "CORRECT"]) partial = len([r for r in results if r.status == "PARTIAL"]) errors = len([r for r in results if r.status == "ERROR"]) accuracy_rate = correct / total if total > 0 else 0 avg_duration = sum(r.total_duration for r in results) / total if total > 0 else 0 # Error analysis error_types = defaultdict(int) failed_questions = [] for result in results: if result.status in ["INCORRECT", "ERROR"]: error_types[result.error_type or "unknown"] += 1 failed_questions.append({ "task_id": result.task_id, "error_type": result.error_type, "error_details": result.error_details }) # Generate recommendations recommendations = self._generate_recommendations(classification, results, error_types) classification_analysis = { "classification": classification, "total_questions": total, "accuracy_rate": accuracy_rate, "successful": correct, "partial": partial, "failed": total - correct - partial, "errors": errors, "performance_metrics": { "avg_duration": avg_duration, "min_duration": min(r.total_duration for r in results) if results else 0, "max_duration": max(r.total_duration for r in results) if results else 0 }, "error_breakdown": dict(error_types), "failed_questions": failed_questions, "improvement_recommendations": recommendations } analysis["classification_breakdown"][classification] = classification_analysis # Generate overall recommendations analysis["overall_recommendations"] = self._generate_overall_recommendations() # Save classification analysis with open(self.batch_analysis_path, 'w') as f: json.dump(analysis, f, indent=2, ensure_ascii=False) def _generate_recommendations(self, classification: str, results: List[QuestionResult], error_types: Dict[str, int]) -> List[str]: """Generate specific recommendations for a classification""" recommendations = [] accuracy_rate = len([r for r in results if r.status == "CORRECT"]) / len(results) if accuracy_rate < 0.8: recommendations.append(f"🔧 Low accuracy ({accuracy_rate:.1%}) - needs immediate attention") # Classification-specific recommendations if classification == "multimedia": if "timeout" in error_types: recommendations.append("⏱️ Optimize video processing timeout limits") if "audio_processing" in error_types: recommendations.append("🎵 Enhance audio transcription accuracy") if accuracy_rate > 0.9: recommendations.append("✅ Excellent multimedia processing - ready for production") elif classification == "research": if "hallucination" in error_types: recommendations.append("🚨 Strengthen anti-hallucination safeguards") if "wikipedia" in error_types: recommendations.append("📚 Improve Wikipedia tool integration") if accuracy_rate > 0.9: recommendations.append("✅ Excellent research capabilities - ready for production") elif classification == "logic_math": if "chess" in error_types: recommendations.append("♟️ Enhance chess analysis algorithms") if "calculation" in error_types: recommendations.append("🧮 Improve mathematical calculation accuracy") if accuracy_rate > 0.9: recommendations.append("✅ Excellent logic/math processing - ready for production") elif classification == "file_processing": if "python_execution" in error_types: recommendations.append("🐍 Optimize Python code execution environment") if "excel_processing" in error_types: recommendations.append("📊 Enhance Excel file processing capabilities") if accuracy_rate > 0.9: recommendations.append("✅ Excellent file processing - ready for production") # Performance recommendations avg_duration = sum(r.total_duration for r in results) / len(results) if avg_duration > 60: recommendations.append(f"⚡ Optimize performance - avg duration {avg_duration:.1f}s") return recommendations def _generate_overall_recommendations(self) -> List[str]: """Generate overall system recommendations""" recommendations = [] total_accuracy = self.batch_metrics["accuracy_rate"] if total_accuracy >= 0.95: recommendations.append("🏆 EXCELLENT: 95%+ accuracy achieved - production ready!") elif total_accuracy >= 0.90: recommendations.append("✅ GREAT: 90%+ accuracy - minor optimizations needed") elif total_accuracy >= 0.80: recommendations.append("🔧 GOOD: 80%+ accuracy - moderate improvements needed") elif total_accuracy >= 0.70: recommendations.append("⚠️ ACCEPTABLE: 70%+ accuracy - significant improvements needed") else: recommendations.append("🚨 CRITICAL: <70% accuracy - major system overhaul required") # Add specific system recommendations recommendations.extend([ "📊 Monitor performance metrics for production deployment", "🔄 Implement continuous improvement based on classification analysis", "📈 Track accuracy trends over time", "🛠️ Focus improvement efforts on lowest-performing classifications" ]) return recommendations async def export_results(self): """Export comprehensive results for analysis""" # Export individual question results results_data = { "batch_metadata": self.batch_metrics, "question_results": [asdict(result) for result in self.question_results.values()], "classification_summary": { classification: { "count": len(results), "accuracy": len([r for r in results if r.status == "CORRECT"]) / len(results) } for classification, results in self.classification_results.items() } } results_file = self.base_log_dir / f"async_batch_results_{self.timestamp}.json" with open(results_file, 'w') as f: json.dump(results_data, f, indent=2, ensure_ascii=False) self.summary_logger.info(f"📁 Detailed results: {results_file}")