Markit_v2 / src /rag /chunking.py
AnseMin's picture
Refactor document ingestion and chunking to support LaTeX content
63279a9
"""Text chunking strategies for RAG document processing."""
import re
from typing import List, Dict, Any, Tuple
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.documents import Document
from src.core.logging_config import get_logger
logger = get_logger(__name__)
class LaTeXAwareChunker:
"""Handles LaTeX-aware document chunking that preserves LaTeX structures."""
def __init__(self, chunk_size: int = 1200, chunk_overlap: int = 150):
"""
Initialize the LaTeX-aware document chunker.
Args:
chunk_size: Maximum size of each chunk in characters
chunk_overlap: Number of characters to overlap between chunks
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Initialize the text splitter with LaTeX-aware settings
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=[
"\n\\section{", # Section headers
"\n\\subsection{", # Subsection headers
"\n\\subsubsection{", # Subsubsection headers
"\n\\title{", # Title commands
"\n\\begin{", # Begin environments
"\n\\end{", # End environments
"\n\n", # Paragraph breaks
"\n", # Line breaks
". ", # Sentence breaks
" ", # Word breaks
"" # Character breaks
],
keep_separator=True,
add_start_index=True
)
# Regex patterns for LaTeX structures
self.latex_table_pattern = re.compile(
r'\\begin\{tabular\}.*?\\end\{tabular\}',
re.DOTALL | re.MULTILINE
)
self.latex_title_pattern = re.compile(
r'\\title\{[^}]*\}',
re.MULTILINE
)
self.latex_section_pattern = re.compile(
r'\\(?:sub)*section\*?\{[^}]*\}',
re.MULTILINE
)
self.latex_environment_pattern = re.compile(
r'\\begin\{[^}]+\}.*?\\end\{[^}]+\}',
re.DOTALL | re.MULTILINE
)
logger.info(f"LaTeX-aware chunker initialized with chunk_size={chunk_size}, overlap={chunk_overlap}")
def extract_latex_structures(self, content: str) -> Tuple[List[Tuple[int, int, str]], str]:
"""
Extract LaTeX tables and environments, replacing them with placeholders.
Args:
content: Original LaTeX content
Returns:
Tuple of (structures_list, content_with_placeholders)
"""
structures = []
# Find all tabular environments (highest priority)
for match in self.latex_table_pattern.finditer(content):
structures.append((
match.start(),
match.end(),
"latex_table",
match.group()
))
# Find other LaTeX environments (avoid overlapping with tables)
for match in self.latex_environment_pattern.finditer(content):
# Check if this environment overlaps with any table
overlaps_with_table = any(
table_start <= match.start() < table_end or
table_start < match.end() <= table_end
for table_start, table_end, struct_type, _ in structures
if struct_type == "latex_table"
)
if not overlaps_with_table and "tabular" not in match.group():
structures.append((
match.start(),
match.end(),
"latex_environment",
match.group()
))
# Find titles and sections
for match in self.latex_title_pattern.finditer(content):
structures.append((
match.start(),
match.end(),
"latex_title",
match.group()
))
for match in self.latex_section_pattern.finditer(content):
structures.append((
match.start(),
match.end(),
"latex_section",
match.group()
))
# Sort by start position
structures.sort(key=lambda x: x[0])
# Replace structures with placeholders
content_with_placeholders = content
offset = 0
for i, (start, end, struct_type, struct_content) in enumerate(structures):
placeholder = f"\n\n__LATEX_STRUCTURE_{i}_{struct_type.upper()}__\n\n"
# Adjust positions based on previous replacements
adjusted_start = start - offset
adjusted_end = end - offset
content_with_placeholders = (
content_with_placeholders[:adjusted_start] +
placeholder +
content_with_placeholders[adjusted_end:]
)
# Update offset for next replacement
offset += (end - start) - len(placeholder)
return structures, content_with_placeholders
def restore_latex_structures(self, chunks: List[str], structures: List[Tuple[int, int, str, str]]) -> List[str]:
"""
Restore LaTeX structures in chunks, keeping tables and environments intact.
Args:
chunks: List of text chunks with placeholders
structures: List of original structures
Returns:
List of chunks with restored structures
"""
restored_chunks = []
for chunk in chunks:
restored_chunk = chunk
# Find placeholders in this chunk
placeholder_pattern = re.compile(r'__LATEX_STRUCTURE_(\d+)_(\w+)__')
for match in placeholder_pattern.finditer(chunk):
structure_index = int(match.group(1))
if structure_index < len(structures):
original_structure = structures[structure_index][3]
restored_chunk = restored_chunk.replace(match.group(), original_structure)
restored_chunks.append(restored_chunk)
return restored_chunks
def chunk_document(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]:
"""
Chunk a LaTeX document while preserving LaTeX structures.
Args:
content: The LaTeX content to chunk
source_metadata: Metadata about the source document
Returns:
List of Document objects with chunked content and enhanced metadata
"""
try:
# Extract LaTeX structures and replace with placeholders
structures, content_with_placeholders = self.extract_latex_structures(content)
# Create a document object with placeholders
doc = Document(
page_content=content_with_placeholders,
metadata=source_metadata
)
# Split the document into chunks
chunks = self.text_splitter.split_documents([doc])
# Restore LaTeX structures in chunks
chunk_contents = [chunk.page_content for chunk in chunks]
restored_contents = self.restore_latex_structures(chunk_contents, structures)
# Create enhanced chunks with restored content
enhanced_chunks = []
for i, (chunk, restored_content) in enumerate(zip(chunks, restored_contents)):
# Add chunk-specific metadata
chunk.metadata.update({
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size": len(restored_content),
"chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}",
"has_latex_table": "\\begin{tabular}" in restored_content,
"has_latex_environment": "\\begin{" in restored_content and "\\end{" in restored_content,
"has_latex_math": "\\(" in restored_content or "$" in restored_content,
"content_type": "latex"
})
# Update the chunk content with restored structures
chunk.page_content = restored_content
enhanced_chunks.append(chunk)
logger.info(f"LaTeX document chunked into {len(enhanced_chunks)} structure-aware pieces")
return enhanced_chunks
except Exception as e:
logger.error(f"Error chunking LaTeX document: {e}")
# Fallback to regular chunking if LaTeX processing fails
return self._fallback_chunk(content, source_metadata)
def _fallback_chunk(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]:
"""Fallback chunking method if LaTeX-aware chunking fails."""
try:
doc = Document(page_content=content, metadata=source_metadata)
chunks = self.text_splitter.split_documents([doc])
for i, chunk in enumerate(chunks):
chunk.metadata.update({
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size": len(chunk.page_content),
"chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}",
"content_type": "latex"
})
logger.warning(f"Used fallback chunking for LaTeX content: {len(chunks)} pieces")
return chunks
except Exception as e:
logger.error(f"Error in LaTeX fallback chunking: {e}")
raise
class MarkdownAwareChunker:
"""Handles markdown-aware document chunking that preserves tables and structures."""
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200):
"""
Initialize the markdown-aware document chunker.
Args:
chunk_size: Maximum size of each chunk in characters
chunk_overlap: Number of characters to overlap between chunks
"""
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
# Initialize the text splitter with markdown-aware settings
self.text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=chunk_overlap,
length_function=len,
separators=[
"\n\n", # Paragraphs and sections
"\n# ", # H1 headers
"\n## ", # H2 headers
"\n### ", # H3 headers
"\n\n---\n\n", # Horizontal rules
"\n", # Lines
" ", # Words
".", # Sentences
",", # Clauses
"" # Characters
],
keep_separator=True,
add_start_index=True
)
# Regex patterns for markdown structures
self.table_pattern = re.compile(
r'(\|.*\|.*\n)+(\|[-\s|:]+\|.*\n)(\|.*\|.*\n)*',
re.MULTILINE
)
self.code_block_pattern = re.compile(
r'```[\s\S]*?```',
re.MULTILINE
)
logger.info(f"Markdown-aware chunker initialized with chunk_size={chunk_size}, overlap={chunk_overlap}")
def extract_markdown_structures(self, content: str) -> Tuple[List[Tuple[int, int, str]], str]:
"""
Extract markdown tables and code blocks, replacing them with placeholders.
Args:
content: Original markdown content
Returns:
Tuple of (structures_list, content_with_placeholders)
where structures_list contains (start, end, type, content) tuples
"""
structures = []
# Find all tables
for match in self.table_pattern.finditer(content):
structures.append((
match.start(),
match.end(),
"table",
match.group()
))
# Find all code blocks
for match in self.code_block_pattern.finditer(content):
structures.append((
match.start(),
match.end(),
"code_block",
match.group()
))
# Sort by start position
structures.sort(key=lambda x: x[0])
# Replace structures with placeholders
content_with_placeholders = content
offset = 0
for i, (start, end, struct_type, struct_content) in enumerate(structures):
placeholder = f"\n\n__STRUCTURE_{i}_{struct_type.upper()}__\n\n"
# Adjust positions based on previous replacements
adjusted_start = start - offset
adjusted_end = end - offset
content_with_placeholders = (
content_with_placeholders[:adjusted_start] +
placeholder +
content_with_placeholders[adjusted_end:]
)
# Update offset for next replacement
offset += (end - start) - len(placeholder)
return structures, content_with_placeholders
def restore_structures(self, chunks: List[str], structures: List[Tuple[int, int, str, str]]) -> List[str]:
"""
Restore markdown structures in chunks, keeping tables and code blocks intact.
Args:
chunks: List of text chunks with placeholders
structures: List of original structures
Returns:
List of chunks with restored structures
"""
restored_chunks = []
for chunk in chunks:
restored_chunk = chunk
# Find placeholders in this chunk
placeholder_pattern = re.compile(r'__STRUCTURE_(\d+)_(\w+)__')
for match in placeholder_pattern.finditer(chunk):
structure_index = int(match.group(1))
if structure_index < len(structures):
original_structure = structures[structure_index][3]
restored_chunk = restored_chunk.replace(match.group(), original_structure)
restored_chunks.append(restored_chunk)
return restored_chunks
def chunk_document(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]:
"""
Chunk a markdown document while preserving tables and code blocks.
Args:
content: The markdown content to chunk
source_metadata: Metadata about the source document
Returns:
List of Document objects with chunked content and enhanced metadata
"""
try:
# Extract markdown structures (tables, code blocks) and replace with placeholders
structures, content_with_placeholders = self.extract_markdown_structures(content)
# Create a document object with placeholders
doc = Document(
page_content=content_with_placeholders,
metadata=source_metadata
)
# Split the document into chunks
chunks = self.text_splitter.split_documents([doc])
# Restore markdown structures in chunks
chunk_contents = [chunk.page_content for chunk in chunks]
restored_contents = self.restore_structures(chunk_contents, structures)
# Create enhanced chunks with restored content
enhanced_chunks = []
for i, (chunk, restored_content) in enumerate(zip(chunks, restored_contents)):
# Add chunk-specific metadata
chunk.metadata.update({
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size": len(restored_content),
"chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}",
"has_table": "table" in restored_content.lower() and "|" in restored_content,
"has_code": "```" in restored_content
})
# Update the chunk content with restored structures
chunk.page_content = restored_content
enhanced_chunks.append(chunk)
logger.info(f"Document chunked into {len(enhanced_chunks)} markdown-aware pieces")
return enhanced_chunks
except Exception as e:
logger.error(f"Error chunking markdown document: {e}")
# Fallback to regular chunking if markdown processing fails
return self._fallback_chunk(content, source_metadata)
def _fallback_chunk(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]:
"""Fallback chunking method if markdown-aware chunking fails."""
try:
doc = Document(page_content=content, metadata=source_metadata)
chunks = self.text_splitter.split_documents([doc])
for i, chunk in enumerate(chunks):
chunk.metadata.update({
"chunk_index": i,
"total_chunks": len(chunks),
"chunk_size": len(chunk.page_content),
"chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}"
})
logger.warning(f"Used fallback chunking for {len(chunks)} pieces")
return chunks
except Exception as e:
logger.error(f"Error in fallback chunking: {e}")
raise
def chunk_multiple_documents(self, documents: List[Dict[str, Any]]) -> List[Document]:
"""
Chunk multiple documents for batch processing.
Args:
documents: List of dictionaries with 'content' and 'metadata' keys
Returns:
List of chunked Document objects
"""
all_chunks = []
for doc_data in documents:
content = doc_data.get('content', '')
metadata = doc_data.get('metadata', {})
if content.strip(): # Only process non-empty content
chunks = self.chunk_document(content, metadata)
all_chunks.extend(chunks)
logger.info(f"Chunked {len(documents)} documents into {len(all_chunks)} total chunks")
return all_chunks
def get_chunk_preview(self, chunks: List[Document], max_chunks: int = 5) -> str:
"""
Generate a preview of chunks for debugging/logging.
Args:
chunks: List of Document chunks
max_chunks: Maximum number of chunks to include in preview
Returns:
String preview of chunks
"""
preview = f"Document Chunks Preview ({len(chunks)} total chunks):\n"
preview += "=" * 50 + "\n"
for i, chunk in enumerate(chunks[:max_chunks]):
has_table = chunk.metadata.get('has_table', False)
has_code = chunk.metadata.get('has_code', False)
preview += f"Chunk {i + 1}:\n"
preview += f" Length: {len(chunk.page_content)} characters\n"
preview += f" Has Table: {has_table}, Has Code: {has_code}\n"
preview += f" Metadata: {chunk.metadata}\n"
preview += f" Content preview: {chunk.page_content[:100]}...\n"
preview += "-" * 30 + "\n"
if len(chunks) > max_chunks:
preview += f"... and {len(chunks) - max_chunks} more chunks\n"
return preview
class UnifiedDocumentChunker:
"""Unified chunker that handles both Markdown and LaTeX content types."""
def __init__(self):
"""Initialize the unified chunker with both markdown and LaTeX chunkers."""
self.markdown_chunker = MarkdownAwareChunker(chunk_size=1000, chunk_overlap=200)
self.latex_chunker = LaTeXAwareChunker(chunk_size=1200, chunk_overlap=150)
logger.info("Unified document chunker initialized with both Markdown and LaTeX support")
def chunk_document(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]:
"""
Chunk a document using the appropriate chunker based on content type.
Args:
content: The document content to chunk
source_metadata: Metadata about the source document
Returns:
List of Document objects with chunked content and enhanced metadata
"""
# Determine content type from metadata or content analysis
content_type = source_metadata.get('doc_type', 'markdown').lower()
# Override content type detection for GOT-OCR results
if source_metadata.get('conversion_method', '').startswith('GOT-OCR'):
content_type = 'latex'
# Auto-detect content type if not specified
if content_type not in ['markdown', 'latex']:
if self._is_latex_content(content):
content_type = 'latex'
else:
content_type = 'markdown'
# Use appropriate chunker
if content_type == 'latex':
logger.info("Using LaTeX-aware chunker for document")
return self.latex_chunker.chunk_document(content, source_metadata)
else:
logger.info("Using Markdown-aware chunker for document")
return self.markdown_chunker.chunk_document(content, source_metadata)
def _is_latex_content(self, content: str) -> bool:
"""
Auto-detect if content is LaTeX based on common LaTeX commands.
Args:
content: Content to analyze
Returns:
True if content appears to be LaTeX, False otherwise
"""
latex_indicators = [
r'\\begin\{',
r'\\end\{',
r'\\title\{',
r'\\section',
r'\\subsection',
r'\\hline',
r'\\multirow',
r'\\multicolumn'
]
# Count LaTeX indicators
latex_count = sum(1 for indicator in latex_indicators if re.search(indicator, content))
# If we find multiple LaTeX indicators, treat as LaTeX
return latex_count >= 2
def chunk_multiple_documents(self, documents: List[Dict[str, Any]]) -> List[Document]:
"""
Chunk multiple documents using appropriate chunkers.
Args:
documents: List of dictionaries with 'content' and 'metadata' keys
Returns:
List of chunked Document objects
"""
all_chunks = []
for doc_data in documents:
content = doc_data.get('content', '')
metadata = doc_data.get('metadata', {})
if content.strip(): # Only process non-empty content
chunks = self.chunk_document(content, metadata)
all_chunks.extend(chunks)
logger.info(f"Chunked {len(documents)} documents into {len(all_chunks)} total chunks")
return all_chunks
def get_chunk_preview(self, chunks: List[Document], max_chunks: int = 5) -> str:
"""
Generate a preview of chunks for debugging/logging.
Args:
chunks: List of Document chunks
max_chunks: Maximum number of chunks to include in preview
Returns:
String preview of chunks
"""
preview = f"Document Chunks Preview ({len(chunks)} total chunks):\n"
preview += "=" * 50 + "\n"
for i, chunk in enumerate(chunks[:max_chunks]):
content_type = chunk.metadata.get('content_type', 'unknown')
has_table = chunk.metadata.get('has_table', False) or chunk.metadata.get('has_latex_table', False)
has_code = chunk.metadata.get('has_code', False) or chunk.metadata.get('has_latex_environment', False)
preview += f"Chunk {i + 1} ({content_type}):\n"
preview += f" Length: {len(chunk.page_content)} characters\n"
preview += f" Has Table: {has_table}, Has Code/Environment: {has_code}\n"
preview += f" Metadata: {chunk.metadata}\n"
preview += f" Content preview: {chunk.page_content[:100]}...\n"
preview += "-" * 30 + "\n"
if len(chunks) > max_chunks:
preview += f"... and {len(chunks) - max_chunks} more chunks\n"
return preview
# Global unified chunker instance that supports both Markdown and LaTeX
document_chunker = UnifiedDocumentChunker()