iykyk-product-validation-agent / utils /nlp_query_enhancer.py
chuckiykyk's picture
Upload 36 files
72f802a verified
"""
NLP Query Enhancer
Advanced query processing using spaCy and NLTK for better search results
"""
import spacy
import nltk
from nltk.corpus import wordnet
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from sentence_transformers import SentenceTransformer
import numpy as np
from typing import List, Dict, Set, Tuple, Any
import re
import logging
from collections import defaultdict, Counter
import asyncio
import aiohttp
logger = logging.getLogger(__name__)
class NLPQueryEnhancer:
"""Enhanced query processing using advanced NLP techniques"""
def __init__(self):
"""Initialize NLP models and resources"""
self.nlp = None
self.sentence_model = None
self.lemmatizer = None
self.stop_words = None
self._initialized = False
# Industry-specific term mappings
self.industry_ontology = {
'technology': {
'ai': ['artificial intelligence', 'machine learning', 'deep learning', 'neural networks'],
'app': ['application', 'software', 'mobile app', 'web app', 'platform'],
'api': ['application programming interface', 'web service', 'endpoint'],
'saas': ['software as a service', 'cloud software', 'subscription software'],
'iot': ['internet of things', 'connected devices', 'smart devices'],
'blockchain': ['distributed ledger', 'cryptocurrency', 'smart contracts']
},
'business': {
'startup': ['new business', 'entrepreneur', 'venture', 'company'],
'revenue': ['income', 'earnings', 'sales', 'profit'],
'customer': ['client', 'user', 'consumer', 'buyer'],
'market': ['industry', 'sector', 'segment', 'niche'],
'competition': ['competitor', 'rival', 'alternative', 'substitute']
},
'health': {
'fitness': ['exercise', 'workout', 'training', 'physical activity'],
'nutrition': ['diet', 'food', 'eating', 'meal planning'],
'wellness': ['health', 'wellbeing', 'lifestyle', 'self-care'],
'medical': ['healthcare', 'clinical', 'treatment', 'diagnosis']
},
'finance': {
'fintech': ['financial technology', 'digital banking', 'payment systems'],
'investment': ['portfolio', 'trading', 'stocks', 'assets'],
'banking': ['financial services', 'credit', 'loans', 'deposits'],
'insurance': ['coverage', 'policy', 'claims', 'risk management']
}
}
async def initialize(self):
"""Initialize NLP models asynchronously"""
if self._initialized:
return
try:
logger.info("Initializing NLP models...")
# Download required NLTK data
await self._download_nltk_data()
# Initialize spaCy model
try:
self.nlp = spacy.load("en_core_web_sm")
except OSError:
logger.warning("spaCy model 'en_core_web_sm' not found. Using basic tokenization.")
self.nlp = None
# Initialize sentence transformer
try:
self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
except Exception as e:
logger.warning(f"Could not load sentence transformer: {e}")
self.sentence_model = None
# Initialize NLTK components
self.lemmatizer = WordNetLemmatizer()
self.stop_words = set(stopwords.words('english'))
self._initialized = True
logger.info("NLP models initialized successfully")
except Exception as e:
logger.error(f"Error initializing NLP models: {e}")
# Set basic fallbacks
self.lemmatizer = WordNetLemmatizer() if 'WordNetLemmatizer' in locals() else None
self.stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
async def _download_nltk_data(self):
"""Download required NLTK data"""
required_data = ['punkt', 'stopwords', 'wordnet', 'averaged_perceptron_tagger', 'omw-1.4']
for data in required_data:
try:
nltk.data.find(f'tokenizers/{data}')
except LookupError:
try:
nltk.download(data, quiet=True)
except Exception as e:
logger.warning(f"Could not download NLTK data '{data}': {e}")
async def enhance_query(self, feature_description: str, target_market: str = "") -> Dict[str, Any]:
"""
Enhance a feature description into multiple optimized search queries
Args:
feature_description: Original feature description
target_market: Target market description
Returns:
Enhanced query data with multiple search strategies
"""
await self.initialize()
try:
# Extract key concepts using NLP
concepts = await self._extract_key_concepts(feature_description)
# Generate semantic variations
semantic_queries = await self._generate_semantic_queries(feature_description)
# Expand with synonyms and related terms
expanded_queries = await self._expand_with_synonyms(concepts)
# Add industry-specific terms
industry_queries = await self._add_industry_terms(concepts, target_market)
# Generate market-specific queries
market_queries = await self._generate_market_queries(feature_description, target_market)
# Combine and rank queries
all_queries = {
'core_concepts': concepts['core_terms'],
'semantic_variations': semantic_queries,
'synonym_expansions': expanded_queries,
'industry_specific': industry_queries,
'market_focused': market_queries,
'combined_queries': self._combine_queries(concepts, target_market)
}
return {
'original_description': feature_description,
'target_market': target_market,
'extracted_concepts': concepts,
'enhanced_queries': all_queries,
'query_metadata': {
'total_queries': sum(len(v) if isinstance(v, list) else 1 for v in all_queries.values()),
'confidence_score': self._calculate_confidence(concepts),
'processing_method': 'advanced_nlp' if self.nlp else 'basic_processing'
}
}
except Exception as e:
logger.error(f"Error enhancing query: {e}")
# Fallback to basic processing
return await self._basic_query_enhancement(feature_description, target_market)
async def _extract_key_concepts(self, text: str) -> Dict[str, Any]:
"""Extract key concepts using spaCy NLP"""
concepts = {
'core_terms': [],
'entities': [],
'technologies': [],
'business_terms': [],
'action_verbs': [],
'descriptors': []
}
if self.nlp:
# Use spaCy for advanced processing
doc = self.nlp(text)
# Extract named entities
for ent in doc.ents:
if ent.label_ in ['ORG', 'PRODUCT', 'TECHNOLOGY']:
concepts['entities'].append(ent.text.lower())
# Extract key terms by POS tags
for token in doc:
if token.is_stop or token.is_punct or len(token.text) < 3:
continue
lemma = token.lemma_.lower()
if token.pos_ == 'NOUN' and token.dep_ in ['nsubj', 'dobj', 'pobj']:
concepts['core_terms'].append(lemma)
elif token.pos_ == 'VERB' and token.tag_ not in ['MD', 'VBZ']: # Not auxiliary verbs
concepts['action_verbs'].append(lemma)
elif token.pos_ == 'ADJ':
concepts['descriptors'].append(lemma)
# Extract noun phrases
for chunk in doc.noun_chunks:
if len(chunk.text.split()) <= 3: # Keep phrases short
concepts['core_terms'].append(chunk.text.lower())
else:
# Fallback to basic processing
words = word_tokenize(text.lower())
if self.lemmatizer:
words = [self.lemmatizer.lemmatize(word) for word in words if word.isalpha()]
# Filter out stop words
words = [word for word in words if word not in self.stop_words and len(word) > 2]
concepts['core_terms'] = words[:10] # Limit to top 10
# Categorize terms
concepts = self._categorize_terms(concepts)
# Remove duplicates and sort by importance
for key in concepts:
if isinstance(concepts[key], list):
concepts[key] = list(dict.fromkeys(concepts[key]))[:5] # Top 5 per category
return concepts
def _categorize_terms(self, concepts: Dict[str, Any]) -> Dict[str, Any]:
"""Categorize terms into technology, business, etc."""
tech_keywords = {'ai', 'app', 'software', 'platform', 'api', 'system', 'tool', 'technology', 'digital', 'online', 'mobile', 'web'}
business_keywords = {'market', 'customer', 'business', 'service', 'product', 'solution', 'company', 'industry', 'revenue', 'profit'}
all_terms = concepts['core_terms'] + concepts['entities']
for term in all_terms:
term_lower = term.lower()
if any(tech in term_lower for tech in tech_keywords):
concepts['technologies'].append(term)
elif any(biz in term_lower for biz in business_keywords):
concepts['business_terms'].append(term)
return concepts
async def _generate_semantic_queries(self, text: str) -> List[str]:
"""Generate semantically similar queries using sentence transformers"""
if not self.sentence_model:
return []
try:
# Generate variations by paraphrasing key concepts
base_embedding = self.sentence_model.encode([text])
# Create variations by replacing key terms with synonyms
variations = []
words = text.split()
for i, word in enumerate(words):
if len(word) > 4 and word.lower() not in self.stop_words:
# Try to find synonyms
synonyms = self._get_wordnet_synonyms(word)
for synonym in synonyms[:2]: # Limit to 2 synonyms per word
new_text = words.copy()
new_text[i] = synonym
variations.append(' '.join(new_text))
return variations[:5] # Return top 5 variations
except Exception as e:
logger.warning(f"Error generating semantic queries: {e}")
return []
async def _expand_with_synonyms(self, concepts: Dict[str, Any]) -> List[str]:
"""Expand core terms with synonyms using WordNet"""
expanded = []
for term in concepts['core_terms'][:5]: # Limit to top 5 terms
synonyms = self._get_wordnet_synonyms(term)
if synonyms:
# Create queries with synonyms
expanded.extend(synonyms[:3]) # Top 3 synonyms per term
return list(set(expanded)) # Remove duplicates
def _get_wordnet_synonyms(self, word: str) -> List[str]:
"""Get synonyms from WordNet"""
synonyms = set()
try:
for syn in wordnet.synsets(word):
for lemma in syn.lemmas():
synonym = lemma.name().replace('_', ' ')
if synonym.lower() != word.lower() and len(synonym) > 2:
synonyms.add(synonym)
except Exception as e:
logger.debug(f"Error getting synonyms for '{word}': {e}")
return list(synonyms)[:5] # Limit to 5 synonyms
async def _add_industry_terms(self, concepts: Dict[str, Any], target_market: str) -> List[str]:
"""Add industry-specific terminology"""
industry_queries = []
# Detect industry from concepts and target market
detected_industries = self._detect_industries(concepts, target_market)
for industry in detected_industries:
if industry in self.industry_ontology:
ontology = self.industry_ontology[industry]
# Match concepts to industry terms
for concept in concepts['core_terms'][:3]:
for term, expansions in ontology.items():
if concept.lower() in term or term in concept.lower():
industry_queries.extend(expansions[:2]) # Add 2 expansions
return list(set(industry_queries))[:8] # Limit to 8 industry terms
def _detect_industries(self, concepts: Dict[str, Any], target_market: str) -> List[str]:
"""Detect relevant industries from concepts and target market"""
industries = []
all_text = ' '.join(concepts['core_terms'] + [target_market]).lower()
# Simple keyword matching for industry detection
industry_keywords = {
'technology': ['app', 'software', 'ai', 'tech', 'digital', 'platform', 'api'],
'health': ['fitness', 'health', 'medical', 'wellness', 'nutrition', 'exercise'],
'business': ['business', 'startup', 'company', 'market', 'customer'],
'finance': ['finance', 'money', 'payment', 'banking', 'investment']
}
for industry, keywords in industry_keywords.items():
if any(keyword in all_text for keyword in keywords):
industries.append(industry)
return industries[:2] # Limit to 2 industries
async def _generate_market_queries(self, feature_description: str, target_market: str) -> List[str]:
"""Generate market-focused search queries"""
if not target_market:
return []
market_queries = []
# Extract key terms from feature description
feature_words = [word for word in feature_description.split()
if len(word) > 3 and word.lower() not in self.stop_words][:3]
# Extract market segments
market_words = [word for word in target_market.split()
if len(word) > 3 and word.lower() not in self.stop_words][:3]
# Combine feature + market
for feature_word in feature_words:
for market_word in market_words:
market_queries.append(f"{feature_word} {market_word}")
market_queries.append(f"{market_word} {feature_word}")
# Add market-specific patterns
market_patterns = [
f"{target_market} solutions",
f"{target_market} tools",
f"{target_market} software",
f"{target_market} apps",
f"{target_market} technology"
]
market_queries.extend(market_patterns)
return list(set(market_queries))[:6] # Limit to 6 market queries
def _combine_queries(self, concepts: Dict[str, Any], target_market: str) -> List[str]:
"""Combine different concepts into comprehensive, contextually relevant queries"""
combined = []
core_terms = concepts['core_terms'][:3]
technologies = concepts['technologies'][:2]
business_terms = concepts['business_terms'][:2]
# Create contextual phrase combinations (preserve meaning)
if len(core_terms) >= 2:
# Keep related terms together
main_concept = ' '.join(core_terms[:2])
combined.append(main_concept)
# Add third term only if it's contextually related
if len(core_terms) >= 3:
extended_concept = ' '.join(core_terms[:3])
# Only add if it makes semantic sense
if len(extended_concept.split()) <= 4: # Avoid overly long phrases
combined.append(extended_concept)
# Combine technology + business terms meaningfully
for tech in technologies:
for biz in business_terms:
# Create meaningful combinations
tech_biz_combo = f"{tech} {biz}"
# Ensure the combination makes sense (not just random word pairs)
if self._is_meaningful_combination(tech, biz):
combined.append(tech_biz_combo)
# Add contextual market combinations
if target_market:
market_words = [word for word in target_market.split() if len(word) > 3][:2]
for market_word in market_words:
for term in core_terms[:2]:
# Create market-specific queries that maintain context
market_combo = f"{term} {market_word}"
if self._is_meaningful_combination(term, market_word):
combined.append(market_combo)
# Remove duplicates and filter for relevance
unique_combined = list(dict.fromkeys(combined)) # Preserve order while removing duplicates
# Filter out combinations that are too generic or meaningless
filtered_combined = [combo for combo in unique_combined if self._is_contextually_relevant(combo)]
return filtered_combined[:5] # Limit to 5 most relevant combined queries
def _is_meaningful_combination(self, term1: str, term2: str) -> bool:
"""Check if two terms create a meaningful combination"""
# Avoid combining very similar terms
if term1.lower() == term2.lower():
return False
# Avoid combining terms that are substrings of each other
if term1.lower() in term2.lower() or term2.lower() in term1.lower():
return False
# Check for semantic compatibility
tech_terms = {'ai', 'ml', 'algorithm', 'neural', 'deep', 'machine', 'learning', 'data', 'model', 'system', 'network', 'automation', 'software', 'digital', 'technology', 'computer', 'intelligence'}
business_terms = {'business', 'market', 'customer', 'service', 'product', 'solution', 'company', 'industry', 'enterprise', 'commercial', 'professional', 'management', 'strategy'}
term1_is_tech = any(tech in term1.lower() for tech in tech_terms)
term2_is_tech = any(tech in term2.lower() for tech in tech_terms)
term1_is_business = any(biz in term1.lower() for biz in business_terms)
term2_is_business = any(biz in term2.lower() for biz in business_terms)
# Good combinations: tech + business, or related terms
if (term1_is_tech and term2_is_business) or (term1_is_business and term2_is_tech):
return True
# Both are tech terms or both are business terms - can be good if not too similar
if (term1_is_tech and term2_is_tech) or (term1_is_business and term2_is_business):
return len(set(term1.lower().split()) & set(term2.lower().split())) == 0 # No overlapping words
return True # Default to allowing the combination
def _is_contextually_relevant(self, query: str) -> bool:
"""Check if a query maintains contextual relevance"""
words = query.lower().split()
# Filter out queries that are too short or too long
if len(words) < 1 or len(words) > 5:
return False
# Filter out queries with only stop words
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
meaningful_words = [word for word in words if word not in stop_words and len(word) > 2]
if len(meaningful_words) == 0:
return False
# Ensure at least one word is substantial (length > 3)
if not any(len(word) > 3 for word in meaningful_words):
return False
return True
def _calculate_confidence(self, concepts: Dict[str, Any]) -> float:
"""Calculate confidence score for the extraction"""
total_concepts = sum(len(v) if isinstance(v, list) else 0 for v in concepts.values())
# Base confidence on number of extracted concepts
if total_concepts >= 15:
return 0.9
elif total_concepts >= 10:
return 0.8
elif total_concepts >= 5:
return 0.7
else:
return 0.6
async def _basic_query_enhancement(self, feature_description: str, target_market: str) -> Dict[str, Any]:
"""Fallback basic query enhancement when NLP models fail"""
# Simple word extraction
words = re.findall(r'\b\w{3,}\b', feature_description.lower())
words = [word for word in words if word not in self.stop_words][:5]
return {
'original_description': feature_description,
'target_market': target_market,
'extracted_concepts': {'core_terms': words},
'enhanced_queries': {
'core_concepts': words,
'semantic_variations': [],
'synonym_expansions': [],
'industry_specific': [],
'market_focused': [target_market] if target_market else [],
'combined_queries': [' '.join(words[:2])] if len(words) >= 2 else words
},
'query_metadata': {
'total_queries': len(words) + (1 if target_market else 0),
'confidence_score': 0.5,
'processing_method': 'basic_fallback'
}
}
def get_optimized_queries_for_platform(self, enhanced_data: Dict[str, Any], platform: str) -> List[str]:
"""
Get platform-optimized queries from enhanced data
Args:
enhanced_data: Result from enhance_query()
platform: Target platform ('news', 'reddit', etc.)
Returns:
List of optimized queries for the specific platform
"""
queries = enhanced_data.get('enhanced_queries', {})
platform_strategies = {
'news': ['core_concepts', 'industry_specific', 'combined_queries'],
'reddit': ['semantic_variations', 'market_focused', 'core_concepts'],
'linkedin': ['industry_specific', 'business_terms', 'market_focused']
}
strategy = platform_strategies.get(platform, ['core_concepts', 'combined_queries'])
optimized_queries = []
for strategy_key in strategy:
if strategy_key in queries:
query_list = queries[strategy_key]
if isinstance(query_list, list):
optimized_queries.extend(query_list)
else:
optimized_queries.append(str(query_list))
# Remove duplicates while preserving order
seen = set()
unique_queries = []
for query in optimized_queries:
if query not in seen:
seen.add(query)
unique_queries.append(query)
return unique_queries[:8] # Limit to 8 queries per platform
# Example usage and testing
async def test_nlp_enhancer():
"""Test the NLP query enhancer"""
enhancer = NLPQueryEnhancer()
# Test query enhancement
feature = "AI-powered voice ordering system for restaurants"
market = "small to medium restaurants, food service industry"
print("Testing NLP Query Enhancement...")
enhanced = await enhancer.enhance_query(feature, market)
print(f"Original: {enhanced['original_description']}")
print(f"Market: {enhanced['target_market']}")
print(f"Core concepts: {enhanced['extracted_concepts']['core_terms']}")
print(f"Confidence: {enhanced['query_metadata']['confidence_score']}")
# Test platform-specific queries
for platform in ['news', 'reddit']:
queries = enhancer.get_optimized_queries_for_platform(enhanced, platform)
print(f"{platform.title()} queries: {queries[:3]}")
return enhanced
if __name__ == "__main__":
asyncio.run(test_nlp_enhancer())