Spaces:
Sleeping
Sleeping
import os | |
import logging | |
from typing import List, Tuple, Optional | |
import numpy as np | |
import faiss | |
import fitz # PyMuPDF | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
from langchain_core.documents import Document | |
from langchain_community.embeddings import HuggingFaceEmbeddings | |
from langchain_community.vectorstores import FAISS | |
# Logger setup | |
logger = logging.getLogger(__name__) | |
logging.basicConfig(level=logging.INFO) | |
# app/embeddings.py | |
import os | |
from typing import List, Dict | |
from langchain_community.vectorstores import FAISS | |
from langchain.embeddings import HuggingFaceEmbeddings | |
from langchain_community.docstore.in_memory import InMemoryDocstore | |
from langchain.schema import Document | |
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2") | |
# Path to FAISS index | |
FAISS_INDEX_DIR = "vector_index" | |
os.makedirs(FAISS_INDEX_DIR, exist_ok=True) | |
def embed_file(file_path: str) -> bool: | |
""" | |
Reads a file, embeds it into FAISS vector store and saves it. | |
""" | |
if not os.path.exists(file_path): | |
raise FileNotFoundError(f"β File not found: {file_path}") | |
with open(file_path, "r", encoding="utf-8") as f: | |
text = f.read() | |
texts = [text] | |
metadatas = [{"source": file_path}] | |
# Create and save vector store | |
vector_store = FAISS.from_texts(texts, embedding_model, metadatas=metadatas) | |
vector_store.save_local(FAISS_INDEX_DIR) | |
return True | |
def query_file_chunks(query: str, k: int = 3) -> List[Document]: | |
""" | |
Loads FAISS vector store and performs semantic search. | |
""" | |
try: | |
vector_store = FAISS.load_local(FAISS_INDEX_DIR, embedding_model) | |
except Exception as e: | |
raise RuntimeError(f"β Failed to load vector store: {e}") | |
results = vector_store.similarity_search(query, k=k) | |
return results | |
# Optionally define DocStore for manual use | |
DocStore = InMemoryDocstore({}) | |
# === PDF Document Store using FAISS & SentenceTransformers === | |
class DocStore: | |
def __init__(self, model_name: str = "all-MiniLM-L6-v2", embedding_dim: int = 384): | |
self.model = SentenceTransformer(model_name) | |
self.index = faiss.IndexFlatL2(embedding_dim) | |
self.texts: List[str] = [] | |
self.metadata: List[str] = [] | |
def _chunk_text(self, text: str, chunk_size: int = 1000) -> List[str]: | |
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)] | |
def add_document(self, filepath: str): | |
doc = fitz.open(filepath) | |
full_text = "\n".join(page.get_text() for page in doc) | |
chunks = self._chunk_text(full_text) | |
for chunk in chunks: | |
embedding = self.model.encode(chunk) | |
self.texts.append(chunk) | |
self.metadata.append(filepath) | |
self.index.add(np.array([embedding], dtype=np.float32)) | |
logger.info(f"π Added {len(chunks)} chunks from {filepath} to FAISS index.") | |
def retrieve(self, query: str, top_k: int = 3) -> List[Tuple[str, str]]: | |
query_vector = self.model.encode(query).astype(np.float32) | |
distances, indices = self.index.search(np.array([query_vector]), top_k) | |
results = [] | |
for idx in indices[0]: | |
if idx < len(self.texts): | |
results.append((self.metadata[idx], self.texts[idx])) | |
return results | |
# === Utility to Add Documents to LangChain VectorStore === | |
def add_to_vector_store(documents: List[Document | dict], vector_store) -> bool: | |
try: | |
if documents and isinstance(documents[0], dict): | |
documents = [Document(**doc) for doc in documents] | |
logger.info(f"π¦ Adding {len(documents)} documents to vector store...") | |
vector_store.add_documents(documents) | |
logger.info("β Documents added successfully.") | |
return True | |
except Exception as e: | |
logger.error(f"β Error adding to vector store: {e}", exc_info=True) | |
return False | |
# === Local In-Memory Embedding + Search === | |
class LocalEmbeddingStore: | |
def __init__(self, model_name: str = "all-MiniLM-L6-v2", chunk_size: int = 300): | |
self.model = SentenceTransformer(model_name) | |
self.chunk_size = chunk_size | |
self.store: dict[str, List[Tuple[str, np.ndarray]]] = {} | |
def embed_file(self, filepath: str) -> dict: | |
with open(filepath, "r", encoding="utf-8") as f: | |
content = f.read() | |
chunks = [content[i:i + self.chunk_size] for i in range(0, len(content), self.chunk_size)] | |
vectors = self.model.encode(chunks) | |
self.store[filepath] = list(zip(chunks, vectors)) | |
logger.info(f"π Embedded {len(chunks)} chunks from {filepath}.") | |
return {"chunks": len(chunks)} | |
def query(self, filename: str, query: str, top_k: int = 3) -> dict: | |
if filename not in self.store: | |
return {"error": "File not embedded"} | |
chunks_vectors = self.store[filename] | |
query_vec = self.model.encode([query])[0] | |
similarities = [(text, cosine_similarity([query_vec], [vec])[0][0]) for text, vec in chunks_vectors] | |
top_chunks = sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k] | |
return {"answer": "\n\n".join(chunk for chunk, _ in top_chunks)} | |
# === LangChain-Compatible FAISS Vector Store Manager === | |
class VectorStoreManager: | |
def __init__(self, embedding_model=None, index_path: str = "db_index"): | |
self.embedding_model = embedding_model or HuggingFaceEmbeddings() | |
self.index_path = index_path | |
self.db: Optional[FAISS] = None | |
def init_vector_store(self): | |
if os.path.exists(self.index_path): | |
self.db = FAISS.load_local(self.index_path, self.embedding_model) | |
logger.info(f"π Loaded existing FAISS index from {self.index_path}") | |
else: | |
logger.warning(f"β οΈ No index found at {self.index_path}. It will be created on first add.") | |
def add_texts(self, texts: List[str], ids: Optional[List[str]] = None): | |
if self.db is None: | |
self.db = FAISS.from_texts(texts, self.embedding_model, ids=ids) | |
else: | |
self.db.add_texts(texts=texts, ids=ids) | |
self.db.save_local(self.index_path) | |
logger.info(f"β Saved FAISS index with {len(texts)} texts to {self.index_path}") | |
def similarity_search(self, query: str, k: int = 3) -> List[str]: | |
if self.db is None: | |
logger.warning("β οΈ Vector store not initialized.") | |
return [] | |
return self.db.similarity_search(query, k=k) | |
# === Test Usage === | |
if __name__ == "__main__": | |
sample_pdf = "sample.pdf" | |
sample_txt = "data/sample.txt" | |
# FAISS PDF store | |
store = DocStore() | |
if os.path.exists(sample_pdf): | |
store.add_document(sample_pdf) | |
results = store.retrieve("What is the return policy?") | |
for meta, chunk in results: | |
print(f"\nπ File: {meta}\nπ Snippet: {chunk[:200]}...\n") | |
# Local text store | |
local_store = LocalEmbeddingStore() | |
if os.path.exists(sample_txt): | |
print(local_store.embed_file(sample_txt)) | |
print(local_store.query(sample_txt, "discount offers")) | |
# VectorStoreManager test | |
vsm = VectorStoreManager() | |
vsm.init_vector_store() | |
vsm.add_texts(["This is a test document."], ids=["test_doc_1"]) | |
print(vsm.similarity_search("test")) | |