Spaces:
Running
on
Zero
Running
on
Zero
"""Document ingestion pipeline for RAG functionality.""" | |
import os | |
import hashlib | |
from typing import List, Dict, Any, Optional, Tuple | |
from pathlib import Path | |
from datetime import datetime | |
from langchain_core.documents import Document | |
from src.rag.chunking import document_chunker | |
from src.rag.vector_store import vector_store_manager | |
from src.rag.embeddings import embedding_manager | |
from src.core.logging_config import get_logger | |
logger = get_logger(__name__) | |
class DocumentIngestionService: | |
"""Service for ingesting documents into the RAG system.""" | |
def __init__(self): | |
"""Initialize the document ingestion service.""" | |
logger.info("Document ingestion service initialized") | |
def create_file_hash(self, content: str) -> str: | |
"""Create a full SHA-256 hash for file content to avoid duplicates.""" | |
return hashlib.sha256(content.encode('utf-8')).hexdigest() | |
def prepare_document_metadata(self, | |
source_path: Optional[str] = None, | |
doc_type: str = "markdown", | |
additional_metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: | |
""" | |
Prepare metadata for a document. | |
Args: | |
source_path: Original document path | |
doc_type: Type of document (markdown, pdf, etc.) | |
additional_metadata: Additional metadata to include | |
Returns: | |
Dictionary with document metadata | |
""" | |
metadata = { | |
"source": source_path or "user_upload", | |
"doc_type": doc_type, | |
"processed_at": datetime.now().isoformat(), | |
"ingestion_version": "1.0" | |
} | |
if additional_metadata: | |
metadata.update(additional_metadata) | |
return metadata | |
def check_duplicate_in_vector_store(self, file_hash: str) -> bool: | |
"""Check if document with given file hash already exists in vector store.""" | |
try: | |
existing_docs = vector_store_manager.get_vector_store()._collection.get( | |
where={"file_hash": file_hash}, | |
limit=1 | |
) | |
return len(existing_docs.get('ids', [])) > 0 | |
except Exception as e: | |
logger.error(f"Error checking for duplicates: {e}") | |
return False | |
def delete_existing_document(self, file_hash: str) -> bool: | |
"""Delete existing document with given file hash from vector store.""" | |
try: | |
vector_store_manager.get_vector_store()._collection.delete( | |
where={"file_hash": file_hash} | |
) | |
logger.info(f"Deleted existing document with hash: {file_hash}") | |
return True | |
except Exception as e: | |
logger.error(f"Error deleting existing document: {e}") | |
return False | |
def ingest_text_content(self, | |
text_content: str, | |
content_type: str = "markdown", | |
source_path: Optional[str] = None, | |
metadata: Optional[Dict[str, Any]] = None, | |
original_file_content: Optional[str] = None) -> Tuple[bool, str, Dict[str, Any]]: | |
""" | |
Ingest text content (markdown or LaTeX) into the RAG system. | |
Args: | |
text_content: The text content to ingest (markdown or LaTeX) | |
content_type: Type of content ("markdown" or "latex") | |
source_path: Optional source path/filename | |
metadata: Optional additional metadata | |
original_file_content: Original file content for hash calculation | |
Returns: | |
Tuple of (success, message, ingestion_stats) | |
""" | |
try: | |
if not text_content or not text_content.strip(): | |
return False, "No content provided for ingestion", {} | |
# Create file hash using original content if available, otherwise use text content | |
file_content_for_hash = original_file_content or text_content | |
file_hash = self.create_file_hash(file_content_for_hash) | |
# Check for duplicates in vector store | |
is_duplicate = self.check_duplicate_in_vector_store(file_hash) | |
replacement_mode = False | |
if is_duplicate: | |
logger.info(f"Document with hash {file_hash} already exists, replacing...") | |
# Delete existing document | |
if self.delete_existing_document(file_hash): | |
replacement_mode = True | |
else: | |
return False, "Failed to replace existing document", {"status": "error"} | |
# Prepare document metadata with file hash | |
doc_metadata = self.prepare_document_metadata( | |
source_path=source_path, | |
doc_type=content_type, # Use content_type instead of hardcoded "markdown" | |
additional_metadata=metadata | |
) | |
doc_metadata["file_hash"] = file_hash | |
doc_metadata["content_length"] = len(text_content) | |
doc_metadata["upload_timestamp"] = datetime.now().isoformat() | |
# Chunk the document using text-aware chunking | |
logger.info(f"Chunking {content_type} document: {file_hash}") | |
chunks = document_chunker.chunk_document(text_content, doc_metadata) | |
if not chunks: | |
return False, "Failed to create document chunks", {} | |
# Add chunks to vector store | |
logger.info(f"Adding {len(chunks)} chunks to vector store") | |
doc_ids = vector_store_manager.add_documents(chunks) | |
if not doc_ids: | |
return False, "Failed to add documents to vector store", {} | |
# Prepare ingestion statistics | |
ingestion_stats = { | |
"status": "success", | |
"file_hash": file_hash, | |
"total_chunks": len(chunks), | |
"document_ids": doc_ids, | |
"content_length": len(text_content), | |
"has_tables": any(chunk.metadata.get("has_table", False) for chunk in chunks), | |
"has_code": any(chunk.metadata.get("has_code", False) for chunk in chunks), | |
"processed_at": datetime.now().isoformat(), | |
"replacement_mode": replacement_mode | |
} | |
action = "Updated existing" if replacement_mode else "Successfully ingested" | |
success_msg = f"{action} document with {len(chunks)} chunks" | |
logger.info(f"{success_msg}: {file_hash}") | |
return True, success_msg, ingestion_stats | |
except Exception as e: | |
error_msg = f"Error during document ingestion: {str(e)}" | |
logger.error(error_msg) | |
return False, error_msg, {"status": "error", "error": str(e)} | |
def ingest_markdown_content(self, | |
markdown_content: str, | |
source_path: Optional[str] = None, | |
metadata: Optional[Dict[str, Any]] = None, | |
original_file_content: Optional[str] = None) -> Tuple[bool, str, Dict[str, Any]]: | |
""" | |
Backward compatibility method for ingesting markdown content. | |
""" | |
return self.ingest_text_content( | |
text_content=markdown_content, | |
content_type="markdown", | |
source_path=source_path, | |
metadata=metadata, | |
original_file_content=original_file_content | |
) | |
def ingest_from_conversion_result(self, conversion_result: Dict[str, Any]) -> Tuple[bool, str, Dict[str, Any]]: | |
""" | |
Ingest a document from Markit conversion result. | |
Args: | |
conversion_result: Dictionary containing conversion results from Markit | |
Returns: | |
Tuple of (success, message, ingestion_stats) | |
""" | |
try: | |
# Extract markdown content from conversion result | |
markdown_content = conversion_result.get("markdown_content", "") | |
if not markdown_content: | |
return False, "No markdown content found in conversion result", {} | |
# Extract metadata from conversion result | |
original_filename = conversion_result.get("original_filename", "unknown") | |
conversion_method = conversion_result.get("conversion_method", "unknown") | |
original_file_content = conversion_result.get("original_file_content") | |
additional_metadata = { | |
"original_filename": original_filename, | |
"conversion_method": conversion_method, | |
"file_size": conversion_result.get("file_size", 0), | |
"conversion_time": conversion_result.get("conversion_time", 0) | |
} | |
# Determine content type based on conversion method | |
content_type = "latex" if "GOT-OCR" in conversion_method else "markdown" | |
# Ingest the content with original file content for proper hashing | |
return self.ingest_text_content( | |
text_content=markdown_content, | |
content_type=content_type, | |
source_path=original_filename, | |
metadata=additional_metadata, | |
original_file_content=original_file_content | |
) | |
except Exception as e: | |
error_msg = f"Error ingesting from conversion result: {str(e)}" | |
logger.error(error_msg) | |
return False, error_msg, {"status": "error", "error": str(e)} | |
def get_ingestion_status(self) -> Dict[str, Any]: | |
""" | |
Get current ingestion system status. | |
Returns: | |
Dictionary with system status information | |
""" | |
status = { | |
"processed_documents": 0, # Will be updated from vector store | |
"embedding_model_available": False, | |
"vector_store_available": False, | |
"system_ready": False | |
} | |
try: | |
# Check embedding model | |
status["embedding_model_available"] = embedding_manager.test_embedding_model() | |
# Check vector store | |
collection_info = vector_store_manager.get_collection_info() | |
status["vector_store_available"] = "error" not in collection_info | |
status["total_documents_in_store"] = collection_info.get("document_count", 0) | |
# System is ready if both components are available | |
status["system_ready"] = ( | |
status["embedding_model_available"] and | |
status["vector_store_available"] | |
) | |
except Exception as e: | |
logger.error(f"Error checking ingestion status: {e}") | |
status["error"] = str(e) | |
return status | |
def test_ingestion_pipeline(self) -> Dict[str, Any]: | |
""" | |
Test the complete ingestion pipeline with sample content. | |
Returns: | |
Dictionary with test results | |
""" | |
test_results = { | |
"pipeline_test": False, | |
"chunking_test": False, | |
"embedding_test": False, | |
"vector_store_test": False, | |
"errors": [] | |
} | |
try: | |
# Test with sample markdown content | |
test_content = """# Test Document | |
This is a test document for the RAG ingestion pipeline. | |
## Features | |
- Document chunking | |
- Embedding generation | |
- Vector store integration | |
## Sample Table | |
| Feature | Status | Priority | | |
|---------|--------|----------| | |
| Chunking | β | High | | |
| Embeddings | β | High | | |
| Vector Store | β | Medium | | |
```python | |
# Sample code block | |
def test_function(): | |
return "Hello, RAG!" | |
``` | |
This document contains various markdown elements to test the ingestion pipeline. | |
""" | |
# Test chunking | |
metadata = self.prepare_document_metadata( | |
source_path="test_document.md", | |
doc_type="markdown" | |
) | |
chunks = document_chunker.chunk_document(test_content, metadata) | |
test_results["chunking_test"] = len(chunks) > 0 | |
if not test_results["chunking_test"]: | |
test_results["errors"].append("Chunking test failed: No chunks created") | |
return test_results | |
# Test embedding | |
test_results["embedding_test"] = embedding_manager.test_embedding_model() | |
if not test_results["embedding_test"]: | |
test_results["errors"].append("Embedding test failed") | |
return test_results | |
# Test vector store (add and retrieve) | |
doc_ids = vector_store_manager.add_documents(chunks[:1]) # Test with one chunk | |
test_results["vector_store_test"] = len(doc_ids) > 0 | |
if test_results["vector_store_test"]: | |
# Test retrieval | |
search_results = vector_store_manager.similarity_search("test document", k=1) | |
test_results["vector_store_test"] = len(search_results) > 0 | |
if not test_results["vector_store_test"]: | |
test_results["errors"].append("Vector store test failed") | |
return test_results | |
# Overall pipeline test | |
test_results["pipeline_test"] = ( | |
test_results["chunking_test"] and | |
test_results["embedding_test"] and | |
test_results["vector_store_test"] | |
) | |
logger.info(f"Ingestion pipeline test completed: {test_results['pipeline_test']}") | |
except Exception as e: | |
error_msg = f"Pipeline test error: {str(e)}" | |
test_results["errors"].append(error_msg) | |
logger.error(error_msg) | |
return test_results | |
# Global document ingestion service instance | |
document_ingestion_service = DocumentIngestionService() |