Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Level-Specific GAIA Testing with Real-Time Accuracy Tracking | |
Focus on achieving 30% Level 1 accuracy through strategic testing and breakthrough leveraging. | |
""" | |
import json | |
import time | |
import argparse | |
import logging | |
import sys | |
from datetime import datetime | |
from typing import Dict, List, Optional | |
from collections import defaultdict | |
from pathlib import Path | |
# Add parent directory to path for imports | |
sys.path.append(str(Path(__file__).parent.parent)) | |
from gaia_web_loader import GAIAQuestionLoaderWeb | |
from main import GAIASolver | |
from question_classifier import QuestionClassifier | |
class LevelSpecificGAIATester: | |
"""Enhanced GAIA testing with level-specific focus and real-time accuracy tracking""" | |
def __init__(self, target_level: str = "1", target_accuracy: float = 0.30): | |
self.target_level = target_level | |
self.target_accuracy = target_accuracy | |
self.loader = GAIAQuestionLoaderWeb() | |
self.classifier = QuestionClassifier() | |
self.solver = GAIASolver(use_kluster=True, kluster_model="qwen3-235b") | |
self.results = [] | |
self.breakthrough_categories = ['chess', 'wikipedia', 'video', 'excel', 'research'] | |
# Create logs directory if it doesn't exist | |
Path("logs").mkdir(exist_ok=True) | |
# Setup logging | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
self.log_file = f"logs/level{target_level}_test_{timestamp}.log" | |
logging.basicConfig( | |
level=logging.INFO, | |
format='%(asctime)s - %(levelname)s - %(message)s', | |
handlers=[ | |
logging.FileHandler(self.log_file), | |
logging.StreamHandler() | |
] | |
) | |
self.logger = logging.getLogger(__name__) | |
# Load validation metadata for accuracy tracking | |
self.validation_data = self.load_validation_metadata() | |
def load_validation_metadata(self): | |
"""Load GAIA validation metadata for answer checking""" | |
try: | |
validation_data = {} | |
with open('gaia_validation_metadata.jsonl', 'r') as f: | |
for line in f: | |
if line.strip(): | |
entry = json.loads(line) | |
validation_data[entry['task_id']] = entry | |
self.logger.info(f"π Loaded {len(validation_data)} validation entries") | |
return validation_data | |
except Exception as e: | |
self.logger.error(f"Failed to load validation metadata: {e}") | |
return {} | |
def get_questions_by_level(self, level: str) -> List[Dict]: | |
"""Get all questions for a specific level""" | |
level_questions = [] | |
for question in self.loader.questions: | |
# Check validation metadata for level information | |
task_id = question.get('task_id') | |
if task_id in self.validation_data: | |
question_level = str(self.validation_data[task_id].get('Level', '')) | |
if question_level == level: | |
level_questions.append(question) | |
self.logger.info(f"π― Found {len(level_questions)} Level {level} questions") | |
return level_questions | |
def classify_question_type(self, question: Dict) -> str: | |
"""Classify question to identify breakthrough opportunities""" | |
question_text = question.get('question', '').lower() | |
# Check for breakthrough categories | |
if any(keyword in question_text for keyword in ['chess', 'move', 'position', 'algebraic']): | |
return 'chess' | |
elif any(keyword in question_text for keyword in ['wikipedia', 'featured article', 'nominated']): | |
return 'wikipedia' | |
elif any(keyword in question_text for keyword in ['video', 'youtube', 'audio', 'dialogue']): | |
return 'video' | |
elif any(keyword in question_text for keyword in ['excel', 'spreadsheet', 'sales', 'total']): | |
return 'excel' | |
elif any(keyword in question_text for keyword in ['research', 'find', 'search', 'who', 'what', 'when']): | |
return 'research' | |
else: | |
return 'general' | |
def calculate_real_time_accuracy(self) -> Dict: | |
"""Calculate real-time accuracy metrics for Level 1 progress""" | |
if not self.results: | |
return { | |
'total_tested': 0, | |
'correct_answers': 0, | |
'current_accuracy': 0.0, | |
'target_needed': int(53 * self.target_accuracy), # 16 for 30% | |
'remaining_to_target': int(53 * self.target_accuracy), | |
'on_target': False | |
} | |
level_results = [r for r in self.results if r.get('level') == self.target_level] | |
correct_count = len([r for r in level_results if r.get('validation_status') == 'CORRECT']) | |
total_tested = len(level_results) | |
current_accuracy = correct_count / total_tested if total_tested > 0 else 0.0 | |
target_needed = int(53 * self.target_accuracy) # 16 for 30% | |
remaining_to_target = max(0, target_needed - correct_count) | |
on_target = current_accuracy >= self.target_accuracy | |
return { | |
'total_tested': total_tested, | |
'correct_answers': correct_count, | |
'current_accuracy': current_accuracy, | |
'target_needed': target_needed, | |
'remaining_to_target': remaining_to_target, | |
'on_target': on_target | |
} | |
def validate_answer(self, task_id: str, our_answer: str) -> str: | |
"""Validate answer against GAIA metadata""" | |
if task_id not in self.validation_data: | |
return 'UNKNOWN' | |
expected_answer = self.validation_data[task_id].get('Final answer', '').strip() | |
our_answer = str(our_answer).strip() | |
# Normalize for comparison | |
def normalize(text): | |
return str(text).lower().strip().replace(',', ', ').replace(' ', ' ') | |
expected_normalized = normalize(expected_answer) | |
our_normalized = normalize(our_answer) | |
if expected_normalized == our_normalized: | |
return 'CORRECT' | |
elif expected_normalized in our_normalized or our_normalized in expected_normalized: | |
return 'PARTIAL' | |
else: | |
return 'INCORRECT' | |
def test_question(self, question: Dict) -> Dict: | |
"""Test a single question with enhanced validation""" | |
task_id = question.get('task_id', 'unknown') | |
question_text = question.get('question', '') | |
question_type = self.classify_question_type(question) | |
# Get level from validation metadata | |
level = str(self.validation_data.get(task_id, {}).get('Level', 'unknown')) | |
self.logger.info(f"\nπ§ͺ Testing {task_id} (Level {level}, Type: {question_type})") | |
self.logger.info(f"π Question: {question_text[:100]}...") | |
start_time = time.time() | |
try: | |
# Use extended timeout for complex questions | |
timeout = 1800 if question_type in self.breakthrough_categories else 900 | |
answer = self.solver.solve_question(question) | |
solve_time = time.time() - start_time | |
# Validate answer | |
validation_status = self.validate_answer(task_id, answer) | |
expected_answer = self.validation_data.get(task_id, {}).get('Final answer', 'Unknown') | |
result = { | |
'task_id': task_id, | |
'level': level, | |
'question_type': question_type, | |
'question': question_text[:200] + "...", | |
'our_answer': answer, | |
'expected_answer': expected_answer, | |
'validation_status': validation_status, | |
'solve_time': solve_time, | |
'breakthrough_category': question_type in self.breakthrough_categories, | |
'timestamp': datetime.now().isoformat() | |
} | |
self.results.append(result) | |
# Log result with status emoji | |
status_emoji = "β " if validation_status == "CORRECT" else "β" if validation_status == "INCORRECT" else "πΆ" | |
self.logger.info(f"{status_emoji} Result: {validation_status}") | |
self.logger.info(f"π‘ Our Answer: {answer}") | |
self.logger.info(f"π― Expected: {expected_answer}") | |
self.logger.info(f"β±οΈ Time: {solve_time:.1f}s") | |
# Calculate and display real-time progress | |
progress = self.calculate_real_time_accuracy() | |
self.logger.info(f"π Level {self.target_level} Progress: {progress['correct_answers']}/{progress['target_needed']} target ({progress['current_accuracy']:.1%})") | |
if progress['on_target']: | |
self.logger.info(f"π TARGET ACHIEVED! {progress['current_accuracy']:.1%} >= {self.target_accuracy:.1%}") | |
return result | |
except Exception as e: | |
error_result = { | |
'task_id': task_id, | |
'level': level, | |
'question_type': question_type, | |
'question': question_text[:200] + "...", | |
'our_answer': f"ERROR: {str(e)}", | |
'expected_answer': self.validation_data.get(task_id, {}).get('Final answer', 'Unknown'), | |
'validation_status': 'ERROR', | |
'solve_time': time.time() - start_time, | |
'breakthrough_category': False, | |
'timestamp': datetime.now().isoformat() | |
} | |
self.results.append(error_result) | |
self.logger.error(f"β Error testing {task_id}: {e}") | |
return error_result | |
def run_level_campaign(self, level: str = None, max_questions: int = None) -> Dict: | |
"""Run strategic testing campaign for specific level""" | |
if level is None: | |
level = self.target_level | |
level_questions = self.get_questions_by_level(level) | |
if max_questions: | |
level_questions = level_questions[:max_questions] | |
self.logger.info(f"\nπ Starting Level {level} Campaign") | |
self.logger.info(f"π― Target: {self.target_accuracy:.1%} accuracy ({int(len(level_questions) * self.target_accuracy)} correct)") | |
self.logger.info(f"π Questions to test: {len(level_questions)}") | |
# Prioritize breakthrough categories | |
breakthrough_questions = [q for q in level_questions if self.classify_question_type(q) in self.breakthrough_categories] | |
other_questions = [q for q in level_questions if self.classify_question_type(q) not in self.breakthrough_categories] | |
self.logger.info(f"π Breakthrough questions: {len(breakthrough_questions)}") | |
self.logger.info(f"π Other questions: {len(other_questions)}") | |
# Test breakthrough questions first | |
all_questions = breakthrough_questions + other_questions | |
for i, question in enumerate(all_questions, 1): | |
self.logger.info(f"\n--- Question {i}/{len(all_questions)} ---") | |
self.test_question(question) | |
# Check if target achieved early | |
progress = self.calculate_real_time_accuracy() | |
if progress['on_target'] and progress['total_tested'] >= 10: # Minimum 10 questions for statistical validity | |
self.logger.info(f"π EARLY TARGET ACHIEVEMENT! {progress['current_accuracy']:.1%} >= {self.target_accuracy:.1%}") | |
break | |
return self.generate_final_report() | |
def generate_final_report(self) -> Dict: | |
"""Generate comprehensive test report""" | |
progress = self.calculate_real_time_accuracy() | |
# Category breakdown | |
category_stats = defaultdict(lambda: {'total': 0, 'correct': 0}) | |
for result in self.results: | |
if result.get('level') == self.target_level: | |
category = result.get('question_type', 'unknown') | |
category_stats[category]['total'] += 1 | |
if result.get('validation_status') == 'CORRECT': | |
category_stats[category]['correct'] += 1 | |
# Calculate category accuracy rates | |
for category in category_stats: | |
total = category_stats[category]['total'] | |
category_stats[category]['accuracy'] = category_stats[category]['correct'] / total if total > 0 else 0 | |
report = { | |
'campaign_summary': { | |
'target_level': self.target_level, | |
'target_accuracy': self.target_accuracy, | |
'achievement_status': 'ACHIEVED' if progress['on_target'] else 'IN_PROGRESS', | |
'final_accuracy': progress['current_accuracy'], | |
'correct_answers': progress['correct_answers'], | |
'total_tested': progress['total_tested'], | |
'target_needed': progress['target_needed'] | |
}, | |
'category_breakdown': dict(category_stats), | |
'breakthrough_performance': { | |
category: stats for category, stats in category_stats.items() | |
if category in self.breakthrough_categories | |
}, | |
'detailed_results': self.results, | |
'timestamp': datetime.now().isoformat(), | |
'log_file': self.log_file | |
} | |
# Save report | |
report_file = f"level{self.target_level}_campaign_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" | |
with open(report_file, 'w') as f: | |
json.dump(report, f, indent=2) | |
self.logger.info(f"\nπ FINAL CAMPAIGN REPORT") | |
self.logger.info(f"π― Target: {self.target_accuracy:.1%} Level {self.target_level} accuracy") | |
self.logger.info(f"π Achievement: {progress['current_accuracy']:.1%} ({progress['correct_answers']}/{progress['total_tested']})") | |
self.logger.info(f"π Status: {'β TARGET ACHIEVED' if progress['on_target'] else 'π IN PROGRESS'}") | |
self.logger.info(f"πΎ Report saved: {report_file}") | |
return report | |
def main(): | |
"""Main function for level-specific GAIA testing""" | |
parser = argparse.ArgumentParser(description='Level-Specific GAIA Testing') | |
parser.add_argument('--level', type=str, default='1', help='Target level to test (1, 2, 3)') | |
parser.add_argument('--target-accuracy', type=float, default=0.30, help='Target accuracy (0.30 = 30%)') | |
parser.add_argument('--max-questions', type=int, help='Maximum questions to test') | |
args = parser.parse_args() | |
print(f"π Level-Specific GAIA Testing Campaign") | |
print(f"π― Level: {args.level}") | |
print(f"π Target Accuracy: {args.target_accuracy:.1%}") | |
print("=" * 60) | |
tester = LevelSpecificGAIATester( | |
target_level=args.level, | |
target_accuracy=args.target_accuracy | |
) | |
try: | |
report = tester.run_level_campaign(level=args.level, max_questions=args.max_questions) | |
# Print summary | |
summary = report['campaign_summary'] | |
print(f"\nπ CAMPAIGN COMPLETE!") | |
print(f"π― Target: {summary['target_accuracy']:.1%}") | |
print(f"π Achieved: {summary['final_accuracy']:.1%}") | |
print(f"π Status: {summary['achievement_status']}") | |
print(f"π― Score: {summary['correct_answers']}/{summary['total_tested']}") | |
except Exception as e: | |
print(f"β Campaign failed: {e}") | |
return 1 | |
return 0 | |
if __name__ == "__main__": | |
exit(main()) |