Spaces:
Running
Running
| # DEPENDENCIES | |
| import time | |
| from typing import Any | |
| from typing import Dict | |
| from typing import List | |
| from loguru import logger | |
| from typing import Optional | |
| from dataclasses import dataclass | |
| from config.settings import settings | |
| from metrics.entropy import EntropyMetric | |
| from config.threshold_config import Domain | |
| from metrics.base_metric import MetricResult | |
| from detector.ensemble import EnsembleResult | |
| from metrics.perplexity import PerplexityMetric | |
| from metrics.linguistic import LinguisticMetric | |
| from metrics.structural import StructuralMetric | |
| from detector.ensemble import EnsembleClassifier | |
| from processors.text_processor import TextProcessor | |
| from processors.text_processor import ProcessedText | |
| from processors.domain_classifier import DomainClassifier | |
| from processors.domain_classifier import DomainPrediction | |
| from processors.language_detector import LanguageDetector | |
| from metrics.semantic_analysis import SemanticAnalysisMetric | |
| from processors.language_detector import LanguageDetectionResult | |
| from metrics.multi_perturbation_stability import MultiPerturbationStabilityMetric | |
| class DetectionResult: | |
| """ | |
| Complete detection result with all metadata | |
| """ | |
| # Final results | |
| ensemble_result : EnsembleResult | |
| # Input metadata | |
| processed_text : ProcessedText | |
| domain_prediction : DomainPrediction | |
| language_result : Optional[LanguageDetectionResult] | |
| # Metric details | |
| metric_results : Dict[str, MetricResult] | |
| # Performance metrics | |
| processing_time : float | |
| metrics_execution_time : Dict[str, float] | |
| # Warnings and errors | |
| warnings : List[str] | |
| errors : List[str] | |
| def to_dict(self) -> Dict[str, Any]: | |
| """ | |
| Convert to dictionary for JSON serialization | |
| """ | |
| return {"prediction" : {"verdict" : self.ensemble_result.final_verdict, | |
| "ai_probability" : round(self.ensemble_result.ai_probability, 4), | |
| "human_probability" : round(self.ensemble_result.human_probability, 4), | |
| "mixed_probability" : round(self.ensemble_result.mixed_probability, 4), | |
| "confidence" : round(self.ensemble_result.overall_confidence, 4), | |
| }, | |
| "analysis" : {"domain" : self.domain_prediction.primary_domain.value, | |
| "domain_confidence" : round(self.domain_prediction.confidence, 4), | |
| "language" : self.language_result.primary_language.value if self.language_result else "unknown", | |
| "language_confidence" : round(self.language_result.confidence, 4) if self.language_result else 0.0, | |
| "text_length" : self.processed_text.word_count, | |
| "sentence_count" : self.processed_text.sentence_count, | |
| }, | |
| "metrics" : {name: result.to_dict() for name, result in self.metric_results.items()}, | |
| "ensemble" : self.ensemble_result.to_dict(), | |
| "performance" : {"total_time" : round(self.processing_time, 3), | |
| "metrics_time" : {name: round(t, 3) for name, t in self.metrics_execution_time.items()}, | |
| }, | |
| "warnings" : self.warnings, | |
| "errors" : self.errors, | |
| } | |
| class DetectionOrchestrator: | |
| """ | |
| Coordinates the entire detection pipeline from text input to final results. | |
| Pipeline: | |
| 1. Text preprocessing | |
| 2. Domain classification | |
| 3. Language detection (optional) | |
| 4. Metric execution (parallel/sequential) | |
| 5. Ensemble aggregation | |
| 6. Result generation | |
| """ | |
| def __init__(self, enable_language_detection: bool = False, parallel_execution: bool = False, skip_expensive_metrics: bool = False): | |
| """ | |
| Initialize detection orchestrator | |
| Arguments: | |
| ---------- | |
| enable_language_detection { bool } : Enable language detection step | |
| parallel_execution { bool } : Execute metrics in parallel (future feature) | |
| skip_expensive_metrics { bool } : Skip computationally expensive metrics | |
| """ | |
| self.enable_language_detection = enable_language_detection | |
| self.parallel_execution = parallel_execution | |
| self.skip_expensive_metrics = skip_expensive_metrics | |
| # Initialize processors | |
| self.text_processor = TextProcessor(min_text_length = settings.MIN_TEXT_LENGTH, | |
| max_text_length = settings.MAX_TEXT_LENGTH, | |
| ) | |
| self.domain_classifier = DomainClassifier() | |
| if self.enable_language_detection: | |
| self.language_detector = LanguageDetector(use_model = True) | |
| else: | |
| self.language_detector = None | |
| # Initialize metrics | |
| self.metrics = self._initialize_metrics() | |
| # Initialize ensemble | |
| self.ensemble = EnsembleClassifier(primary_method = "confidence_calibrated", | |
| fallback_method = "domain_weighted", | |
| use_ml_ensemble = False, | |
| min_metrics_required = 3, | |
| ) | |
| logger.info(f"DetectionOrchestrator initialized (language_detection={enable_language_detection}, skip_expensive={skip_expensive_metrics})") | |
| def _initialize_metrics(self) -> Dict[str, Any]: | |
| """ | |
| Initialize all enabled metrics | |
| """ | |
| metrics = dict() | |
| # Structural metric (statistical analysis) | |
| try: | |
| metrics["structural"] = StructuralMetric() | |
| logger.debug("Structural metric initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize structural metric: {repr(e)}") | |
| # Entropy metric | |
| try: | |
| metrics["entropy"] = EntropyMetric() | |
| logger.debug("Entropy metric initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize entropy metric: {repr(e)}") | |
| # Perplexity metric | |
| try: | |
| metrics["perplexity"] = PerplexityMetric() | |
| logger.debug("Perplexity metric initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize perplexity metric: {repr(e)}") | |
| # Semantic analysis metric | |
| try: | |
| metrics["semantic_analysis"] = SemanticAnalysisMetric() | |
| logger.debug("Semantic analysis metric initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize semantic analysis metric: {repr(e)}") | |
| # Linguistic metric | |
| try: | |
| metrics["linguistic"] = LinguisticMetric() | |
| logger.debug("Linguistic metric initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize linguistic metric: {repr(e)}") | |
| # MultiPerturbationStability metric (expensive) | |
| try: | |
| metrics["multi_perturbation_stability"] = MultiPerturbationStabilityMetric() | |
| logger.debug("MultiPerturbationStability metric initialized") | |
| except Exception as e: | |
| logger.error(f"Failed to initialize MultiPerturbationStability metric: {repr(e)}") | |
| logger.info(f"Initialized {len(metrics)} metrics: {list(metrics.keys())}") | |
| return metrics | |
| def initialize(self) -> bool: | |
| """ | |
| Initialize all components (load models, etc.) | |
| Returns: | |
| -------- | |
| { bool } : True if successful, False otherwise | |
| """ | |
| try: | |
| logger.info("Initializing detection pipeline...") | |
| # Initialize domain classifier | |
| if not self.domain_classifier.initialize(): | |
| logger.warning("Domain classifier initialization failed") | |
| # Initialize language detector | |
| if self.language_detector: | |
| if not self.language_detector.initialize(): | |
| logger.warning("Language detector initialization failed") | |
| # Initialize metrics | |
| successful_metrics = 0 | |
| for name, metric in self.metrics.items(): | |
| try: | |
| if metric.initialize(): | |
| successful_metrics += 1 | |
| logger.debug(f"Metric {name} initialized successfully") | |
| else: | |
| logger.warning(f"Metric {name} initialization failed") | |
| except Exception as e: | |
| logger.error(f"Error initializing metric {name}: {repr(e)}") | |
| # Need at least 3 metrics for reliable detection | |
| logger.success(f"Detection pipeline initialized: {successful_metrics}/{len(self.metrics)} metrics ready") | |
| return (successful_metrics >= 3) | |
| except Exception as e: | |
| logger.error(f"Failed to initialize detection pipeline: {repr(e)}") | |
| return False | |
| def analyze(self, text: str, domain: Optional[Domain] = None, **kwargs) -> DetectionResult: | |
| """ | |
| Analyze text and detect if AI-generated | |
| Arguments: | |
| ---------- | |
| text { str } : Input text to analyze | |
| domain { Domain } : Override automatic domain detection | |
| **kwargs : Additional options | |
| Returns: | |
| -------- | |
| { DetectionResult } : DetectionResult with complete analysis | |
| """ | |
| start_time = time.time() | |
| warnings = list() | |
| errors = list() | |
| try: | |
| # Preprocess text | |
| logger.info("Step 1: Preprocessing text...") | |
| processed_text = self.text_processor.process(text = text) | |
| if not processed_text.is_valid: | |
| logger.warning(f"Text validation failed: {processed_text.validation_errors}") | |
| warnings.extend(processed_text.validation_errors) | |
| # Continue anyway if text is present | |
| # Detect language | |
| language_result = None | |
| if self.language_detector: | |
| logger.info("Step 2: Detecting language...") | |
| try: | |
| language_result = self.language_detector.detect(processed_text.cleaned_text) | |
| if (language_result.primary_language.value != "en"): | |
| warnings.append(f"Non-English text detected ({language_result.primary_language.value}). Detection accuracy may be reduced.") | |
| if (language_result.is_multilingual): | |
| warnings.append("Multilingual content detected") | |
| if (language_result.confidence < 0.7): | |
| warnings.append(f"Low language detection confidence ({language_result.confidence:.2f})") | |
| except Exception as e: | |
| logger.warning(f"Language detection failed: {repr(e)}") | |
| warnings.append("Language detection failed") | |
| # Classify domain | |
| logger.info("Step 3: Classifying domain...") | |
| if domain is None: | |
| try: | |
| domain_prediction = self.domain_classifier.classify(processed_text.cleaned_text) | |
| domain = domain_prediction.primary_domain | |
| if (domain_prediction.confidence < 0.5): | |
| warnings.append(f"Low domain classification confidence ({domain_prediction.confidence:.2f})") | |
| except Exception as e: | |
| logger.warning(f"Domain classification failed: {repr(e)}") | |
| domain_prediction = DomainPrediction(primary_domain = Domain.GENERAL, | |
| secondary_domain = None, | |
| confidence = 0.5, | |
| domain_scores = {}, | |
| ) | |
| domain = Domain.GENERAL | |
| warnings.append("Domain classification failed, using GENERAL") | |
| else: | |
| # Use provided domain | |
| domain_prediction = DomainPrediction(primary_domain = domain, | |
| secondary_domain = None, | |
| confidence = 1.0, | |
| domain_scores = {domain.value: 1.0}, | |
| ) | |
| logger.info(f"Detected domain: {domain.value} (confidence: {domain_prediction.confidence:.2f})") | |
| # Execute metrics calculations | |
| logger.info("Step 4: Executing detection metrics calculations...") | |
| metric_results = dict() | |
| metrics_execution_time = dict() | |
| for name, metric in self.metrics.items(): | |
| metric_start = time.time() | |
| try: | |
| # Check if we should skip expensive metrics | |
| if (self.skip_expensive_metrics and (name == "multi_perturbation_stability")): | |
| logger.info(f"Skipping expensive metric: {name}") | |
| continue | |
| logger.debug(f"Computing metric: {name}") | |
| result = metric.compute(text = processed_text.cleaned_text, | |
| domain = domain, | |
| skip_expensive = self.skip_expensive_metrics, | |
| ) | |
| metric_results[name] = result | |
| if result.error: | |
| warnings.append(f"{name} metric error: {result.error}") | |
| except Exception as e: | |
| logger.error(f"Error computing metric {name}: {repr(e)}") | |
| errors.append(f"{name}: {repr(e)}") | |
| # Create error result | |
| metric_results[name] = MetricResult(metric_name = name, | |
| ai_probability = 0.5, | |
| human_probability = 0.5, | |
| mixed_probability = 0.0, | |
| confidence = 0.0, | |
| error = repr(e), | |
| ) | |
| finally: | |
| metrics_execution_time[name] = time.time() - metric_start | |
| logger.info(f"Executed {len(metric_results)} metrics successfully") | |
| # Ensemble aggregation | |
| logger.info("Step 5: Aggregating results with ensemble...") | |
| try: | |
| ensemble_result = self.ensemble.predict(metric_results = metric_results, | |
| domain = domain, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Ensemble prediction failed: {repr(e)}") | |
| errors.append(f"Ensemble: {repr(e)}") | |
| # Create fallback result | |
| ensemble_result = EnsembleResult(final_verdict = "Error", | |
| ai_probability = 0.5, | |
| human_probability = 0.5, | |
| mixed_probability = 0.0, | |
| overall_confidence = 0.0, | |
| domain = domain, | |
| metric_results = metric_results, | |
| metric_weights = {}, | |
| weighted_scores = {}, | |
| reasoning = ["Ensemble aggregation failed"], | |
| uncertainty_score = 1.0, | |
| consensus_level = 0.0, | |
| ) | |
| # Calculate total processing time | |
| processing_time = time.time() - start_time | |
| logger.success(f"Analysis complete: {ensemble_result.final_verdict} " | |
| f"(AI probability: {ensemble_result.ai_probability:.1%}, " | |
| f"confidence: {ensemble_result.overall_confidence:.2f}) " | |
| f"in {processing_time:.2f}s") | |
| return DetectionResult(ensemble_result = ensemble_result, | |
| processed_text = processed_text, | |
| domain_prediction = domain_prediction, | |
| language_result = language_result, | |
| metric_results = metric_results, | |
| processing_time = processing_time, | |
| metrics_execution_time = metrics_execution_time, | |
| warnings = warnings, | |
| errors = errors, | |
| ) | |
| except Exception as e: | |
| logger.error(f"Fatal error in detection pipeline: {repr(e)}") | |
| processing_time = time.time() - start_time | |
| # Return error result | |
| return DetectionResult(ensemble_result = EnsembleResult(final_verdict = "Error", | |
| ai_probability = 0.5, | |
| human_probability = 0.5, | |
| mixed_probability = 0.0, | |
| overall_confidence = 0.0, | |
| domain = Domain.GENERAL, | |
| metric_results = {}, | |
| metric_weights = {}, | |
| weighted_scores = {}, | |
| reasoning = [f"Fatal error: {str(e)}"], | |
| uncertainty_score = 1.0, | |
| consensus_level = 0.0, | |
| ), | |
| processed_text = ProcessedText(original_text = text, | |
| cleaned_text = "", | |
| sentences = [], | |
| words = [], | |
| paragraphs = [], | |
| char_count = 0, | |
| word_count = 0, | |
| sentence_count = 0, | |
| paragraph_count = 0, | |
| avg_sentence_length = 0.0, | |
| avg_word_length = 0.0, | |
| is_valid = False, | |
| validation_errors = ["Processing failed"], | |
| metadata = {}, | |
| ), | |
| domain_prediction = DomainPrediction(primary_domain = Domain.GENERAL, | |
| secondary_domain = None, | |
| confidence = 0.0, | |
| domain_scores = {}, | |
| ), | |
| language_result = None, | |
| metric_results = {}, | |
| processing_time = processing_time, | |
| metrics_execution_time = {}, | |
| warnings = [], | |
| errors = [f"Fatal error: {repr(e)}"], | |
| ) | |
| def batch_analyze(self, texts: List[str], domain: Optional[Domain] = None) -> List[DetectionResult]: | |
| """ | |
| Analyze multiple texts | |
| Arguments: | |
| ---------- | |
| texts { list } : List of texts to analyze | |
| domain { Domain } : Override automatic domain detection | |
| Returns: | |
| -------- | |
| { list } : List of DetectionResult objects | |
| """ | |
| logger.info(f"Batch analyzing {len(texts)} texts...") | |
| results = list() | |
| for i, text in enumerate(texts): | |
| logger.info(f"Analyzing text {i+1}/{len(texts)}...") | |
| try: | |
| result = self.analyze(text = text, | |
| domain = domain, | |
| ) | |
| results.append(result) | |
| except Exception as e: | |
| logger.error(f"Error analyzing text {i+1}: {repr(e)}") | |
| # Create error result for this text | |
| error_result = DetectionResult(ensemble_result = EnsembleResult(final_verdict = "Error", | |
| ai_probability = 0.5, | |
| human_probability = 0.5, | |
| mixed_probability = 0.0, | |
| overall_confidence = 0.0, | |
| domain = Domain.GENERAL, | |
| metric_results = {}, | |
| metric_weights = {}, | |
| weighted_scores = {}, | |
| reasoning = [f"Analysis failed: {str(e)}"], | |
| uncertainty_score = 1.0, | |
| consensus_level = 0.0, | |
| ), | |
| processed_text = ProcessedText(original_text = text, | |
| cleaned_text = "", | |
| sentences = [], | |
| words = [], | |
| paragraphs = [], | |
| char_count = 0, | |
| word_count = 0, | |
| sentence_count = 0, | |
| paragraph_count = 0, | |
| avg_sentence_length = 0.0, | |
| avg_word_length = 0.0, | |
| is_valid = False, | |
| validation_errors = ["Processing failed"], | |
| metadata = {}, | |
| ), | |
| domain_prediction = DomainPrediction(primary_domain = Domain.GENERAL, | |
| secondary_domain = None, | |
| confidence = 0.0, | |
| domain_scores = {}, | |
| ), | |
| language_result = None, | |
| metric_results = {}, | |
| processing_time = 0.0, | |
| metrics_execution_time = {}, | |
| warnings = [], | |
| errors = [f"Analysis failed: {repr(e)}"], | |
| ) | |
| results.append(error_result) | |
| logger.info(f"Batch analysis complete: {len(results)}/{len(texts)} processed") | |
| return results | |
| def cleanup(self): | |
| """ | |
| Clean up resources | |
| """ | |
| logger.info("Cleaning up detection orchestrator...") | |
| for name, metric in self.metrics.items(): | |
| try: | |
| metric.cleanup() | |
| logger.debug(f"Cleaned up metric: {name}") | |
| except Exception as e: | |
| logger.warning(f"Error cleaning up metric {name}: {repr(e)}") | |
| if self.domain_classifier: | |
| try: | |
| self.domain_classifier.cleanup() | |
| logger.debug("Cleaned up domain classifier") | |
| except Exception as e: | |
| logger.warning(f"Error cleaning up domain classifier: {repr(e)}") | |
| if self.language_detector: | |
| try: | |
| self.language_detector.cleanup() | |
| logger.debug("Cleaned up language detector") | |
| except Exception as e: | |
| logger.warning(f"Error cleaning up language detector: {repr(e)}") | |
| logger.info("Cleanup complete") | |
| # Export | |
| __all__ = ["DetectionResult", | |
| "DetectionOrchestrator", | |
| ] |