Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Classification Analyzer | |
Performance analysis by question classification to identify improvement areas. | |
""" | |
import json | |
import logging | |
from collections import defaultdict, Counter | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Tuple, Any | |
import statistics | |
class ClassificationAnalyzer: | |
"""Analyzer for performance metrics by question classification.""" | |
def __init__(self): | |
"""Initialize the classification analyzer.""" | |
self.logger = logging.getLogger("ClassificationAnalyzer") | |
async def analyze_by_classification(self, results: Dict[str, Dict], session_dir: Path) -> Dict: | |
""" | |
Analyze test results by question classification. | |
Args: | |
results: Test results keyed by question_id | |
session_dir: Directory to save analysis results | |
Returns: | |
Classification analysis report | |
""" | |
self.logger.info("Starting classification-based analysis...") | |
# Organize results by classification | |
classification_data = self.organize_by_classification(results) | |
# Calculate performance metrics | |
performance_metrics = self.calculate_performance_metrics(classification_data) | |
# Analyze tool effectiveness | |
tool_effectiveness = self.analyze_tool_effectiveness(classification_data) | |
# Identify improvement areas | |
improvement_areas = self.identify_improvement_areas(performance_metrics, tool_effectiveness) | |
# Create comprehensive report | |
analysis_report = { | |
"analysis_timestamp": datetime.now().isoformat(), | |
"total_questions": len(results), | |
"classification_breakdown": self.get_classification_breakdown(classification_data), | |
"performance_metrics": performance_metrics, | |
"tool_effectiveness": tool_effectiveness, | |
"improvement_areas": improvement_areas, | |
"detailed_data": classification_data | |
} | |
# Save analysis report | |
report_file = session_dir / "classification_analysis.json" | |
with open(report_file, 'w') as f: | |
json.dump(analysis_report, f, indent=2) | |
self.logger.info(f"Classification analysis saved to: {report_file}") | |
return analysis_report | |
def organize_by_classification(self, results: Dict[str, Dict]) -> Dict[str, List[Dict]]: | |
"""Organize results by question classification.""" | |
classification_data = defaultdict(list) | |
for question_id, result in results.items(): | |
# Get classification info | |
classification = result.get('classification', {}) | |
primary_agent = classification.get('primary_agent', 'unknown') | |
# Add to classification group | |
classification_data[primary_agent].append({ | |
'question_id': question_id, | |
'result': result, | |
'classification': classification | |
}) | |
return dict(classification_data) | |
def calculate_performance_metrics(self, classification_data: Dict[str, List[Dict]]) -> Dict[str, Dict]: | |
"""Calculate performance metrics for each classification.""" | |
metrics = {} | |
for classification, questions in classification_data.items(): | |
# Accuracy metrics | |
validation_statuses = [] | |
execution_times = [] | |
complexity_scores = [] | |
confidence_scores = [] | |
correct_count = 0 | |
partial_count = 0 | |
incorrect_count = 0 | |
timeout_count = 0 | |
error_count = 0 | |
for question_data in questions: | |
result = question_data['result'] | |
classification_info = question_data['classification'] | |
# Validation status | |
validation = result.get('validation', {}) | |
status = validation.get('validation_status', 'unknown') | |
validation_statuses.append(status) | |
if status == 'correct': | |
correct_count += 1 | |
elif status == 'partial': | |
partial_count += 1 | |
elif status == 'incorrect': | |
incorrect_count += 1 | |
# Execution metrics | |
solver_result = result.get('solver_result', {}) | |
if solver_result.get('status') == 'timeout': | |
timeout_count += 1 | |
elif solver_result.get('status') == 'error': | |
error_count += 1 | |
# Timing | |
exec_time = result.get('total_processing_time', 0) | |
if exec_time > 0: | |
execution_times.append(exec_time) | |
# Classification metrics | |
complexity = classification_info.get('complexity', 0) | |
if complexity > 0: | |
complexity_scores.append(complexity) | |
confidence = classification_info.get('confidence', 0) | |
if confidence > 0: | |
confidence_scores.append(confidence) | |
total_questions = len(questions) | |
# Calculate metrics | |
accuracy = correct_count / total_questions if total_questions > 0 else 0 | |
partial_rate = partial_count / total_questions if total_questions > 0 else 0 | |
error_rate = (error_count + timeout_count) / total_questions if total_questions > 0 else 0 | |
metrics[classification] = { | |
"total_questions": total_questions, | |
"accuracy": accuracy, | |
"partial_accuracy": partial_rate, | |
"error_rate": error_rate, | |
"counts": { | |
"correct": correct_count, | |
"partial": partial_count, | |
"incorrect": incorrect_count, | |
"timeout": timeout_count, | |
"error": error_count | |
}, | |
"execution_time": { | |
"mean": statistics.mean(execution_times) if execution_times else 0, | |
"median": statistics.median(execution_times) if execution_times else 0, | |
"max": max(execution_times) if execution_times else 0, | |
"min": min(execution_times) if execution_times else 0 | |
}, | |
"complexity": { | |
"mean": statistics.mean(complexity_scores) if complexity_scores else 0, | |
"distribution": Counter(complexity_scores) | |
}, | |
"classification_confidence": { | |
"mean": statistics.mean(confidence_scores) if confidence_scores else 0, | |
"min": min(confidence_scores) if confidence_scores else 0 | |
} | |
} | |
return metrics | |
def analyze_tool_effectiveness(self, classification_data: Dict[str, List[Dict]]) -> Dict[str, Dict]: | |
"""Analyze tool effectiveness across classifications.""" | |
tool_usage = defaultdict(lambda: { | |
'total_uses': 0, | |
'successes': 0, | |
'by_classification': defaultdict(lambda: {'uses': 0, 'successes': 0}) | |
}) | |
for classification, questions in classification_data.items(): | |
for question_data in questions: | |
result = question_data['result'] | |
classification_info = question_data['classification'] | |
# Get tools needed | |
tools_needed = classification_info.get('tools_needed', []) | |
success = result.get('validation', {}).get('validation_status') == 'correct' | |
for tool in tools_needed: | |
tool_usage[tool]['total_uses'] += 1 | |
tool_usage[tool]['by_classification'][classification]['uses'] += 1 | |
if success: | |
tool_usage[tool]['successes'] += 1 | |
tool_usage[tool]['by_classification'][classification]['successes'] += 1 | |
# Calculate effectiveness rates | |
tool_effectiveness = {} | |
for tool, usage_data in tool_usage.items(): | |
total_uses = usage_data['total_uses'] | |
successes = usage_data['successes'] | |
effectiveness_rate = successes / total_uses if total_uses > 0 else 0 | |
# Per-classification effectiveness | |
classification_effectiveness = {} | |
for classification, class_data in usage_data['by_classification'].items(): | |
class_uses = class_data['uses'] | |
class_successes = class_data['successes'] | |
class_rate = class_successes / class_uses if class_uses > 0 else 0 | |
classification_effectiveness[classification] = { | |
'uses': class_uses, | |
'successes': class_successes, | |
'effectiveness_rate': class_rate | |
} | |
tool_effectiveness[tool] = { | |
'total_uses': total_uses, | |
'total_successes': successes, | |
'overall_effectiveness': effectiveness_rate, | |
'by_classification': classification_effectiveness | |
} | |
return tool_effectiveness | |
def identify_improvement_areas(self, performance_metrics: Dict, tool_effectiveness: Dict) -> Dict[str, List[str]]: | |
"""Identify specific improvement areas based on analysis.""" | |
improvements = { | |
"low_accuracy_classifications": [], | |
"high_error_rate_classifications": [], | |
"slow_processing_classifications": [], | |
"ineffective_tools": [], | |
"misclassified_questions": [], | |
"recommendations": [] | |
} | |
# Identify low accuracy classifications | |
for classification, metrics in performance_metrics.items(): | |
accuracy = metrics['accuracy'] | |
error_rate = metrics['error_rate'] | |
avg_time = metrics['execution_time']['mean'] | |
if accuracy < 0.5: # Less than 50% accuracy | |
improvements["low_accuracy_classifications"].append({ | |
"classification": classification, | |
"accuracy": accuracy, | |
"details": f"Only {accuracy:.1%} accuracy with {metrics['total_questions']} questions" | |
}) | |
if error_rate > 0.3: # More than 30% errors/timeouts | |
improvements["high_error_rate_classifications"].append({ | |
"classification": classification, | |
"error_rate": error_rate, | |
"details": f"{error_rate:.1%} error/timeout rate" | |
}) | |
if avg_time > 600: # More than 10 minutes average | |
improvements["slow_processing_classifications"].append({ | |
"classification": classification, | |
"avg_time": avg_time, | |
"details": f"Average {avg_time:.0f} seconds processing time" | |
}) | |
# Identify ineffective tools | |
for tool, effectiveness in tool_effectiveness.items(): | |
overall_rate = effectiveness['overall_effectiveness'] | |
total_uses = effectiveness['total_uses'] | |
if overall_rate < 0.4 and total_uses >= 3: # Less than 40% effectiveness with meaningful usage | |
improvements["ineffective_tools"].append({ | |
"tool": tool, | |
"effectiveness": overall_rate, | |
"uses": total_uses, | |
"details": f"Only {overall_rate:.1%} success rate across {total_uses} uses" | |
}) | |
# Generate recommendations | |
recommendations = [] | |
if improvements["low_accuracy_classifications"]: | |
worst_classification = min(improvements["low_accuracy_classifications"], | |
key=lambda x: x['accuracy']) | |
recommendations.append( | |
f"PRIORITY: Improve {worst_classification['classification']} agent " | |
f"(currently {worst_classification['accuracy']:.1%} accuracy)" | |
) | |
if improvements["ineffective_tools"]: | |
worst_tool = min(improvements["ineffective_tools"], | |
key=lambda x: x['effectiveness']) | |
recommendations.append( | |
f"TOOL FIX: Revise {worst_tool['tool']} tool " | |
f"(currently {worst_tool['effectiveness']:.1%} effectiveness)" | |
) | |
if improvements["high_error_rate_classifications"]: | |
recommendations.append( | |
"STABILITY: Address timeout and error handling for classifications with high error rates" | |
) | |
overall_accuracy = self.calculate_overall_accuracy(performance_metrics) | |
if overall_accuracy < 0.7: | |
recommendations.append( | |
f"SYSTEM: Overall accuracy is {overall_accuracy:.1%} - target 70% for production readiness" | |
) | |
improvements["recommendations"] = recommendations | |
return improvements | |
def calculate_overall_accuracy(self, performance_metrics: Dict) -> float: | |
"""Calculate overall system accuracy across all classifications.""" | |
total_correct = 0 | |
total_questions = 0 | |
for metrics in performance_metrics.values(): | |
total_correct += metrics['counts']['correct'] | |
total_questions += metrics['total_questions'] | |
return total_correct / total_questions if total_questions > 0 else 0 | |
def get_classification_breakdown(self, classification_data: Dict[str, List[Dict]]) -> Dict[str, int]: | |
"""Get simple breakdown of question counts by classification.""" | |
return { | |
classification: len(questions) | |
for classification, questions in classification_data.items() | |
} |