import csv import json import io import tempfile import re import numpy as np from datetime import datetime from functools import lru_cache from collections import Counter from typing import List, Dict, Optional, Tuple import nltk from nltk.corpus import stopwords from config import config from models import handle_errors # Initialize NLTK try: nltk.download('stopwords', quiet=True) nltk.download('punkt', quiet=True) STOP_WORDS = set(stopwords.words('english')) except: STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} # Simplified Text Processing class TextProcessor: """Optimized text processing with multi-language support""" @staticmethod @lru_cache(maxsize=config.CACHE_SIZE) def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str: """Clean text with language awareness""" text = text.strip() # Don't clean Chinese text aggressively if re.search(r'[\u4e00-\u9fff]', text): return text text = text.lower() if remove_numbers: text = re.sub(r'\d+', '', text) if remove_punctuation: text = re.sub(r'[^\w\s]', '', text) words = text.split() cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) >= config.MIN_WORD_LENGTH] return ' '.join(cleaned_words) @staticmethod def parse_batch_input(text: str) -> List[str]: """Parse batch input from textarea""" lines = text.strip().split('\n') return [line.strip() for line in lines if line.strip()] # Enhanced History Manager class HistoryManager: """Enhanced history management with filtering""" def __init__(self): self._history = [] def add(self, entry: Dict): """Add entry with timestamp""" entry['timestamp'] = datetime.now().isoformat() self._history.append(entry) if len(self._history) > config.MAX_HISTORY_SIZE: self._history = self._history[-config.MAX_HISTORY_SIZE:] def add_batch(self, entries: List[Dict]): """Add multiple entries""" for entry in entries: self.add(entry) def get_all(self) -> List[Dict]: return self._history.copy() def get_recent(self, n: int = 10) -> List[Dict]: return self._history[-n:] if self._history else [] def filter_by(self, sentiment: str = None, language: str = None, min_confidence: float = None) -> List[Dict]: """Filter history by criteria""" filtered = self._history if sentiment: filtered = [h for h in filtered if h['sentiment'] == sentiment] if language: filtered = [h for h in filtered if h.get('language', 'en') == language] if min_confidence: filtered = [h for h in filtered if h['confidence'] >= min_confidence] return filtered def clear(self) -> int: count = len(self._history) self._history.clear() return count def size(self) -> int: return len(self._history) def get_stats(self) -> Dict: """Get comprehensive statistics""" if not self._history: return {} sentiments = [item['sentiment'] for item in self._history] confidences = [item['confidence'] for item in self._history] languages = [item.get('language', 'en') for item in self._history] return { 'total_analyses': len(self._history), 'positive_count': sentiments.count('Positive'), 'negative_count': sentiments.count('Negative'), 'neutral_count': sentiments.count('Neutral'), 'avg_confidence': np.mean(confidences), 'max_confidence': np.max(confidences), 'min_confidence': np.min(confidences), 'languages_detected': len(set(languages)), 'most_common_language': Counter(languages).most_common(1)[0][0] if languages else 'en' } # Universal Data Handler class DataHandler: """Enhanced data operations""" @staticmethod @handle_errors(default_return=(None, "Export failed")) def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]: """Export data with comprehensive information""" if not data: return None, "No data to export" temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False, suffix=f'.{format_type}', encoding='utf-8') if format_type == 'csv': writer = csv.writer(temp_file) writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Language', 'Pos_Prob', 'Neg_Prob', 'Neu_Prob', 'Word_Count']) for entry in data: writer.writerow([ entry.get('timestamp', ''), entry.get('text', ''), entry.get('sentiment', ''), f"{entry.get('confidence', 0):.4f}", entry.get('language', 'en'), f"{entry.get('pos_prob', 0):.4f}", f"{entry.get('neg_prob', 0):.4f}", f"{entry.get('neu_prob', 0):.4f}", entry.get('word_count', 0) ]) elif format_type == 'json': json.dump(data, temp_file, indent=2, ensure_ascii=False) temp_file.close() return temp_file.name, f"Exported {len(data)} entries" @staticmethod @handle_errors(default_return="") def process_file(file) -> str: """Process uploaded files""" if not file: return "" content = file.read().decode('utf-8') if file.name.endswith('.csv'): csv_file = io.StringIO(content) reader = csv.reader(csv_file) try: next(reader) # Skip header texts = [] for row in reader: if row and row[0].strip(): text = row[0].strip().strip('"') if text: texts.append(text) return '\n'.join(texts) except: lines = content.strip().split('\n')[1:] texts = [] for line in lines: if line.strip(): text = line.strip().strip('"') if text: texts.append(text) return '\n'.join(texts) return content