|
"""
|
|
Modal Labs GPU Processing
|
|
GPU-accelerated sentiment analysis and text processing
|
|
"""
|
|
|
|
import modal
|
|
import asyncio
|
|
import logging
|
|
import os
|
|
from typing import List, Dict, Any, Optional
|
|
import json
|
|
from datetime import datetime
|
|
import numpy as np
|
|
from dotenv import load_dotenv
|
|
|
|
|
|
load_dotenv()
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
app = modal.App("product-feature-agent")
|
|
|
|
|
|
image = (
|
|
modal.Image.debian_slim()
|
|
.pip_install([
|
|
"torch>=2.1.0",
|
|
"transformers>=4.35.0",
|
|
"numpy>=1.24.0",
|
|
"scikit-learn>=1.3.0",
|
|
"textblob>=0.17.1",
|
|
"vaderSentiment>=3.3.2",
|
|
"pandas>=2.1.0",
|
|
"accelerate>=0.24.0",
|
|
"python-dotenv>=1.0.0"
|
|
])
|
|
.run_commands([
|
|
"python -c \"import nltk; nltk.download('punkt'); nltk.download('punkt_tab'); nltk.download('vader_lexicon'); nltk.download('averaged_perceptron_tagger')\"",
|
|
"python -m textblob.download_corpora",
|
|
"python -c \"import textblob; textblob.TextBlob('test').tags\""
|
|
])
|
|
)
|
|
|
|
|
|
model_volume = modal.Volume.from_name("feature-agent-models", create_if_missing=True)
|
|
|
|
@app.function(
|
|
image=image,
|
|
gpu="T4",
|
|
memory=4096,
|
|
timeout=300,
|
|
volumes={"/models": model_volume},
|
|
min_containers=1
|
|
)
|
|
def gpu_batch_sentiment_analysis(texts: List[str], batch_size: int = 32) -> List[Dict[str, Any]]:
|
|
"""
|
|
Perform GPU-accelerated sentiment analysis on a batch of texts
|
|
|
|
Args:
|
|
texts: List of text strings to analyze
|
|
batch_size: Batch size for processing
|
|
|
|
Returns:
|
|
List of sentiment analysis results
|
|
"""
|
|
import torch
|
|
from transformers.pipelines import pipeline
|
|
from transformers import AutoTokenizer, AutoModelForSequenceClassification
|
|
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
|
|
from textblob import TextBlob
|
|
import time
|
|
|
|
start_time = time.time()
|
|
logger.info(f"Starting GPU sentiment analysis for {len(texts)} texts")
|
|
|
|
try:
|
|
|
|
vader_analyzer = SentimentIntensityAnalyzer()
|
|
|
|
|
|
model_name = "cardiffnlp/twitter-roberta-base-sentiment-latest"
|
|
cache_dir = "/models/sentiment"
|
|
|
|
|
|
device = 0 if torch.cuda.is_available() else -1
|
|
|
|
|
|
sentiment_pipeline = pipeline(
|
|
"sentiment-analysis",
|
|
model=model_name,
|
|
tokenizer=model_name,
|
|
device=device,
|
|
model_kwargs={"cache_dir": cache_dir},
|
|
max_length=512,
|
|
truncation=True
|
|
)
|
|
|
|
results = []
|
|
|
|
|
|
for i in range(0, len(texts), batch_size):
|
|
batch_texts = texts[i:i + batch_size]
|
|
batch_results = []
|
|
|
|
|
|
transformer_results = sentiment_pipeline(batch_texts)
|
|
|
|
|
|
for j, text in enumerate(batch_texts):
|
|
if not text or len(text.strip()) == 0:
|
|
batch_results.append({
|
|
"text": text,
|
|
"error": "Empty text",
|
|
"processing_time": 0
|
|
})
|
|
continue
|
|
|
|
text_start = time.time()
|
|
|
|
|
|
vader_scores = vader_analyzer.polarity_scores(text)
|
|
|
|
|
|
try:
|
|
blob = TextBlob(text)
|
|
textblob_sentiment = blob.sentiment
|
|
except:
|
|
textblob_sentiment = None
|
|
|
|
|
|
transformer_result = transformer_results[j]
|
|
|
|
|
|
transformer_label = transformer_result["label"].lower()
|
|
if transformer_label in ["positive", "pos"]:
|
|
transformer_sentiment = "positive"
|
|
elif transformer_label in ["negative", "neg"]:
|
|
transformer_sentiment = "negative"
|
|
else:
|
|
transformer_sentiment = "neutral"
|
|
|
|
|
|
combined_result = {
|
|
"text": text[:100] + "..." if len(text) > 100 else text,
|
|
"vader": {
|
|
"compound": vader_scores["compound"],
|
|
"positive": vader_scores["pos"],
|
|
"negative": vader_scores["neg"],
|
|
"neutral": vader_scores["neu"]
|
|
},
|
|
"textblob": {
|
|
"polarity": textblob_sentiment.polarity if textblob_sentiment else 0,
|
|
"subjectivity": textblob_sentiment.subjectivity if textblob_sentiment else 0
|
|
},
|
|
"transformer": {
|
|
"label": transformer_sentiment,
|
|
"confidence": transformer_result["score"]
|
|
},
|
|
"consensus": _calculate_consensus_sentiment(
|
|
vader_scores["compound"],
|
|
textblob_sentiment.polarity if textblob_sentiment else 0,
|
|
transformer_sentiment,
|
|
transformer_result["score"]
|
|
),
|
|
"processing_time": time.time() - text_start
|
|
}
|
|
|
|
batch_results.append(combined_result)
|
|
|
|
results.extend(batch_results)
|
|
|
|
|
|
processed = min(i + batch_size, len(texts))
|
|
logger.info(f"Processed {processed}/{len(texts)} texts")
|
|
|
|
total_time = time.time() - start_time
|
|
logger.info(f"GPU sentiment analysis completed in {total_time:.2f}s")
|
|
|
|
|
|
summary = _calculate_batch_summary(results)
|
|
|
|
return {
|
|
"results": results,
|
|
"summary": summary,
|
|
"processing_stats": {
|
|
"total_texts": len(texts),
|
|
"total_time": total_time,
|
|
"texts_per_second": len(texts) / total_time,
|
|
"gpu_used": torch.cuda.is_available(),
|
|
"model_used": model_name
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in GPU sentiment analysis: {str(e)}")
|
|
return {
|
|
"error": str(e),
|
|
"processing_stats": {
|
|
"total_texts": len(texts),
|
|
"total_time": time.time() - start_time,
|
|
"gpu_used": False
|
|
}
|
|
}
|
|
|
|
def _calculate_consensus_sentiment(vader_compound: float, textblob_polarity: float,
|
|
transformer_label: str, transformer_confidence: float) -> Dict[str, Any]:
|
|
"""Calculate consensus sentiment from multiple models"""
|
|
|
|
|
|
transformer_score = 0
|
|
if transformer_label == "positive":
|
|
transformer_score = transformer_confidence
|
|
elif transformer_label == "negative":
|
|
transformer_score = -transformer_confidence
|
|
|
|
|
|
weights = {
|
|
"vader": 0.3,
|
|
"textblob": 0.2,
|
|
"transformer": 0.5
|
|
}
|
|
|
|
weighted_score = (
|
|
vader_compound * weights["vader"] +
|
|
textblob_polarity * weights["textblob"] +
|
|
transformer_score * weights["transformer"]
|
|
)
|
|
|
|
|
|
if weighted_score >= 0.1:
|
|
consensus_label = "positive"
|
|
elif weighted_score <= -0.1:
|
|
consensus_label = "negative"
|
|
else:
|
|
consensus_label = "neutral"
|
|
|
|
|
|
scores = [vader_compound, textblob_polarity, transformer_score]
|
|
agreement = 1.0 - (np.std(scores) / 2.0)
|
|
|
|
return {
|
|
"label": consensus_label,
|
|
"score": weighted_score,
|
|
"confidence": max(0.0, min(1.0, agreement))
|
|
}
|
|
|
|
def _calculate_batch_summary(results: List[Dict[str, Any]]) -> Dict[str, Any]:
|
|
"""Calculate summary statistics for batch results"""
|
|
if not results:
|
|
return {}
|
|
|
|
valid_results = [r for r in results if "error" not in r]
|
|
|
|
if not valid_results:
|
|
return {"error": "No valid results"}
|
|
|
|
|
|
sentiment_counts = {"positive": 0, "negative": 0, "neutral": 0}
|
|
total_confidence = 0
|
|
total_processing_time = 0
|
|
|
|
for result in valid_results:
|
|
consensus = result.get("consensus", {})
|
|
label = consensus.get("label", "neutral")
|
|
confidence = consensus.get("confidence", 0)
|
|
|
|
sentiment_counts[label] += 1
|
|
total_confidence += confidence
|
|
total_processing_time += result.get("processing_time", 0)
|
|
|
|
total_valid = len(valid_results)
|
|
|
|
return {
|
|
"total_analyzed": total_valid,
|
|
"sentiment_distribution": {
|
|
"positive": sentiment_counts["positive"],
|
|
"negative": sentiment_counts["negative"],
|
|
"neutral": sentiment_counts["neutral"],
|
|
"positive_pct": (sentiment_counts["positive"] / total_valid) * 100,
|
|
"negative_pct": (sentiment_counts["negative"] / total_valid) * 100,
|
|
"neutral_pct": (sentiment_counts["neutral"] / total_valid) * 100
|
|
},
|
|
"average_confidence": total_confidence / total_valid,
|
|
"average_processing_time": total_processing_time / total_valid,
|
|
"dominant_sentiment": max(sentiment_counts, key=sentiment_counts.get)
|
|
}
|
|
|
|
@app.function(
|
|
image=image,
|
|
gpu="T4",
|
|
memory=2048,
|
|
timeout=180
|
|
)
|
|
def gpu_keyword_extraction(texts: List[str], max_keywords: int = 10) -> Dict[str, Any]:
|
|
"""
|
|
Extract keywords using GPU-accelerated NLP models
|
|
"""
|
|
import torch
|
|
from transformers.pipelines import pipeline
|
|
from textblob import TextBlob
|
|
from sklearn.feature_extraction.text import TfidfVectorizer
|
|
import time
|
|
|
|
start_time = time.time()
|
|
logger.info(f"Starting GPU keyword extraction for {len(texts)} texts")
|
|
|
|
try:
|
|
|
|
combined_text = " ".join(texts)
|
|
|
|
|
|
blob = TextBlob(combined_text)
|
|
noun_phrases = list(blob.noun_phrases)
|
|
|
|
|
|
vectorizer = TfidfVectorizer(
|
|
max_features=max_keywords * 2,
|
|
ngram_range=(1, 3),
|
|
stop_words="english"
|
|
)
|
|
|
|
if texts:
|
|
tfidf_matrix = vectorizer.fit_transform(texts)
|
|
feature_names = vectorizer.get_feature_names_out()
|
|
tfidf_scores = tfidf_matrix.sum(axis=0).A1
|
|
|
|
|
|
top_indices = tfidf_scores.argsort()[-max_keywords:][::-1]
|
|
tfidf_keywords = [(feature_names[i], tfidf_scores[i]) for i in top_indices]
|
|
else:
|
|
tfidf_keywords = []
|
|
|
|
|
|
all_keywords = {}
|
|
|
|
|
|
for phrase in noun_phrases:
|
|
if len(phrase) > 3:
|
|
all_keywords[phrase] = all_keywords.get(phrase, 0) + 1
|
|
|
|
|
|
for term, score in tfidf_keywords:
|
|
all_keywords[term] = all_keywords.get(term, 0) + score
|
|
|
|
|
|
sorted_keywords = sorted(all_keywords.items(), key=lambda x: x[1], reverse=True)
|
|
|
|
result = {
|
|
"keywords": sorted_keywords[:max_keywords],
|
|
"total_texts": len(texts),
|
|
"processing_time": time.time() - start_time,
|
|
"method": "hybrid_tfidf_nlp"
|
|
}
|
|
|
|
logger.info(f"Keyword extraction completed in {result['processing_time']:.2f}s")
|
|
return result
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in keyword extraction: {str(e)}")
|
|
return {"error": str(e)}
|
|
|
|
class GPUProcessor:
|
|
"""Client interface for Modal GPU processing"""
|
|
|
|
def __init__(self):
|
|
"""Initialize GPU processor client"""
|
|
self.app = app
|
|
self.modal_available = False
|
|
self.setup_modal_client()
|
|
|
|
def setup_modal_client(self):
|
|
"""Setup Modal client with credentials and test connection"""
|
|
logger.info("⚙️ Modal Labs client setup - Started")
|
|
|
|
try:
|
|
|
|
modal_token = os.getenv("MODAL_TOKEN")
|
|
|
|
if not modal_token:
|
|
error_msg = "No Modal token found in environment variables (MODAL_TOKEN)"
|
|
logger.error(f"❌ Modal Labs API failed: {error_msg}")
|
|
self.modal_available = False
|
|
return
|
|
|
|
logger.info(f"🔌 Attempting to connect to Modal Labs API with token ending in ...{modal_token[-4:]}")
|
|
|
|
|
|
try:
|
|
import modal
|
|
|
|
test_app = modal.App("connection-test")
|
|
logger.info("✅ Modal Labs API connected successfully - client initialized")
|
|
self.modal_available = True
|
|
logger.info("⚙️ Modal Labs client setup - Completed")
|
|
|
|
except Exception as modal_error:
|
|
error_msg = f"Modal Labs connection test failed: {str(modal_error)}"
|
|
logger.error(f"❌ Modal Labs API failed: {error_msg}")
|
|
self.modal_available = False
|
|
|
|
except Exception as e:
|
|
error_msg = f"Modal Labs client setup failed: {str(e)}"
|
|
logger.error(f"❌ Modal Labs API failed: {error_msg}")
|
|
logger.error("⚙️ Modal Labs client setup - Failed")
|
|
self.modal_available = False
|
|
|
|
async def batch_sentiment_analysis(self, data_sources: List[Any]) -> Dict[str, Any]:
|
|
"""
|
|
Process multiple data sources with sentiment analysis (GPU if available, fallback to local)
|
|
|
|
Args:
|
|
data_sources: List of data from different collectors
|
|
|
|
Returns:
|
|
Comprehensive sentiment analysis
|
|
"""
|
|
try:
|
|
|
|
all_texts = []
|
|
source_mapping = {}
|
|
|
|
for i, source in enumerate(data_sources):
|
|
source_texts = self._extract_texts_from_source(source)
|
|
|
|
|
|
start_idx = len(all_texts)
|
|
all_texts.extend(source_texts)
|
|
end_idx = len(all_texts)
|
|
|
|
source_mapping[f"source_{i}"] = {
|
|
"start": start_idx,
|
|
"end": end_idx,
|
|
"count": len(source_texts),
|
|
"type": self._identify_source_type(source),
|
|
"data": source
|
|
}
|
|
|
|
if not all_texts:
|
|
return {"error": "No texts found in data sources"}
|
|
|
|
|
|
if self.modal_available:
|
|
try:
|
|
logger.info(f"Sending {len(all_texts)} texts to Modal GPU processing")
|
|
with app.run():
|
|
sentiment_results = gpu_batch_sentiment_analysis.remote(all_texts)
|
|
|
|
|
|
organized_results = self._organize_results_by_source(
|
|
sentiment_results, source_mapping
|
|
)
|
|
|
|
return organized_results
|
|
|
|
except Exception as modal_error:
|
|
logger.warning(f"Modal GPU processing failed: {str(modal_error)}")
|
|
logger.info("Falling back to local sentiment analysis")
|
|
|
|
|
|
logger.info(f"Processing {len(all_texts)} texts with local sentiment analysis")
|
|
sentiment_results = await self._local_sentiment_analysis(all_texts)
|
|
|
|
|
|
organized_results = self._organize_results_by_source(
|
|
sentiment_results, source_mapping
|
|
)
|
|
|
|
return organized_results
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in batch sentiment analysis: {str(e)}")
|
|
return {"error": str(e)}
|
|
|
|
async def extract_keywords(self, texts: List[str], max_keywords: int = 20) -> Dict[str, Any]:
|
|
"""Extract keywords using GPU acceleration"""
|
|
try:
|
|
with app.run():
|
|
result = gpu_keyword_extraction.remote(texts, max_keywords)
|
|
return result
|
|
except Exception as e:
|
|
logger.error(f"Error in keyword extraction: {str(e)}")
|
|
return {"error": str(e)}
|
|
|
|
async def _local_sentiment_analysis(self, texts: List[str]) -> Dict[str, Any]:
|
|
"""
|
|
Local sentiment analysis fallback using VADER and TextBlob
|
|
"""
|
|
try:
|
|
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer
|
|
from textblob import TextBlob
|
|
import time
|
|
|
|
start_time = time.time()
|
|
vader_analyzer = SentimentIntensityAnalyzer()
|
|
|
|
results = []
|
|
|
|
for text in texts:
|
|
if not text or len(text.strip()) == 0:
|
|
results.append({
|
|
"text": text,
|
|
"error": "Empty text",
|
|
"processing_time": 0
|
|
})
|
|
continue
|
|
|
|
text_start = time.time()
|
|
|
|
|
|
vader_scores = vader_analyzer.polarity_scores(text)
|
|
|
|
|
|
try:
|
|
blob = TextBlob(text)
|
|
textblob_sentiment = blob.sentiment
|
|
except:
|
|
textblob_sentiment = None
|
|
|
|
|
|
vader_compound = vader_scores["compound"]
|
|
textblob_polarity = textblob_sentiment.polarity if textblob_sentiment else 0
|
|
|
|
|
|
weighted_score = (vader_compound * 0.6) + (textblob_polarity * 0.4)
|
|
|
|
if weighted_score >= 0.1:
|
|
consensus_label = "positive"
|
|
elif weighted_score <= -0.1:
|
|
consensus_label = "negative"
|
|
else:
|
|
consensus_label = "neutral"
|
|
|
|
|
|
agreement = 1.0 - abs(vader_compound - textblob_polarity) / 2.0
|
|
confidence = max(0.0, min(1.0, agreement))
|
|
|
|
result = {
|
|
"text": text[:100] + "..." if len(text) > 100 else text,
|
|
"vader": {
|
|
"compound": vader_compound,
|
|
"positive": vader_scores["pos"],
|
|
"negative": vader_scores["neg"],
|
|
"neutral": vader_scores["neu"]
|
|
},
|
|
"textblob": {
|
|
"polarity": textblob_polarity,
|
|
"subjectivity": textblob_sentiment.subjectivity if textblob_sentiment else 0
|
|
},
|
|
"consensus": {
|
|
"label": consensus_label,
|
|
"score": weighted_score,
|
|
"confidence": confidence
|
|
},
|
|
"processing_time": time.time() - text_start
|
|
}
|
|
|
|
results.append(result)
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
|
|
summary = _calculate_batch_summary(results)
|
|
|
|
return {
|
|
"results": results,
|
|
"summary": summary,
|
|
"processing_stats": {
|
|
"total_texts": len(texts),
|
|
"total_time": total_time,
|
|
"texts_per_second": len(texts) / total_time if total_time > 0 else 0,
|
|
"gpu_used": False,
|
|
"model_used": "local_vader_textblob"
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in local sentiment analysis: {str(e)}")
|
|
return {"error": str(e)}
|
|
|
|
def _extract_texts_from_source(self, source: Any) -> List[str]:
|
|
"""Extract text content from different data source formats"""
|
|
texts = []
|
|
|
|
if isinstance(source, dict):
|
|
|
|
if "apps" in source:
|
|
for app_name, app_data in source["apps"].items():
|
|
if "reviews" in app_data:
|
|
for review in app_data["reviews"]:
|
|
title = review.get("title", "")
|
|
content = review.get("content", "")
|
|
combined = f"{title} {content}".strip()
|
|
if combined:
|
|
texts.append(combined)
|
|
|
|
|
|
elif "posts" in source:
|
|
for post in source["posts"]:
|
|
title = post.get("title", "")
|
|
selftext = post.get("selftext", "")
|
|
combined = f"{title} {selftext}".strip()
|
|
if combined:
|
|
texts.append(combined)
|
|
|
|
|
|
elif "articles" in source:
|
|
for article in source["articles"]:
|
|
title = article.get("title", "")
|
|
description = article.get("description", "")
|
|
combined = f"{title} {description}".strip()
|
|
if combined:
|
|
texts.append(combined)
|
|
|
|
|
|
elif "search_results" in source:
|
|
for search_term, results in source["search_results"].items():
|
|
if "articles" in results:
|
|
for article in results["articles"]:
|
|
title = article.get("title", "")
|
|
description = article.get("description", "")
|
|
combined = f"{title} {description}".strip()
|
|
if combined:
|
|
texts.append(combined)
|
|
|
|
|
|
elif "query_results" in source:
|
|
for query, result in source["query_results"].items():
|
|
if "posts" in result:
|
|
for post in result["posts"]:
|
|
title = post.get("title", "")
|
|
selftext = post.get("selftext", "")
|
|
combined = f"{title} {selftext}".strip()
|
|
if combined:
|
|
texts.append(combined)
|
|
|
|
return texts
|
|
|
|
def _identify_source_type(self, source: Any) -> str:
|
|
"""Identify the type of data source"""
|
|
if isinstance(source, dict):
|
|
if "apps" in source:
|
|
return "app_store"
|
|
elif "posts" in source:
|
|
return "reddit"
|
|
elif "articles" in source:
|
|
return "news"
|
|
return "unknown"
|
|
|
|
def _organize_results_by_source(self, sentiment_results: Dict[str, Any],
|
|
source_mapping: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Organize sentiment results by original data source"""
|
|
if "error" in sentiment_results:
|
|
return sentiment_results
|
|
|
|
results = sentiment_results.get("results", [])
|
|
summary = sentiment_results.get("summary", {})
|
|
processing_stats = sentiment_results.get("processing_stats", {})
|
|
|
|
organized = {
|
|
"by_source": {},
|
|
"overall_summary": summary,
|
|
"processing_stats": processing_stats
|
|
}
|
|
|
|
|
|
for source_id, mapping in source_mapping.items():
|
|
start_idx = mapping["start"]
|
|
end_idx = mapping["end"]
|
|
source_results = results[start_idx:end_idx]
|
|
|
|
|
|
source_summary = _calculate_batch_summary(source_results)
|
|
|
|
organized["by_source"][source_id] = {
|
|
"type": mapping["type"],
|
|
"count": mapping["count"],
|
|
"results": source_results,
|
|
"summary": source_summary
|
|
}
|
|
|
|
return organized
|
|
|
|
|
|
async def test_gpu_processor():
|
|
"""Test function for GPU processor"""
|
|
processor = GPUProcessor()
|
|
|
|
|
|
test_texts = [
|
|
"This product is amazing! I love using it every day.",
|
|
"Terrible experience, would not recommend to anyone.",
|
|
"It's okay, nothing special but does the job.",
|
|
"Outstanding features and great customer service!",
|
|
"Complete waste of money, very disappointed."
|
|
]
|
|
|
|
|
|
print("Testing GPU sentiment analysis...")
|
|
mock_sources = [
|
|
{"posts": [{"title": text, "selftext": ""} for text in test_texts[:3]]},
|
|
{"articles": [{"title": text, "description": ""} for text in test_texts[3:]]}
|
|
]
|
|
|
|
sentiment_result = await processor.batch_sentiment_analysis(mock_sources)
|
|
print(f"Sentiment analysis completed: {sentiment_result.get('processing_stats', {}).get('total_texts', 0)} texts processed")
|
|
|
|
|
|
print("Testing GPU keyword extraction...")
|
|
keyword_result = await processor.extract_keywords(test_texts)
|
|
print(f"Keyword extraction: {len(keyword_result.get('keywords', []))} keywords found")
|
|
|
|
return sentiment_result, keyword_result
|
|
|
|
if __name__ == "__main__":
|
|
|
|
asyncio.run(test_gpu_processor())
|
|
|