Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Asynchronous Question Processor | |
Clean question handler that removes hardcoded overrides for honest accuracy measurement. | |
""" | |
import asyncio | |
import json | |
import logging | |
import time | |
import traceback | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Optional, Any | |
import subprocess | |
import sys | |
import os | |
# Add the project root to the Python path | |
sys.path.insert(0, str(Path(__file__).parent)) | |
from gaia_web_loader import GAIAQuestionLoaderWeb | |
from question_classifier import QuestionClassifier | |
class AsyncQuestionProcessor: | |
"""Asynchronous processor for individual GAIA questions with clean execution.""" | |
def __init__(self, | |
session_dir: Path, | |
timeout_seconds: int = 900, | |
model: str = "qwen3-235b"): | |
""" | |
Initialize the async question processor. | |
Args: | |
session_dir: Directory for this test session | |
timeout_seconds: Timeout per question processing | |
model: Model to use for question solving | |
""" | |
self.session_dir = session_dir | |
self.timeout_seconds = timeout_seconds | |
self.model = model | |
# Create individual logs directory | |
self.logs_dir = session_dir / "individual_logs" | |
self.logs_dir.mkdir(exist_ok=True) | |
# Setup logging | |
self.setup_logging() | |
# Initialize components | |
self.loader = GAIAQuestionLoaderWeb() | |
self.classifier = QuestionClassifier() | |
# Load validation metadata for accuracy checking | |
self.validation_metadata = self.load_validation_metadata() | |
def setup_logging(self): | |
"""Setup logging for the question processor.""" | |
log_file = self.session_dir / "question_processor.log" | |
self.logger = logging.getLogger("AsyncQuestionProcessor") | |
self.logger.setLevel(logging.INFO) | |
# File handler | |
file_handler = logging.FileHandler(log_file) | |
file_handler.setLevel(logging.INFO) | |
# Formatter | |
formatter = logging.Formatter( | |
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
file_handler.setFormatter(formatter) | |
self.logger.addHandler(file_handler) | |
def load_validation_metadata(self) -> Dict[str, Any]: | |
"""Load validation metadata for answer checking.""" | |
metadata_file = Path("gaia_validation_metadata.jsonl") | |
metadata = {} | |
if not metadata_file.exists(): | |
self.logger.warning(f"Validation metadata file not found: {metadata_file}") | |
return metadata | |
try: | |
with open(metadata_file, 'r') as f: | |
for line in f: | |
line = line.strip() | |
if line: | |
try: | |
data = json.loads(line) | |
task_id = data.get('task_id') | |
if task_id: | |
metadata[task_id] = data | |
except json.JSONDecodeError: | |
continue | |
self.logger.info(f"Loaded validation metadata for {len(metadata)} questions") | |
except Exception as e: | |
self.logger.error(f"Failed to load validation metadata: {e}") | |
return metadata | |
async def classify_question(self, question: Dict) -> Dict: | |
"""Classify the question using the classification system.""" | |
try: | |
classification = await asyncio.to_thread( | |
self.classifier.classify_question, question | |
) | |
return classification | |
except Exception as e: | |
self.logger.error(f"Classification failed: {e}") | |
return { | |
"primary_agent": "general", | |
"secondary_agent": None, | |
"complexity": 3, | |
"confidence": 0.0, | |
"tools_needed": [], | |
"error": str(e) | |
} | |
async def execute_question_solver(self, question_id: str) -> Dict: | |
""" | |
Execute the main question solver without hardcoded overrides. | |
This is the clean version that provides honest accuracy measurement. | |
""" | |
start_time = time.time() | |
# Create individual log file for this question | |
individual_log = self.logs_dir / f"question_{question_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" | |
try: | |
# Build command for question solver | |
cmd = [ | |
sys.executable, | |
"tests/test_specific_question.py", | |
question_id, | |
self.model | |
] | |
self.logger.info(f"Executing solver for {question_id}: {' '.join(cmd)}") | |
# Execute with timeout | |
process = await asyncio.create_subprocess_exec( | |
*cmd, | |
stdout=asyncio.subprocess.PIPE, | |
stderr=asyncio.subprocess.STDOUT, | |
cwd=Path.cwd() | |
) | |
try: | |
stdout, _ = await asyncio.wait_for( | |
process.communicate(), | |
timeout=self.timeout_seconds | |
) | |
# Write output to individual log | |
with open(individual_log, 'w') as f: | |
f.write(f"Command: {' '.join(cmd)}\n") | |
f.write(f"Start time: {datetime.fromtimestamp(start_time).isoformat()}\n") | |
f.write(f"Question ID: {question_id}\n") | |
f.write("=" * 80 + "\n") | |
f.write(stdout.decode('utf-8', errors='replace')) | |
execution_time = time.time() - start_time | |
# Parse the output for answer extraction | |
output_text = stdout.decode('utf-8', errors='replace') | |
answer = self.extract_answer_from_output(output_text) | |
return { | |
"status": "completed", | |
"execution_time": execution_time, | |
"return_code": process.returncode, | |
"answer": answer, | |
"log_file": str(individual_log), | |
"timestamp": datetime.now().isoformat() | |
} | |
except asyncio.TimeoutError: | |
# Kill the process on timeout | |
process.kill() | |
await process.wait() | |
execution_time = time.time() - start_time | |
# Write timeout info to log | |
with open(individual_log, 'w') as f: | |
f.write(f"Command: {' '.join(cmd)}\n") | |
f.write(f"Start time: {datetime.fromtimestamp(start_time).isoformat()}\n") | |
f.write(f"Question ID: {question_id}\n") | |
f.write(f"STATUS: TIMEOUT after {self.timeout_seconds} seconds\n") | |
f.write("=" * 80 + "\n") | |
return { | |
"status": "timeout", | |
"execution_time": execution_time, | |
"timeout_seconds": self.timeout_seconds, | |
"log_file": str(individual_log), | |
"timestamp": datetime.now().isoformat() | |
} | |
except Exception as e: | |
execution_time = time.time() - start_time | |
# Write error info to log | |
with open(individual_log, 'w') as f: | |
f.write(f"Command: {' '.join(cmd)}\n") | |
f.write(f"Start time: {datetime.fromtimestamp(start_time).isoformat()}\n") | |
f.write(f"Question ID: {question_id}\n") | |
f.write(f"STATUS: ERROR - {str(e)}\n") | |
f.write("=" * 80 + "\n") | |
f.write(traceback.format_exc()) | |
return { | |
"status": "error", | |
"execution_time": execution_time, | |
"error": str(e), | |
"log_file": str(individual_log), | |
"timestamp": datetime.now().isoformat() | |
} | |
def extract_answer_from_output(self, output_text: str) -> Optional[str]: | |
"""Extract the final answer from solver output.""" | |
# Look for common answer patterns | |
patterns = [ | |
"Final Answer:", | |
"FINAL ANSWER:", | |
"Answer:", | |
"ANSWER:", | |
] | |
lines = output_text.split('\n') | |
# Search for answer patterns | |
for i, line in enumerate(lines): | |
line_stripped = line.strip() | |
for pattern in patterns: | |
if pattern in line_stripped: | |
# Try to extract answer from same line | |
answer_part = line_stripped.split(pattern, 1) | |
if len(answer_part) > 1: | |
answer = answer_part[1].strip() | |
if answer: | |
return answer | |
# Try next line if current line doesn't have answer | |
if i + 1 < len(lines): | |
next_line = lines[i + 1].strip() | |
if next_line: | |
return next_line | |
# Fallback: look for the last non-empty line that might be an answer | |
for line in reversed(lines): | |
line_stripped = line.strip() | |
if line_stripped and not line_stripped.startswith(('=', '-', 'Time:', 'Duration:')): | |
# Avoid log formatting lines | |
if len(line_stripped) < 200: # Reasonable answer length | |
return line_stripped | |
return None | |
def validate_answer(self, question_id: str, generated_answer: Optional[str]) -> Dict: | |
"""Validate the generated answer against expected answer.""" | |
if question_id not in self.validation_metadata: | |
return { | |
"validation_status": "no_metadata", | |
"message": "No validation metadata available" | |
} | |
metadata = self.validation_metadata[question_id] | |
expected_answer = metadata.get('Final answer') | |
if not generated_answer: | |
return { | |
"validation_status": "no_answer", | |
"expected_answer": expected_answer, | |
"message": "No answer generated" | |
} | |
# Simple string comparison (case-insensitive) | |
generated_clean = str(generated_answer).strip().lower() | |
expected_clean = str(expected_answer).strip().lower() | |
if generated_clean == expected_clean: | |
status = "correct" | |
elif generated_clean in expected_clean or expected_clean in generated_clean: | |
status = "partial" | |
else: | |
status = "incorrect" | |
return { | |
"validation_status": status, | |
"generated_answer": generated_answer, | |
"expected_answer": expected_answer, | |
"match_details": { | |
"exact_match": (generated_clean == expected_clean), | |
"partial_match": (generated_clean in expected_clean or expected_clean in generated_clean) | |
} | |
} | |
async def process_question(self, question: Dict) -> Dict: | |
""" | |
Process a single question through the complete pipeline. | |
This is the clean version without hardcoded overrides for honest accuracy. | |
""" | |
question_id = question.get('task_id', 'unknown') | |
start_time = time.time() | |
self.logger.info(f"Processing question {question_id}") | |
try: | |
# Step 1: Classify the question | |
classification = await self.classify_question(question) | |
# Step 2: Execute the solver (clean version) | |
solver_result = await self.execute_question_solver(question_id) | |
# Step 3: Validate the answer | |
validation = self.validate_answer( | |
question_id, | |
solver_result.get('answer') | |
) | |
total_time = time.time() - start_time | |
# Compile complete result | |
result = { | |
"question_id": question_id, | |
"question_text": question.get('Question', '')[:200] + "..." if len(question.get('Question', '')) > 200 else question.get('Question', ''), | |
"classification": classification, | |
"solver_result": solver_result, | |
"validation": validation, | |
"total_processing_time": total_time, | |
"timestamp": datetime.now().isoformat() | |
} | |
self.logger.info(f"Completed question {question_id} in {total_time:.2f}s - Status: {validation.get('validation_status', 'unknown')}") | |
return result | |
except Exception as e: | |
total_time = time.time() - start_time | |
self.logger.error(f"Failed to process question {question_id}: {e}") | |
return { | |
"question_id": question_id, | |
"status": "error", | |
"error": str(e), | |
"total_processing_time": total_time, | |
"timestamp": datetime.now().isoformat(), | |
"traceback": traceback.format_exc() | |
} |