""" NLP Query Enhancer Advanced query processing using spaCy and NLTK for better search results """ import spacy import nltk from nltk.corpus import wordnet from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from nltk.stem import WordNetLemmatizer from sentence_transformers import SentenceTransformer import numpy as np from typing import List, Dict, Set, Tuple, Any import re import logging from collections import defaultdict, Counter import asyncio import aiohttp logger = logging.getLogger(__name__) class NLPQueryEnhancer: """Enhanced query processing using advanced NLP techniques""" def __init__(self): """Initialize NLP models and resources""" self.nlp = None self.sentence_model = None self.lemmatizer = None self.stop_words = None self._initialized = False # Industry-specific term mappings self.industry_ontology = { 'technology': { 'ai': ['artificial intelligence', 'machine learning', 'deep learning', 'neural networks'], 'app': ['application', 'software', 'mobile app', 'web app', 'platform'], 'api': ['application programming interface', 'web service', 'endpoint'], 'saas': ['software as a service', 'cloud software', 'subscription software'], 'iot': ['internet of things', 'connected devices', 'smart devices'], 'blockchain': ['distributed ledger', 'cryptocurrency', 'smart contracts'] }, 'business': { 'startup': ['new business', 'entrepreneur', 'venture', 'company'], 'revenue': ['income', 'earnings', 'sales', 'profit'], 'customer': ['client', 'user', 'consumer', 'buyer'], 'market': ['industry', 'sector', 'segment', 'niche'], 'competition': ['competitor', 'rival', 'alternative', 'substitute'] }, 'health': { 'fitness': ['exercise', 'workout', 'training', 'physical activity'], 'nutrition': ['diet', 'food', 'eating', 'meal planning'], 'wellness': ['health', 'wellbeing', 'lifestyle', 'self-care'], 'medical': ['healthcare', 'clinical', 'treatment', 'diagnosis'] }, 'finance': { 'fintech': ['financial technology', 'digital banking', 'payment systems'], 'investment': ['portfolio', 'trading', 'stocks', 'assets'], 'banking': ['financial services', 'credit', 'loans', 'deposits'], 'insurance': ['coverage', 'policy', 'claims', 'risk management'] } } async def initialize(self): """Initialize NLP models asynchronously""" if self._initialized: return try: logger.info("Initializing NLP models...") # Download required NLTK data await self._download_nltk_data() # Initialize spaCy model try: self.nlp = spacy.load("en_core_web_sm") except OSError: logger.warning("spaCy model 'en_core_web_sm' not found. Using basic tokenization.") self.nlp = None # Initialize sentence transformer try: self.sentence_model = SentenceTransformer('all-MiniLM-L6-v2') except Exception as e: logger.warning(f"Could not load sentence transformer: {e}") self.sentence_model = None # Initialize NLTK components self.lemmatizer = WordNetLemmatizer() self.stop_words = set(stopwords.words('english')) self._initialized = True logger.info("NLP models initialized successfully") except Exception as e: logger.error(f"Error initializing NLP models: {e}") # Set basic fallbacks self.lemmatizer = WordNetLemmatizer() if 'WordNetLemmatizer' in locals() else None self.stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} async def _download_nltk_data(self): """Download required NLTK data""" required_data = ['punkt', 'stopwords', 'wordnet', 'averaged_perceptron_tagger', 'omw-1.4'] for data in required_data: try: nltk.data.find(f'tokenizers/{data}') except LookupError: try: nltk.download(data, quiet=True) except Exception as e: logger.warning(f"Could not download NLTK data '{data}': {e}") async def enhance_query(self, feature_description: str, target_market: str = "") -> Dict[str, Any]: """ Enhance a feature description into multiple optimized search queries Args: feature_description: Original feature description target_market: Target market description Returns: Enhanced query data with multiple search strategies """ await self.initialize() try: # Extract key concepts using NLP concepts = await self._extract_key_concepts(feature_description) # Generate semantic variations semantic_queries = await self._generate_semantic_queries(feature_description) # Expand with synonyms and related terms expanded_queries = await self._expand_with_synonyms(concepts) # Add industry-specific terms industry_queries = await self._add_industry_terms(concepts, target_market) # Generate market-specific queries market_queries = await self._generate_market_queries(feature_description, target_market) # Combine and rank queries all_queries = { 'core_concepts': concepts['core_terms'], 'semantic_variations': semantic_queries, 'synonym_expansions': expanded_queries, 'industry_specific': industry_queries, 'market_focused': market_queries, 'combined_queries': self._combine_queries(concepts, target_market) } return { 'original_description': feature_description, 'target_market': target_market, 'extracted_concepts': concepts, 'enhanced_queries': all_queries, 'query_metadata': { 'total_queries': sum(len(v) if isinstance(v, list) else 1 for v in all_queries.values()), 'confidence_score': self._calculate_confidence(concepts), 'processing_method': 'advanced_nlp' if self.nlp else 'basic_processing' } } except Exception as e: logger.error(f"Error enhancing query: {e}") # Fallback to basic processing return await self._basic_query_enhancement(feature_description, target_market) async def _extract_key_concepts(self, text: str) -> Dict[str, Any]: """Extract key concepts using spaCy NLP""" concepts = { 'core_terms': [], 'entities': [], 'technologies': [], 'business_terms': [], 'action_verbs': [], 'descriptors': [] } if self.nlp: # Use spaCy for advanced processing doc = self.nlp(text) # Extract named entities for ent in doc.ents: if ent.label_ in ['ORG', 'PRODUCT', 'TECHNOLOGY']: concepts['entities'].append(ent.text.lower()) # Extract key terms by POS tags for token in doc: if token.is_stop or token.is_punct or len(token.text) < 3: continue lemma = token.lemma_.lower() if token.pos_ == 'NOUN' and token.dep_ in ['nsubj', 'dobj', 'pobj']: concepts['core_terms'].append(lemma) elif token.pos_ == 'VERB' and token.tag_ not in ['MD', 'VBZ']: # Not auxiliary verbs concepts['action_verbs'].append(lemma) elif token.pos_ == 'ADJ': concepts['descriptors'].append(lemma) # Extract noun phrases for chunk in doc.noun_chunks: if len(chunk.text.split()) <= 3: # Keep phrases short concepts['core_terms'].append(chunk.text.lower()) else: # Fallback to basic processing words = word_tokenize(text.lower()) if self.lemmatizer: words = [self.lemmatizer.lemmatize(word) for word in words if word.isalpha()] # Filter out stop words words = [word for word in words if word not in self.stop_words and len(word) > 2] concepts['core_terms'] = words[:10] # Limit to top 10 # Categorize terms concepts = self._categorize_terms(concepts) # Remove duplicates and sort by importance for key in concepts: if isinstance(concepts[key], list): concepts[key] = list(dict.fromkeys(concepts[key]))[:5] # Top 5 per category return concepts def _categorize_terms(self, concepts: Dict[str, Any]) -> Dict[str, Any]: """Categorize terms into technology, business, etc.""" tech_keywords = {'ai', 'app', 'software', 'platform', 'api', 'system', 'tool', 'technology', 'digital', 'online', 'mobile', 'web'} business_keywords = {'market', 'customer', 'business', 'service', 'product', 'solution', 'company', 'industry', 'revenue', 'profit'} all_terms = concepts['core_terms'] + concepts['entities'] for term in all_terms: term_lower = term.lower() if any(tech in term_lower for tech in tech_keywords): concepts['technologies'].append(term) elif any(biz in term_lower for biz in business_keywords): concepts['business_terms'].append(term) return concepts async def _generate_semantic_queries(self, text: str) -> List[str]: """Generate semantically similar queries using sentence transformers""" if not self.sentence_model: return [] try: # Generate variations by paraphrasing key concepts base_embedding = self.sentence_model.encode([text]) # Create variations by replacing key terms with synonyms variations = [] words = text.split() for i, word in enumerate(words): if len(word) > 4 and word.lower() not in self.stop_words: # Try to find synonyms synonyms = self._get_wordnet_synonyms(word) for synonym in synonyms[:2]: # Limit to 2 synonyms per word new_text = words.copy() new_text[i] = synonym variations.append(' '.join(new_text)) return variations[:5] # Return top 5 variations except Exception as e: logger.warning(f"Error generating semantic queries: {e}") return [] async def _expand_with_synonyms(self, concepts: Dict[str, Any]) -> List[str]: """Expand core terms with synonyms using WordNet""" expanded = [] for term in concepts['core_terms'][:5]: # Limit to top 5 terms synonyms = self._get_wordnet_synonyms(term) if synonyms: # Create queries with synonyms expanded.extend(synonyms[:3]) # Top 3 synonyms per term return list(set(expanded)) # Remove duplicates def _get_wordnet_synonyms(self, word: str) -> List[str]: """Get synonyms from WordNet""" synonyms = set() try: for syn in wordnet.synsets(word): for lemma in syn.lemmas(): synonym = lemma.name().replace('_', ' ') if synonym.lower() != word.lower() and len(synonym) > 2: synonyms.add(synonym) except Exception as e: logger.debug(f"Error getting synonyms for '{word}': {e}") return list(synonyms)[:5] # Limit to 5 synonyms async def _add_industry_terms(self, concepts: Dict[str, Any], target_market: str) -> List[str]: """Add industry-specific terminology""" industry_queries = [] # Detect industry from concepts and target market detected_industries = self._detect_industries(concepts, target_market) for industry in detected_industries: if industry in self.industry_ontology: ontology = self.industry_ontology[industry] # Match concepts to industry terms for concept in concepts['core_terms'][:3]: for term, expansions in ontology.items(): if concept.lower() in term or term in concept.lower(): industry_queries.extend(expansions[:2]) # Add 2 expansions return list(set(industry_queries))[:8] # Limit to 8 industry terms def _detect_industries(self, concepts: Dict[str, Any], target_market: str) -> List[str]: """Detect relevant industries from concepts and target market""" industries = [] all_text = ' '.join(concepts['core_terms'] + [target_market]).lower() # Simple keyword matching for industry detection industry_keywords = { 'technology': ['app', 'software', 'ai', 'tech', 'digital', 'platform', 'api'], 'health': ['fitness', 'health', 'medical', 'wellness', 'nutrition', 'exercise'], 'business': ['business', 'startup', 'company', 'market', 'customer'], 'finance': ['finance', 'money', 'payment', 'banking', 'investment'] } for industry, keywords in industry_keywords.items(): if any(keyword in all_text for keyword in keywords): industries.append(industry) return industries[:2] # Limit to 2 industries async def _generate_market_queries(self, feature_description: str, target_market: str) -> List[str]: """Generate market-focused search queries""" if not target_market: return [] market_queries = [] # Extract key terms from feature description feature_words = [word for word in feature_description.split() if len(word) > 3 and word.lower() not in self.stop_words][:3] # Extract market segments market_words = [word for word in target_market.split() if len(word) > 3 and word.lower() not in self.stop_words][:3] # Combine feature + market for feature_word in feature_words: for market_word in market_words: market_queries.append(f"{feature_word} {market_word}") market_queries.append(f"{market_word} {feature_word}") # Add market-specific patterns market_patterns = [ f"{target_market} solutions", f"{target_market} tools", f"{target_market} software", f"{target_market} apps", f"{target_market} technology" ] market_queries.extend(market_patterns) return list(set(market_queries))[:6] # Limit to 6 market queries def _combine_queries(self, concepts: Dict[str, Any], target_market: str) -> List[str]: """Combine different concepts into comprehensive, contextually relevant queries""" combined = [] core_terms = concepts['core_terms'][:3] technologies = concepts['technologies'][:2] business_terms = concepts['business_terms'][:2] # Create contextual phrase combinations (preserve meaning) if len(core_terms) >= 2: # Keep related terms together main_concept = ' '.join(core_terms[:2]) combined.append(main_concept) # Add third term only if it's contextually related if len(core_terms) >= 3: extended_concept = ' '.join(core_terms[:3]) # Only add if it makes semantic sense if len(extended_concept.split()) <= 4: # Avoid overly long phrases combined.append(extended_concept) # Combine technology + business terms meaningfully for tech in technologies: for biz in business_terms: # Create meaningful combinations tech_biz_combo = f"{tech} {biz}" # Ensure the combination makes sense (not just random word pairs) if self._is_meaningful_combination(tech, biz): combined.append(tech_biz_combo) # Add contextual market combinations if target_market: market_words = [word for word in target_market.split() if len(word) > 3][:2] for market_word in market_words: for term in core_terms[:2]: # Create market-specific queries that maintain context market_combo = f"{term} {market_word}" if self._is_meaningful_combination(term, market_word): combined.append(market_combo) # Remove duplicates and filter for relevance unique_combined = list(dict.fromkeys(combined)) # Preserve order while removing duplicates # Filter out combinations that are too generic or meaningless filtered_combined = [combo for combo in unique_combined if self._is_contextually_relevant(combo)] return filtered_combined[:5] # Limit to 5 most relevant combined queries def _is_meaningful_combination(self, term1: str, term2: str) -> bool: """Check if two terms create a meaningful combination""" # Avoid combining very similar terms if term1.lower() == term2.lower(): return False # Avoid combining terms that are substrings of each other if term1.lower() in term2.lower() or term2.lower() in term1.lower(): return False # Check for semantic compatibility tech_terms = {'ai', 'ml', 'algorithm', 'neural', 'deep', 'machine', 'learning', 'data', 'model', 'system', 'network', 'automation', 'software', 'digital', 'technology', 'computer', 'intelligence'} business_terms = {'business', 'market', 'customer', 'service', 'product', 'solution', 'company', 'industry', 'enterprise', 'commercial', 'professional', 'management', 'strategy'} term1_is_tech = any(tech in term1.lower() for tech in tech_terms) term2_is_tech = any(tech in term2.lower() for tech in tech_terms) term1_is_business = any(biz in term1.lower() for biz in business_terms) term2_is_business = any(biz in term2.lower() for biz in business_terms) # Good combinations: tech + business, or related terms if (term1_is_tech and term2_is_business) or (term1_is_business and term2_is_tech): return True # Both are tech terms or both are business terms - can be good if not too similar if (term1_is_tech and term2_is_tech) or (term1_is_business and term2_is_business): return len(set(term1.lower().split()) & set(term2.lower().split())) == 0 # No overlapping words return True # Default to allowing the combination def _is_contextually_relevant(self, query: str) -> bool: """Check if a query maintains contextual relevance""" words = query.lower().split() # Filter out queries that are too short or too long if len(words) < 1 or len(words) > 5: return False # Filter out queries with only stop words stop_words = {'the', 'a', 'an', 'and', 'or', 'but', 'in', 'on', 'at', 'to', 'for', 'of', 'with', 'by'} meaningful_words = [word for word in words if word not in stop_words and len(word) > 2] if len(meaningful_words) == 0: return False # Ensure at least one word is substantial (length > 3) if not any(len(word) > 3 for word in meaningful_words): return False return True def _calculate_confidence(self, concepts: Dict[str, Any]) -> float: """Calculate confidence score for the extraction""" total_concepts = sum(len(v) if isinstance(v, list) else 0 for v in concepts.values()) # Base confidence on number of extracted concepts if total_concepts >= 15: return 0.9 elif total_concepts >= 10: return 0.8 elif total_concepts >= 5: return 0.7 else: return 0.6 async def _basic_query_enhancement(self, feature_description: str, target_market: str) -> Dict[str, Any]: """Fallback basic query enhancement when NLP models fail""" # Simple word extraction words = re.findall(r'\b\w{3,}\b', feature_description.lower()) words = [word for word in words if word not in self.stop_words][:5] return { 'original_description': feature_description, 'target_market': target_market, 'extracted_concepts': {'core_terms': words}, 'enhanced_queries': { 'core_concepts': words, 'semantic_variations': [], 'synonym_expansions': [], 'industry_specific': [], 'market_focused': [target_market] if target_market else [], 'combined_queries': [' '.join(words[:2])] if len(words) >= 2 else words }, 'query_metadata': { 'total_queries': len(words) + (1 if target_market else 0), 'confidence_score': 0.5, 'processing_method': 'basic_fallback' } } def get_optimized_queries_for_platform(self, enhanced_data: Dict[str, Any], platform: str) -> List[str]: """ Get platform-optimized queries from enhanced data Args: enhanced_data: Result from enhance_query() platform: Target platform ('news', 'reddit', etc.) Returns: List of optimized queries for the specific platform """ queries = enhanced_data.get('enhanced_queries', {}) platform_strategies = { 'news': ['core_concepts', 'industry_specific', 'combined_queries'], 'reddit': ['semantic_variations', 'market_focused', 'core_concepts'], 'linkedin': ['industry_specific', 'business_terms', 'market_focused'] } strategy = platform_strategies.get(platform, ['core_concepts', 'combined_queries']) optimized_queries = [] for strategy_key in strategy: if strategy_key in queries: query_list = queries[strategy_key] if isinstance(query_list, list): optimized_queries.extend(query_list) else: optimized_queries.append(str(query_list)) # Remove duplicates while preserving order seen = set() unique_queries = [] for query in optimized_queries: if query not in seen: seen.add(query) unique_queries.append(query) return unique_queries[:8] # Limit to 8 queries per platform # Example usage and testing async def test_nlp_enhancer(): """Test the NLP query enhancer""" enhancer = NLPQueryEnhancer() # Test query enhancement feature = "AI-powered voice ordering system for restaurants" market = "small to medium restaurants, food service industry" print("Testing NLP Query Enhancement...") enhanced = await enhancer.enhance_query(feature, market) print(f"Original: {enhanced['original_description']}") print(f"Market: {enhanced['target_market']}") print(f"Core concepts: {enhanced['extracted_concepts']['core_terms']}") print(f"Confidence: {enhanced['query_metadata']['confidence_score']}") # Test platform-specific queries for platform in ['news', 'reddit']: queries = enhancer.get_optimized_queries_for_platform(enhanced, platform) print(f"{platform.title()} queries: {queries[:3]}") return enhanced if __name__ == "__main__": asyncio.run(test_nlp_enhancer())