entropy25 commited on
Commit
f823c9f
·
verified ·
1 Parent(s): ec43aa0

Create data_utils.py

Browse files
Files changed (1) hide show
  1. data_utils.py +191 -0
data_utils.py ADDED
@@ -0,0 +1,191 @@
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
+ import csv
2
+ import json
3
+ import io
4
+ import tempfile
5
+ import re
6
+ import numpy as np
7
+ from datetime import datetime
8
+ from functools import lru_cache
9
+ from collections import Counter
10
+ from typing import List, Dict, Optional, Tuple
11
+ import nltk
12
+ from nltk.corpus import stopwords
13
+
14
+ from config import config
15
+ from models import handle_errors
16
+
17
+ # Initialize NLTK
18
+ try:
19
+ nltk.download('stopwords', quiet=True)
20
+ nltk.download('punkt', quiet=True)
21
+ STOP_WORDS = set(stopwords.words('english'))
22
+ except:
23
+ STOP_WORDS = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
24
+
25
+ # Simplified Text Processing
26
+ class TextProcessor:
27
+ """Optimized text processing with multi-language support"""
28
+
29
+ @staticmethod
30
+ @lru_cache(maxsize=config.CACHE_SIZE)
31
+ def clean_text(text: str, remove_punctuation: bool = True, remove_numbers: bool = False) -> str:
32
+ """Clean text with language awareness"""
33
+ text = text.strip()
34
+
35
+ # Don't clean Chinese text aggressively
36
+ if re.search(r'[\u4e00-\u9fff]', text):
37
+ return text
38
+
39
+ text = text.lower()
40
+
41
+ if remove_numbers:
42
+ text = re.sub(r'\d+', '', text)
43
+
44
+ if remove_punctuation:
45
+ text = re.sub(r'[^\w\s]', '', text)
46
+
47
+ words = text.split()
48
+ cleaned_words = [w for w in words if w not in STOP_WORDS and len(w) >= config.MIN_WORD_LENGTH]
49
+ return ' '.join(cleaned_words)
50
+
51
+ @staticmethod
52
+ def parse_batch_input(text: str) -> List[str]:
53
+ """Parse batch input from textarea"""
54
+ lines = text.strip().split('\n')
55
+ return [line.strip() for line in lines if line.strip()]
56
+
57
+ # Enhanced History Manager
58
+ class HistoryManager:
59
+ """Enhanced history management with filtering"""
60
+ def __init__(self):
61
+ self._history = []
62
+
63
+ def add(self, entry: Dict):
64
+ """Add entry with timestamp"""
65
+ entry['timestamp'] = datetime.now().isoformat()
66
+ self._history.append(entry)
67
+ if len(self._history) > config.MAX_HISTORY_SIZE:
68
+ self._history = self._history[-config.MAX_HISTORY_SIZE:]
69
+
70
+ def add_batch(self, entries: List[Dict]):
71
+ """Add multiple entries"""
72
+ for entry in entries:
73
+ self.add(entry)
74
+
75
+ def get_all(self) -> List[Dict]:
76
+ return self._history.copy()
77
+
78
+ def get_recent(self, n: int = 10) -> List[Dict]:
79
+ return self._history[-n:] if self._history else []
80
+
81
+ def filter_by(self, sentiment: str = None, language: str = None,
82
+ min_confidence: float = None) -> List[Dict]:
83
+ """Filter history by criteria"""
84
+ filtered = self._history
85
+
86
+ if sentiment:
87
+ filtered = [h for h in filtered if h['sentiment'] == sentiment]
88
+ if language:
89
+ filtered = [h for h in filtered if h.get('language', 'en') == language]
90
+ if min_confidence:
91
+ filtered = [h for h in filtered if h['confidence'] >= min_confidence]
92
+
93
+ return filtered
94
+
95
+ def clear(self) -> int:
96
+ count = len(self._history)
97
+ self._history.clear()
98
+ return count
99
+
100
+ def size(self) -> int:
101
+ return len(self._history)
102
+
103
+ def get_stats(self) -> Dict:
104
+ """Get comprehensive statistics"""
105
+ if not self._history:
106
+ return {}
107
+
108
+ sentiments = [item['sentiment'] for item in self._history]
109
+ confidences = [item['confidence'] for item in self._history]
110
+ languages = [item.get('language', 'en') for item in self._history]
111
+
112
+ return {
113
+ 'total_analyses': len(self._history),
114
+ 'positive_count': sentiments.count('Positive'),
115
+ 'negative_count': sentiments.count('Negative'),
116
+ 'neutral_count': sentiments.count('Neutral'),
117
+ 'avg_confidence': np.mean(confidences),
118
+ 'max_confidence': np.max(confidences),
119
+ 'min_confidence': np.min(confidences),
120
+ 'languages_detected': len(set(languages)),
121
+ 'most_common_language': Counter(languages).most_common(1)[0][0] if languages else 'en'
122
+ }
123
+
124
+ # Universal Data Handler
125
+ class DataHandler:
126
+ """Enhanced data operations"""
127
+
128
+ @staticmethod
129
+ @handle_errors(default_return=(None, "Export failed"))
130
+ def export_data(data: List[Dict], format_type: str) -> Tuple[Optional[str], str]:
131
+ """Export data with comprehensive information"""
132
+ if not data:
133
+ return None, "No data to export"
134
+
135
+ temp_file = tempfile.NamedTemporaryFile(mode='w', delete=False,
136
+ suffix=f'.{format_type}', encoding='utf-8')
137
+
138
+ if format_type == 'csv':
139
+ writer = csv.writer(temp_file)
140
+ writer.writerow(['Timestamp', 'Text', 'Sentiment', 'Confidence', 'Language',
141
+ 'Pos_Prob', 'Neg_Prob', 'Neu_Prob', 'Word_Count'])
142
+ for entry in data:
143
+ writer.writerow([
144
+ entry.get('timestamp', ''),
145
+ entry.get('text', ''),
146
+ entry.get('sentiment', ''),
147
+ f"{entry.get('confidence', 0):.4f}",
148
+ entry.get('language', 'en'),
149
+ f"{entry.get('pos_prob', 0):.4f}",
150
+ f"{entry.get('neg_prob', 0):.4f}",
151
+ f"{entry.get('neu_prob', 0):.4f}",
152
+ entry.get('word_count', 0)
153
+ ])
154
+ elif format_type == 'json':
155
+ json.dump(data, temp_file, indent=2, ensure_ascii=False)
156
+
157
+ temp_file.close()
158
+ return temp_file.name, f"Exported {len(data)} entries"
159
+
160
+ @staticmethod
161
+ @handle_errors(default_return="")
162
+ def process_file(file) -> str:
163
+ """Process uploaded files"""
164
+ if not file:
165
+ return ""
166
+
167
+ content = file.read().decode('utf-8')
168
+
169
+ if file.name.endswith('.csv'):
170
+ csv_file = io.StringIO(content)
171
+ reader = csv.reader(csv_file)
172
+ try:
173
+ next(reader) # Skip header
174
+ texts = []
175
+ for row in reader:
176
+ if row and row[0].strip():
177
+ text = row[0].strip().strip('"')
178
+ if text:
179
+ texts.append(text)
180
+ return '\n'.join(texts)
181
+ except:
182
+ lines = content.strip().split('\n')[1:]
183
+ texts = []
184
+ for line in lines:
185
+ if line.strip():
186
+ text = line.strip().strip('"')
187
+ if text:
188
+ texts.append(text)
189
+ return '\n'.join(texts)
190
+
191
+ return content