"""Text chunking strategies for RAG document processing.""" import re from typing import List, Dict, Any, Tuple from langchain_text_splitters import RecursiveCharacterTextSplitter from langchain_core.documents import Document from src.core.logging_config import get_logger logger = get_logger(__name__) class LaTeXAwareChunker: """Handles LaTeX-aware document chunking that preserves LaTeX structures.""" def __init__(self, chunk_size: int = 1200, chunk_overlap: int = 150): """ Initialize the LaTeX-aware document chunker. Args: chunk_size: Maximum size of each chunk in characters chunk_overlap: Number of characters to overlap between chunks """ self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap # Initialize the text splitter with LaTeX-aware settings self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len, separators=[ "\n\\section{", # Section headers "\n\\subsection{", # Subsection headers "\n\\subsubsection{", # Subsubsection headers "\n\\title{", # Title commands "\n\\begin{", # Begin environments "\n\\end{", # End environments "\n\n", # Paragraph breaks "\n", # Line breaks ". ", # Sentence breaks " ", # Word breaks "" # Character breaks ], keep_separator=True, add_start_index=True ) # Regex patterns for LaTeX structures self.latex_table_pattern = re.compile( r'\\begin\{tabular\}.*?\\end\{tabular\}', re.DOTALL | re.MULTILINE ) self.latex_title_pattern = re.compile( r'\\title\{[^}]*\}', re.MULTILINE ) self.latex_section_pattern = re.compile( r'\\(?:sub)*section\*?\{[^}]*\}', re.MULTILINE ) self.latex_environment_pattern = re.compile( r'\\begin\{[^}]+\}.*?\\end\{[^}]+\}', re.DOTALL | re.MULTILINE ) logger.info(f"LaTeX-aware chunker initialized with chunk_size={chunk_size}, overlap={chunk_overlap}") def extract_latex_structures(self, content: str) -> Tuple[List[Tuple[int, int, str]], str]: """ Extract LaTeX tables and environments, replacing them with placeholders. Args: content: Original LaTeX content Returns: Tuple of (structures_list, content_with_placeholders) """ structures = [] # Find all tabular environments (highest priority) for match in self.latex_table_pattern.finditer(content): structures.append(( match.start(), match.end(), "latex_table", match.group() )) # Find other LaTeX environments (avoid overlapping with tables) for match in self.latex_environment_pattern.finditer(content): # Check if this environment overlaps with any table overlaps_with_table = any( table_start <= match.start() < table_end or table_start < match.end() <= table_end for table_start, table_end, struct_type, _ in structures if struct_type == "latex_table" ) if not overlaps_with_table and "tabular" not in match.group(): structures.append(( match.start(), match.end(), "latex_environment", match.group() )) # Find titles and sections for match in self.latex_title_pattern.finditer(content): structures.append(( match.start(), match.end(), "latex_title", match.group() )) for match in self.latex_section_pattern.finditer(content): structures.append(( match.start(), match.end(), "latex_section", match.group() )) # Sort by start position structures.sort(key=lambda x: x[0]) # Replace structures with placeholders content_with_placeholders = content offset = 0 for i, (start, end, struct_type, struct_content) in enumerate(structures): placeholder = f"\n\n__LATEX_STRUCTURE_{i}_{struct_type.upper()}__\n\n" # Adjust positions based on previous replacements adjusted_start = start - offset adjusted_end = end - offset content_with_placeholders = ( content_with_placeholders[:adjusted_start] + placeholder + content_with_placeholders[adjusted_end:] ) # Update offset for next replacement offset += (end - start) - len(placeholder) return structures, content_with_placeholders def restore_latex_structures(self, chunks: List[str], structures: List[Tuple[int, int, str, str]]) -> List[str]: """ Restore LaTeX structures in chunks, keeping tables and environments intact. Args: chunks: List of text chunks with placeholders structures: List of original structures Returns: List of chunks with restored structures """ restored_chunks = [] for chunk in chunks: restored_chunk = chunk # Find placeholders in this chunk placeholder_pattern = re.compile(r'__LATEX_STRUCTURE_(\d+)_(\w+)__') for match in placeholder_pattern.finditer(chunk): structure_index = int(match.group(1)) if structure_index < len(structures): original_structure = structures[structure_index][3] restored_chunk = restored_chunk.replace(match.group(), original_structure) restored_chunks.append(restored_chunk) return restored_chunks def chunk_document(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: """ Chunk a LaTeX document while preserving LaTeX structures. Args: content: The LaTeX content to chunk source_metadata: Metadata about the source document Returns: List of Document objects with chunked content and enhanced metadata """ try: # Extract LaTeX structures and replace with placeholders structures, content_with_placeholders = self.extract_latex_structures(content) # Create a document object with placeholders doc = Document( page_content=content_with_placeholders, metadata=source_metadata ) # Split the document into chunks chunks = self.text_splitter.split_documents([doc]) # Restore LaTeX structures in chunks chunk_contents = [chunk.page_content for chunk in chunks] restored_contents = self.restore_latex_structures(chunk_contents, structures) # Create enhanced chunks with restored content enhanced_chunks = [] for i, (chunk, restored_content) in enumerate(zip(chunks, restored_contents)): # Add chunk-specific metadata chunk.metadata.update({ "chunk_index": i, "total_chunks": len(chunks), "chunk_size": len(restored_content), "chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}", "has_latex_table": "\\begin{tabular}" in restored_content, "has_latex_environment": "\\begin{" in restored_content and "\\end{" in restored_content, "has_latex_math": "\\(" in restored_content or "$" in restored_content, "content_type": "latex" }) # Update the chunk content with restored structures chunk.page_content = restored_content enhanced_chunks.append(chunk) logger.info(f"LaTeX document chunked into {len(enhanced_chunks)} structure-aware pieces") return enhanced_chunks except Exception as e: logger.error(f"Error chunking LaTeX document: {e}") # Fallback to regular chunking if LaTeX processing fails return self._fallback_chunk(content, source_metadata) def _fallback_chunk(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: """Fallback chunking method if LaTeX-aware chunking fails.""" try: doc = Document(page_content=content, metadata=source_metadata) chunks = self.text_splitter.split_documents([doc]) for i, chunk in enumerate(chunks): chunk.metadata.update({ "chunk_index": i, "total_chunks": len(chunks), "chunk_size": len(chunk.page_content), "chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}", "content_type": "latex" }) logger.warning(f"Used fallback chunking for LaTeX content: {len(chunks)} pieces") return chunks except Exception as e: logger.error(f"Error in LaTeX fallback chunking: {e}") raise class MarkdownAwareChunker: """Handles markdown-aware document chunking that preserves tables and structures.""" def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): """ Initialize the markdown-aware document chunker. Args: chunk_size: Maximum size of each chunk in characters chunk_overlap: Number of characters to overlap between chunks """ self.chunk_size = chunk_size self.chunk_overlap = chunk_overlap # Initialize the text splitter with markdown-aware settings self.text_splitter = RecursiveCharacterTextSplitter( chunk_size=chunk_size, chunk_overlap=chunk_overlap, length_function=len, separators=[ "\n\n", # Paragraphs and sections "\n# ", # H1 headers "\n## ", # H2 headers "\n### ", # H3 headers "\n\n---\n\n", # Horizontal rules "\n", # Lines " ", # Words ".", # Sentences ",", # Clauses "" # Characters ], keep_separator=True, add_start_index=True ) # Regex patterns for markdown structures self.table_pattern = re.compile( r'(\|.*\|.*\n)+(\|[-\s|:]+\|.*\n)(\|.*\|.*\n)*', re.MULTILINE ) self.code_block_pattern = re.compile( r'```[\s\S]*?```', re.MULTILINE ) logger.info(f"Markdown-aware chunker initialized with chunk_size={chunk_size}, overlap={chunk_overlap}") def extract_markdown_structures(self, content: str) -> Tuple[List[Tuple[int, int, str]], str]: """ Extract markdown tables and code blocks, replacing them with placeholders. Args: content: Original markdown content Returns: Tuple of (structures_list, content_with_placeholders) where structures_list contains (start, end, type, content) tuples """ structures = [] # Find all tables for match in self.table_pattern.finditer(content): structures.append(( match.start(), match.end(), "table", match.group() )) # Find all code blocks for match in self.code_block_pattern.finditer(content): structures.append(( match.start(), match.end(), "code_block", match.group() )) # Sort by start position structures.sort(key=lambda x: x[0]) # Replace structures with placeholders content_with_placeholders = content offset = 0 for i, (start, end, struct_type, struct_content) in enumerate(structures): placeholder = f"\n\n__STRUCTURE_{i}_{struct_type.upper()}__\n\n" # Adjust positions based on previous replacements adjusted_start = start - offset adjusted_end = end - offset content_with_placeholders = ( content_with_placeholders[:adjusted_start] + placeholder + content_with_placeholders[adjusted_end:] ) # Update offset for next replacement offset += (end - start) - len(placeholder) return structures, content_with_placeholders def restore_structures(self, chunks: List[str], structures: List[Tuple[int, int, str, str]]) -> List[str]: """ Restore markdown structures in chunks, keeping tables and code blocks intact. Args: chunks: List of text chunks with placeholders structures: List of original structures Returns: List of chunks with restored structures """ restored_chunks = [] for chunk in chunks: restored_chunk = chunk # Find placeholders in this chunk placeholder_pattern = re.compile(r'__STRUCTURE_(\d+)_(\w+)__') for match in placeholder_pattern.finditer(chunk): structure_index = int(match.group(1)) if structure_index < len(structures): original_structure = structures[structure_index][3] restored_chunk = restored_chunk.replace(match.group(), original_structure) restored_chunks.append(restored_chunk) return restored_chunks def chunk_document(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: """ Chunk a markdown document while preserving tables and code blocks. Args: content: The markdown content to chunk source_metadata: Metadata about the source document Returns: List of Document objects with chunked content and enhanced metadata """ try: # Extract markdown structures (tables, code blocks) and replace with placeholders structures, content_with_placeholders = self.extract_markdown_structures(content) # Create a document object with placeholders doc = Document( page_content=content_with_placeholders, metadata=source_metadata ) # Split the document into chunks chunks = self.text_splitter.split_documents([doc]) # Restore markdown structures in chunks chunk_contents = [chunk.page_content for chunk in chunks] restored_contents = self.restore_structures(chunk_contents, structures) # Create enhanced chunks with restored content enhanced_chunks = [] for i, (chunk, restored_content) in enumerate(zip(chunks, restored_contents)): # Add chunk-specific metadata chunk.metadata.update({ "chunk_index": i, "total_chunks": len(chunks), "chunk_size": len(restored_content), "chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}", "has_table": "table" in restored_content.lower() and "|" in restored_content, "has_code": "```" in restored_content }) # Update the chunk content with restored structures chunk.page_content = restored_content enhanced_chunks.append(chunk) logger.info(f"Document chunked into {len(enhanced_chunks)} markdown-aware pieces") return enhanced_chunks except Exception as e: logger.error(f"Error chunking markdown document: {e}") # Fallback to regular chunking if markdown processing fails return self._fallback_chunk(content, source_metadata) def _fallback_chunk(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: """Fallback chunking method if markdown-aware chunking fails.""" try: doc = Document(page_content=content, metadata=source_metadata) chunks = self.text_splitter.split_documents([doc]) for i, chunk in enumerate(chunks): chunk.metadata.update({ "chunk_index": i, "total_chunks": len(chunks), "chunk_size": len(chunk.page_content), "chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}" }) logger.warning(f"Used fallback chunking for {len(chunks)} pieces") return chunks except Exception as e: logger.error(f"Error in fallback chunking: {e}") raise def chunk_multiple_documents(self, documents: List[Dict[str, Any]]) -> List[Document]: """ Chunk multiple documents for batch processing. Args: documents: List of dictionaries with 'content' and 'metadata' keys Returns: List of chunked Document objects """ all_chunks = [] for doc_data in documents: content = doc_data.get('content', '') metadata = doc_data.get('metadata', {}) if content.strip(): # Only process non-empty content chunks = self.chunk_document(content, metadata) all_chunks.extend(chunks) logger.info(f"Chunked {len(documents)} documents into {len(all_chunks)} total chunks") return all_chunks def get_chunk_preview(self, chunks: List[Document], max_chunks: int = 5) -> str: """ Generate a preview of chunks for debugging/logging. Args: chunks: List of Document chunks max_chunks: Maximum number of chunks to include in preview Returns: String preview of chunks """ preview = f"Document Chunks Preview ({len(chunks)} total chunks):\n" preview += "=" * 50 + "\n" for i, chunk in enumerate(chunks[:max_chunks]): has_table = chunk.metadata.get('has_table', False) has_code = chunk.metadata.get('has_code', False) preview += f"Chunk {i + 1}:\n" preview += f" Length: {len(chunk.page_content)} characters\n" preview += f" Has Table: {has_table}, Has Code: {has_code}\n" preview += f" Metadata: {chunk.metadata}\n" preview += f" Content preview: {chunk.page_content[:100]}...\n" preview += "-" * 30 + "\n" if len(chunks) > max_chunks: preview += f"... and {len(chunks) - max_chunks} more chunks\n" return preview class UnifiedDocumentChunker: """Unified chunker that handles both Markdown and LaTeX content types.""" def __init__(self): """Initialize the unified chunker with both markdown and LaTeX chunkers.""" self.markdown_chunker = MarkdownAwareChunker(chunk_size=1000, chunk_overlap=200) self.latex_chunker = LaTeXAwareChunker(chunk_size=1200, chunk_overlap=150) logger.info("Unified document chunker initialized with both Markdown and LaTeX support") def chunk_document(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: """ Chunk a document using the appropriate chunker based on content type. Args: content: The document content to chunk source_metadata: Metadata about the source document Returns: List of Document objects with chunked content and enhanced metadata """ # Determine content type from metadata or content analysis content_type = source_metadata.get('doc_type', 'markdown').lower() # Override content type detection for GOT-OCR results if source_metadata.get('conversion_method', '').startswith('GOT-OCR'): content_type = 'latex' # Auto-detect content type if not specified if content_type not in ['markdown', 'latex']: if self._is_latex_content(content): content_type = 'latex' else: content_type = 'markdown' # Use appropriate chunker if content_type == 'latex': logger.info("Using LaTeX-aware chunker for document") return self.latex_chunker.chunk_document(content, source_metadata) else: logger.info("Using Markdown-aware chunker for document") return self.markdown_chunker.chunk_document(content, source_metadata) def _is_latex_content(self, content: str) -> bool: """ Auto-detect if content is LaTeX based on common LaTeX commands. Args: content: Content to analyze Returns: True if content appears to be LaTeX, False otherwise """ latex_indicators = [ r'\\begin\{', r'\\end\{', r'\\title\{', r'\\section', r'\\subsection', r'\\hline', r'\\multirow', r'\\multicolumn' ] # Count LaTeX indicators latex_count = sum(1 for indicator in latex_indicators if re.search(indicator, content)) # If we find multiple LaTeX indicators, treat as LaTeX return latex_count >= 2 def chunk_multiple_documents(self, documents: List[Dict[str, Any]]) -> List[Document]: """ Chunk multiple documents using appropriate chunkers. Args: documents: List of dictionaries with 'content' and 'metadata' keys Returns: List of chunked Document objects """ all_chunks = [] for doc_data in documents: content = doc_data.get('content', '') metadata = doc_data.get('metadata', {}) if content.strip(): # Only process non-empty content chunks = self.chunk_document(content, metadata) all_chunks.extend(chunks) logger.info(f"Chunked {len(documents)} documents into {len(all_chunks)} total chunks") return all_chunks def get_chunk_preview(self, chunks: List[Document], max_chunks: int = 5) -> str: """ Generate a preview of chunks for debugging/logging. Args: chunks: List of Document chunks max_chunks: Maximum number of chunks to include in preview Returns: String preview of chunks """ preview = f"Document Chunks Preview ({len(chunks)} total chunks):\n" preview += "=" * 50 + "\n" for i, chunk in enumerate(chunks[:max_chunks]): content_type = chunk.metadata.get('content_type', 'unknown') has_table = chunk.metadata.get('has_table', False) or chunk.metadata.get('has_latex_table', False) has_code = chunk.metadata.get('has_code', False) or chunk.metadata.get('has_latex_environment', False) preview += f"Chunk {i + 1} ({content_type}):\n" preview += f" Length: {len(chunk.page_content)} characters\n" preview += f" Has Table: {has_table}, Has Code/Environment: {has_code}\n" preview += f" Metadata: {chunk.metadata}\n" preview += f" Content preview: {chunk.page_content[:100]}...\n" preview += "-" * 30 + "\n" if len(chunks) > max_chunks: preview += f"... and {len(chunks) - max_chunks} more chunks\n" return preview # Global unified chunker instance that supports both Markdown and LaTeX document_chunker = UnifiedDocumentChunker()