import os from dotenv import load_dotenv from langchain_community.document_loaders import WebBaseLoader from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_community.vectorstores import Chroma from langchain_ollama import ChatOllama from langchain.prompts import PromptTemplate from langchain_core.output_parsers import StrOutputParser from langchain_core.retrievers import BaseRetriever from langchain_core.runnables import Runnable from langchain_core.documents import Document from langchain_core.embeddings import Embeddings import chromadb import numpy as np from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import pandas as pd from typing import Optional, List import re # Disable ChromaDB telemetry to avoid the error os.environ["ANONYMIZED_TELEMETRY"] = "False" os.environ["CHROMA_SERVER_HOST"] = "localhost" os.environ["CHROMA_SERVER_HTTP_PORT"] = "8000" class ImprovedTFIDFEmbeddings(Embeddings): """Improved TF-IDF based embedding function with better preprocessing.""" def __init__(self): self.vectorizer = TfidfVectorizer( max_features=5000, stop_words='english', ngram_range=(1, 3), min_df=1, max_df=0.85, lowercase=True, strip_accents='unicode', analyzer='word' ) self.fitted = False self.documents = [] def embed_documents(self, texts): """Create embeddings for a list of texts.""" if not self.fitted: self.documents = texts self.vectorizer.fit(texts) self.fitted = True # Transform texts to TF-IDF vectors tfidf_matrix = self.vectorizer.transform(texts) # Convert to dense arrays and normalize embeddings = [] for i in range(tfidf_matrix.shape[0]): embedding = tfidf_matrix[i].toarray().flatten() # Normalize the embedding norm = np.linalg.norm(embedding) if norm > 0: embedding = embedding / norm # Pad or truncate to 512 dimensions if len(embedding) < 512: embedding = np.pad(embedding, (0, 512 - len(embedding))) else: embedding = embedding[:512] embeddings.append(embedding.tolist()) return embeddings def embed_query(self, text): """Create embedding for a single query text.""" if not self.fitted: # If not fitted, fit with just this text self.vectorizer.fit([text]) self.fitted = True # Transform query to TF-IDF vector tfidf_matrix = self.vectorizer.transform([text]) embedding = tfidf_matrix[0].toarray().flatten() # Normalize the embedding norm = np.linalg.norm(embedding) if norm > 0: embedding = embedding / norm # Pad or truncate to 512 dimensions if len(embedding) < 512: embedding = np.pad(embedding, (0, 512 - len(embedding))) else: embedding = embedding[:512] return embedding.tolist() class SmartFAQRetriever(BaseRetriever): """Smart retriever optimized for FAQ datasets with semantic similarity.""" def __init__(self, documents: List[Document], k: int = 4): super().__init__() self._documents = documents self._k = k self._vectorizer = None # Use private attribute @property def documents(self): return self._documents @property def k(self): return self._k def _get_relevant_documents(self, query: str) -> List[Document]: """Retrieve documents based on semantic similarity.""" # Ensure vectorizer is fitted if not hasattr(self, '_vectorizer') or self._vectorizer is None or not hasattr(self._vectorizer, 'vocabulary_') or not self._vectorizer.vocabulary_: print("[SmartFAQRetriever] Fitting vectorizer...") self._vectorizer = TfidfVectorizer( max_features=3000, stop_words='english', ngram_range=(1, 2), min_df=1, max_df=0.9 ) questions = [] for doc in self._documents: if "QUESTION:" in doc.page_content: question_part = doc.page_content.split("ANSWER:")[0] question = question_part.replace("QUESTION:", "").strip() questions.append(question) else: questions.append(doc.page_content) self._vectorizer.fit(questions) query_lower = query.lower().strip() # Extract questions from documents questions = [] for doc in self._documents: if "QUESTION:" in doc.page_content: question_part = doc.page_content.split("ANSWER:")[0] question = question_part.replace("QUESTION:", "").strip() questions.append(question) else: questions.append(doc.page_content) # Transform query and questions to TF-IDF vectors query_vector = self._vectorizer.transform([query_lower]) question_vectors = self._vectorizer.transform(questions) # Calculate cosine similarities similarities = cosine_similarity(query_vector, question_vectors).flatten() # Get top k documents top_indices = similarities.argsort()[-self._k:][::-1] # Return documents with highest similarity scores relevant_docs = [self._documents[i] for i in top_indices if similarities[i] > 0.1] if not relevant_docs: # Fallback to first k documents if no good matches relevant_docs = self._documents[:self._k] return relevant_docs async def _aget_relevant_documents(self, query: str) -> List[Document]: """Async version of get_relevant_documents.""" return self._get_relevant_documents(query) def setup_retriever(use_kaggle_data: bool = False, kaggle_dataset: Optional[str] = None, kaggle_username: Optional[str] = None, kaggle_key: Optional[str] = None, use_local_mental_health_data: bool = False) -> BaseRetriever: """ Creates a vector store with documents from test data, Kaggle datasets, or local mental health data. Args: use_kaggle_data: Whether to load Kaggle data instead of test documents kaggle_dataset: Kaggle dataset name (e.g., 'username/dataset-name') kaggle_username: Your Kaggle username (optional if using kaggle.json) kaggle_key: Your Kaggle API key (optional if using kaggle.json) use_local_mental_health_data: Whether to load local mental health FAQ data """ print("Setting up the retriever...") if use_local_mental_health_data: try: print("Loading mental health FAQ data from local file...") mental_health_file = "data/Mental_Health_FAQ.csv" if not os.path.exists(mental_health_file): print(f"Mental health FAQ file not found: {mental_health_file}") use_local_mental_health_data = False else: # Load mental health FAQ data df = pd.read_csv(mental_health_file) documents = [] for _, row in df.iterrows(): question = row['Questions'] answer = row['Answers'] # Create document in FAQ format content = f"QUESTION: {question}\nANSWER: {answer}" documents.append(Document(page_content=content)) print(f"Loaded {len(documents)} mental health FAQ documents") for i, doc in enumerate(documents[:3]): print(f"Sample FAQ {i+1}: {doc.page_content[:200]}...") except Exception as e: print(f"Error loading mental health data: {e}") use_local_mental_health_data = False if use_kaggle_data and kaggle_dataset: try: from src.kaggle_loader import KaggleDataLoader print(f"Loading Kaggle dataset: {kaggle_dataset}") # Create loader without parameters - it will auto-load from kaggle.json loader = KaggleDataLoader() # Download the dataset dataset_path = loader.download_dataset(kaggle_dataset) # Load documents based on file type - only process files from this specific dataset documents = [] # Get the dataset name to identify the correct files dataset_name = kaggle_dataset.split('/')[-1] print(f"Processing files in dataset directory: {dataset_path}") for file in os.listdir(dataset_path): file_path = os.path.join(dataset_path, file) if file.endswith('.csv'): print(f"Loading CSV file: {file}") # For FAQ datasets, use the improved loading method if 'faq' in file.lower() or 'mental' in file.lower(): documents.extend(loader.load_csv_dataset(file_path, [], chunk_size=50)) else: # For other CSV files, use first few columns as text df = pd.read_csv(file_path) text_columns = df.columns[:3].tolist() # Use first 3 columns documents.extend(loader.load_csv_dataset(file_path, text_columns, chunk_size=50)) elif file.endswith('.json'): print(f"Loading JSON file: {file}") documents.extend(loader.load_json_dataset(file_path)) elif file.endswith('.txt'): print(f"Loading text file: {file}") documents.extend(loader.load_text_dataset(file_path)) print(f"Loaded {len(documents)} documents from Kaggle dataset") for i, doc in enumerate(documents[:3]): print(f"Sample doc {i+1}: {doc.page_content[:200]}") except Exception as e: print(f"Error loading Kaggle data: {e}") print("Falling back to test documents...") use_kaggle_data = False if not use_kaggle_data and not use_local_mental_health_data: # No test documents - use mental health data as default print("No specific data source specified, loading mental health FAQ data as default...") try: mental_health_file = "data/Mental_Health_FAQ.csv" if not os.path.exists(mental_health_file): raise FileNotFoundError(f"Mental health FAQ file not found: {mental_health_file}") # Load mental health FAQ data df = pd.read_csv(mental_health_file) documents = [] for _, row in df.iterrows(): question = row['Questions'] answer = row['Answers'] # Create document in FAQ format content = f"QUESTION: {question}\nANSWER: {answer}" documents.append(Document(page_content=content)) print(f"Loaded {len(documents)} mental health FAQ documents") for i, doc in enumerate(documents[:3]): print(f"Sample FAQ {i+1}: {doc.page_content[:200]}...") except Exception as e: print(f"Error loading mental health data: {e}") raise Exception("No valid data source available. Please ensure mental health FAQ data is present or provide Kaggle credentials.") print("Creating TF-IDF embeddings...") embeddings = ImprovedTFIDFEmbeddings() print("Creating ChromaDB vector store...") client = chromadb.PersistentClient(path="./src/chroma_db") # Clear existing collections to prevent mixing old and new data try: collections = client.list_collections() for collection in collections: print(f"Deleting existing collection: {collection.name}") client.delete_collection(collection.name) except Exception as e: print(f"Warning: Could not clear existing collections: {e}") print(f"Processing {len(documents)} documents...") # Check if this is a FAQ dataset and use smart retriever if any("QUESTION:" in doc.page_content for doc in documents): print("Using SmartFAQRetriever for better semantic matching...") return SmartFAQRetriever(documents, k=4) else: # Use vector store for non-FAQ datasets vectorstore = Chroma.from_documents( documents=documents, embedding=embeddings, client=client ) print("Retriever setup complete.") return vectorstore.as_retriever(k=4) def setup_rag_chain() -> Runnable: """Sets up the RAG chain with a prompt template and an LLM.""" # Define the prompt template for the LLM prompt = PromptTemplate( template="""You are an assistant for question-answering tasks. Use the following documents to answer the question. If you don't know the answer, just say that you don't know. Use three sentences maximum and keep the answer concise: Question: {question} Documents: {documents} Answer: """, input_variables=["question", "documents"], ) # Initialize the LLM with dolphin-llama3:8b model # Note: This requires the Ollama server to be running with the specified model llm = ChatOllama( model="dolphin-llama3:8b", temperature=0, ) # Create a chain combining the prompt template and LLM return prompt | llm | StrOutputParser() # Define the RAG application class class RAGApplication: def __init__(self, retriever: BaseRetriever, rag_chain: Runnable): self.retriever = retriever self.rag_chain = rag_chain def run(self, question: str) -> str: """Runs the RAG pipeline for a given question.""" # Retrieve relevant documents documents = self.retriever.invoke(question) # Debug: Print retrieved documents print(f"\nDEBUG: Retrieved {len(documents)} documents for question: '{question}'") for i, doc in enumerate(documents): print(f"DEBUG: Document {i+1}: {doc.page_content[:200]}...") # Extract content from retrieved documents doc_texts = "\n\n".join([doc.page_content for doc in documents]) # Debug: Print the combined document text print(f"DEBUG: Combined document text: {doc_texts[:300]}...") # Get the answer from the language model answer = self.rag_chain.invoke({"question": question, "documents": doc_texts}) return answer # Main execution block if __name__ == "__main__": load_dotenv() # 1. Setup the components retriever = setup_retriever() rag_chain = setup_rag_chain() # 2. Initialize the RAG application rag_application = RAGApplication(retriever, rag_chain) # 3. Run an example query question = "What is prompt engineering" print("\n--- Running RAG Application ---") print(f"Question: {question}") answer = rag_application.run(question) print(f"Answer: {answer}")