#!/usr/bin/env python3 """ Asynchronous Question Processor Clean question handler that removes hardcoded overrides for honest accuracy measurement. """ import asyncio import json import logging import time import traceback from datetime import datetime from pathlib import Path from typing import Dict, List, Optional, Any import subprocess import sys import os # Add the project root to the Python path sys.path.insert(0, str(Path(__file__).parent)) from gaia_web_loader import GAIAQuestionLoaderWeb from question_classifier import QuestionClassifier class AsyncQuestionProcessor: """Asynchronous processor for individual GAIA questions with clean execution.""" def __init__(self, session_dir: Path, timeout_seconds: int = 900, model: str = "qwen3-235b"): """ Initialize the async question processor. Args: session_dir: Directory for this test session timeout_seconds: Timeout per question processing model: Model to use for question solving """ self.session_dir = session_dir self.timeout_seconds = timeout_seconds self.model = model # Create individual logs directory self.logs_dir = session_dir / "individual_logs" self.logs_dir.mkdir(exist_ok=True) # Setup logging self.setup_logging() # Initialize components self.loader = GAIAQuestionLoaderWeb() self.classifier = QuestionClassifier() # Load validation metadata for accuracy checking self.validation_metadata = self.load_validation_metadata() def setup_logging(self): """Setup logging for the question processor.""" log_file = self.session_dir / "question_processor.log" self.logger = logging.getLogger("AsyncQuestionProcessor") self.logger.setLevel(logging.INFO) # File handler file_handler = logging.FileHandler(log_file) file_handler.setLevel(logging.INFO) # Formatter formatter = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) file_handler.setFormatter(formatter) self.logger.addHandler(file_handler) def load_validation_metadata(self) -> Dict[str, Any]: """Load validation metadata for answer checking.""" metadata_file = Path("gaia_validation_metadata.jsonl") metadata = {} if not metadata_file.exists(): self.logger.warning(f"Validation metadata file not found: {metadata_file}") return metadata try: with open(metadata_file, 'r') as f: for line in f: line = line.strip() if line: try: data = json.loads(line) task_id = data.get('task_id') if task_id: metadata[task_id] = data except json.JSONDecodeError: continue self.logger.info(f"Loaded validation metadata for {len(metadata)} questions") except Exception as e: self.logger.error(f"Failed to load validation metadata: {e}") return metadata async def classify_question(self, question: Dict) -> Dict: """Classify the question using the classification system.""" try: classification = await asyncio.to_thread( self.classifier.classify_question, question ) return classification except Exception as e: self.logger.error(f"Classification failed: {e}") return { "primary_agent": "general", "secondary_agent": None, "complexity": 3, "confidence": 0.0, "tools_needed": [], "error": str(e) } async def execute_question_solver(self, question_id: str) -> Dict: """ Execute the main question solver without hardcoded overrides. This is the clean version that provides honest accuracy measurement. """ start_time = time.time() # Create individual log file for this question individual_log = self.logs_dir / f"question_{question_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" try: # Build command for question solver cmd = [ sys.executable, "tests/test_specific_question.py", question_id, self.model ] self.logger.info(f"Executing solver for {question_id}: {' '.join(cmd)}") # Execute with timeout process = await asyncio.create_subprocess_exec( *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT, cwd=Path.cwd() ) try: stdout, _ = await asyncio.wait_for( process.communicate(), timeout=self.timeout_seconds ) # Write output to individual log with open(individual_log, 'w') as f: f.write(f"Command: {' '.join(cmd)}\n") f.write(f"Start time: {datetime.fromtimestamp(start_time).isoformat()}\n") f.write(f"Question ID: {question_id}\n") f.write("=" * 80 + "\n") f.write(stdout.decode('utf-8', errors='replace')) execution_time = time.time() - start_time # Parse the output for answer extraction output_text = stdout.decode('utf-8', errors='replace') answer = self.extract_answer_from_output(output_text) return { "status": "completed", "execution_time": execution_time, "return_code": process.returncode, "answer": answer, "log_file": str(individual_log), "timestamp": datetime.now().isoformat() } except asyncio.TimeoutError: # Kill the process on timeout process.kill() await process.wait() execution_time = time.time() - start_time # Write timeout info to log with open(individual_log, 'w') as f: f.write(f"Command: {' '.join(cmd)}\n") f.write(f"Start time: {datetime.fromtimestamp(start_time).isoformat()}\n") f.write(f"Question ID: {question_id}\n") f.write(f"STATUS: TIMEOUT after {self.timeout_seconds} seconds\n") f.write("=" * 80 + "\n") return { "status": "timeout", "execution_time": execution_time, "timeout_seconds": self.timeout_seconds, "log_file": str(individual_log), "timestamp": datetime.now().isoformat() } except Exception as e: execution_time = time.time() - start_time # Write error info to log with open(individual_log, 'w') as f: f.write(f"Command: {' '.join(cmd)}\n") f.write(f"Start time: {datetime.fromtimestamp(start_time).isoformat()}\n") f.write(f"Question ID: {question_id}\n") f.write(f"STATUS: ERROR - {str(e)}\n") f.write("=" * 80 + "\n") f.write(traceback.format_exc()) return { "status": "error", "execution_time": execution_time, "error": str(e), "log_file": str(individual_log), "timestamp": datetime.now().isoformat() } def extract_answer_from_output(self, output_text: str) -> Optional[str]: """Extract the final answer from solver output.""" # Look for common answer patterns patterns = [ "Final Answer:", "FINAL ANSWER:", "Answer:", "ANSWER:", ] lines = output_text.split('\n') # Search for answer patterns for i, line in enumerate(lines): line_stripped = line.strip() for pattern in patterns: if pattern in line_stripped: # Try to extract answer from same line answer_part = line_stripped.split(pattern, 1) if len(answer_part) > 1: answer = answer_part[1].strip() if answer: return answer # Try next line if current line doesn't have answer if i + 1 < len(lines): next_line = lines[i + 1].strip() if next_line: return next_line # Fallback: look for the last non-empty line that might be an answer for line in reversed(lines): line_stripped = line.strip() if line_stripped and not line_stripped.startswith(('=', '-', 'Time:', 'Duration:')): # Avoid log formatting lines if len(line_stripped) < 200: # Reasonable answer length return line_stripped return None def validate_answer(self, question_id: str, generated_answer: Optional[str]) -> Dict: """Validate the generated answer against expected answer.""" if question_id not in self.validation_metadata: return { "validation_status": "no_metadata", "message": "No validation metadata available" } metadata = self.validation_metadata[question_id] expected_answer = metadata.get('Final answer') if not generated_answer: return { "validation_status": "no_answer", "expected_answer": expected_answer, "message": "No answer generated" } # Simple string comparison (case-insensitive) generated_clean = str(generated_answer).strip().lower() expected_clean = str(expected_answer).strip().lower() if generated_clean == expected_clean: status = "correct" elif generated_clean in expected_clean or expected_clean in generated_clean: status = "partial" else: status = "incorrect" return { "validation_status": status, "generated_answer": generated_answer, "expected_answer": expected_answer, "match_details": { "exact_match": (generated_clean == expected_clean), "partial_match": (generated_clean in expected_clean or expected_clean in generated_clean) } } async def process_question(self, question: Dict) -> Dict: """ Process a single question through the complete pipeline. This is the clean version without hardcoded overrides for honest accuracy. """ question_id = question.get('task_id', 'unknown') start_time = time.time() self.logger.info(f"Processing question {question_id}") try: # Step 1: Classify the question classification = await self.classify_question(question) # Step 2: Execute the solver (clean version) solver_result = await self.execute_question_solver(question_id) # Step 3: Validate the answer validation = self.validate_answer( question_id, solver_result.get('answer') ) total_time = time.time() - start_time # Compile complete result result = { "question_id": question_id, "question_text": question.get('Question', '')[:200] + "..." if len(question.get('Question', '')) > 200 else question.get('Question', ''), "classification": classification, "solver_result": solver_result, "validation": validation, "total_processing_time": total_time, "timestamp": datetime.now().isoformat() } self.logger.info(f"Completed question {question_id} in {total_time:.2f}s - Status: {validation.get('validation_status', 'unknown')}") return result except Exception as e: total_time = time.time() - start_time self.logger.error(f"Failed to process question {question_id}: {e}") return { "question_id": question_id, "status": "error", "error": str(e), "total_processing_time": total_time, "timestamp": datetime.now().isoformat(), "traceback": traceback.format_exc() }