Final_Assignment / gaia /core /question_processor.py
tonthatthienvu's picture
feat: major refactoring - transform monolithic architecture into modular system
ba68fc1
#!/usr/bin/env python3
"""
Question processing and agent coordination for GAIA solver.
Handles question classification, file management, and agent execution.
"""
import re
import time
from typing import Dict, Any, List, Optional
from ..config.settings import Config
from ..models.manager import ModelManager
from ..utils.exceptions import GAIAError, ClassificationError
class QuestionProcessor:
"""Processes questions and coordinates agent execution."""
def __init__(self, model_manager: ModelManager, config: Config):
self.model_manager = model_manager
self.config = config
self.question_loader = None
self.classifier = None
# Initialize components lazily
self._init_components()
# Prompt templates (simplified version)
self.prompt_templates = self._get_prompt_templates()
def _init_components(self) -> None:
"""Initialize question loader and classifier."""
try:
# Import and initialize question loader
from ..utils.question_loader import GAIAQuestionLoader
self.question_loader = GAIAQuestionLoader()
# Import and initialize classifier
from ..utils.classifier import QuestionClassifier
self.classifier = QuestionClassifier(self.model_manager)
except ImportError:
# Fallback to legacy imports if new modules not ready
print("⚠️ Using legacy question processing components")
self._init_legacy_components()
def _init_legacy_components(self) -> None:
"""Initialize legacy components as fallback."""
try:
import sys
import os
# Add parent directory to path for legacy imports
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from gaia_web_loader import GAIAQuestionLoaderWeb
from question_classifier import QuestionClassifier as LegacyClassifier
self.question_loader = GAIAQuestionLoaderWeb()
self.classifier = LegacyClassifier()
except ImportError as e:
print(f"⚠️ Could not initialize question processing components: {e}")
# Create minimal fallback
self.question_loader = None
self.classifier = None
def _get_prompt_templates(self) -> Dict[str, str]:
"""Get simplified prompt templates."""
return {
"multimedia": """You are solving a GAIA benchmark multimedia question.
TASK: {question_text}
APPROACH:
1. Use appropriate multimedia analysis tools
2. For YouTube videos, ALWAYS use analyze_youtube_video tool
3. Extract exact information requested
4. Provide precise final answer
Focus on accuracy and use tool outputs directly.""",
"research": """You are solving a GAIA benchmark research question.
TASK: {question_text}
APPROACH:
1. Use research_with_comprehensive_fallback for robust search
2. Try multiple research methods if needed
3. Use tool outputs directly - do not fabricate information
4. Provide factual, verified answer
Trust validated research data over internal knowledge.""",
"logic_math": """You are solving a GAIA benchmark logic/math question.
TASK: {question_text}
APPROACH:
1. Break down the problem step-by-step
2. Use advanced_calculator for calculations
3. Show your work clearly
4. Verify your final answer
Focus on mathematical precision.""",
"file_processing": """You are solving a GAIA benchmark file processing question.
TASK: {question_text}
APPROACH:
1. Use appropriate file analysis tools
2. Extract the specific data requested
3. Process and calculate as needed
4. Use tool results directly
Trust file processing tool outputs.""",
"chess": """You are solving a GAIA benchmark chess question.
TASK: {question_text}
APPROACH:
1. Use analyze_chess_multi_tool for comprehensive analysis
2. Take the EXACT move returned by the tool
3. Do not modify or interpret the result
4. Use tool result directly as final answer
Trust the chess analysis tool completely.""",
"general": """You are solving a GAIA benchmark question.
TASK: {question_text}
APPROACH:
1. Analyze the question carefully
2. Choose appropriate tools
3. Work systematically
4. Provide clear, direct answer
Focus on answering exactly what is asked."""
}
def process_question(self, question_data: Dict[str, Any]) -> str:
"""Process a question and return the raw response."""
question_text = question_data.get("question", "")
task_id = question_data.get("task_id", "unknown")
# Handle file downloads if needed
enhanced_question = self._handle_file_processing(question_data)
# Classify the question
classification = self._classify_question(enhanced_question, question_data)
# Get appropriate prompt
prompt = self._get_enhanced_prompt(enhanced_question, classification)
# Execute with agent
response = self._execute_with_agent(prompt)
return response
def _handle_file_processing(self, question_data: Dict[str, Any]) -> str:
"""Handle file downloads and enhance question text."""
question_text = question_data.get("question", "")
has_file = bool(question_data.get("file_name", ""))
if has_file and self.question_loader:
file_name = question_data.get('file_name')
task_id = question_data.get('task_id', 'unknown')
print(f"πŸ“Ž Note: This question has an associated file: {file_name}")
try:
# Download the file
print(f"⬇️ Downloading file: {file_name}")
downloaded_path = self.question_loader.download_file(task_id)
if downloaded_path:
print(f"βœ… File downloaded to: {downloaded_path}")
question_text += f"\n\n[Note: This question references a file: {downloaded_path}]"
else:
print(f"⚠️ Failed to download file: {file_name}")
question_text += f"\n\n[Note: This question references a file: {file_name} - download failed]"
except Exception as e:
print(f"⚠️ Error downloading file: {e}")
question_text += f"\n\n[Note: This question references a file: {file_name} - download error]"
return question_text
def _classify_question(self, question_text: str, question_data: Dict[str, Any]) -> Dict[str, Any]:
"""Classify the question to determine agent type."""
try:
if self.classifier:
file_name = question_data.get('file_name', '')
classification = self.classifier.classify_question(question_text, file_name)
else:
# Fallback classification
classification = self._fallback_classification(question_text)
# Special handling for known patterns
classification = self._enhance_classification(question_text, classification)
return classification
except Exception as e:
print(f"⚠️ Classification error: {e}")
# Return general classification as fallback
return {
'primary_agent': 'general',
'complexity': 3,
'tools_needed': [],
'confidence': 0.5
}
def _fallback_classification(self, question_text: str) -> Dict[str, Any]:
"""Simple fallback classification logic."""
question_lower = question_text.lower()
# YouTube detection
youtube_pattern = r'(https?://)?(www\.)?(youtube\.com|youtu\.?be)'
if re.search(youtube_pattern, question_text):
return {
'primary_agent': 'multimedia',
'complexity': 3,
'tools_needed': ['analyze_youtube_video'],
'confidence': 0.9
}
# Chess detection
chess_keywords = ['chess', 'position', 'move', 'algebraic notation']
if any(keyword in question_lower for keyword in chess_keywords):
return {
'primary_agent': 'chess',
'complexity': 4,
'tools_needed': ['analyze_chess_multi_tool'],
'confidence': 0.9
}
# File processing detection
file_extensions = ['.xlsx', '.xls', '.py', '.txt', '.pdf']
if any(ext in question_lower for ext in file_extensions):
return {
'primary_agent': 'file_processing',
'complexity': 3,
'tools_needed': ['analyze_excel_file', 'analyze_python_code'],
'confidence': 0.8
}
# Math detection
math_keywords = ['calculate', 'solve', 'equation', 'formula', 'math']
if any(keyword in question_lower for keyword in math_keywords):
return {
'primary_agent': 'logic_math',
'complexity': 3,
'tools_needed': ['advanced_calculator'],
'confidence': 0.7
}
# Research fallback
return {
'primary_agent': 'research',
'complexity': 3,
'tools_needed': ['research_with_comprehensive_fallback'],
'confidence': 0.6
}
def _enhance_classification(self, question_text: str, classification: Dict[str, Any]) -> Dict[str, Any]:
"""Enhance classification with special handling."""
question_lower = question_text.lower()
# Force YouTube classification
youtube_url_pattern = r'(https?://)?(www\.)?(youtube\.com|youtu\.?be)/(?:watch\?v=|embed/|v/|shorts/|playlist\?list=|channel/|user/|[^/\s]+/?)?([^\s&?/]+)'
if re.search(youtube_url_pattern, question_text):
classification['primary_agent'] = 'multimedia'
if 'analyze_youtube_video' not in classification.get('tools_needed', []):
classification['tools_needed'] = ['analyze_youtube_video'] + classification.get('tools_needed', [])
print("πŸŽ₯ YouTube URL detected - forcing multimedia classification")
# Force chess classification
chess_keywords = ['chess', 'position', 'move', 'algebraic notation', 'black to move', 'white to move']
if any(keyword in question_lower for keyword in chess_keywords):
classification['primary_agent'] = 'chess'
print("β™ŸοΈ Chess question detected - using specialized chess analysis")
return classification
def _get_enhanced_prompt(self, question_text: str, classification: Dict[str, Any]) -> str:
"""Get enhanced prompt based on classification."""
question_type = classification.get('primary_agent', 'general')
print(f"🎯 Question type: {question_type}")
print(f"πŸ“Š Complexity: {classification.get('complexity', 'unknown')}/5")
print(f"πŸ”§ Tools needed: {classification.get('tools_needed', [])}")
# Get appropriate template
if question_type in self.prompt_templates:
template = self.prompt_templates[question_type]
else:
template = self.prompt_templates["general"]
enhanced_prompt = template.format(question_text=question_text)
print(f"πŸ“‹ Using {question_type} prompt template")
return enhanced_prompt
def _execute_with_agent(self, prompt: str) -> str:
"""Execute prompt with smolagents agent."""
try:
# Get current model
model = self.model_manager.get_current_model()
# Create fresh agent for memory management
from smolagents import CodeAgent
# Import tools
tools = self._get_tools()
print("🧠 Creating fresh agent to avoid memory accumulation...")
agent = CodeAgent(
model=model,
tools=tools,
max_steps=self.config.model.MAX_STEPS,
verbosity_level=self.config.model.VERBOSITY_LEVEL
)
# Execute the prompt
response = agent.run(prompt)
raw_answer = str(response)
print(f"βœ… Generated raw answer: {raw_answer[:100]}...")
return raw_answer
except Exception as e:
# Try fallback model if available
if self.model_manager._switch_to_fallback():
print("πŸ”„ Retrying with fallback model...")
return self._execute_with_agent(prompt)
else:
raise GAIAError(f"Agent execution failed: {e}")
def _get_tools(self) -> List:
"""Get available tools for the agent."""
try:
# Import tools from the old system for now
import sys
import os
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__)))
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
from gaia_tools import GAIA_TOOLS
return GAIA_TOOLS
except ImportError:
print("⚠️ Could not import GAIA_TOOLS, using empty tool list")
return []
def get_random_question(self) -> Optional[Dict[str, Any]]:
"""Get a random question."""
if self.question_loader:
return self.question_loader.get_random_question()
return None
def get_questions(self, max_questions: int = 5) -> List[Dict[str, Any]]:
"""Get multiple questions."""
if self.question_loader and hasattr(self.question_loader, 'questions'):
return self.question_loader.questions[:max_questions]
return []