File size: 14,473 Bytes
575f1c7
 
 
 
 
 
 
 
21c909d
 
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21c909d
 
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21c909d
 
 
 
 
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21c909d
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
f46dfbd
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
21c909d
 
 
 
f46dfbd
 
 
 
 
 
575f1c7
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
"""Vector store management using Chroma for document storage and retrieval."""

import os
from typing import List, Optional, Dict, Any, Tuple
from pathlib import Path
from langchain_chroma import Chroma
from langchain_core.documents import Document
from langchain_core.vectorstores import VectorStoreRetriever
from langchain_community.retrievers import BM25Retriever
from langchain.retrievers import EnsembleRetriever
from src.rag.embeddings import embedding_manager
from src.core.config import config
from src.core.logging_config import get_logger

logger = get_logger(__name__)

class VectorStoreManager:
    """Manages Chroma vector store for document storage and retrieval."""
    
    def __init__(self, persist_directory: Optional[str] = None, collection_name: str = "markit_documents"):
        """
        Initialize the vector store manager.
        
        Args:
            persist_directory: Directory to persist the vector database
            collection_name: Name of the collection in the vector store
        """
        self.collection_name = collection_name
        
        # Set default persist directory
        if persist_directory is None:
            persist_directory = config.rag.vector_store_path
        
        self.persist_directory = str(Path(persist_directory).resolve())
        
        # Ensure the directory exists
        os.makedirs(self.persist_directory, exist_ok=True)
        
        self._vector_store: Optional[Chroma] = None
        self._documents_cache: List[Document] = []  # Cache documents for BM25 retriever
        self._bm25_retriever: Optional[BM25Retriever] = None
        
        logger.info(f"VectorStoreManager initialized with persist_directory={self.persist_directory}")
    
    def get_vector_store(self) -> Chroma:
        """Get or create the Chroma vector store."""
        if self._vector_store is None:
            try:
                embedding_model = embedding_manager.get_embedding_model()
                
                self._vector_store = Chroma(
                    collection_name=self.collection_name,
                    embedding_function=embedding_model,
                    persist_directory=self.persist_directory,
                    collection_metadata={"hnsw:space": "cosine"}  # Use cosine similarity
                )
                
                logger.info(f"Vector store initialized with collection '{self.collection_name}'")
                
            except Exception as e:
                logger.error(f"Failed to initialize vector store: {e}")
                raise
        
        return self._vector_store
    
    def add_documents(self, documents: List[Document]) -> List[str]:
        """
        Add documents to the vector store.
        
        Args:
            documents: List of Document objects to add
            
        Returns:
            List of document IDs that were added
        """
        try:
            if not documents:
                logger.warning("No documents provided to add to vector store")
                return []
            
            vector_store = self.get_vector_store()
            
            # Generate unique IDs for documents
            doc_ids = [f"doc_{i}_{hash(doc.page_content)}" for i, doc in enumerate(documents)]
            
            # Add documents to the vector store
            added_ids = vector_store.add_documents(documents=documents, ids=doc_ids)
            
            # Update documents cache for BM25 retriever
            self._documents_cache.extend(documents)
            # Reset BM25 retriever to force rebuild with new documents
            self._bm25_retriever = None
            
            logger.info(f"Added {len(added_ids)} documents to vector store")
            return added_ids
            
        except Exception as e:
            logger.error(f"Error adding documents to vector store: {e}")
            raise
    
    def similarity_search(self, query: str, k: int = 4, score_threshold: Optional[float] = None) -> List[Document]:
        """
        Search for similar documents using semantic similarity.
        
        Args:
            query: Search query
            k: Number of documents to return
            score_threshold: Minimum similarity score threshold
            
        Returns:
            List of similar documents
        """
        try:
            vector_store = self.get_vector_store()
            
            if score_threshold is not None:
                # Use similarity search with score threshold
                docs_with_scores = vector_store.similarity_search_with_relevance_scores(
                    query=query,
                    k=k,
                    score_threshold=score_threshold
                )
                documents = [doc for doc, score in docs_with_scores]
            else:
                # Regular similarity search
                documents = vector_store.similarity_search(query=query, k=k)
            
            logger.info(f"Found {len(documents)} similar documents for query: '{query[:50]}...'")
            return documents
            
        except Exception as e:
            logger.error(f"Error performing similarity search: {e}")
            return []
    
    def get_retriever(self, search_type: str = "similarity", search_kwargs: Optional[Dict[str, Any]] = None) -> VectorStoreRetriever:
        """
        Get a retriever for the vector store.
        
        Args:
            search_type: Type of search ("similarity", "mmr", "similarity_score_threshold")
            search_kwargs: Additional search parameters
            
        Returns:
            VectorStoreRetriever object
        """
        try:
            vector_store = self.get_vector_store()
            
            if search_kwargs is None:
                search_kwargs = {"k": 4}
            
            retriever = vector_store.as_retriever(
                search_type=search_type,
                search_kwargs=search_kwargs
            )
            
            logger.info(f"Created retriever with search_type='{search_type}' and kwargs={search_kwargs}")
            return retriever
            
        except Exception as e:
            logger.error(f"Error creating retriever: {e}")
            raise
    
    def get_bm25_retriever(self, k: int = 4) -> BM25Retriever:
        """
        Get or create a BM25 retriever for keyword-based search.
        
        Args:
            k: Number of documents to return
            
        Returns:
            BM25Retriever object
        """
        try:
            if self._bm25_retriever is None or not self._documents_cache:
                if not self._documents_cache:
                    # Try to load documents from the vector store
                    vector_store = self.get_vector_store()
                    collection = vector_store._collection
                    all_docs = collection.get()
                    
                    if all_docs and all_docs.get('documents') and all_docs.get('metadatas'):
                        # Reconstruct documents from vector store
                        self._documents_cache = [
                            Document(page_content=content, metadata=metadata)
                            for content, metadata in zip(all_docs['documents'], all_docs['metadatas'])
                        ]
                
                if self._documents_cache:
                    self._bm25_retriever = BM25Retriever.from_documents(
                        documents=self._documents_cache,
                        k=k
                    )
                    logger.info(f"Created BM25 retriever with {len(self._documents_cache)} documents")
                else:
                    logger.warning("No documents available for BM25 retriever")
                    # Create empty retriever
                    self._bm25_retriever = BM25Retriever.from_documents(
                        documents=[Document(page_content="", metadata={})],
                        k=k
                    )
            
            # Update k if different
            if hasattr(self._bm25_retriever, 'k'):
                self._bm25_retriever.k = k
            
            return self._bm25_retriever
            
        except Exception as e:
            logger.error(f"Error creating BM25 retriever: {e}")
            raise
    
    def get_hybrid_retriever(self, 
                           k: int = 4, 
                           semantic_weight: float = 0.7, 
                           keyword_weight: float = 0.3,
                           search_type: str = "similarity",
                           search_kwargs: Optional[Dict[str, Any]] = None) -> EnsembleRetriever:
        """
        Get a hybrid retriever that combines semantic (vector) and keyword (BM25) search.
        
        Args:
            k: Number of documents to return
            semantic_weight: Weight for semantic search (0.0 to 1.0)
            keyword_weight: Weight for keyword search (0.0 to 1.0)
            search_type: Type of semantic search ("similarity", "mmr", "similarity_score_threshold")
            search_kwargs: Additional search parameters for semantic retriever
            
        Returns:
            EnsembleRetriever object combining both approaches
        """
        try:
            # Normalize weights
            total_weight = semantic_weight + keyword_weight
            if total_weight == 0:
                semantic_weight, keyword_weight = 0.7, 0.3
            else:
                semantic_weight = semantic_weight / total_weight
                keyword_weight = keyword_weight / total_weight
            
            # Get semantic retriever
            if search_kwargs is None:
                search_kwargs = {"k": k}
            else:
                search_kwargs = search_kwargs.copy()
                search_kwargs["k"] = k
            
            semantic_retriever = self.get_retriever(
                search_type=search_type,
                search_kwargs=search_kwargs
            )
            
            # Get BM25 retriever
            keyword_retriever = self.get_bm25_retriever(k=k)
            
            # Create ensemble retriever
            ensemble_retriever = EnsembleRetriever(
                retrievers=[semantic_retriever, keyword_retriever],
                weights=[semantic_weight, keyword_weight]
            )
            
            logger.info(f"Created hybrid retriever with weights: semantic={semantic_weight:.2f}, keyword={keyword_weight:.2f}")
            return ensemble_retriever
            
        except Exception as e:
            logger.error(f"Error creating hybrid retriever: {e}")
            raise
    
    def get_collection_info(self) -> Dict[str, Any]:
        """
        Get information about the current collection.
        
        Returns:
            Dictionary with collection information
        """
        try:
            vector_store = self.get_vector_store()
            
            # Get collection count
            count = vector_store._collection.count()
            
            info = {
                "collection_name": self.collection_name,
                "persist_directory": self.persist_directory,
                "document_count": count,
                "embedding_model": "text-embedding-3-small"
            }
            
            logger.info(f"Collection info: {info}")
            return info
            
        except Exception as e:
            logger.error(f"Error getting collection info: {e}")
            return {"error": str(e)}
    
    def delete_collection(self) -> bool:
        """
        Delete the current collection and reset the vector store.
        
        Returns:
            True if successful, False otherwise
        """
        try:
            if self._vector_store is not None:
                self._vector_store.delete_collection()
                self._vector_store = None
                
            logger.info(f"Deleted collection '{self.collection_name}'")
            return True
            
        except Exception as e:
            logger.error(f"Error deleting collection: {e}")
            return False
    
    def search_with_metadata_filter(self, query: str, metadata_filter: Dict[str, Any], k: int = 4) -> List[Document]:
        """
        Search documents with metadata filtering.
        
        Args:
            query: Search query
            metadata_filter: Metadata filter conditions
            k: Number of documents to return
            
        Returns:
            List of filtered documents
        """
        try:
            vector_store = self.get_vector_store()
            
            documents = vector_store.similarity_search(
                query=query,
                k=k,
                filter=metadata_filter
            )
            
            logger.info(f"Found {len(documents)} documents with metadata filter: {metadata_filter}")
            return documents
            
        except Exception as e:
            logger.error(f"Error searching with metadata filter: {e}")
            return []
    
    def clear_all_documents(self) -> bool:
        """
        Clear all documents from the vector store collection.
        
        Returns:
            True if successful, False otherwise
        """
        try:
            vector_store = self.get_vector_store()
            
            # Get all document IDs first
            collection = vector_store._collection
            all_docs = collection.get()
            
            if not all_docs or not all_docs.get('ids'):
                logger.info("No documents found in vector store to clear")
                return True
            
            # Delete all documents using their IDs
            collection.delete(ids=all_docs['ids'])
            
            # Reset the vector store instance to ensure clean state
            self._vector_store = None
            
            # Clear documents cache and BM25 retriever
            self._documents_cache.clear()
            self._bm25_retriever = None
            
            logger.info(f"Successfully cleared {len(all_docs['ids'])} documents from vector store")
            return True
            
        except Exception as e:
            logger.error(f"Error clearing all documents: {e}")
            return False

# Global vector store manager instance
vector_store_manager = VectorStoreManager()