Markit_v2 / src /rag /ingestion.py
AnseMin's picture
Refactor document ingestion and chunking to support LaTeX content
63279a9
"""Document ingestion pipeline for RAG functionality."""
import os
import hashlib
from typing import List, Dict, Any, Optional, Tuple
from pathlib import Path
from datetime import datetime
from langchain_core.documents import Document
from src.rag.chunking import document_chunker
from src.rag.vector_store import vector_store_manager
from src.rag.embeddings import embedding_manager
from src.core.logging_config import get_logger
logger = get_logger(__name__)
class DocumentIngestionService:
"""Service for ingesting documents into the RAG system."""
def __init__(self):
"""Initialize the document ingestion service."""
logger.info("Document ingestion service initialized")
def create_file_hash(self, content: str) -> str:
"""Create a full SHA-256 hash for file content to avoid duplicates."""
return hashlib.sha256(content.encode('utf-8')).hexdigest()
def prepare_document_metadata(self,
source_path: Optional[str] = None,
doc_type: str = "markdown",
additional_metadata: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
Prepare metadata for a document.
Args:
source_path: Original document path
doc_type: Type of document (markdown, pdf, etc.)
additional_metadata: Additional metadata to include
Returns:
Dictionary with document metadata
"""
metadata = {
"source": source_path or "user_upload",
"doc_type": doc_type,
"processed_at": datetime.now().isoformat(),
"ingestion_version": "1.0"
}
if additional_metadata:
metadata.update(additional_metadata)
return metadata
def check_duplicate_in_vector_store(self, file_hash: str) -> bool:
"""Check if document with given file hash already exists in vector store."""
try:
existing_docs = vector_store_manager.get_vector_store()._collection.get(
where={"file_hash": file_hash},
limit=1
)
return len(existing_docs.get('ids', [])) > 0
except Exception as e:
logger.error(f"Error checking for duplicates: {e}")
return False
def delete_existing_document(self, file_hash: str) -> bool:
"""Delete existing document with given file hash from vector store."""
try:
vector_store_manager.get_vector_store()._collection.delete(
where={"file_hash": file_hash}
)
logger.info(f"Deleted existing document with hash: {file_hash}")
return True
except Exception as e:
logger.error(f"Error deleting existing document: {e}")
return False
def ingest_text_content(self,
text_content: str,
content_type: str = "markdown",
source_path: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
original_file_content: Optional[str] = None) -> Tuple[bool, str, Dict[str, Any]]:
"""
Ingest text content (markdown or LaTeX) into the RAG system.
Args:
text_content: The text content to ingest (markdown or LaTeX)
content_type: Type of content ("markdown" or "latex")
source_path: Optional source path/filename
metadata: Optional additional metadata
original_file_content: Original file content for hash calculation
Returns:
Tuple of (success, message, ingestion_stats)
"""
try:
if not text_content or not text_content.strip():
return False, "No content provided for ingestion", {}
# Create file hash using original content if available, otherwise use text content
file_content_for_hash = original_file_content or text_content
file_hash = self.create_file_hash(file_content_for_hash)
# Check for duplicates in vector store
is_duplicate = self.check_duplicate_in_vector_store(file_hash)
replacement_mode = False
if is_duplicate:
logger.info(f"Document with hash {file_hash} already exists, replacing...")
# Delete existing document
if self.delete_existing_document(file_hash):
replacement_mode = True
else:
return False, "Failed to replace existing document", {"status": "error"}
# Prepare document metadata with file hash
doc_metadata = self.prepare_document_metadata(
source_path=source_path,
doc_type=content_type, # Use content_type instead of hardcoded "markdown"
additional_metadata=metadata
)
doc_metadata["file_hash"] = file_hash
doc_metadata["content_length"] = len(text_content)
doc_metadata["upload_timestamp"] = datetime.now().isoformat()
# Chunk the document using text-aware chunking
logger.info(f"Chunking {content_type} document: {file_hash}")
chunks = document_chunker.chunk_document(text_content, doc_metadata)
if not chunks:
return False, "Failed to create document chunks", {}
# Add chunks to vector store
logger.info(f"Adding {len(chunks)} chunks to vector store")
doc_ids = vector_store_manager.add_documents(chunks)
if not doc_ids:
return False, "Failed to add documents to vector store", {}
# Prepare ingestion statistics
ingestion_stats = {
"status": "success",
"file_hash": file_hash,
"total_chunks": len(chunks),
"document_ids": doc_ids,
"content_length": len(text_content),
"has_tables": any(chunk.metadata.get("has_table", False) for chunk in chunks),
"has_code": any(chunk.metadata.get("has_code", False) for chunk in chunks),
"processed_at": datetime.now().isoformat(),
"replacement_mode": replacement_mode
}
action = "Updated existing" if replacement_mode else "Successfully ingested"
success_msg = f"{action} document with {len(chunks)} chunks"
logger.info(f"{success_msg}: {file_hash}")
return True, success_msg, ingestion_stats
except Exception as e:
error_msg = f"Error during document ingestion: {str(e)}"
logger.error(error_msg)
return False, error_msg, {"status": "error", "error": str(e)}
def ingest_markdown_content(self,
markdown_content: str,
source_path: Optional[str] = None,
metadata: Optional[Dict[str, Any]] = None,
original_file_content: Optional[str] = None) -> Tuple[bool, str, Dict[str, Any]]:
"""
Backward compatibility method for ingesting markdown content.
"""
return self.ingest_text_content(
text_content=markdown_content,
content_type="markdown",
source_path=source_path,
metadata=metadata,
original_file_content=original_file_content
)
def ingest_from_conversion_result(self, conversion_result: Dict[str, Any]) -> Tuple[bool, str, Dict[str, Any]]:
"""
Ingest a document from Markit conversion result.
Args:
conversion_result: Dictionary containing conversion results from Markit
Returns:
Tuple of (success, message, ingestion_stats)
"""
try:
# Extract markdown content from conversion result
markdown_content = conversion_result.get("markdown_content", "")
if not markdown_content:
return False, "No markdown content found in conversion result", {}
# Extract metadata from conversion result
original_filename = conversion_result.get("original_filename", "unknown")
conversion_method = conversion_result.get("conversion_method", "unknown")
original_file_content = conversion_result.get("original_file_content")
additional_metadata = {
"original_filename": original_filename,
"conversion_method": conversion_method,
"file_size": conversion_result.get("file_size", 0),
"conversion_time": conversion_result.get("conversion_time", 0)
}
# Determine content type based on conversion method
content_type = "latex" if "GOT-OCR" in conversion_method else "markdown"
# Ingest the content with original file content for proper hashing
return self.ingest_text_content(
text_content=markdown_content,
content_type=content_type,
source_path=original_filename,
metadata=additional_metadata,
original_file_content=original_file_content
)
except Exception as e:
error_msg = f"Error ingesting from conversion result: {str(e)}"
logger.error(error_msg)
return False, error_msg, {"status": "error", "error": str(e)}
def get_ingestion_status(self) -> Dict[str, Any]:
"""
Get current ingestion system status.
Returns:
Dictionary with system status information
"""
status = {
"processed_documents": 0, # Will be updated from vector store
"embedding_model_available": False,
"vector_store_available": False,
"system_ready": False
}
try:
# Check embedding model
status["embedding_model_available"] = embedding_manager.test_embedding_model()
# Check vector store
collection_info = vector_store_manager.get_collection_info()
status["vector_store_available"] = "error" not in collection_info
status["total_documents_in_store"] = collection_info.get("document_count", 0)
# System is ready if both components are available
status["system_ready"] = (
status["embedding_model_available"] and
status["vector_store_available"]
)
except Exception as e:
logger.error(f"Error checking ingestion status: {e}")
status["error"] = str(e)
return status
def test_ingestion_pipeline(self) -> Dict[str, Any]:
"""
Test the complete ingestion pipeline with sample content.
Returns:
Dictionary with test results
"""
test_results = {
"pipeline_test": False,
"chunking_test": False,
"embedding_test": False,
"vector_store_test": False,
"errors": []
}
try:
# Test with sample markdown content
test_content = """# Test Document
This is a test document for the RAG ingestion pipeline.
## Features
- Document chunking
- Embedding generation
- Vector store integration
## Sample Table
| Feature | Status | Priority |
|---------|--------|----------|
| Chunking | βœ… | High |
| Embeddings | βœ… | High |
| Vector Store | βœ… | Medium |
```python
# Sample code block
def test_function():
return "Hello, RAG!"
```
This document contains various markdown elements to test the ingestion pipeline.
"""
# Test chunking
metadata = self.prepare_document_metadata(
source_path="test_document.md",
doc_type="markdown"
)
chunks = document_chunker.chunk_document(test_content, metadata)
test_results["chunking_test"] = len(chunks) > 0
if not test_results["chunking_test"]:
test_results["errors"].append("Chunking test failed: No chunks created")
return test_results
# Test embedding
test_results["embedding_test"] = embedding_manager.test_embedding_model()
if not test_results["embedding_test"]:
test_results["errors"].append("Embedding test failed")
return test_results
# Test vector store (add and retrieve)
doc_ids = vector_store_manager.add_documents(chunks[:1]) # Test with one chunk
test_results["vector_store_test"] = len(doc_ids) > 0
if test_results["vector_store_test"]:
# Test retrieval
search_results = vector_store_manager.similarity_search("test document", k=1)
test_results["vector_store_test"] = len(search_results) > 0
if not test_results["vector_store_test"]:
test_results["errors"].append("Vector store test failed")
return test_results
# Overall pipeline test
test_results["pipeline_test"] = (
test_results["chunking_test"] and
test_results["embedding_test"] and
test_results["vector_store_test"]
)
logger.info(f"Ingestion pipeline test completed: {test_results['pipeline_test']}")
except Exception as e:
error_msg = f"Pipeline test error: {str(e)}"
test_results["errors"].append(error_msg)
logger.error(error_msg)
return test_results
# Global document ingestion service instance
document_ingestion_service = DocumentIngestionService()