|
"""
|
|
NLP Query Enhancer
|
|
Advanced query processing using spaCy and NLTK for better search results
|
|
"""
|
|
|
|
import spacy
|
|
import nltk
|
|
from nltk.corpus import wordnet
|
|
from nltk.tokenize import word_tokenize
|
|
from nltk.corpus import stopwords
|
|
from nltk.stem import WordNetLemmatizer
|
|
from sentence_transformers import SentenceTransformer
|
|
import numpy as np
|
|
from typing import List, Dict, Set, Tuple, Any
|
|
import re
|
|
import logging
|
|
from collections import defaultdict, Counter
|
|
import asyncio
|
|
import aiohttp
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class NLPQueryEnhancer:
|
|
"""Enhanced query processing using advanced NLP techniques"""
|
|
|
|
def __init__(self):
|
|
"""Initialize NLP models and resources"""
|
|
self.nlp = None
|
|
self.sentence_model = None
|
|
self.lemmatizer = None
|
|
self.stop_words = None
|
|
self._initialized = False
|
|
|
|
|
|
self.industry_ontology = {
|
|
'technology': {
|
|
'ai': ['artificial intelligence', 'machine learning', 'deep learning', 'neural networks'],
|
|
'app': ['application', 'software', 'mobile app', 'web app', 'platform'],
|
|
'api': ['application programming interface', 'web service', 'endpoint'],
|
|
'saas': ['software as a service', 'cloud software', 'subscription software'],
|
|
'iot': ['internet of things', 'connected devices', 'smart devices'],
|
|
'blockchain': ['distributed ledger', 'cryptocurrency', 'smart contracts']
|
|
},
|
|
'business': {
|
|
'startup': ['new business', 'entrepreneur', 'venture', 'company'],
|
|
'revenue': ['income', 'earnings', 'sales', 'profit'],
|
|
'customer': ['client', 'user', 'consumer', 'buyer'],
|
|
'market': ['industry', 'sector', 'segment', 'niche'],
|
|
'competition': ['competitor', 'rival', 'alternative', 'substitute']
|
|
},
|
|
'health': {
|
|
'fitness': ['exercise', 'workout', 'training', 'physical activity'],
|
|
'nutrition': ['diet', 'food', 'eating', 'meal planning'],
|
|
'wellness': ['health', 'wellbeing', 'lifestyle', 'self-care'],
|
|
'medical': ['healthcare', 'clinical', 'treatment', 'diagnosis']
|
|
},
|
|
'finance': {
|
|
'fintech': ['financial technology', 'digital banking', 'payment systems'],
|
|
'investment': ['portfolio', 'trading', 'stocks', 'assets'],
|
|
'banking': ['financial services', 'credit', 'loans', 'deposits'],
|
|
'insurance': ['coverage', 'policy', 'claims', 'risk management']
|
|
}
|
|
}
|
|
|
|
async def initialize(self):
|
|
"""Initialize NLP models asynchronously"""
|
|
if self._initialized:
|
|
return
|
|
|
|
try:
|
|
logger.info("Initializing NLP models...")
|
|
|
|
|
|
await self._download_nltk_data()
|
|
|
|
|
|
try:
|
|
self.nlp = spacy.load("en_core_web_sm")
|
|
except OSError:
|
|
logger.warning("spaCy model 'en_core_web_sm' not found. Using basic tokenization.")
|
|
self.nlp = None
|
|
|
|
|
|
try:
|
|
self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2')
|
|
except Exception as e:
|
|
logger.warning(f"Could not load sentence transformer: {e}")
|
|
self.sentence_model = None
|
|
|
|
|
|
self.lemmatizer = WordNetLemmatizer()
|
|
self.stop_words = set(stopwords.words('english'))
|
|
|
|
self._initialized = True
|
|
logger.info("NLP models initialized successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing NLP models: {e}")
|
|
|
|
self.lemmatizer = WordNetLemmatizer() if 'WordNetLemmatizer' in locals() else None
|
|
self.stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
|
|
|
|
async def _download_nltk_data(self):
|
|
"""Download required NLTK data"""
|
|
required_data = ['punkt', 'stopwords', 'wordnet', 'averaged_perceptron_tagger', 'omw-1.4']
|
|
|
|
for data in required_data:
|
|
try:
|
|
nltk.data.find(f'tokenizers/{data}')
|
|
except LookupError:
|
|
try:
|
|
nltk.download(data, quiet=True)
|
|
except Exception as e:
|
|
logger.warning(f"Could not download NLTK data '{data}': {e}")
|
|
|
|
async def enhance_query(self, feature_description: str, target_market: str = "") -> Dict[str, Any]:
|
|
"""
|
|
Enhance a feature description into multiple optimized search queries
|
|
|
|
Args:
|
|
feature_description: Original feature description
|
|
target_market: Target market description
|
|
|
|
Returns:
|
|
Enhanced query data with multiple search strategies
|
|
"""
|
|
await self.initialize()
|
|
|
|
try:
|
|
|
|
concepts = await self._extract_key_concepts(feature_description)
|
|
|
|
|
|
semantic_queries = await self._generate_semantic_queries(feature_description)
|
|
|
|
|
|
expanded_queries = await self._expand_with_synonyms(concepts)
|
|
|
|
|
|
industry_queries = await self._add_industry_terms(concepts, target_market)
|
|
|
|
|
|
market_queries = await self._generate_market_queries(feature_description, target_market)
|
|
|
|
|
|
all_queries = {
|
|
'core_concepts': concepts['core_terms'],
|
|
'semantic_variations': semantic_queries,
|
|
'synonym_expansions': expanded_queries,
|
|
'industry_specific': industry_queries,
|
|
'market_focused': market_queries,
|
|
'combined_queries': self._combine_queries(concepts, target_market)
|
|
}
|
|
|
|
return {
|
|
'original_description': feature_description,
|
|
'target_market': target_market,
|
|
'extracted_concepts': concepts,
|
|
'enhanced_queries': all_queries,
|
|
'query_metadata': {
|
|
'total_queries': sum(len(v) if isinstance(v, list) else 1 for v in all_queries.values()),
|
|
'confidence_score': self._calculate_confidence(concepts),
|
|
'processing_method': 'advanced_nlp' if self.nlp else 'basic_processing'
|
|
}
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error enhancing query: {e}")
|
|
|
|
return await self._basic_query_enhancement(feature_description, target_market)
|
|
|
|
async def _extract_key_concepts(self, text: str) -> Dict[str, Any]:
|
|
"""Extract key concepts using spaCy NLP"""
|
|
concepts = {
|
|
'core_terms': [],
|
|
'entities': [],
|
|
'technologies': [],
|
|
'business_terms': [],
|
|
'action_verbs': [],
|
|
'descriptors': []
|
|
}
|
|
|
|
if self.nlp:
|
|
|
|
doc = self.nlp(text)
|
|
|
|
|
|
for ent in doc.ents:
|
|
if ent.label_ in ['ORG', 'PRODUCT', 'TECHNOLOGY']:
|
|
concepts['entities'].append(ent.text.lower())
|
|
|
|
|
|
for token in doc:
|
|
if token.is_stop or token.is_punct or len(token.text) < 3:
|
|
continue
|
|
|
|
lemma = token.lemma_.lower()
|
|
|
|
if token.pos_ == 'NOUN' and token.dep_ in ['nsubj', 'dobj', 'pobj']:
|
|
concepts['core_terms'].append(lemma)
|
|
elif token.pos_ == 'VERB' and token.tag_ not in ['MD', 'VBZ']:
|
|
concepts['action_verbs'].append(lemma)
|
|
elif token.pos_ == 'ADJ':
|
|
concepts['descriptors'].append(lemma)
|
|
|
|
|
|
for chunk in doc.noun_chunks:
|
|
if len(chunk.text.split()) <= 3:
|
|
concepts['core_terms'].append(chunk.text.lower())
|
|
|
|
else:
|
|
|
|
words = word_tokenize(text.lower())
|
|
if self.lemmatizer:
|
|
words = [self.lemmatizer.lemmatize(word) for word in words if word.isalpha()]
|
|
|
|
|
|
words = [word for word in words if word not in self.stop_words and len(word) > 2]
|
|
concepts['core_terms'] = words[:10]
|
|
|
|
|
|
concepts = self._categorize_terms(concepts)
|
|
|
|
|
|
for key in concepts:
|
|
if isinstance(concepts[key], list):
|
|
concepts[key] = list(dict.fromkeys(concepts[key]))[:5]
|
|
|
|
return concepts
|
|
|
|
def _categorize_terms(self, concepts: Dict[str, Any]) -> Dict[str, Any]:
|
|
"""Categorize terms into technology, business, etc."""
|
|
tech_keywords = {'ai', 'app', 'software', 'platform', 'api', 'system', 'tool', 'technology', 'digital', 'online', 'mobile', 'web'}
|
|
business_keywords = {'market', 'customer', 'business', 'service', 'product', 'solution', 'company', 'industry', 'revenue', 'profit'}
|
|
|
|
all_terms = concepts['core_terms'] + concepts['entities']
|
|
|
|
for term in all_terms:
|
|
term_lower = term.lower()
|
|
if any(tech in term_lower for tech in tech_keywords):
|
|
concepts['technologies'].append(term)
|
|
elif any(biz in term_lower for biz in business_keywords):
|
|
concepts['business_terms'].append(term)
|
|
|
|
return concepts
|
|
|
|
async def _generate_semantic_queries(self, text: str) -> List[str]:
|
|
"""Generate semantically similar queries using sentence transformers"""
|
|
if not self.sentence_model:
|
|
return []
|
|
|
|
try:
|
|
|
|
base_embedding = self.sentence_model.encode([text])
|
|
|
|
|
|
variations = []
|
|
words = text.split()
|
|
|
|
for i, word in enumerate(words):
|
|
if len(word) > 4 and word.lower() not in self.stop_words:
|
|
|
|
synonyms = self._get_wordnet_synonyms(word)
|
|
for synonym in synonyms[:2]:
|
|
new_text = words.copy()
|
|
new_text[i] = synonym
|
|
variations.append(' '.join(new_text))
|
|
|
|
return variations[:5]
|
|
|
|
except Exception as e:
|
|
logger.warning(f"Error generating semantic queries: {e}")
|
|
return []
|
|
|
|
async def _expand_with_synonyms(self, concepts: Dict[str, Any]) -> List[str]:
|
|
"""Expand core terms with synonyms using WordNet"""
|
|
expanded = []
|
|
|
|
for term in concepts['core_terms'][:5]:
|
|
synonyms = self._get_wordnet_synonyms(term)
|
|
if synonyms:
|
|
|
|
expanded.extend(synonyms[:3])
|
|
|
|
return list(set(expanded))
|
|
|
|
def _get_wordnet_synonyms(self, word: str) -> List[str]:
|
|
"""Get synonyms from WordNet"""
|
|
synonyms = set()
|
|
|
|
try:
|
|
for syn in wordnet.synsets(word):
|
|
for lemma in syn.lemmas():
|
|
synonym = lemma.name().replace('_', ' ')
|
|
if synonym.lower() != word.lower() and len(synonym) > 2:
|
|
synonyms.add(synonym)
|
|
except Exception as e:
|
|
logger.debug(f"Error getting synonyms for '{word}': {e}")
|
|
|
|
return list(synonyms)[:5]
|
|
|
|
async def _add_industry_terms(self, concepts: Dict[str, Any], target_market: str) -> List[str]:
|
|
"""Add industry-specific terminology"""
|
|
industry_queries = []
|
|
|
|
|
|
detected_industries = self._detect_industries(concepts, target_market)
|
|
|
|
for industry in detected_industries:
|
|
if industry in self.industry_ontology:
|
|
ontology = self.industry_ontology[industry]
|
|
|
|
|
|
for concept in concepts['core_terms'][:3]:
|
|
for term, expansions in ontology.items():
|
|
if concept.lower() in term or term in concept.lower():
|
|
industry_queries.extend(expansions[:2])
|
|
|
|
return list(set(industry_queries))[:8]
|
|
|
|
def _detect_industries(self, concepts: Dict[str, Any], target_market: str) -> List[str]:
|
|
"""Detect relevant industries from concepts and target market"""
|
|
industries = []
|
|
all_text = ' '.join(concepts['core_terms'] + [target_market]).lower()
|
|
|
|
|
|
industry_keywords = {
|
|
'technology': ['app', 'software', 'ai', 'tech', 'digital', 'platform', 'api'],
|
|
'health': ['fitness', 'health', 'medical', 'wellness', 'nutrition', 'exercise'],
|
|
'business': ['business', 'startup', 'company', 'market', 'customer'],
|
|
'finance': ['finance', 'money', 'payment', 'banking', 'investment']
|
|
}
|
|
|
|
for industry, keywords in industry_keywords.items():
|
|
if any(keyword in all_text for keyword in keywords):
|
|
industries.append(industry)
|
|
|
|
return industries[:2]
|
|
|
|
async def _generate_market_queries(self, feature_description: str, target_market: str) -> List[str]:
|
|
"""Generate market-focused search queries"""
|
|
if not target_market:
|
|
return []
|
|
|
|
market_queries = []
|
|
|
|
|
|
feature_words = [word for word in feature_description.split()
|
|
if len(word) > 3 and word.lower() not in self.stop_words][:3]
|
|
|
|
|
|
market_words = [word for word in target_market.split()
|
|
if len(word) > 3 and word.lower() not in self.stop_words][:3]
|
|
|
|
|
|
for feature_word in feature_words:
|
|
for market_word in market_words:
|
|
market_queries.append(f"{feature_word} {market_word}")
|
|
market_queries.append(f"{market_word} {feature_word}")
|
|
|
|
|
|
market_patterns = [
|
|
f"{target_market} solutions",
|
|
f"{target_market} tools",
|
|
f"{target_market} software",
|
|
f"{target_market} apps",
|
|
f"{target_market} technology"
|
|
]
|
|
|
|
market_queries.extend(market_patterns)
|
|
|
|
return list(set(market_queries))[:6]
|
|
|
|
def _combine_queries(self, concepts: Dict[str, Any], target_market: str) -> List[str]:
|
|
"""Combine different concepts into comprehensive, contextually relevant queries"""
|
|
combined = []
|
|
|
|
core_terms = concepts['core_terms'][:3]
|
|
technologies = concepts['technologies'][:2]
|
|
business_terms = concepts['business_terms'][:2]
|
|
|
|
|
|
if len(core_terms) >= 2:
|
|
|
|
main_concept = ' '.join(core_terms[:2])
|
|
combined.append(main_concept)
|
|
|
|
|
|
if len(core_terms) >= 3:
|
|
extended_concept = ' '.join(core_terms[:3])
|
|
|
|
if len(extended_concept.split()) <= 4:
|
|
combined.append(extended_concept)
|
|
|
|
|
|
for tech in technologies:
|
|
for biz in business_terms:
|
|
|
|
tech_biz_combo = f"{tech} {biz}"
|
|
|
|
if self._is_meaningful_combination(tech, biz):
|
|
combined.append(tech_biz_combo)
|
|
|
|
|
|
if target_market:
|
|
market_words = [word for word in target_market.split() if len(word) > 3][:2]
|
|
for market_word in market_words:
|
|
for term in core_terms[:2]:
|
|
|
|
market_combo = f"{term} {market_word}"
|
|
if self._is_meaningful_combination(term, market_word):
|
|
combined.append(market_combo)
|
|
|
|
|
|
unique_combined = list(dict.fromkeys(combined))
|
|
|
|
|
|
filtered_combined = [combo for combo in unique_combined if self._is_contextually_relevant(combo)]
|
|
|
|
return filtered_combined[:5]
|
|
|
|
def _is_meaningful_combination(self, term1: str, term2: str) -> bool:
|
|
"""Check if two terms create a meaningful combination"""
|
|
|
|
if term1.lower() == term2.lower():
|
|
return False
|
|
|
|
|
|
if term1.lower() in term2.lower() or term2.lower() in term1.lower():
|
|
return False
|
|
|
|
|
|
tech_terms = {'ai', 'ml', 'algorithm', 'neural', 'deep', 'machine', 'learning', 'data', 'model', 'system', 'network', 'automation', 'software', 'digital', 'technology', 'computer', 'intelligence'}
|
|
business_terms = {'business', 'market', 'customer', 'service', 'product', 'solution', 'company', 'industry', 'enterprise', 'commercial', 'professional', 'management', 'strategy'}
|
|
|
|
term1_is_tech = any(tech in term1.lower() for tech in tech_terms)
|
|
term2_is_tech = any(tech in term2.lower() for tech in tech_terms)
|
|
term1_is_business = any(biz in term1.lower() for biz in business_terms)
|
|
term2_is_business = any(biz in term2.lower() for biz in business_terms)
|
|
|
|
|
|
if (term1_is_tech and term2_is_business) or (term1_is_business and term2_is_tech):
|
|
return True
|
|
|
|
|
|
if (term1_is_tech and term2_is_tech) or (term1_is_business and term2_is_business):
|
|
return len(set(term1.lower().split()) & set(term2.lower().split())) == 0
|
|
|
|
return True
|
|
|
|
def _is_contextually_relevant(self, query: str) -> bool:
|
|
"""Check if a query maintains contextual relevance"""
|
|
words = query.lower().split()
|
|
|
|
|
|
if len(words) < 1 or len(words) > 5:
|
|
return False
|
|
|
|
|
|
stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'}
|
|
meaningful_words = [word for word in words if word not in stop_words and len(word) > 2]
|
|
|
|
if len(meaningful_words) == 0:
|
|
return False
|
|
|
|
|
|
if not any(len(word) > 3 for word in meaningful_words):
|
|
return False
|
|
|
|
return True
|
|
|
|
def _calculate_confidence(self, concepts: Dict[str, Any]) -> float:
|
|
"""Calculate confidence score for the extraction"""
|
|
total_concepts = sum(len(v) if isinstance(v, list) else 0 for v in concepts.values())
|
|
|
|
|
|
if total_concepts >= 15:
|
|
return 0.9
|
|
elif total_concepts >= 10:
|
|
return 0.8
|
|
elif total_concepts >= 5:
|
|
return 0.7
|
|
else:
|
|
return 0.6
|
|
|
|
async def _basic_query_enhancement(self, feature_description: str, target_market: str) -> Dict[str, Any]:
|
|
"""Fallback basic query enhancement when NLP models fail"""
|
|
|
|
words = re.findall(r'\b\w{3,}\b', feature_description.lower())
|
|
words = [word for word in words if word not in self.stop_words][:5]
|
|
|
|
return {
|
|
'original_description': feature_description,
|
|
'target_market': target_market,
|
|
'extracted_concepts': {'core_terms': words},
|
|
'enhanced_queries': {
|
|
'core_concepts': words,
|
|
'semantic_variations': [],
|
|
'synonym_expansions': [],
|
|
'industry_specific': [],
|
|
'market_focused': [target_market] if target_market else [],
|
|
'combined_queries': [' '.join(words[:2])] if len(words) >= 2 else words
|
|
},
|
|
'query_metadata': {
|
|
'total_queries': len(words) + (1 if target_market else 0),
|
|
'confidence_score': 0.5,
|
|
'processing_method': 'basic_fallback'
|
|
}
|
|
}
|
|
|
|
def get_optimized_queries_for_platform(self, enhanced_data: Dict[str, Any], platform: str) -> List[str]:
|
|
"""
|
|
Get platform-optimized queries from enhanced data
|
|
|
|
Args:
|
|
enhanced_data: Result from enhance_query()
|
|
platform: Target platform ('news', 'reddit', etc.)
|
|
|
|
Returns:
|
|
List of optimized queries for the specific platform
|
|
"""
|
|
queries = enhanced_data.get('enhanced_queries', {})
|
|
|
|
platform_strategies = {
|
|
'news': ['core_concepts', 'industry_specific', 'combined_queries'],
|
|
'reddit': ['semantic_variations', 'market_focused', 'core_concepts'],
|
|
'linkedin': ['industry_specific', 'business_terms', 'market_focused']
|
|
}
|
|
|
|
strategy = platform_strategies.get(platform, ['core_concepts', 'combined_queries'])
|
|
optimized_queries = []
|
|
|
|
for strategy_key in strategy:
|
|
if strategy_key in queries:
|
|
query_list = queries[strategy_key]
|
|
if isinstance(query_list, list):
|
|
optimized_queries.extend(query_list)
|
|
else:
|
|
optimized_queries.append(str(query_list))
|
|
|
|
|
|
seen = set()
|
|
unique_queries = []
|
|
for query in optimized_queries:
|
|
if query not in seen:
|
|
seen.add(query)
|
|
unique_queries.append(query)
|
|
|
|
return unique_queries[:8]
|
|
|
|
|
|
|
|
async def test_nlp_enhancer():
|
|
"""Test the NLP query enhancer"""
|
|
enhancer = NLPQueryEnhancer()
|
|
|
|
|
|
feature = "AI-powered voice ordering system for restaurants"
|
|
market = "small to medium restaurants, food service industry"
|
|
|
|
print("Testing NLP Query Enhancement...")
|
|
enhanced = await enhancer.enhance_query(feature, market)
|
|
|
|
print(f"Original: {enhanced['original_description']}")
|
|
print(f"Market: {enhanced['target_market']}")
|
|
print(f"Core concepts: {enhanced['extracted_concepts']['core_terms']}")
|
|
print(f"Confidence: {enhanced['query_metadata']['confidence_score']}")
|
|
|
|
|
|
for platform in ['news', 'reddit']:
|
|
queries = enhancer.get_optimized_queries_for_platform(enhanced, platform)
|
|
print(f"{platform.title()} queries: {queries[:3]}")
|
|
|
|
return enhanced
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_nlp_enhancer())
|
|
|