Spaces:
Running
Running
#!/usr/bin/env python3 | |
""" | |
Summary Report Generator | |
Master reporting with improvement recommendations and actionable insights. | |
""" | |
import json | |
import logging | |
from datetime import datetime | |
from pathlib import Path | |
from typing import Dict, List, Any | |
import statistics | |
class SummaryReportGenerator: | |
"""Generator for comprehensive summary reports with actionable insights.""" | |
def __init__(self): | |
"""Initialize the summary report generator.""" | |
self.logger = logging.getLogger("SummaryReportGenerator") | |
async def generate_master_report(self, | |
results: Dict[str, Dict], | |
session_dir: Path, | |
classification_report: Dict) -> Dict: | |
""" | |
Generate comprehensive master report with actionable insights. | |
Args: | |
results: Raw test results | |
session_dir: Session directory for output | |
classification_report: Classification analysis results | |
Returns: | |
Master report dictionary | |
""" | |
self.logger.info("Generating master summary report...") | |
# Generate all report sections | |
executive_summary = self.generate_executive_summary(results, classification_report) | |
detailed_metrics = self.generate_detailed_metrics(results, classification_report) | |
improvement_roadmap = self.generate_improvement_roadmap(classification_report) | |
technical_insights = self.generate_technical_insights(results, classification_report) | |
# Compile master report | |
master_report = { | |
"report_metadata": { | |
"generated_at": datetime.now().isoformat(), | |
"total_questions": len(results), | |
"session_directory": str(session_dir), | |
"report_version": "1.0" | |
}, | |
"executive_summary": executive_summary, | |
"detailed_metrics": detailed_metrics, | |
"improvement_roadmap": improvement_roadmap, | |
"technical_insights": technical_insights | |
} | |
# Save master report | |
report_file = session_dir / "master_summary_report.json" | |
with open(report_file, 'w') as f: | |
json.dump(master_report, f, indent=2) | |
# Generate human-readable markdown report | |
markdown_report = self.generate_markdown_report(master_report) | |
markdown_file = session_dir / "SUMMARY_REPORT.md" | |
with open(markdown_file, 'w') as f: | |
f.write(markdown_report) | |
self.logger.info(f"Master report saved to: {report_file}") | |
self.logger.info(f"Markdown report saved to: {markdown_file}") | |
return master_report | |
def generate_executive_summary(self, results: Dict, classification_report: Dict) -> Dict: | |
"""Generate executive summary with key metrics and status.""" | |
performance_metrics = classification_report.get('performance_metrics', {}) | |
# Calculate overall metrics | |
total_questions = len(results) | |
total_correct = sum(metrics.get('counts', {}).get('correct', 0) | |
for metrics in performance_metrics.values()) | |
total_partial = sum(metrics.get('counts', {}).get('partial', 0) | |
for metrics in performance_metrics.values()) | |
total_errors = sum(metrics.get('counts', {}).get('error', 0) + | |
metrics.get('counts', {}).get('timeout', 0) | |
for metrics in performance_metrics.values()) | |
overall_accuracy = total_correct / total_questions if total_questions > 0 else 0 | |
partial_rate = total_partial / total_questions if total_questions > 0 else 0 | |
error_rate = total_errors / total_questions if total_questions > 0 else 0 | |
# Best and worst performing classifications | |
classification_accuracies = { | |
classification: metrics.get('accuracy', 0) | |
for classification, metrics in performance_metrics.items() | |
} | |
best_classification = max(classification_accuracies.items(), | |
key=lambda x: x[1], default=('none', 0)) | |
worst_classification = min(classification_accuracies.items(), | |
key=lambda x: x[1], default=('none', 0)) | |
# Production readiness assessment | |
production_ready = overall_accuracy >= 0.7 and error_rate <= 0.1 | |
return { | |
"overall_performance": { | |
"accuracy": overall_accuracy, | |
"partial_accuracy": partial_rate, | |
"error_rate": error_rate, | |
"total_questions": total_questions | |
}, | |
"classification_performance": { | |
"best": { | |
"classification": best_classification[0], | |
"accuracy": best_classification[1] | |
}, | |
"worst": { | |
"classification": worst_classification[0], | |
"accuracy": worst_classification[1] | |
} | |
}, | |
"production_readiness": { | |
"ready": production_ready, | |
"accuracy_target": 0.7, | |
"current_accuracy": overall_accuracy, | |
"gap_to_target": max(0, 0.7 - overall_accuracy) | |
}, | |
"key_findings": self.extract_key_findings(results, classification_report) | |
} | |
def generate_detailed_metrics(self, results: Dict, classification_report: Dict) -> Dict: | |
"""Generate detailed performance metrics breakdown.""" | |
performance_metrics = classification_report.get('performance_metrics', {}) | |
tool_effectiveness = classification_report.get('tool_effectiveness', {}) | |
# Processing time analysis | |
all_times = [] | |
for result in results.values(): | |
time_taken = result.get('total_processing_time', 0) | |
if time_taken > 0: | |
all_times.append(time_taken) | |
time_analysis = { | |
"mean": statistics.mean(all_times) if all_times else 0, | |
"median": statistics.median(all_times) if all_times else 0, | |
"max": max(all_times) if all_times else 0, | |
"min": min(all_times) if all_times else 0, | |
"total_processing_time": sum(all_times) | |
} | |
# Tool usage ranking | |
tool_ranking = sorted( | |
tool_effectiveness.items(), | |
key=lambda x: x[1].get('overall_effectiveness', 0), | |
reverse=True | |
) | |
return { | |
"by_classification": performance_metrics, | |
"processing_time_analysis": time_analysis, | |
"tool_effectiveness_ranking": [ | |
{ | |
"tool": tool, | |
"effectiveness": data.get('overall_effectiveness', 0), | |
"total_uses": data.get('total_uses', 0) | |
} | |
for tool, data in tool_ranking | |
], | |
"error_analysis": self.analyze_errors(results) | |
} | |
def analyze_errors(self, results: Dict) -> Dict: | |
"""Analyze error patterns and types.""" | |
error_types = {} | |
timeout_questions = [] | |
error_questions = [] | |
for question_id, result in results.items(): | |
solver_result = result.get('solver_result', {}) | |
status = solver_result.get('status', 'unknown') | |
if status == 'timeout': | |
timeout_questions.append(question_id) | |
elif status == 'error': | |
error_questions.append(question_id) | |
error_msg = solver_result.get('error', 'Unknown error') | |
error_types[error_msg] = error_types.get(error_msg, 0) + 1 | |
return { | |
"timeout_count": len(timeout_questions), | |
"error_count": len(error_questions), | |
"timeout_questions": timeout_questions, | |
"error_questions": error_questions, | |
"error_types": error_types | |
} | |
def generate_improvement_roadmap(self, classification_report: Dict) -> Dict: | |
"""Generate structured improvement roadmap.""" | |
improvement_areas = classification_report.get('improvement_areas', {}) | |
# Prioritize improvements | |
high_priority = [] | |
medium_priority = [] | |
low_priority = [] | |
# High priority: Low accuracy classifications | |
for item in improvement_areas.get('low_accuracy_classifications', []): | |
if item['accuracy'] < 0.3: | |
high_priority.append({ | |
"type": "critical_accuracy", | |
"target": item['classification'], | |
"current_accuracy": item['accuracy'], | |
"action": f"Redesign {item['classification']} agent logic and prompts", | |
"expected_impact": "High - directly improves success rate" | |
}) | |
# High priority: High error rates | |
for item in improvement_areas.get('high_error_rate_classifications', []): | |
if item['error_rate'] > 0.4: | |
high_priority.append({ | |
"type": "stability", | |
"target": item['classification'], | |
"current_error_rate": item['error_rate'], | |
"action": f"Fix timeout and error handling for {item['classification']} questions", | |
"expected_impact": "High - reduces system failures" | |
}) | |
# Medium priority: Tool improvements | |
for item in improvement_areas.get('ineffective_tools', []): | |
if item['uses'] >= 5: # Only tools with significant usage | |
medium_priority.append({ | |
"type": "tool_effectiveness", | |
"target": item['tool'], | |
"current_effectiveness": item['effectiveness'], | |
"action": f"Revise {item['tool']} tool implementation and error handling", | |
"expected_impact": "Medium - improves specific question types" | |
}) | |
# Low priority: Performance optimizations | |
for item in improvement_areas.get('slow_processing_classifications', []): | |
low_priority.append({ | |
"type": "performance", | |
"target": item['classification'], | |
"current_time": item['avg_time'], | |
"action": f"Optimize processing pipeline for {item['classification']} questions", | |
"expected_impact": "Low - improves user experience" | |
}) | |
return { | |
"high_priority": high_priority, | |
"medium_priority": medium_priority, | |
"low_priority": low_priority, | |
"recommended_sequence": self.generate_implementation_sequence( | |
high_priority, medium_priority, low_priority | |
), | |
"effort_estimates": self.estimate_implementation_effort( | |
high_priority, medium_priority, low_priority | |
) | |
} | |
def generate_implementation_sequence(self, high_priority: List, medium_priority: List, low_priority: List) -> List[str]: | |
"""Generate recommended implementation sequence.""" | |
sequence = [] | |
# Start with highest impact accuracy improvements | |
critical_accuracy = [item for item in high_priority if item['type'] == 'critical_accuracy'] | |
if critical_accuracy: | |
worst_accuracy = min(critical_accuracy, key=lambda x: x['current_accuracy']) | |
sequence.append(f"1. Fix {worst_accuracy['target']} agent (critical accuracy issue)") | |
# Then stability issues | |
stability_issues = [item for item in high_priority if item['type'] == 'stability'] | |
if stability_issues: | |
sequence.append("2. Address high error rate classifications") | |
# Then tool improvements that affect multiple classifications | |
if medium_priority: | |
sequence.append("3. Improve ineffective tools with high usage") | |
# Finally performance optimizations | |
if low_priority: | |
sequence.append("4. Optimize processing performance") | |
return sequence | |
def estimate_implementation_effort(self, high_priority: List, medium_priority: List, low_priority: List) -> Dict: | |
"""Estimate implementation effort for improvements.""" | |
return { | |
"high_priority_items": len(high_priority), | |
"estimated_effort": { | |
"agent_redesign": f"{len([i for i in high_priority if i['type'] == 'critical_accuracy'])} weeks", | |
"stability_fixes": f"{len([i for i in high_priority if i['type'] == 'stability'])} days", | |
"tool_improvements": f"{len(medium_priority)} days", | |
"performance_optimization": f"{len(low_priority)} days" | |
}, | |
"total_estimated_effort": f"{len(high_priority) * 5 + len(medium_priority) * 2 + len(low_priority)} person-days" | |
} | |
def generate_technical_insights(self, results: Dict, classification_report: Dict) -> Dict: | |
"""Generate technical insights and patterns.""" | |
# Question complexity vs success rate | |
complexity_analysis = self.analyze_complexity_patterns(results) | |
# Classification accuracy patterns | |
classification_patterns = self.analyze_classification_patterns(classification_report) | |
# Tool usage patterns | |
tool_patterns = self.analyze_tool_patterns(classification_report) | |
return { | |
"complexity_analysis": complexity_analysis, | |
"classification_patterns": classification_patterns, | |
"tool_patterns": tool_patterns, | |
"system_limitations": self.identify_system_limitations(results, classification_report) | |
} | |
def analyze_complexity_patterns(self, results: Dict) -> Dict: | |
"""Analyze how question complexity affects success rate.""" | |
complexity_buckets = {} | |
for result in results.values(): | |
classification = result.get('classification', {}) | |
complexity = classification.get('complexity', 0) | |
validation = result.get('validation', {}) | |
success = validation.get('validation_status') == 'correct' | |
if complexity not in complexity_buckets: | |
complexity_buckets[complexity] = {'total': 0, 'successful': 0} | |
complexity_buckets[complexity]['total'] += 1 | |
if success: | |
complexity_buckets[complexity]['successful'] += 1 | |
# Calculate success rates by complexity | |
complexity_success_rates = {} | |
for complexity, data in complexity_buckets.items(): | |
success_rate = data['successful'] / data['total'] if data['total'] > 0 else 0 | |
complexity_success_rates[complexity] = { | |
'success_rate': success_rate, | |
'total_questions': data['total'] | |
} | |
return complexity_success_rates | |
def analyze_classification_patterns(self, classification_report: Dict) -> Dict: | |
"""Analyze patterns in classification performance.""" | |
performance_metrics = classification_report.get('performance_metrics', {}) | |
patterns = { | |
"high_performers": [], | |
"low_performers": [], | |
"inconsistent_performers": [] | |
} | |
for classification, metrics in performance_metrics.items(): | |
accuracy = metrics.get('accuracy', 0) | |
error_rate = metrics.get('error_rate', 0) | |
total_questions = metrics.get('total_questions', 0) | |
if accuracy >= 0.8 and total_questions >= 3: | |
patterns["high_performers"].append({ | |
"classification": classification, | |
"accuracy": accuracy, | |
"questions": total_questions | |
}) | |
elif accuracy <= 0.3 and total_questions >= 3: | |
patterns["low_performers"].append({ | |
"classification": classification, | |
"accuracy": accuracy, | |
"questions": total_questions | |
}) | |
elif error_rate > 0.5: | |
patterns["inconsistent_performers"].append({ | |
"classification": classification, | |
"error_rate": error_rate, | |
"questions": total_questions | |
}) | |
return patterns | |
def analyze_tool_patterns(self, classification_report: Dict) -> Dict: | |
"""Analyze tool usage and effectiveness patterns.""" | |
tool_effectiveness = classification_report.get('tool_effectiveness', {}) | |
# Group tools by effectiveness | |
highly_effective = [] | |
moderately_effective = [] | |
ineffective = [] | |
for tool, data in tool_effectiveness.items(): | |
effectiveness = data.get('overall_effectiveness', 0) | |
uses = data.get('total_uses', 0) | |
if uses >= 3: # Only consider tools with meaningful usage | |
if effectiveness >= 0.8: | |
highly_effective.append({ | |
"tool": tool, | |
"effectiveness": effectiveness, | |
"uses": uses | |
}) | |
elif effectiveness >= 0.5: | |
moderately_effective.append({ | |
"tool": tool, | |
"effectiveness": effectiveness, | |
"uses": uses | |
}) | |
else: | |
ineffective.append({ | |
"tool": tool, | |
"effectiveness": effectiveness, | |
"uses": uses | |
}) | |
return { | |
"highly_effective_tools": highly_effective, | |
"moderately_effective_tools": moderately_effective, | |
"ineffective_tools": ineffective | |
} | |
def identify_system_limitations(self, results: Dict, classification_report: Dict) -> List[str]: | |
"""Identify current system limitations.""" | |
limitations = [] | |
# Overall accuracy limitation | |
overall_accuracy = sum( | |
metrics.get('counts', {}).get('correct', 0) | |
for metrics in classification_report.get('performance_metrics', {}).values() | |
) / len(results) if results else 0 | |
if overall_accuracy < 0.7: | |
limitations.append(f"Overall accuracy ({overall_accuracy:.1%}) below production target (70%)") | |
# High error rate limitation | |
total_errors = sum( | |
metrics.get('counts', {}).get('error', 0) + metrics.get('counts', {}).get('timeout', 0) | |
for metrics in classification_report.get('performance_metrics', {}).values() | |
) | |
error_rate = total_errors / len(results) if results else 0 | |
if error_rate > 0.1: | |
limitations.append(f"High error/timeout rate ({error_rate:.1%}) indicates stability issues") | |
# Processing time limitation | |
slow_classifications = classification_report.get('improvement_areas', {}).get('slow_processing_classifications', []) | |
if slow_classifications: | |
limitations.append("Slow processing times for some question types may affect user experience") | |
# Tool effectiveness limitation | |
ineffective_tools = classification_report.get('improvement_areas', {}).get('ineffective_tools', []) | |
if len(ineffective_tools) > 3: | |
limitations.append("Multiple tools showing low effectiveness, impacting overall system performance") | |
return limitations | |
def extract_key_findings(self, results: Dict, classification_report: Dict) -> List[str]: | |
"""Extract key findings from the analysis.""" | |
findings = [] | |
performance_metrics = classification_report.get('performance_metrics', {}) | |
# Best performing classification | |
if performance_metrics: | |
best_classification = max(performance_metrics.items(), key=lambda x: x[1].get('accuracy', 0)) | |
findings.append(f"Best performing agent: {best_classification[0]} ({best_classification[1].get('accuracy', 0):.1%} accuracy)") | |
# Most problematic classification | |
if performance_metrics: | |
worst_classification = min(performance_metrics.items(), key=lambda x: x[1].get('accuracy', 0)) | |
if worst_classification[1].get('accuracy', 0) < 0.5: | |
findings.append(f"Critical issue: {worst_classification[0]} agent has {worst_classification[1].get('accuracy', 0):.1%} accuracy") | |
# Tool insights | |
tool_effectiveness = classification_report.get('tool_effectiveness', {}) | |
if tool_effectiveness: | |
most_effective_tool = max(tool_effectiveness.items(), key=lambda x: x[1].get('overall_effectiveness', 0)) | |
findings.append(f"Most effective tool: {most_effective_tool[0]} ({most_effective_tool[1].get('overall_effectiveness', 0):.1%} success rate)") | |
return findings | |
def generate_markdown_report(self, master_report: Dict) -> str: | |
"""Generate human-readable markdown report.""" | |
report = [] | |
# Header | |
metadata = master_report.get('report_metadata', {}) | |
report.append("# GAIA Test System - Master Summary Report") | |
report.append(f"**Generated:** {metadata.get('generated_at', 'Unknown')}") | |
report.append(f"**Total Questions:** {metadata.get('total_questions', 0)}") | |
report.append("") | |
# Executive Summary | |
exec_summary = master_report.get('executive_summary', {}) | |
overall_perf = exec_summary.get('overall_performance', {}) | |
report.append("## Executive Summary") | |
report.append(f"- **Overall Accuracy:** {overall_perf.get('accuracy', 0):.1%}") | |
report.append(f"- **Error Rate:** {overall_perf.get('error_rate', 0):.1%}") | |
production = exec_summary.get('production_readiness', {}) | |
if production.get('ready', False): | |
report.append("- **Status:** β Production Ready") | |
else: | |
gap = production.get('gap_to_target', 0) | |
report.append(f"- **Status:** β Not Production Ready (need {gap:.1%} improvement)") | |
report.append("") | |
# Key Findings | |
findings = exec_summary.get('key_findings', []) | |
if findings: | |
report.append("### Key Findings") | |
for finding in findings: | |
report.append(f"- {finding}") | |
report.append("") | |
# Improvement Roadmap | |
roadmap = master_report.get('improvement_roadmap', {}) | |
high_priority = roadmap.get('high_priority', []) | |
if high_priority: | |
report.append("## High Priority Improvements") | |
for i, item in enumerate(high_priority, 1): | |
report.append(f"{i}. **{item.get('target', 'Unknown')}** - {item.get('action', 'No action specified')}") | |
report.append(f" - Current: {item.get('current_accuracy', item.get('current_error_rate', 'Unknown'))}") | |
report.append(f" - Impact: {item.get('expected_impact', 'Unknown')}") | |
report.append("") | |
# Implementation Sequence | |
sequence = roadmap.get('recommended_sequence', []) | |
if sequence: | |
report.append("## Recommended Implementation Sequence") | |
for step in sequence: | |
report.append(f"- {step}") | |
report.append("") | |
return "\n".join(report) |