File size: 14,388 Bytes
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f1b4af
 
 
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f1b4af
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
63279a9
 
 
 
 
 
575f1c7
63279a9
575f1c7
 
63279a9
 
575f1c7
 
3f1b4af
575f1c7
 
 
 
 
63279a9
575f1c7
 
63279a9
 
3f1b4af
575f1c7
3f1b4af
 
 
575f1c7
3f1b4af
 
 
 
 
 
 
 
 
575f1c7
 
63279a9
575f1c7
 
3f1b4af
63279a9
3f1b4af
575f1c7
63279a9
 
 
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
3f1b4af
575f1c7
 
63279a9
575f1c7
 
3f1b4af
 
575f1c7
 
3f1b4af
 
 
575f1c7
 
 
 
 
 
 
 
63279a9
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f1b4af
575f1c7
 
 
 
 
 
 
 
63279a9
 
 
 
 
 
 
575f1c7
3f1b4af
 
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3f1b4af
575f1c7
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
"""Document ingestion pipeline for RAG functionality."""

import os
import hashlib
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
from datetime import datetime
from langchain_core.documents import Document
from src.rag.chunking import document_chunker
from src.rag.vector_store import vector_store_manager
from src.rag.embeddings import embedding_manager
from src.core.logging_config import get_logger

logger = get_logger(__name__)

class DocumentIngestionService:
    """Service for ingesting documents into the RAG system."""
    
    def __init__(self):
        """Initialize the document ingestion service."""
        logger.info("Document ingestion service initialized")
    
    def create_file_hash(self, content: str) -> str:
        """Create a full SHA-256 hash for file content to avoid duplicates."""
        return hashlib.sha256(content.encode('utf-8')).hexdigest()
    
    def prepare_document_metadata(self, 
                                source_path: Optional[str] = None, 
                                doc_type: str = "markdown",
                                additional_metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        Prepare metadata for a document.
        
        Args:
            source_path: Original document path
            doc_type: Type of document (markdown, pdf, etc.)
            additional_metadata: Additional metadata to include
            
        Returns:
            Dictionary with document metadata
        """
        metadata = {
            "source": source_path or "user_upload",
            "doc_type": doc_type,
            "processed_at": datetime.now().isoformat(),
            "ingestion_version": "1.0"
        }
        
        if additional_metadata:
            metadata.update(additional_metadata)
            
        return metadata
    
    def check_duplicate_in_vector_store(self, file_hash: str) -> bool:
        """Check if document with given file hash already exists in vector store."""
        try:
            existing_docs = vector_store_manager.get_vector_store()._collection.get(
                where={"file_hash": file_hash},
                limit=1
            )
            return len(existing_docs.get('ids', [])) > 0
        except Exception as e:
            logger.error(f"Error checking for duplicates: {e}")
            return False
    
    def delete_existing_document(self, file_hash: str) -> bool:
        """Delete existing document with given file hash from vector store."""
        try:
            vector_store_manager.get_vector_store()._collection.delete(
                where={"file_hash": file_hash}
            )
            logger.info(f"Deleted existing document with hash: {file_hash}")
            return True
        except Exception as e:
            logger.error(f"Error deleting existing document: {e}")
            return False
    
    def ingest_text_content(self, 
                           text_content: str, 
                           content_type: str = "markdown",
                           source_path: Optional[str] = None,
                           metadata: Optional[Dict[str, Any]] = None,
                           original_file_content: Optional[str] = None) -> Tuple[bool, str, Dict[str, Any]]:
        """
        Ingest text content (markdown or LaTeX) into the RAG system.
        
        Args:
            text_content: The text content to ingest (markdown or LaTeX)
            content_type: Type of content ("markdown" or "latex")
            source_path: Optional source path/filename
            metadata: Optional additional metadata
            original_file_content: Original file content for hash calculation
            
        Returns:
            Tuple of (success, message, ingestion_stats)
        """
        try:
            if not text_content or not text_content.strip():
                return False, "No content provided for ingestion", {}
            
            # Create file hash using original content if available, otherwise use text content
            file_content_for_hash = original_file_content or text_content
            file_hash = self.create_file_hash(file_content_for_hash)
            
            # Check for duplicates in vector store
            is_duplicate = self.check_duplicate_in_vector_store(file_hash)
            replacement_mode = False
            
            if is_duplicate:
                logger.info(f"Document with hash {file_hash} already exists, replacing...")
                # Delete existing document
                if self.delete_existing_document(file_hash):
                    replacement_mode = True
                else:
                    return False, "Failed to replace existing document", {"status": "error"}
            
            # Prepare document metadata with file hash
            doc_metadata = self.prepare_document_metadata(
                source_path=source_path,
                doc_type=content_type,  # Use content_type instead of hardcoded "markdown"
                additional_metadata=metadata
            )
            doc_metadata["file_hash"] = file_hash
            doc_metadata["content_length"] = len(text_content)
            doc_metadata["upload_timestamp"] = datetime.now().isoformat()
            
            # Chunk the document using text-aware chunking
            logger.info(f"Chunking {content_type} document: {file_hash}")
            chunks = document_chunker.chunk_document(text_content, doc_metadata)
            
            if not chunks:
                return False, "Failed to create document chunks", {}
            
            # Add chunks to vector store
            logger.info(f"Adding {len(chunks)} chunks to vector store")
            doc_ids = vector_store_manager.add_documents(chunks)
            
            if not doc_ids:
                return False, "Failed to add documents to vector store", {}
            
            # Prepare ingestion statistics
            ingestion_stats = {
                "status": "success",
                "file_hash": file_hash,
                "total_chunks": len(chunks),
                "document_ids": doc_ids,
                "content_length": len(text_content),
                "has_tables": any(chunk.metadata.get("has_table", False) for chunk in chunks),
                "has_code": any(chunk.metadata.get("has_code", False) for chunk in chunks),
                "processed_at": datetime.now().isoformat(),
                "replacement_mode": replacement_mode
            }
            
            action = "Updated existing" if replacement_mode else "Successfully ingested"
            success_msg = f"{action} document with {len(chunks)} chunks"
            logger.info(f"{success_msg}: {file_hash}")
            
            return True, success_msg, ingestion_stats
            
        except Exception as e:
            error_msg = f"Error during document ingestion: {str(e)}"
            logger.error(error_msg)
            return False, error_msg, {"status": "error", "error": str(e)}
    
    def ingest_markdown_content(self, 
                              markdown_content: str, 
                              source_path: Optional[str] = None,
                              metadata: Optional[Dict[str, Any]] = None,
                              original_file_content: Optional[str] = None) -> Tuple[bool, str, Dict[str, Any]]:
        """
        Backward compatibility method for ingesting markdown content.
        """
        return self.ingest_text_content(
            text_content=markdown_content,
            content_type="markdown",
            source_path=source_path,
            metadata=metadata,
            original_file_content=original_file_content
        )
    
    def ingest_from_conversion_result(self, conversion_result: Dict[str, Any]) -> Tuple[bool, str, Dict[str, Any]]:
        """
        Ingest a document from Markit conversion result.
        
        Args:
            conversion_result: Dictionary containing conversion results from Markit
            
        Returns:
            Tuple of (success, message, ingestion_stats)
        """
        try:
            # Extract markdown content from conversion result
            markdown_content = conversion_result.get("markdown_content", "")
            
            if not markdown_content:
                return False, "No markdown content found in conversion result", {}
            
            # Extract metadata from conversion result
            original_filename = conversion_result.get("original_filename", "unknown")
            conversion_method = conversion_result.get("conversion_method", "unknown")
            original_file_content = conversion_result.get("original_file_content")
            
            additional_metadata = {
                "original_filename": original_filename,
                "conversion_method": conversion_method,
                "file_size": conversion_result.get("file_size", 0),
                "conversion_time": conversion_result.get("conversion_time", 0)
            }
            
            # Determine content type based on conversion method
            content_type = "latex" if "GOT-OCR" in conversion_method else "markdown"
            
            # Ingest the content with original file content for proper hashing
            return self.ingest_text_content(
                text_content=markdown_content,
                content_type=content_type,
                source_path=original_filename,
                metadata=additional_metadata,
                original_file_content=original_file_content
            )
            
        except Exception as e:
            error_msg = f"Error ingesting from conversion result: {str(e)}"
            logger.error(error_msg)
            return False, error_msg, {"status": "error", "error": str(e)}
    
    def get_ingestion_status(self) -> Dict[str, Any]:
        """
        Get current ingestion system status.
        
        Returns:
            Dictionary with system status information
        """
        status = {
            "processed_documents": 0,  # Will be updated from vector store
            "embedding_model_available": False,
            "vector_store_available": False,
            "system_ready": False
        }
        
        try:
            # Check embedding model
            status["embedding_model_available"] = embedding_manager.test_embedding_model()
            
            # Check vector store
            collection_info = vector_store_manager.get_collection_info()
            status["vector_store_available"] = "error" not in collection_info
            status["total_documents_in_store"] = collection_info.get("document_count", 0)
            
            # System is ready if both components are available
            status["system_ready"] = (
                status["embedding_model_available"] and 
                status["vector_store_available"]
            )
            
        except Exception as e:
            logger.error(f"Error checking ingestion status: {e}")
            status["error"] = str(e)
        
        return status
    
    
    def test_ingestion_pipeline(self) -> Dict[str, Any]:
        """
        Test the complete ingestion pipeline with sample content.
        
        Returns:
            Dictionary with test results
        """
        test_results = {
            "pipeline_test": False,
            "chunking_test": False,
            "embedding_test": False,
            "vector_store_test": False,
            "errors": []
        }
        
        try:
            # Test with sample markdown content
            test_content = """# Test Document

This is a test document for the RAG ingestion pipeline.

## Features

- Document chunking
- Embedding generation
- Vector store integration

## Sample Table

| Feature | Status | Priority |
|---------|--------|----------|
| Chunking | βœ… | High |
| Embeddings | βœ… | High |
| Vector Store | βœ… | Medium |

```python
# Sample code block
def test_function():
    return "Hello, RAG!"
```

This document contains various markdown elements to test the ingestion pipeline.
"""
            
            # Test chunking
            metadata = self.prepare_document_metadata(
                source_path="test_document.md",
                doc_type="markdown"
            )
            
            chunks = document_chunker.chunk_document(test_content, metadata)
            test_results["chunking_test"] = len(chunks) > 0
            
            if not test_results["chunking_test"]:
                test_results["errors"].append("Chunking test failed: No chunks created")
                return test_results
            
            # Test embedding
            test_results["embedding_test"] = embedding_manager.test_embedding_model()
            
            if not test_results["embedding_test"]:
                test_results["errors"].append("Embedding test failed")
                return test_results
            
            # Test vector store (add and retrieve)
            doc_ids = vector_store_manager.add_documents(chunks[:1])  # Test with one chunk
            test_results["vector_store_test"] = len(doc_ids) > 0
            
            if test_results["vector_store_test"]:
                # Test retrieval
                search_results = vector_store_manager.similarity_search("test document", k=1)
                test_results["vector_store_test"] = len(search_results) > 0
            
            if not test_results["vector_store_test"]:
                test_results["errors"].append("Vector store test failed")
                return test_results
            
            # Overall pipeline test
            test_results["pipeline_test"] = (
                test_results["chunking_test"] and 
                test_results["embedding_test"] and 
                test_results["vector_store_test"]
            )
            
            logger.info(f"Ingestion pipeline test completed: {test_results['pipeline_test']}")
            
        except Exception as e:
            error_msg = f"Pipeline test error: {str(e)}"
            test_results["errors"].append(error_msg)
            logger.error(error_msg)
        
        return test_results

# Global document ingestion service instance
document_ingestion_service = DocumentIngestionService()