File size: 11,869 Bytes
33fba47
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
import torch
import numpy as np
import gc
import threading
import langdetect
import logging
from collections import OrderedDict, Counter
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from concurrent.futures import ThreadPoolExecutor
from functools import lru_cache, wraps
from contextlib import contextmanager
from typing import List, Dict, Optional, Tuple, Any, Callable
import re

from config import config

logger = logging.getLogger(__name__)

# Decorators and Context Managers
def handle_errors(default_return=None):
    """Centralized error handling decorator"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs):
            try:
                return func(*args, **kwargs)
            except Exception as e:
                logger.error(f"{func.__name__} failed: {e}")
                return default_return if default_return is not None else f"Error: {str(e)}"
        return wrapper
    return decorator

@contextmanager
def memory_cleanup():
    """Context manager for memory cleanup"""
    try:
        yield
    finally:
        gc.collect()
        if torch.cuda.is_available():
            torch.cuda.empty_cache()

class ThemeContext:
    """Theme management context"""
    def __init__(self, theme: str = 'default'):
        self.theme = theme
        self.colors = config.THEMES.get(theme, config.THEMES['default'])

class LRUModelCache:
    """LRU Cache for models with memory management"""
    def __init__(self, max_size: int = 2):
        self.max_size = max_size
        self.cache = OrderedDict()
        self.lock = threading.Lock()
    
    def get(self, key):
        with self.lock:
            if key in self.cache:
                # Move to end (most recently used)
                self.cache.move_to_end(key)
                return self.cache[key]
            return None
    
    def put(self, key, value):
        with self.lock:
            if key in self.cache:
                self.cache.move_to_end(key)
            else:
                if len(self.cache) >= self.max_size:
                    # Remove least recently used
                    oldest_key = next(iter(self.cache))
                    old_model, old_tokenizer = self.cache.pop(oldest_key)
                    # Force cleanup
                    del old_model, old_tokenizer
                    gc.collect()
                    if torch.cuda.is_available():
                        torch.cuda.empty_cache()
                
                self.cache[key] = value
    
    def clear(self):
        with self.lock:
            for model, tokenizer in self.cache.values():
                del model, tokenizer
            self.cache.clear()
            gc.collect()
            if torch.cuda.is_available():
                torch.cuda.empty_cache()

# Enhanced Model Manager with Optimized Memory Management
class ModelManager:
    """Optimized multi-language model manager with LRU cache and lazy loading"""
    _instance = None
    
    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._initialized = False
        return cls._instance
    
    def __init__(self):
        if not self._initialized:
            self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
            self.model_cache = LRUModelCache(config.MODEL_CACHE_SIZE)
            self.loading_lock = threading.Lock()
            self._initialized = True
            logger.info(f"ModelManager initialized on device: {self.device}")
    
    def _load_model(self, model_name: str, cache_key: str):
        """Load model with memory optimization"""
        try:
            logger.info(f"Loading model: {model_name}")
            
            # Load with memory optimization
            tokenizer = AutoTokenizer.from_pretrained(model_name)
            model = AutoModelForSequenceClassification.from_pretrained(
                model_name,
                torch_dtype=torch.float16 if torch.cuda.is_available() else torch.float32,
                device_map="auto" if torch.cuda.is_available() else None
            )
            
            if not torch.cuda.is_available():
                model.to(self.device)
            
            # Set to eval mode to save memory
            model.eval()
            
            # Cache the model
            self.model_cache.put(cache_key, (model, tokenizer))
            logger.info(f"Model {model_name} loaded and cached successfully")
            
            return model, tokenizer
            
        except Exception as e:
            logger.error(f"Failed to load model {model_name}: {e}")
            raise
    
    def get_model(self, language='en'):
        """Get model for specific language with lazy loading and caching"""
        # Determine cache key and model name
        if language == 'zh':
            cache_key = 'zh'
            model_name = config.MODELS['zh']
        else:
            cache_key = 'multilingual'
            model_name = config.MODELS['multilingual']
        
        # Try to get from cache first
        cached_model = self.model_cache.get(cache_key)
        if cached_model is not None:
            return cached_model
        
        # Load model if not in cache (with thread safety)
        with self.loading_lock:
            # Double-check pattern
            cached_model = self.model_cache.get(cache_key)
            if cached_model is not None:
                return cached_model
            
            return self._load_model(model_name, cache_key)
    
    @staticmethod
    def detect_language(text: str) -> str:
        """Detect text language"""
        try:
            detected = langdetect.detect(text)
            language_mapping = {
                'zh-cn': 'zh',
                'zh-tw': 'zh'
            }
            detected = language_mapping.get(detected, detected)
            return detected if detected in config.SUPPORTED_LANGUAGES else 'en'
        except:
            return 'en'

# Core Sentiment Analysis Engine with Performance Optimizations
class SentimentEngine:
    """Optimized multi-language sentiment analysis engine"""
    
    def __init__(self):
        self.model_manager = ModelManager()
        self.executor = ThreadPoolExecutor(max_workers=4)
    
    @handle_errors(default_return={'sentiment': 'Unknown', 'confidence': 0.0})
    def analyze_single(self, text: str, language: str = 'auto', preprocessing_options: Dict = None) -> Dict:
        """Optimized single text analysis"""
        if not text.strip():
            raise ValueError("Empty text provided")
        
        # Detect language
        if language == 'auto':
            detected_lang = self.model_manager.detect_language(text)
        else:
            detected_lang = language
        
        # Get appropriate model
        model, tokenizer = self.model_manager.get_model(detected_lang)
        
        # Preprocessing
        options = preprocessing_options or {}
        processed_text = text
        if options.get('clean_text', False) and not re.search(r'[\u4e00-\u9fff]', text):
            from data_utils import TextProcessor
            processed_text = TextProcessor.clean_text(
                text, 
                options.get('remove_punctuation', True),
                options.get('remove_numbers', False)
            )
        
        # Tokenize and analyze with memory optimization
        inputs = tokenizer(processed_text, return_tensors="pt", padding=True, 
                         truncation=True, max_length=config.MAX_TEXT_LENGTH).to(self.model_manager.device)
        
        # Use no_grad for inference to save memory
        with torch.no_grad():
            outputs = model(**inputs)
            probs = torch.nn.functional.softmax(outputs.logits, dim=-1).cpu().numpy()[0]
        
        # Clear GPU cache after inference
        if torch.cuda.is_available():
            torch.cuda.empty_cache()
        
        # Handle different model outputs
        if len(probs) == 3:  # negative, neutral, positive
            sentiment_idx = np.argmax(probs)
            sentiment_labels = ['Negative', 'Neutral', 'Positive']
            sentiment = sentiment_labels[sentiment_idx]
            confidence = float(probs[sentiment_idx])
            
            result = {
                'sentiment': sentiment,
                'confidence': confidence,
                'neg_prob': float(probs[0]),
                'neu_prob': float(probs[1]),
                'pos_prob': float(probs[2]),
                'has_neutral': True
            }
        else:  # negative, positive
            pred = np.argmax(probs)
            sentiment = "Positive" if pred == 1 else "Negative"
            confidence = float(probs[pred])
            
            result = {
                'sentiment': sentiment,
                'confidence': confidence,
                'neg_prob': float(probs[0]),
                'pos_prob': float(probs[1]),
                'neu_prob': 0.0,
                'has_neutral': False
            }
        
        # Add metadata
        result.update({
            'language': detected_lang,
            'word_count': len(text.split()),
            'char_count': len(text)
        })
        
        return result
    
    def _analyze_text_batch(self, text: str, language: str, preprocessing_options: Dict, index: int) -> Dict:
        """Single text analysis for batch processing"""
        try:
            result = self.analyze_single(text, language, preprocessing_options)
            result['batch_index'] = index
            result['text'] = text[:100] + '...' if len(text) > 100 else text
            result['full_text'] = text
            return result
        except Exception as e:
            return {
                'sentiment': 'Error',
                'confidence': 0.0,
                'error': str(e),
                'batch_index': index,
                'text': text[:100] + '...' if len(text) > 100 else text,
                'full_text': text
            }

    @handle_errors(default_return=[])
    def analyze_batch(self, texts: List[str], language: str = 'auto', 
                     preprocessing_options: Dict = None, progress_callback=None) -> List[Dict]:
        """Optimized parallel batch processing"""
        if len(texts) > config.BATCH_SIZE_LIMIT:
            texts = texts[:config.BATCH_SIZE_LIMIT]
        
        if not texts:
            return []
        
        # Pre-load model to avoid race conditions
        self.model_manager.get_model(language if language != 'auto' else 'en')
        
        # Use ThreadPoolExecutor for parallel processing
        with ThreadPoolExecutor(max_workers=min(4, len(texts))) as executor:
            futures = []
            for i, text in enumerate(texts):
                future = executor.submit(
                    self._analyze_text_batch, 
                    text, language, preprocessing_options, i
                )
                futures.append(future)
            
            results = []
            for i, future in enumerate(futures):
                if progress_callback:
                    progress_callback((i + 1) / len(futures))
                
                try:
                    result = future.result(timeout=30)  # 30 second timeout per text
                    results.append(result)
                except Exception as e:
                    results.append({
                        'sentiment': 'Error',
                        'confidence': 0.0,
                        'error': f"Timeout or error: {str(e)}",
                        'batch_index': i,
                        'text': texts[i][:100] + '...' if len(texts[i]) > 100 else texts[i],
                        'full_text': texts[i]
                    })
        
        return results