Spaces:
Sleeping
Sleeping
#!/usr/bin/env python3 | |
# -*- coding: utf-8 -*- | |
""" | |
Agent Manager | |
This module provides the main orchestrator for the Code Review Agent. | |
It coordinates the review process and manages the state of the application. | |
""" | |
import os | |
import time | |
import logging | |
import tempfile | |
import json | |
from datetime import datetime | |
import gradio as gr | |
from src.core.language_detector import LanguageDetector | |
from src.services.code_analyzer import CodeAnalyzer | |
from src.services.report_generator import ReportGenerator | |
from src.services.repository_service import RepositoryService | |
from src.services.security_scanner import SecurityScanner | |
from src.services.performance_analyzer import PerformanceAnalyzer | |
logger = logging.getLogger(__name__) | |
class AgentManager: | |
""" | |
Main orchestrator for the Code Review Agent. | |
This class coordinates the review process, manages the application state, | |
and provides the interface between the UI and the business logic. | |
""" | |
def __init__(self): | |
""" | |
Initialize the AgentManager. | |
""" | |
# Initialize state management | |
self.state = { | |
'repo_url': None, | |
'progress': {}, | |
'results': {}, | |
'current_step': None | |
} | |
# Initialize services | |
self.language_detector = LanguageDetector() | |
self.code_analyzer = CodeAnalyzer() | |
self.report_generator = ReportGenerator() | |
self.repository_service = RepositoryService() | |
self.security_scanner = SecurityScanner() | |
self.performance_analyzer = PerformanceAnalyzer() | |
self.temp_dir = tempfile.mkdtemp(prefix="code_review_agent_") | |
logger.info(f"Initialized AgentManager with temp directory: {self.temp_dir}") | |
def start_review(self, repo_url, github_token=None, selected_languages=None): | |
""" | |
Start the code review process for a GitHub repository. | |
Args: | |
repo_url (str): The URL of the GitHub repository to review. | |
github_token (str, optional): GitHub authentication token for private repositories. | |
selected_languages (list, optional): List of languages to analyze. If None, | |
languages will be auto-detected. | |
Returns: | |
tuple: (progress_group, overall_progress, status_message, results_dashboard) - Updated UI components. | |
""" | |
# Initialize progress components outside the try block | |
progress_group = gr.Group(visible=True) | |
overall_progress = gr.Slider(value=0) | |
status_message = gr.Markdown("*Starting review...*") | |
try: | |
# Initialize state for new review | |
self.state = { | |
'repo_url': repo_url, | |
'progress': {}, | |
'results': {}, | |
'current_step': None | |
} | |
# Clone repository | |
self._update_progress("Repository Cloning", 0, overall_progress, status_message) | |
repo_path = self._clone_repository(repo_url, github_token) | |
self._update_progress("Repository Cloning", 100, overall_progress, status_message) | |
# Detect languages | |
self._update_progress("Language Detection", 0, overall_progress, status_message) | |
if selected_languages and len(selected_languages) > 0: | |
languages = selected_languages | |
logger.info(f"Using selected languages: {languages}") | |
else: | |
languages = self.language_detector.detect_languages(repo_path) | |
logger.info(f"Auto-detected languages: {languages}") | |
self.state['languages'] = languages | |
self._update_progress("Language Detection", 100, overall_progress, status_message) | |
# Perform code analysis | |
self._update_progress("Code Analysis", 0, overall_progress, status_message) | |
code_analysis_results = self.code_analyzer.analyze_repository(repo_path, languages) | |
self.state['results']['code_analysis'] = code_analysis_results | |
self._update_progress("Code Analysis", 100, overall_progress, status_message) | |
# Perform security scanning | |
self._update_progress("Security Scanning", 0, overall_progress, status_message) | |
security_results = self.security_scanner.scan_repository(repo_path, languages) | |
self.state['results']['security'] = security_results | |
self._update_progress("Security Scanning", 100, overall_progress, status_message) | |
# Perform performance analysis | |
self._update_progress("Performance Analysis", 0, overall_progress, status_message) | |
performance_results = self.performance_analyzer.analyze_repository(repo_path, languages) | |
self.state['results']['performance'] = performance_results | |
self._update_progress("Performance Analysis", 100, overall_progress, status_message) | |
# Perform AI review | |
self._update_progress("AI Review", 0, overall_progress, status_message) | |
ai_review_results = self._perform_ai_review(repo_path, languages) | |
self.state['results']['ai_review'] = ai_review_results | |
self._update_progress("AI Review", 100, overall_progress, status_message) | |
# Generate report | |
self._update_progress("Report Generation", 0, overall_progress, status_message) | |
repo_name = repo_url.split('/')[-1].replace('.git', '') | |
report_paths = self.report_generator.generate_report( | |
repo_name, self.state['results'] | |
) | |
self.state['report_paths'] = report_paths | |
self._update_progress("Report Generation", 100, overall_progress, status_message) | |
# Update results dashboard | |
results_dashboard = self._create_results_dashboard(self.state['results']) | |
results_dashboard.update(visible=True) | |
return progress_group, overall_progress, status_message, results_dashboard | |
except Exception as e: | |
logger.exception(f"Error during code review: {e}") | |
# Update progress components with error | |
status_message.update(value=f"*Error: {str(e)}*") | |
return progress_group, overall_progress, status_message, None | |
def export_report(self, results_dashboard, export_format): | |
""" | |
Export the code review report in the specified format. | |
Args: | |
results_dashboard: The results dashboard component. | |
export_format (str): The format to export the report in ('pdf', 'json', 'html', 'csv'). | |
Returns: | |
str: The path to the exported file. | |
""" | |
try: | |
if not self.state.get('results'): | |
logger.warning("No results available to export") | |
return None | |
# Get the actual format value from the textbox component | |
format_value = export_format.value if hasattr(export_format, 'value') else export_format | |
# Create exports directory if it doesn't exist | |
exports_dir = os.path.join(os.path.dirname(__file__), '..', '..', 'exports') | |
os.makedirs(exports_dir, exist_ok=True) | |
# Generate filename with timestamp | |
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") | |
repo_name = self.state['repo_url'].split('/')[-1].replace('.git', '') | |
filename = f"{repo_name}_review_{timestamp}.{format_value}" | |
filepath = os.path.join(exports_dir, filename) | |
# Export report in the specified format using report_generator | |
report_paths = self.report_generator.generate_report( | |
repo_name, self.state['results'], format_value | |
) | |
if format_value in report_paths: | |
return report_paths[format_value] | |
else: | |
logger.warning(f"Unsupported export format: {format_value}") | |
return None | |
logger.info(f"Exported report to {filepath}") | |
return filepath | |
except Exception as e: | |
logger.exception(f"Error exporting report: {e}") | |
return None | |
def _clone_repository(self, repo_url, github_token=None): | |
""" | |
Clone the GitHub repository to a temporary directory. | |
Args: | |
repo_url (str): The URL of the GitHub repository to clone. | |
github_token (str, optional): GitHub authentication token for private repositories. | |
Returns: | |
str: The path to the cloned repository. | |
""" | |
# Import the repository service here to avoid circular imports | |
from src.services.repository_service import RepositoryService | |
# Create a repository service instance | |
repo_service = RepositoryService(base_temp_dir=self.temp_dir) | |
# Clone the repository using the service | |
try: | |
# If a GitHub token is provided, use it for authentication | |
if github_token and github_token.strip(): | |
# Modify the URL to include the token for authentication | |
auth_url = repo_url.replace('https://', f'https://{github_token}@') | |
repo_path = repo_service.clone_repository(auth_url) | |
logger.info(f"Cloned repository using GitHub token authentication") | |
else: | |
# Clone without authentication (for public repositories) | |
repo_path = repo_service.clone_repository(repo_url) | |
logger.info(f"Cloned repository without authentication") | |
return repo_path | |
except Exception as e: | |
logger.error(f"Error cloning repository: {e}") | |
raise | |
def _perform_ai_review(self, repo_path, languages): | |
""" | |
Perform AI-powered code review. | |
Args: | |
repo_path (str): The path to the repository. | |
languages (list): List of programming languages to analyze. | |
Returns: | |
dict: AI review results. | |
""" | |
try: | |
# This is a placeholder for AI review functionality | |
# In a real implementation, this would use the MCP AI review service | |
from src.mcp.ai_review import AIReviewMCP | |
ai_reviewer = AIReviewMCP() | |
results = ai_reviewer.review_repository(repo_path, languages) | |
logger.info(f"AI review completed for {len(languages)} languages") | |
return results | |
except Exception as e: | |
logger.error(f"Error during AI review: {e}") | |
return { | |
'error': str(e), | |
'suggestions': [], | |
'issues': [] | |
} | |
def _update_progress(self, step, value, overall_progress, status_message): | |
""" | |
Update the progress components for a specific step. | |
Args: | |
step (str): The step to update. | |
value (int): The progress value (0-100). | |
overall_progress: The overall progress slider component. | |
status_message: The status message markdown component. | |
""" | |
# Update state | |
self.state['progress'][step] = value | |
self.state['current_step'] = step | |
# Calculate overall progress | |
total_steps = 7 # Total number of steps in the review process | |
completed_steps = sum(1 for v in self.state['progress'].values() if v == 100) | |
current_step_progress = value if step in self.state['progress'] else 0 | |
overall_value = (completed_steps * 100 + current_step_progress) / total_steps | |
# Update UI components | |
overall_progress.update(value=overall_value) | |
status_message.update(value=f"*{step}: {value}%*") | |
logger.info(f"Progress update: {step} - {value}% (Overall: {overall_value:.1f}%)") | |
time.sleep(0.5) # Simulate progress update time | |
def _create_results_dashboard(self, report): | |
""" | |
Create a results dashboard component for the UI. | |
Args: | |
report (dict): The code review report. | |
Returns: | |
object: A results dashboard component. | |
""" | |
# This is a placeholder. In a real implementation, this would create a | |
# results dashboard component for the UI. | |
class ResultsDashboard: | |
def __init__(self): | |
self.visible = False | |
def update(self, visible=None): | |
if visible is not None: | |
self.visible = visible | |
return self | |
return ResultsDashboard() |