Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Asynchronous Complete GAIA Test System | |
Main orchestrator for concurrent testing of all GAIA questions with honest accuracy measurement. | |
""" | |
import asyncio | |
import json | |
import logging | |
import time | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Optional, Tuple | |
import sys | |
import os | |
# Add the project root to the Python path | |
sys.path.insert(0, str(Path(__file__).parent)) | |
from async_question_processor import AsyncQuestionProcessor | |
from classification_analyzer import ClassificationAnalyzer | |
from summary_report_generator import SummaryReportGenerator | |
class AsyncGAIATestSystem: | |
"""Main orchestrator for asynchronous GAIA testing with honest accuracy measurement.""" | |
def __init__(self, | |
max_concurrent: int = 3, | |
timeout_seconds: int = 900, | |
output_dir: str = "async_test_results"): | |
""" | |
Initialize the async test system. | |
Args: | |
max_concurrent: Maximum number of concurrent question processors | |
timeout_seconds: Timeout per question (15 minutes default) | |
output_dir: Directory for test results and logs | |
""" | |
self.max_concurrent = max_concurrent | |
self.timeout_seconds = timeout_seconds | |
self.output_dir = Path(output_dir) | |
self.output_dir.mkdir(exist_ok=True) | |
# Create timestamped session directory | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
self.session_dir = self.output_dir / f"session_{timestamp}" | |
self.session_dir.mkdir(exist_ok=True) | |
# Initialize components | |
self.processor = AsyncQuestionProcessor( | |
session_dir=self.session_dir, | |
timeout_seconds=self.timeout_seconds | |
) | |
self.analyzer = ClassificationAnalyzer() | |
self.reporter = SummaryReportGenerator() | |
# Setup logging | |
self.setup_logging() | |
# Test results tracking | |
self.results: Dict[str, Dict] = {} | |
self.start_time: Optional[float] = None | |
self.end_time: Optional[float] = None | |
def setup_logging(self): | |
"""Setup comprehensive logging for the test session.""" | |
log_file = self.session_dir / "async_test_system.log" | |
# Configure logger | |
self.logger = logging.getLogger("AsyncGAIATest") | |
self.logger.setLevel(logging.INFO) | |
# File handler | |
file_handler = logging.FileHandler(log_file) | |
file_handler.setLevel(logging.INFO) | |
# Console handler | |
console_handler = logging.StreamHandler() | |
console_handler.setLevel(logging.INFO) | |
# Formatter | |
formatter = logging.Formatter( | |
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
file_handler.setFormatter(formatter) | |
console_handler.setFormatter(formatter) | |
# Add handlers | |
self.logger.addHandler(file_handler) | |
self.logger.addHandler(console_handler) | |
async def load_questions(self) -> List[Dict]: | |
"""Load GAIA questions from the standard source.""" | |
questions_file = Path("gaia_questions_list.txt") | |
if not questions_file.exists(): | |
self.logger.error(f"Questions file not found: {questions_file}") | |
return [] | |
questions = [] | |
try: | |
with open(questions_file, 'r') as f: | |
for line in f: | |
line = line.strip() | |
if line and line.startswith('{'): | |
try: | |
question = json.loads(line) | |
questions.append(question) | |
except json.JSONDecodeError as e: | |
self.logger.warning(f"Failed to parse question line: {line[:50]}... - {e}") | |
self.logger.info(f"Loaded {len(questions)} questions for testing") | |
return questions | |
except Exception as e: | |
self.logger.error(f"Failed to load questions: {e}") | |
return [] | |
async def process_question_batch(self, questions: List[Dict]) -> Dict[str, Dict]: | |
"""Process a batch of questions concurrently.""" | |
# Create semaphore to limit concurrent processing | |
semaphore = asyncio.Semaphore(self.max_concurrent) | |
async def process_single_question(question: Dict) -> Tuple[str, Dict]: | |
"""Process a single question with semaphore control.""" | |
async with semaphore: | |
question_id = question.get('task_id', 'unknown') | |
self.logger.info(f"Starting processing for question {question_id}") | |
try: | |
result = await self.processor.process_question(question) | |
self.logger.info(f"Completed processing for question {question_id}") | |
return question_id, result | |
except Exception as e: | |
self.logger.error(f"Failed to process question {question_id}: {e}") | |
return question_id, { | |
'status': 'error', | |
'error': str(e), | |
'timestamp': datetime.now().isoformat() | |
} | |
# Create tasks for all questions | |
tasks = [process_single_question(q) for q in questions] | |
# Process all questions concurrently | |
self.logger.info(f"Starting concurrent processing of {len(questions)} questions (max_concurrent={self.max_concurrent})") | |
results = await asyncio.gather(*tasks, return_exceptions=True) | |
# Organize results | |
organized_results = {} | |
for result in results: | |
if isinstance(result, Exception): | |
self.logger.error(f"Task failed with exception: {result}") | |
continue | |
question_id, question_result = result | |
organized_results[question_id] = question_result | |
return organized_results | |
async def run_complete_test(self) -> Dict: | |
"""Run the complete asynchronous GAIA test system.""" | |
self.logger.info("=" * 80) | |
self.logger.info("ASYNC GAIA TEST SYSTEM - STARTING COMPLETE TEST") | |
self.logger.info("=" * 80) | |
self.start_time = time.time() | |
try: | |
# Load questions | |
self.logger.info("Loading GAIA questions...") | |
questions = await self.load_questions() | |
if not questions: | |
self.logger.error("No questions loaded. Aborting test.") | |
return {"status": "error", "message": "No questions loaded"} | |
self.logger.info(f"Processing {len(questions)} questions with max_concurrent={self.max_concurrent}") | |
# Process questions concurrently | |
self.results = await self.process_question_batch(questions) | |
self.end_time = time.time() | |
total_duration = self.end_time - self.start_time | |
self.logger.info(f"All questions processed in {total_duration:.2f} seconds") | |
# Generate analysis and reports | |
await self.generate_comprehensive_analysis() | |
# Create session summary | |
session_summary = { | |
"session_id": self.session_dir.name, | |
"start_time": datetime.fromtimestamp(self.start_time).isoformat(), | |
"end_time": datetime.fromtimestamp(self.end_time).isoformat(), | |
"total_duration_seconds": total_duration, | |
"questions_processed": len(self.results), | |
"max_concurrent": self.max_concurrent, | |
"timeout_seconds": self.timeout_seconds, | |
"session_dir": str(self.session_dir), | |
"results": self.results | |
} | |
# Save session summary | |
summary_file = self.session_dir / "session_summary.json" | |
with open(summary_file, 'w') as f: | |
json.dump(session_summary, f, indent=2) | |
self.logger.info(f"Session summary saved to: {summary_file}") | |
return session_summary | |
except Exception as e: | |
self.logger.error(f"Complete test failed: {e}") | |
return {"status": "error", "message": str(e)} | |
async def generate_comprehensive_analysis(self): | |
"""Generate comprehensive analysis and reports.""" | |
self.logger.info("Generating comprehensive analysis...") | |
try: | |
# Classification-based analysis | |
classification_report = await self.analyzer.analyze_by_classification( | |
self.results, self.session_dir | |
) | |
# Master summary report | |
summary_report = await self.reporter.generate_master_report( | |
self.results, self.session_dir, classification_report | |
) | |
self.logger.info("Analysis and reports generated successfully") | |
except Exception as e: | |
self.logger.error(f"Failed to generate analysis: {e}") | |
def main(): | |
"""Main entry point for the async test system.""" | |
import argparse | |
parser = argparse.ArgumentParser(description="Asynchronous GAIA Test System") | |
parser.add_argument('--max-concurrent', type=int, default=3, | |
help='Maximum concurrent question processors (default: 3)') | |
parser.add_argument('--timeout', type=int, default=900, | |
help='Timeout per question in seconds (default: 900)') | |
parser.add_argument('--output-dir', type=str, default='async_test_results', | |
help='Output directory for results (default: async_test_results)') | |
args = parser.parse_args() | |
# Create and run the test system | |
system = AsyncGAIATestSystem( | |
max_concurrent=args.max_concurrent, | |
timeout_seconds=args.timeout, | |
output_dir=args.output_dir | |
) | |
# Run the async test | |
try: | |
result = asyncio.run(system.run_complete_test()) | |
if result.get("status") == "error": | |
print(f"Test failed: {result.get('message')}") | |
sys.exit(1) | |
else: | |
print(f"Test completed successfully!") | |
print(f"Results saved to: {system.session_dir}") | |
except KeyboardInterrupt: | |
print("\nTest interrupted by user") | |
sys.exit(1) | |
except Exception as e: | |
print(f"Test failed with exception: {e}") | |
sys.exit(1) | |
if __name__ == "__main__": | |
main() |