Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Question processing and agent coordination for GAIA solver. | |
Handles question classification, file management, and agent execution. | |
""" | |
import re | |
import time | |
from typing import Dict, Any, List, Optional | |
from ..config.settings import Config | |
from ..models.manager import ModelManager | |
from ..utils.exceptions import GAIAError, ClassificationError | |
class QuestionProcessor: | |
"""Processes questions and coordinates agent execution.""" | |
def __init__(self, model_manager: ModelManager, config: Config): | |
self.model_manager = model_manager | |
self.config = config | |
self.question_loader = None | |
self.classifier = None | |
# Initialize components lazily | |
self._init_components() | |
# Prompt templates (simplified version) | |
self.prompt_templates = self._get_prompt_templates() | |
def _init_components(self) -> None: | |
"""Initialize question loader and classifier.""" | |
try: | |
# Import and initialize question loader | |
from ..utils.question_loader import GAIAQuestionLoader | |
self.question_loader = GAIAQuestionLoader() | |
# Import and initialize classifier | |
from ..utils.classifier import QuestionClassifier | |
self.classifier = QuestionClassifier(self.model_manager) | |
except ImportError: | |
# Fallback to legacy imports if new modules not ready | |
print("β οΈ Using legacy question processing components") | |
self._init_legacy_components() | |
def _init_legacy_components(self) -> None: | |
"""Initialize legacy components as fallback.""" | |
try: | |
import sys | |
import os | |
# Add parent directory to path for legacy imports | |
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) | |
if parent_dir not in sys.path: | |
sys.path.insert(0, parent_dir) | |
from gaia_web_loader import GAIAQuestionLoaderWeb | |
from question_classifier import QuestionClassifier as LegacyClassifier | |
self.question_loader = GAIAQuestionLoaderWeb() | |
self.classifier = LegacyClassifier() | |
except ImportError as e: | |
print(f"β οΈ Could not initialize question processing components: {e}") | |
# Create minimal fallback | |
self.question_loader = None | |
self.classifier = None | |
def _get_prompt_templates(self) -> Dict[str, str]: | |
"""Get simplified prompt templates.""" | |
return { | |
"multimedia": """You are solving a GAIA benchmark multimedia question. | |
TASK: {question_text} | |
APPROACH: | |
1. Use appropriate multimedia analysis tools | |
2. For YouTube videos, ALWAYS use analyze_youtube_video tool | |
3. Extract exact information requested | |
4. Provide precise final answer | |
Focus on accuracy and use tool outputs directly.""", | |
"research": """You are solving a GAIA benchmark research question. | |
TASK: {question_text} | |
APPROACH: | |
1. Use research_with_comprehensive_fallback for robust search | |
2. Try multiple research methods if needed | |
3. Use tool outputs directly - do not fabricate information | |
4. Provide factual, verified answer | |
Trust validated research data over internal knowledge.""", | |
"logic_math": """You are solving a GAIA benchmark logic/math question. | |
TASK: {question_text} | |
APPROACH: | |
1. Break down the problem step-by-step | |
2. Use advanced_calculator for calculations | |
3. Show your work clearly | |
4. Verify your final answer | |
Focus on mathematical precision.""", | |
"file_processing": """You are solving a GAIA benchmark file processing question. | |
TASK: {question_text} | |
APPROACH: | |
1. Use appropriate file analysis tools | |
2. Extract the specific data requested | |
3. Process and calculate as needed | |
4. Use tool results directly | |
Trust file processing tool outputs.""", | |
"chess": """You are solving a GAIA benchmark chess question. | |
TASK: {question_text} | |
APPROACH: | |
1. Use analyze_chess_multi_tool for comprehensive analysis | |
2. Take the EXACT move returned by the tool | |
3. Do not modify or interpret the result | |
4. Use tool result directly as final answer | |
Trust the chess analysis tool completely.""", | |
"general": """You are solving a GAIA benchmark question. | |
TASK: {question_text} | |
APPROACH: | |
1. Analyze the question carefully | |
2. Choose appropriate tools | |
3. Work systematically | |
4. Provide clear, direct answer | |
Focus on answering exactly what is asked.""" | |
} | |
def process_question(self, question_data: Dict[str, Any]) -> str: | |
"""Process a question and return the raw response.""" | |
question_text = question_data.get("question", "") | |
task_id = question_data.get("task_id", "unknown") | |
# Handle file downloads if needed | |
enhanced_question = self._handle_file_processing(question_data) | |
# Classify the question | |
classification = self._classify_question(enhanced_question, question_data) | |
# Get appropriate prompt | |
prompt = self._get_enhanced_prompt(enhanced_question, classification) | |
# Execute with agent | |
response = self._execute_with_agent(prompt) | |
return response | |
def _handle_file_processing(self, question_data: Dict[str, Any]) -> str: | |
"""Handle file downloads and enhance question text.""" | |
question_text = question_data.get("question", "") | |
has_file = bool(question_data.get("file_name", "")) | |
if has_file and self.question_loader: | |
file_name = question_data.get('file_name') | |
task_id = question_data.get('task_id', 'unknown') | |
print(f"π Note: This question has an associated file: {file_name}") | |
try: | |
# Download the file | |
print(f"β¬οΈ Downloading file: {file_name}") | |
downloaded_path = self.question_loader.download_file(task_id) | |
if downloaded_path: | |
print(f"β File downloaded to: {downloaded_path}") | |
question_text += f"\n\n[Note: This question references a file: {downloaded_path}]" | |
else: | |
print(f"β οΈ Failed to download file: {file_name}") | |
question_text += f"\n\n[Note: This question references a file: {file_name} - download failed]" | |
except Exception as e: | |
print(f"β οΈ Error downloading file: {e}") | |
question_text += f"\n\n[Note: This question references a file: {file_name} - download error]" | |
return question_text | |
def _classify_question(self, question_text: str, question_data: Dict[str, Any]) -> Dict[str, Any]: | |
"""Classify the question to determine agent type.""" | |
try: | |
if self.classifier: | |
file_name = question_data.get('file_name', '') | |
classification = self.classifier.classify_question(question_text, file_name) | |
else: | |
# Fallback classification | |
classification = self._fallback_classification(question_text) | |
# Special handling for known patterns | |
classification = self._enhance_classification(question_text, classification) | |
return classification | |
except Exception as e: | |
print(f"β οΈ Classification error: {e}") | |
# Return general classification as fallback | |
return { | |
'primary_agent': 'general', | |
'complexity': 3, | |
'tools_needed': [], | |
'confidence': 0.5 | |
} | |
def _fallback_classification(self, question_text: str) -> Dict[str, Any]: | |
"""Simple fallback classification logic.""" | |
question_lower = question_text.lower() | |
# YouTube detection | |
youtube_pattern = r'(https?://)?(www\.)?(youtube\.com|youtu\.?be)' | |
if re.search(youtube_pattern, question_text): | |
return { | |
'primary_agent': 'multimedia', | |
'complexity': 3, | |
'tools_needed': ['analyze_youtube_video'], | |
'confidence': 0.9 | |
} | |
# Chess detection | |
chess_keywords = ['chess', 'position', 'move', 'algebraic notation'] | |
if any(keyword in question_lower for keyword in chess_keywords): | |
return { | |
'primary_agent': 'chess', | |
'complexity': 4, | |
'tools_needed': ['analyze_chess_multi_tool'], | |
'confidence': 0.9 | |
} | |
# File processing detection | |
file_extensions = ['.xlsx', '.xls', '.py', '.txt', '.pdf'] | |
if any(ext in question_lower for ext in file_extensions): | |
return { | |
'primary_agent': 'file_processing', | |
'complexity': 3, | |
'tools_needed': ['analyze_excel_file', 'analyze_python_code'], | |
'confidence': 0.8 | |
} | |
# Math detection | |
math_keywords = ['calculate', 'solve', 'equation', 'formula', 'math'] | |
if any(keyword in question_lower for keyword in math_keywords): | |
return { | |
'primary_agent': 'logic_math', | |
'complexity': 3, | |
'tools_needed': ['advanced_calculator'], | |
'confidence': 0.7 | |
} | |
# Research fallback | |
return { | |
'primary_agent': 'research', | |
'complexity': 3, | |
'tools_needed': ['research_with_comprehensive_fallback'], | |
'confidence': 0.6 | |
} | |
def _enhance_classification(self, question_text: str, classification: Dict[str, Any]) -> Dict[str, Any]: | |
"""Enhance classification with special handling.""" | |
question_lower = question_text.lower() | |
# Force YouTube classification | |
youtube_url_pattern = r'(https?://)?(www\.)?(youtube\.com|youtu\.?be)/(?:watch\?v=|embed/|v/|shorts/|playlist\?list=|channel/|user/|[^/\s]+/?)?([^\s&?/]+)' | |
if re.search(youtube_url_pattern, question_text): | |
classification['primary_agent'] = 'multimedia' | |
if 'analyze_youtube_video' not in classification.get('tools_needed', []): | |
classification['tools_needed'] = ['analyze_youtube_video'] + classification.get('tools_needed', []) | |
print("π₯ YouTube URL detected - forcing multimedia classification") | |
# Force chess classification | |
chess_keywords = ['chess', 'position', 'move', 'algebraic notation', 'black to move', 'white to move'] | |
if any(keyword in question_lower for keyword in chess_keywords): | |
classification['primary_agent'] = 'chess' | |
print("βοΈ Chess question detected - using specialized chess analysis") | |
return classification | |
def _get_enhanced_prompt(self, question_text: str, classification: Dict[str, Any]) -> str: | |
"""Get enhanced prompt based on classification.""" | |
question_type = classification.get('primary_agent', 'general') | |
print(f"π― Question type: {question_type}") | |
print(f"π Complexity: {classification.get('complexity', 'unknown')}/5") | |
print(f"π§ Tools needed: {classification.get('tools_needed', [])}") | |
# Get appropriate template | |
if question_type in self.prompt_templates: | |
template = self.prompt_templates[question_type] | |
else: | |
template = self.prompt_templates["general"] | |
enhanced_prompt = template.format(question_text=question_text) | |
print(f"π Using {question_type} prompt template") | |
return enhanced_prompt | |
def _execute_with_agent(self, prompt: str) -> str: | |
"""Execute prompt with smolagents agent.""" | |
try: | |
# Get current model | |
model = self.model_manager.get_current_model() | |
# Create fresh agent for memory management | |
from smolagents import CodeAgent | |
# Import tools | |
tools = self._get_tools() | |
print("π§ Creating fresh agent to avoid memory accumulation...") | |
agent = CodeAgent( | |
model=model, | |
tools=tools, | |
max_steps=self.config.model.MAX_STEPS, | |
verbosity_level=self.config.model.VERBOSITY_LEVEL | |
) | |
# Execute the prompt | |
response = agent.run(prompt) | |
raw_answer = str(response) | |
print(f"β Generated raw answer: {raw_answer[:100]}...") | |
return raw_answer | |
except Exception as e: | |
# Try fallback model if available | |
if self.model_manager._switch_to_fallback(): | |
print("π Retrying with fallback model...") | |
return self._execute_with_agent(prompt) | |
else: | |
raise GAIAError(f"Agent execution failed: {e}") | |
def _get_tools(self) -> List: | |
"""Get available tools for the agent.""" | |
try: | |
# Import tools from the old system for now | |
import sys | |
import os | |
parent_dir = os.path.dirname(os.path.dirname(os.path.dirname(__file__))) | |
if parent_dir not in sys.path: | |
sys.path.insert(0, parent_dir) | |
from gaia_tools import GAIA_TOOLS | |
return GAIA_TOOLS | |
except ImportError: | |
print("β οΈ Could not import GAIA_TOOLS, using empty tool list") | |
return [] | |
def get_random_question(self) -> Optional[Dict[str, Any]]: | |
"""Get a random question.""" | |
if self.question_loader: | |
return self.question_loader.get_random_question() | |
return None | |
def get_questions(self, max_questions: int = 5) -> List[Dict[str, Any]]: | |
"""Get multiple questions.""" | |
if self.question_loader and hasattr(self.question_loader, 'questions'): | |
return self.question_loader.questions[:max_questions] | |
return [] |