Spaces:
Sleeping
Sleeping
import os | |
from dotenv import load_dotenv | |
from langchain_community.document_loaders import WebBaseLoader | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain_community.vectorstores import Chroma | |
from langchain_ollama import ChatOllama | |
from langchain.prompts import PromptTemplate | |
from langchain_core.output_parsers import StrOutputParser | |
from langchain_core.retrievers import BaseRetriever | |
from langchain_core.runnables import Runnable | |
from langchain_core.documents import Document | |
from langchain_core.embeddings import Embeddings | |
import chromadb | |
import numpy as np | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import pandas as pd | |
from typing import Optional, List | |
import re | |
import torch | |
import subprocess | |
# Load tokenizer and model separately to configure properly | |
from transformers import AutoTokenizer, AutoModelForCausalLM | |
# OPTION 1: Use Hugging Face Pipeline (Recommended for HF Spaces) | |
from transformers import pipeline | |
from langchain.llms import HuggingFacePipeline | |
# Disable ChromaDB telemetry to avoid the error | |
os.environ["ANONYMIZED_TELEMETRY"] = "False" | |
os.environ["CHROMA_SERVER_HOST"] = "localhost" | |
os.environ["CHROMA_SERVER_HTTP_PORT"] = "8000" | |
class ImprovedTFIDFEmbeddings(Embeddings): | |
"""Improved TF-IDF based embedding function with better preprocessing.""" | |
def __init__(self): | |
self.vectorizer = TfidfVectorizer( | |
max_features=5000, | |
stop_words='english', | |
ngram_range=(1, 3), | |
min_df=1, | |
max_df=0.85, | |
lowercase=True, | |
strip_accents='unicode', | |
analyzer='word' | |
) | |
self.fitted = False | |
self.documents = [] | |
def embed_documents(self, texts): | |
"""Create embeddings for a list of texts.""" | |
if not self.fitted: | |
self.documents = texts | |
self.vectorizer.fit(texts) | |
self.fitted = True | |
# Transform texts to TF-IDF vectors | |
tfidf_matrix = self.vectorizer.transform(texts) | |
# Convert to dense arrays and normalize | |
embeddings = [] | |
for i in range(tfidf_matrix.shape[0]): | |
embedding = tfidf_matrix[i].toarray().flatten() | |
# Normalize the embedding | |
norm = np.linalg.norm(embedding) | |
if norm > 0: | |
embedding = embedding / norm | |
# Pad or truncate to 512 dimensions | |
if len(embedding) < 512: | |
embedding = np.pad(embedding, (0, 512 - len(embedding))) | |
else: | |
embedding = embedding[:512] | |
embeddings.append(embedding.tolist()) | |
return embeddings | |
def embed_query(self, text): | |
"""Create embedding for a single query text.""" | |
if not self.fitted: | |
# If not fitted, fit with just this text | |
self.vectorizer.fit([text]) | |
self.fitted = True | |
# Transform query to TF-IDF vector | |
tfidf_matrix = self.vectorizer.transform([text]) | |
embedding = tfidf_matrix[0].toarray().flatten() | |
# Normalize the embedding | |
norm = np.linalg.norm(embedding) | |
if norm > 0: | |
embedding = embedding / norm | |
# Pad or truncate to 512 dimensions | |
if len(embedding) < 512: | |
embedding = np.pad(embedding, (0, 512 - len(embedding))) | |
else: | |
embedding = embedding[:512] | |
return embedding.tolist() | |
class SmartFAQRetriever(BaseRetriever): | |
"""Smart retriever optimized for FAQ datasets with semantic similarity.""" | |
def __init__(self, documents: List[Document], k: int = 4): | |
super().__init__() | |
self._documents = documents | |
self._k = k | |
self._vectorizer = None # Use private attribute | |
def documents(self): | |
return self._documents | |
def k(self): | |
return self._k | |
def get_documents_with_confidence(self, query: str) -> List[dict]: | |
"""Return top documents and their confidence (similarity) scores.""" | |
results = self._get_relevant_documents_with_scores(query) | |
return [{"document": doc.page_content, "confidence": round(score, 3)} for doc, score in results] | |
def _get_relevant_documents_with_scores(self, query: str) -> List[tuple[Document, float]]: | |
"""Retrieve documents along with similarity scores.""" | |
if not hasattr(self, '_vectorizer') or self._vectorizer is None or not hasattr(self._vectorizer, 'vocabulary_') or not self._vectorizer.vocabulary_: | |
self._vectorizer = TfidfVectorizer( | |
max_features=3000, | |
stop_words='english', | |
ngram_range=(1, 2), | |
min_df=1, | |
max_df=0.9 | |
) | |
questions = [doc.page_content.split("ANSWER:")[0].replace("QUESTION:", "").strip() | |
if "QUESTION:" in doc.page_content else doc.page_content | |
for doc in self._documents] | |
self._vectorizer.fit(questions) | |
query_vector = self._vectorizer.transform([query.lower().strip()]) | |
question_texts = [doc.page_content.split("ANSWER:")[0].replace("QUESTION:", "").strip() | |
if "QUESTION:" in doc.page_content else doc.page_content | |
for doc in self._documents] | |
question_vectors = self._vectorizer.transform(question_texts) | |
similarities = cosine_similarity(query_vector, question_vectors).flatten() | |
top_indices = similarities.argsort()[-self._k:][::-1] | |
return [(self._documents[i], float(similarities[i])) for i in top_indices if similarities[i] > 0.1] | |
def _get_relevant_documents(self, query: str) -> List[Document]: | |
"""Retrieve documents based on semantic similarity.""" | |
# Ensure vectorizer is fitted | |
if not hasattr(self, '_vectorizer') or self._vectorizer is None or not hasattr(self._vectorizer, 'vocabulary_') or not self._vectorizer.vocabulary_: | |
print("[SmartFAQRetriever] Fitting vectorizer...") | |
self._vectorizer = TfidfVectorizer( | |
max_features=3000, | |
stop_words='english', | |
ngram_range=(1, 2), | |
min_df=1, | |
max_df=0.9 | |
) | |
questions = [] | |
for doc in self._documents: | |
if "QUESTION:" in doc.page_content: | |
question_part = doc.page_content.split("ANSWER:")[0] | |
question = question_part.replace("QUESTION:", "").strip() | |
questions.append(question) | |
else: | |
questions.append(doc.page_content) | |
self._vectorizer.fit(questions) | |
query_lower = query.lower().strip() | |
# Extract questions from documents | |
questions = [] | |
for doc in self._documents: | |
if "QUESTION:" in doc.page_content: | |
question_part = doc.page_content.split("ANSWER:")[0] | |
question = question_part.replace("QUESTION:", "").strip() | |
questions.append(question) | |
else: | |
questions.append(doc.page_content) | |
# Transform query and questions to TF-IDF vectors | |
query_vector = self._vectorizer.transform([query_lower]) | |
question_vectors = self._vectorizer.transform(questions) | |
# Calculate cosine similarities | |
similarities = cosine_similarity(query_vector, question_vectors).flatten() | |
# Get top k documents | |
top_indices = similarities.argsort()[-self._k:][::-1] | |
# Return documents with highest similarity scores | |
relevant_docs = [self._documents[i] for i in top_indices if similarities[i] > 0.1] | |
if not relevant_docs: | |
# Fallback to first k documents if no good matches | |
relevant_docs = self._documents[:self._k] | |
return relevant_docs | |
async def _aget_relevant_documents(self, query: str) -> List[Document]: | |
"""Async version of get_relevant_documents.""" | |
return self._get_relevant_documents(query) | |
def setup_retriever(use_kaggle_data: bool = False, kaggle_dataset: Optional[str] = None, | |
kaggle_username: Optional[str] = None, kaggle_key: Optional[str] = None, | |
use_local_mental_health_data: bool = False) -> BaseRetriever: | |
""" | |
Creates a vector store with documents from test data, Kaggle datasets, or local mental health data. | |
Args: | |
use_kaggle_data: Whether to load Kaggle data instead of test documents | |
kaggle_dataset: Kaggle dataset name (e.g., 'username/dataset-name') | |
kaggle_username: Your Kaggle username (optional if using kaggle.json) | |
kaggle_key: Your Kaggle API key (optional if using kaggle.json) | |
use_local_mental_health_data: Whether to load local mental health FAQ data | |
""" | |
print("Setting up the retriever...") | |
if use_local_mental_health_data: | |
try: | |
print("Loading mental health FAQ data from local file...") | |
mental_health_file = "data/Mental_Health_FAQ.csv" | |
if not os.path.exists(mental_health_file): | |
print(f"Mental health FAQ file not found: {mental_health_file}") | |
use_local_mental_health_data = False | |
else: | |
# Load mental health FAQ data | |
df = pd.read_csv(mental_health_file) | |
documents = [] | |
for _, row in df.iterrows(): | |
question = row['Questions'] | |
answer = row['Answers'] | |
# Create document in FAQ format | |
content = f"QUESTION: {question}\nANSWER: {answer}" | |
documents.append(Document(page_content=content)) | |
print(f"Loaded {len(documents)} mental health FAQ documents") | |
for i, doc in enumerate(documents[:3]): | |
print(f"Sample FAQ {i+1}: {doc.page_content[:200]}...") | |
except Exception as e: | |
print(f"Error loading mental health data: {e}") | |
use_local_mental_health_data = False | |
if use_kaggle_data and kaggle_dataset: | |
try: | |
from src.kaggle_loader import KaggleDataLoader | |
print(f"Loading Kaggle dataset: {kaggle_dataset}") | |
# Create loader without parameters - it will auto-load from kaggle.json | |
loader = KaggleDataLoader() | |
# Download the dataset | |
dataset_path = loader.download_dataset(kaggle_dataset) | |
# Load documents based on file type - only process files from this specific dataset | |
documents = [] | |
# Get the dataset name to identify the correct files | |
dataset_name = kaggle_dataset.split('/')[-1] | |
print(f"Processing files in dataset directory: {dataset_path}") | |
for file in os.listdir(dataset_path): | |
file_path = os.path.join(dataset_path, file) | |
if file.endswith('.csv'): | |
print(f"Loading CSV file: {file}") | |
# For FAQ datasets, use the improved loading method | |
if 'faq' in file.lower() or 'mental' in file.lower(): | |
documents.extend(loader.load_csv_dataset(file_path, [], chunk_size=50)) | |
else: | |
# For other CSV files, use first few columns as text | |
df = pd.read_csv(file_path) | |
text_columns = df.columns[:3].tolist() # Use first 3 columns | |
documents.extend(loader.load_csv_dataset(file_path, text_columns, chunk_size=50)) | |
elif file.endswith('.json'): | |
print(f"Loading JSON file: {file}") | |
documents.extend(loader.load_json_dataset(file_path)) | |
elif file.endswith('.txt'): | |
print(f"Loading text file: {file}") | |
documents.extend(loader.load_text_dataset(file_path)) | |
print(f"Loaded {len(documents)} documents from Kaggle dataset") | |
for i, doc in enumerate(documents[:3]): | |
print(f"Sample doc {i+1}: {doc.page_content[:200]}") | |
except Exception as e: | |
print(f"Error loading Kaggle data: {e}") | |
print("Falling back to test documents...") | |
use_kaggle_data = False | |
if not use_kaggle_data and not use_local_mental_health_data: | |
# No test documents - use mental health data as default | |
print("No specific data source specified, loading mental health FAQ data as default...") | |
try: | |
mental_health_file = "data/Mental_Health_FAQ.csv" | |
if not os.path.exists(mental_health_file): | |
raise FileNotFoundError(f"Mental health FAQ file not found: {mental_health_file}") | |
# Load mental health FAQ data | |
df = pd.read_csv(mental_health_file) | |
documents = [] | |
for _, row in df.iterrows(): | |
question = row['Questions'] | |
answer = row['Answers'] | |
# Create document in FAQ format | |
content = f"QUESTION: {question}\nANSWER: {answer}" | |
documents.append(Document(page_content=content)) | |
print(f"Loaded {len(documents)} mental health FAQ documents") | |
for i, doc in enumerate(documents[:3]): | |
print(f"Sample FAQ {i+1}: {doc.page_content[:200]}...") | |
except Exception as e: | |
print(f"Error loading mental health data: {e}") | |
raise Exception("No valid data source available. Please ensure mental health FAQ data is present or provide Kaggle credentials.") | |
print("Creating TF-IDF embeddings...") | |
embeddings = ImprovedTFIDFEmbeddings() | |
print("Creating ChromaDB vector store...") | |
client = chromadb.PersistentClient(path="./tmp/chroma_db") | |
# Clear existing collections to prevent mixing old and new data | |
try: | |
collections = client.list_collections() | |
for collection in collections: | |
print(f"Deleting existing collection: {collection.name}") | |
client.delete_collection(collection.name) | |
except Exception as e: | |
print(f"Warning: Could not clear existing collections: {e}") | |
print(f"Processing {len(documents)} documents...") | |
# Check if this is a FAQ dataset and use smart retriever | |
if any("QUESTION:" in doc.page_content for doc in documents): | |
print("Using SmartFAQRetriever for better semantic matching...") | |
return SmartFAQRetriever(documents, k=4) | |
else: | |
# Use vector store for non-FAQ datasets | |
vectorstore = Chroma.from_documents( | |
documents=documents, | |
embedding=embeddings, | |
client=client | |
) | |
print("Retriever setup complete.") | |
return vectorstore.as_retriever(k=4) | |
# def setup_rag_chain() -> Runnable: | |
# """Sets up the RAG chain with a prompt template and an LLM.""" | |
# # Define the prompt template for the LLM | |
# prompt = PromptTemplate( | |
# template="""You are an assistant for question-answering tasks. | |
# Use the following documents to answer the question. | |
# If you don't know the answer, just say that you don't know. | |
# Use three sentences maximum and keep the answer concise: | |
# Question: {question} | |
# Documents: {documents} | |
# Answer: | |
# """, | |
# input_variables=["question", "documents"], | |
# ) | |
# # Initialize the LLM with dolphin-llama3:8b model | |
# # Note: This requires the Ollama server to be running with the specified model | |
# llm = ChatOllama( | |
# model="deepseek-ai/DeepSeek-R1-0528-Qwen3-8B", | |
# temperature=0, | |
# ) | |
# # Create a chain combining the prompt template and LLM | |
# return prompt | llm | StrOutputParser() | |
def setup_rag_chain() -> Runnable: | |
"""Sets up the RAG chain with a prompt template and an LLM.""" | |
# Define the prompt template for the LLM | |
prompt = PromptTemplate( | |
template="""Context: You are a medical information assistant that answers health questions using verified medical documents. | |
Primary Task: Answer the medical question using ONLY the provided documents. | |
Instructions: | |
1. For medical questions: Provide a clear, accurate answer based solely on the document content | |
2. If documents lack sufficient information: "I don't have enough information in the provided documents to answer this question" | |
3. For non-medical questions: "I specialize in medical information. Please ask a health-related question." | |
4. For identity questions: "I am a medical information assistant designed to help answer health-related questions based on verified medical documents." | |
5. Always use patient-friendly language | |
6. Keep responses 2-4 sentences maximum | |
7. For serious symptoms, recommend consulting healthcare professionals | |
Documents: {documents} | |
Question: {question} | |
Medical Answer:""", | |
input_variables=["question", "documents"], | |
) | |
try: | |
tokenizer = AutoTokenizer.from_pretrained("HuggingFaceTB/SmolLM3-3B") | |
model = AutoModelForCausalLM.from_pretrained( | |
"HuggingFaceTB/SmolLM3-3B", | |
device_map="auto", | |
torch_dtype=torch.float16 | |
) | |
# Fix the tokenizer configuration properly | |
if tokenizer.pad_token is None: | |
tokenizer.pad_token = tokenizer.eos_token | |
print(f"Tokenizer pad_token_id: {tokenizer.pad_token_id}") | |
print(f"Tokenizer eos_token_id: {tokenizer.eos_token_id}") | |
# Initialize pipeline with correct token IDs from tokenizer | |
hf_pipeline = pipeline( | |
"text-generation", | |
model=model, | |
tokenizer=tokenizer, | |
max_new_tokens=50, # Start small for testing | |
temperature=0.2, | |
return_full_text=False, | |
do_sample=True, | |
# Use actual tokenizer token IDs, not hardcoded values | |
pad_token_id=tokenizer.pad_token_id, | |
eos_token_id=tokenizer.eos_token_id, | |
clean_up_tokenization_spaces=True | |
) | |
# Test the pipeline with a simple input | |
test_input = "What is diabetes?" | |
print(f"Testing pipeline with: {test_input}") | |
test_result = hf_pipeline(test_input) | |
print(f"Pipeline test successful: {test_result}") | |
except Exception as e: | |
print(f"Error setting up BioGPT: {e}") | |
print("Falling back to DistilGPT-2...") | |
# Fallback to a more stable model | |
hf_pipeline = pipeline( | |
"text-generation", | |
model="distilgpt2", | |
max_new_tokens=50, | |
temperature=0.2, | |
return_full_text=False, | |
do_sample=True, | |
clean_up_tokenization_spaces=True | |
) | |
# Test the fallback pipeline | |
test_input = "What is diabetes?" | |
print(f"Testing fallback pipeline with: {test_input}") | |
test_result = hf_pipeline(test_input) | |
print(f"Fallback pipeline test successful: {test_result}") | |
# Wrap it in LangChain | |
llm = HuggingFacePipeline(pipeline=hf_pipeline) | |
# Create a chain combining the prompt template and LLM | |
return prompt | llm | StrOutputParser() | |
# Also update the RAG application class with better error handling | |
class RAGApplication: | |
def __init__(self, retriever: BaseRetriever, rag_chain: Runnable): | |
self.retriever = retriever | |
self.rag_chain = rag_chain | |
# def run(self, question: str) -> str: | |
# """Runs the RAG pipeline for a given question.""" | |
# try: | |
# # Input validation | |
# if not question or not question.strip(): | |
# return "Please provide a valid question." | |
# question = question.strip() | |
# print(f"\nProcessing question: '{question}'") | |
# # Retrieve relevant documents | |
# documents = self.retriever.invoke(question) | |
# # Debug: Print retrieved documents | |
# print(f"DEBUG: Retrieved {len(documents)} documents") | |
# for i, doc in enumerate(documents): | |
# print(f"DEBUG: Document {i+1}: {doc.page_content[:200]}...") | |
# # Extract content from retrieved documents | |
# doc_texts = "\n\n".join([doc.page_content for doc in documents]) | |
# # Limit the total input length to prevent token overflow | |
# max_input_length = 500 # Conservative limit | |
# if len(doc_texts) > max_input_length: | |
# doc_texts = doc_texts[:max_input_length] + "..." | |
# print(f"DEBUG: Truncated document text to {max_input_length} characters") | |
# print(f"DEBUG: Combined document text length: {len(doc_texts)}") | |
# # Get the answer from the language model | |
# print("DEBUG: Calling language model...") | |
# answer = self.rag_chain.invoke({"question": question, "documents": doc_texts}) | |
# print(f"DEBUG: Language model response: {answer}") | |
# return answer | |
# except Exception as e: | |
# print(f"Error in RAG application: {str(e)}") | |
# import traceback | |
# traceback.print_exc() | |
# return f"I apologize, but I encountered an error processing your question: {str(e)}. Please try rephrasing it or ask a different question." | |
def run(self, question: str) -> str: | |
try: | |
if not question.strip(): | |
return "Please provide a valid question." | |
print(f"\nProcessing question: '{question}'") | |
if hasattr(self.retriever, "get_documents_with_confidence"): | |
docs_with_scores = self.retriever.get_documents_with_confidence(question) | |
documents = [Document(page_content=d["document"]) for d in docs_with_scores] | |
confidence_info = "\n".join([f"- Score: {d['confidence']}, Snippet: {d['document'][:100]}..." for d in docs_with_scores]) | |
else: | |
documents = self.retriever.invoke(question) | |
confidence_info = "Confidence scoring not available." | |
print(f"Retrieved {len(documents)} documents") | |
print(confidence_info) | |
doc_texts = "\n\n".join([doc.page_content for doc in documents]) | |
if len(doc_texts) > 500: | |
doc_texts = doc_texts[:500] + "..." | |
answer = self.rag_chain.invoke({"question": question, "documents": doc_texts}) | |
# Append confidence footer | |
footer = "\n\n(Note: This answer is based on documents with confidence scores. Review full context if critical.)" | |
return answer.strip() + footer | |
except Exception as e: | |
print(f"Error in RAG application: {str(e)}") | |
import traceback | |
traceback.print_exc() | |
return f"I apologize, but I encountered an error processing your question: {str(e)}. Please try rephrasing it or ask a different question." | |
# Main execution block | |
if __name__ == "__main__": | |
load_dotenv() | |
# 1. Setup the components | |
retriever = setup_retriever() | |
rag_chain = setup_rag_chain() | |
# 2. Initialize the RAG application | |
rag_application = RAGApplication(retriever, rag_chain) | |
# 3. Run an example query | |
question = "What is terminal illness?" | |
print("\n--- Running RAG Application ---") | |
print(f"Question: {question}") | |
answer = rag_application.run(question) | |
print(f"Answer: {answer}") | |