Spaces:
Sleeping
Sleeping
File size: 7,389 Bytes
293ab16 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 |
import os
import logging
from typing import List, Tuple, Optional
import numpy as np
import faiss
import fitz # PyMuPDF
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
from langchain_core.documents import Document
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.vectorstores import FAISS
# Logger setup
logger = logging.getLogger(__name__)
logging.basicConfig(level=logging.INFO)
# app/embeddings.py
import os
from typing import List, Dict
from langchain_community.vectorstores import FAISS
from langchain.embeddings import HuggingFaceEmbeddings
from langchain_community.docstore.in_memory import InMemoryDocstore
from langchain.schema import Document
embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
# Path to FAISS index
FAISS_INDEX_DIR = "vector_index"
os.makedirs(FAISS_INDEX_DIR, exist_ok=True)
def embed_file(file_path: str) -> bool:
"""
Reads a file, embeds it into FAISS vector store and saves it.
"""
if not os.path.exists(file_path):
raise FileNotFoundError(f"β File not found: {file_path}")
with open(file_path, "r", encoding="utf-8") as f:
text = f.read()
texts = [text]
metadatas = [{"source": file_path}]
# Create and save vector store
vector_store = FAISS.from_texts(texts, embedding_model, metadatas=metadatas)
vector_store.save_local(FAISS_INDEX_DIR)
return True
def query_file_chunks(query: str, k: int = 3) -> List[Document]:
"""
Loads FAISS vector store and performs semantic search.
"""
try:
vector_store = FAISS.load_local(FAISS_INDEX_DIR, embedding_model)
except Exception as e:
raise RuntimeError(f"β Failed to load vector store: {e}")
results = vector_store.similarity_search(query, k=k)
return results
# Optionally define DocStore for manual use
DocStore = InMemoryDocstore({})
# === PDF Document Store using FAISS & SentenceTransformers ===
class DocStore:
def __init__(self, model_name: str = "all-MiniLM-L6-v2", embedding_dim: int = 384):
self.model = SentenceTransformer(model_name)
self.index = faiss.IndexFlatL2(embedding_dim)
self.texts: List[str] = []
self.metadata: List[str] = []
def _chunk_text(self, text: str, chunk_size: int = 1000) -> List[str]:
return [text[i:i + chunk_size] for i in range(0, len(text), chunk_size)]
def add_document(self, filepath: str):
doc = fitz.open(filepath)
full_text = "\n".join(page.get_text() for page in doc)
chunks = self._chunk_text(full_text)
for chunk in chunks:
embedding = self.model.encode(chunk)
self.texts.append(chunk)
self.metadata.append(filepath)
self.index.add(np.array([embedding], dtype=np.float32))
logger.info(f"π Added {len(chunks)} chunks from {filepath} to FAISS index.")
def retrieve(self, query: str, top_k: int = 3) -> List[Tuple[str, str]]:
query_vector = self.model.encode(query).astype(np.float32)
distances, indices = self.index.search(np.array([query_vector]), top_k)
results = []
for idx in indices[0]:
if idx < len(self.texts):
results.append((self.metadata[idx], self.texts[idx]))
return results
# === Utility to Add Documents to LangChain VectorStore ===
def add_to_vector_store(documents: List[Document | dict], vector_store) -> bool:
try:
if documents and isinstance(documents[0], dict):
documents = [Document(**doc) for doc in documents]
logger.info(f"π¦ Adding {len(documents)} documents to vector store...")
vector_store.add_documents(documents)
logger.info("β
Documents added successfully.")
return True
except Exception as e:
logger.error(f"β Error adding to vector store: {e}", exc_info=True)
return False
# === Local In-Memory Embedding + Search ===
class LocalEmbeddingStore:
def __init__(self, model_name: str = "all-MiniLM-L6-v2", chunk_size: int = 300):
self.model = SentenceTransformer(model_name)
self.chunk_size = chunk_size
self.store: dict[str, List[Tuple[str, np.ndarray]]] = {}
def embed_file(self, filepath: str) -> dict:
with open(filepath, "r", encoding="utf-8") as f:
content = f.read()
chunks = [content[i:i + self.chunk_size] for i in range(0, len(content), self.chunk_size)]
vectors = self.model.encode(chunks)
self.store[filepath] = list(zip(chunks, vectors))
logger.info(f"π Embedded {len(chunks)} chunks from {filepath}.")
return {"chunks": len(chunks)}
def query(self, filename: str, query: str, top_k: int = 3) -> dict:
if filename not in self.store:
return {"error": "File not embedded"}
chunks_vectors = self.store[filename]
query_vec = self.model.encode([query])[0]
similarities = [(text, cosine_similarity([query_vec], [vec])[0][0]) for text, vec in chunks_vectors]
top_chunks = sorted(similarities, key=lambda x: x[1], reverse=True)[:top_k]
return {"answer": "\n\n".join(chunk for chunk, _ in top_chunks)}
# === LangChain-Compatible FAISS Vector Store Manager ===
class VectorStoreManager:
def __init__(self, embedding_model=None, index_path: str = "db_index"):
self.embedding_model = embedding_model or HuggingFaceEmbeddings()
self.index_path = index_path
self.db: Optional[FAISS] = None
def init_vector_store(self):
if os.path.exists(self.index_path):
self.db = FAISS.load_local(self.index_path, self.embedding_model)
logger.info(f"π Loaded existing FAISS index from {self.index_path}")
else:
logger.warning(f"β οΈ No index found at {self.index_path}. It will be created on first add.")
def add_texts(self, texts: List[str], ids: Optional[List[str]] = None):
if self.db is None:
self.db = FAISS.from_texts(texts, self.embedding_model, ids=ids)
else:
self.db.add_texts(texts=texts, ids=ids)
self.db.save_local(self.index_path)
logger.info(f"β
Saved FAISS index with {len(texts)} texts to {self.index_path}")
def similarity_search(self, query: str, k: int = 3) -> List[str]:
if self.db is None:
logger.warning("β οΈ Vector store not initialized.")
return []
return self.db.similarity_search(query, k=k)
# === Test Usage ===
if __name__ == "__main__":
sample_pdf = "sample.pdf"
sample_txt = "data/sample.txt"
# FAISS PDF store
store = DocStore()
if os.path.exists(sample_pdf):
store.add_document(sample_pdf)
results = store.retrieve("What is the return policy?")
for meta, chunk in results:
print(f"\nπ File: {meta}\nπ Snippet: {chunk[:200]}...\n")
# Local text store
local_store = LocalEmbeddingStore()
if os.path.exists(sample_txt):
print(local_store.embed_file(sample_txt))
print(local_store.query(sample_txt, "discount offers"))
# VectorStoreManager test
vsm = VectorStoreManager()
vsm.init_vector_store()
vsm.add_texts(["This is a test document."], ids=["test_doc_1"])
print(vsm.similarity_search("test"))
|