Spaces:
Running
on
CPU Upgrade
Running
on
CPU Upgrade
| import json | |
| import os | |
| import sys | |
| from functools import cache | |
| from pathlib import Path | |
| import torch | |
| from langchain_community.retrievers import QdrantSparseVectorRetriever | |
| from langchain_community.vectorstores import Qdrant | |
| from langchain_core.documents import Document | |
| from langchain_openai.embeddings import OpenAIEmbeddings | |
| from qdrant_client import QdrantClient, models | |
| from transformers import AutoModelForMaskedLM, AutoTokenizer | |
| from data_processing import excel_to_dataframe | |
| class DataProcessor: | |
| def __init__(self, data_dir: Path): | |
| self.data_dir = data_dir | |
| def load_practitioners_data(self): | |
| try: | |
| df = excel_to_dataframe(self.data_dir) | |
| practitioners_data = [] | |
| for idx, row in df.iterrows(): | |
| # I am using dot as a separator for text embeddings | |
| content = ". ".join(f"{key}: {value}" for key, value in row.items()) | |
| doc = Document(page_content=content, metadata={"row": idx}) | |
| practitioners_data.append(doc) | |
| return practitioners_data | |
| except FileNotFoundError: | |
| sys.exit( | |
| "Directory or Excel file not found. Please check the path and try again." | |
| ) | |
| def load_tall_tree_data(self): | |
| # Check if the file has a .json extension | |
| json_files = [ | |
| file for file in self.data_dir.iterdir() if file.suffix == ".json" | |
| ] | |
| if not json_files: | |
| raise FileNotFoundError("No JSON files found in the specified directory.") | |
| if len(json_files) > 1: | |
| raise ValueError( | |
| "More than one JSON file found in the specified directory." | |
| ) | |
| path = json_files[0] | |
| data = self.load_json_file(path) | |
| tall_tree_data = self.process_json_data(data) | |
| return tall_tree_data | |
| def load_json_file(self, path): | |
| try: | |
| with open(path, "r") as f: | |
| data = json.load(f) | |
| return data | |
| except json.JSONDecodeError: | |
| raise ValueError(f"The file {path} is not a valid JSON file.") | |
| def process_json_data(self, data): | |
| tall_tree_data = [] | |
| for idx, (key, value) in enumerate(data.items()): | |
| content = f"{key}: {value}" | |
| doc = Document(page_content=content, metadata={"row": idx}) | |
| tall_tree_data.append(doc) | |
| return tall_tree_data | |
| class ValidateQdrantClient: | |
| """Base class for retriever clients to ensure environment variables are set.""" | |
| def __init__(self): | |
| self.validate_environment_variables() | |
| def validate_environment_variables(self): | |
| """Check if the Qdrant environment variables are set.""" | |
| required_vars = ["QDRANT_API_KEY", "QDRANT_URL"] | |
| missing_vars = [var for var in required_vars if not os.getenv(var)] | |
| if missing_vars: | |
| raise EnvironmentError( | |
| f"Missing environment variable(s): {', '.join(missing_vars)}" | |
| ) | |
| class DenseVectorStore(ValidateQdrantClient): | |
| """Store dense data in Qdrant vector database.""" | |
| TEXT_EMBEDDING_MODELS = [ | |
| "text-embedding-ada-002", | |
| "text-embedding-3-small", | |
| "text-embedding-3-large", | |
| ] | |
| def __init__( | |
| self, | |
| documents: list[Document], | |
| embeddings_model: str = "text-embedding-3-small", | |
| collection_name: str = "practitioners_db", | |
| ): | |
| super().__init__() | |
| if embeddings_model not in self.TEXT_EMBEDDING_MODELS: | |
| raise ValueError( | |
| f"Invalid embeddings model: {embeddings_model}. Valid options are {', '.join(self.TEXT_EMBEDDING_MODELS)}." | |
| ) | |
| self.documents = documents | |
| self.embeddings_model = embeddings_model | |
| self.collection_name = collection_name | |
| self._qdrant_db = None | |
| def qdrant_db(self): | |
| if self._qdrant_db is None: | |
| self._qdrant_db = Qdrant.from_documents( | |
| self.documents, | |
| OpenAIEmbeddings(model=self.embeddings_model), | |
| url=os.getenv("QDRANT_URL"), | |
| api_key=os.getenv("QDRANT_API_KEY"), | |
| prefer_grpc=True, | |
| collection_name=self.collection_name, | |
| force_recreate=True, | |
| ) | |
| return self._qdrant_db | |
| class SparseVectorStore(ValidateQdrantClient): | |
| """Store sparse vectors in Qdrant vector database using SPLADE neural retrieval model.""" | |
| def __init__( | |
| self, | |
| documents: list[Document], | |
| collection_name: str, | |
| vector_name: str, | |
| k: int = 4, | |
| splade_model_id: str = "naver/splade-cocondenser-ensembledistil", | |
| ): | |
| # Validate Qdrant client | |
| super().__init__() | |
| self.client = QdrantClient( | |
| url=os.getenv("QDRANT_URL"), | |
| api_key=os.getenv("QDRANT_API_KEY"), | |
| ) # TODO: prefer_grpc=True is not working | |
| self.model_id = splade_model_id | |
| self._tokenizer = None | |
| self._model = None | |
| self.collection_name = collection_name | |
| self.vector_name = vector_name | |
| self.k = k | |
| self.sparse_retriever = self.create_sparse_retriever() | |
| self.add_documents(documents) | |
| def tokenizer(self): | |
| """Initialize the tokenizer.""" | |
| if self._tokenizer is None: | |
| self._tokenizer = AutoTokenizer.from_pretrained(self.model_id) | |
| return self._tokenizer | |
| def model(self): | |
| """Initialize the SPLADE neural retrieval model.""" | |
| if self._model is None: | |
| self._model = AutoModelForMaskedLM.from_pretrained(self.model_id) | |
| return self._model | |
| def sparse_encoder(self, text: str) -> tuple[list[int], list[float]]: | |
| """Encode the input text into a sparse vector.""" | |
| tokens = self.tokenizer( | |
| text, | |
| return_tensors="pt", | |
| max_length=512, | |
| padding="max_length", | |
| truncation=True, | |
| ) | |
| with torch.no_grad(): | |
| logits = self.model(**tokens).logits | |
| relu_log = torch.log1p(torch.relu(logits)) | |
| weighted_log = relu_log * tokens.attention_mask.unsqueeze(-1) | |
| max_val = torch.max(weighted_log, dim=1).values.squeeze() | |
| indices = torch.nonzero(max_val, as_tuple=False).squeeze().cpu().numpy() | |
| values = max_val[indices].cpu().numpy() | |
| return indices.tolist(), values.tolist() | |
| def create_sparse_retriever(self): | |
| self.client.recreate_collection( | |
| self.collection_name, | |
| vectors_config={}, | |
| sparse_vectors_config={ | |
| self.vector_name: models.SparseVectorParams( | |
| index=models.SparseIndexParams( | |
| on_disk=False, | |
| ) | |
| ) | |
| }, | |
| ) | |
| return QdrantSparseVectorRetriever( | |
| client=self.client, | |
| collection_name=self.collection_name, | |
| sparse_vector_name=self.vector_name, | |
| sparse_encoder=self.sparse_encoder, | |
| k=self.k, | |
| ) | |
| def add_documents(self, documents): | |
| self.sparse_retriever.add_documents(documents) | |
| def main(): | |
| data_dir = Path().resolve().parent / "data" | |
| if not data_dir.exists(): | |
| sys.exit(f"The directory {data_dir} does not exist.") | |
| processor = DataProcessor(data_dir) | |
| print("Loading and cleaning Practitioners data...") | |
| practitioners_dataset = processor.load_practitioners_data() | |
| print("Loading Tall Tree data from json file...") | |
| tall_tree_dataset = processor.load_tall_tree_data() | |
| # Set OpenAI embeddings model | |
| # TODO: Test new OpenAI text embeddings models | |
| # text-embedding-3-large | |
| # text-embedding-3-small | |
| EMBEDDINGS_MODEL = "text-embedding-3-small" | |
| # Store both datasets in Qdrant | |
| print(f"Storing dense vectors in Qdrant using {EMBEDDINGS_MODEL}...") | |
| practitioners_db = DenseVectorStore( | |
| practitioners_dataset, EMBEDDINGS_MODEL, collection_name="practitioners_db" | |
| ).qdrant_db | |
| tall_tree_db = DenseVectorStore( | |
| tall_tree_dataset, EMBEDDINGS_MODEL, collection_name="tall_tree_db" | |
| ).qdrant_db | |
| print(f"Storing sparse vectors in Qdrant using SPLADE neural retrieval model...") | |
| practitioners_sparse_vector_db = SparseVectorStore( | |
| documents=practitioners_dataset, | |
| collection_name="practitioners_db_sparse_collection", | |
| vector_name="sparse_vector", | |
| k=15, | |
| splade_model_id="naver/splade-cocondenser-ensembledistil", | |
| ) | |
| if __name__ == "__main__": | |
| main() | |