Spaces:
Running
on
Zero
Running
on
Zero
"""Text chunking strategies for RAG document processing.""" | |
import re | |
from typing import List, Dict, Any, Tuple | |
from langchain_text_splitters import RecursiveCharacterTextSplitter | |
from langchain_core.documents import Document | |
from src.core.logging_config import get_logger | |
logger = get_logger(__name__) | |
class LaTeXAwareChunker: | |
"""Handles LaTeX-aware document chunking that preserves LaTeX structures.""" | |
def __init__(self, chunk_size: int = 1200, chunk_overlap: int = 150): | |
""" | |
Initialize the LaTeX-aware document chunker. | |
Args: | |
chunk_size: Maximum size of each chunk in characters | |
chunk_overlap: Number of characters to overlap between chunks | |
""" | |
self.chunk_size = chunk_size | |
self.chunk_overlap = chunk_overlap | |
# Initialize the text splitter with LaTeX-aware settings | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
length_function=len, | |
separators=[ | |
"\n\\section{", # Section headers | |
"\n\\subsection{", # Subsection headers | |
"\n\\subsubsection{", # Subsubsection headers | |
"\n\\title{", # Title commands | |
"\n\\begin{", # Begin environments | |
"\n\\end{", # End environments | |
"\n\n", # Paragraph breaks | |
"\n", # Line breaks | |
". ", # Sentence breaks | |
" ", # Word breaks | |
"" # Character breaks | |
], | |
keep_separator=True, | |
add_start_index=True | |
) | |
# Regex patterns for LaTeX structures | |
self.latex_table_pattern = re.compile( | |
r'\\begin\{tabular\}.*?\\end\{tabular\}', | |
re.DOTALL | re.MULTILINE | |
) | |
self.latex_title_pattern = re.compile( | |
r'\\title\{[^}]*\}', | |
re.MULTILINE | |
) | |
self.latex_section_pattern = re.compile( | |
r'\\(?:sub)*section\*?\{[^}]*\}', | |
re.MULTILINE | |
) | |
self.latex_environment_pattern = re.compile( | |
r'\\begin\{[^}]+\}.*?\\end\{[^}]+\}', | |
re.DOTALL | re.MULTILINE | |
) | |
logger.info(f"LaTeX-aware chunker initialized with chunk_size={chunk_size}, overlap={chunk_overlap}") | |
def extract_latex_structures(self, content: str) -> Tuple[List[Tuple[int, int, str]], str]: | |
""" | |
Extract LaTeX tables and environments, replacing them with placeholders. | |
Args: | |
content: Original LaTeX content | |
Returns: | |
Tuple of (structures_list, content_with_placeholders) | |
""" | |
structures = [] | |
# Find all tabular environments (highest priority) | |
for match in self.latex_table_pattern.finditer(content): | |
structures.append(( | |
match.start(), | |
match.end(), | |
"latex_table", | |
match.group() | |
)) | |
# Find other LaTeX environments (avoid overlapping with tables) | |
for match in self.latex_environment_pattern.finditer(content): | |
# Check if this environment overlaps with any table | |
overlaps_with_table = any( | |
table_start <= match.start() < table_end or | |
table_start < match.end() <= table_end | |
for table_start, table_end, struct_type, _ in structures | |
if struct_type == "latex_table" | |
) | |
if not overlaps_with_table and "tabular" not in match.group(): | |
structures.append(( | |
match.start(), | |
match.end(), | |
"latex_environment", | |
match.group() | |
)) | |
# Find titles and sections | |
for match in self.latex_title_pattern.finditer(content): | |
structures.append(( | |
match.start(), | |
match.end(), | |
"latex_title", | |
match.group() | |
)) | |
for match in self.latex_section_pattern.finditer(content): | |
structures.append(( | |
match.start(), | |
match.end(), | |
"latex_section", | |
match.group() | |
)) | |
# Sort by start position | |
structures.sort(key=lambda x: x[0]) | |
# Replace structures with placeholders | |
content_with_placeholders = content | |
offset = 0 | |
for i, (start, end, struct_type, struct_content) in enumerate(structures): | |
placeholder = f"\n\n__LATEX_STRUCTURE_{i}_{struct_type.upper()}__\n\n" | |
# Adjust positions based on previous replacements | |
adjusted_start = start - offset | |
adjusted_end = end - offset | |
content_with_placeholders = ( | |
content_with_placeholders[:adjusted_start] + | |
placeholder + | |
content_with_placeholders[adjusted_end:] | |
) | |
# Update offset for next replacement | |
offset += (end - start) - len(placeholder) | |
return structures, content_with_placeholders | |
def restore_latex_structures(self, chunks: List[str], structures: List[Tuple[int, int, str, str]]) -> List[str]: | |
""" | |
Restore LaTeX structures in chunks, keeping tables and environments intact. | |
Args: | |
chunks: List of text chunks with placeholders | |
structures: List of original structures | |
Returns: | |
List of chunks with restored structures | |
""" | |
restored_chunks = [] | |
for chunk in chunks: | |
restored_chunk = chunk | |
# Find placeholders in this chunk | |
placeholder_pattern = re.compile(r'__LATEX_STRUCTURE_(\d+)_(\w+)__') | |
for match in placeholder_pattern.finditer(chunk): | |
structure_index = int(match.group(1)) | |
if structure_index < len(structures): | |
original_structure = structures[structure_index][3] | |
restored_chunk = restored_chunk.replace(match.group(), original_structure) | |
restored_chunks.append(restored_chunk) | |
return restored_chunks | |
def chunk_document(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: | |
""" | |
Chunk a LaTeX document while preserving LaTeX structures. | |
Args: | |
content: The LaTeX content to chunk | |
source_metadata: Metadata about the source document | |
Returns: | |
List of Document objects with chunked content and enhanced metadata | |
""" | |
try: | |
# Extract LaTeX structures and replace with placeholders | |
structures, content_with_placeholders = self.extract_latex_structures(content) | |
# Create a document object with placeholders | |
doc = Document( | |
page_content=content_with_placeholders, | |
metadata=source_metadata | |
) | |
# Split the document into chunks | |
chunks = self.text_splitter.split_documents([doc]) | |
# Restore LaTeX structures in chunks | |
chunk_contents = [chunk.page_content for chunk in chunks] | |
restored_contents = self.restore_latex_structures(chunk_contents, structures) | |
# Create enhanced chunks with restored content | |
enhanced_chunks = [] | |
for i, (chunk, restored_content) in enumerate(zip(chunks, restored_contents)): | |
# Add chunk-specific metadata | |
chunk.metadata.update({ | |
"chunk_index": i, | |
"total_chunks": len(chunks), | |
"chunk_size": len(restored_content), | |
"chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}", | |
"has_latex_table": "\\begin{tabular}" in restored_content, | |
"has_latex_environment": "\\begin{" in restored_content and "\\end{" in restored_content, | |
"has_latex_math": "\\(" in restored_content or "$" in restored_content, | |
"content_type": "latex" | |
}) | |
# Update the chunk content with restored structures | |
chunk.page_content = restored_content | |
enhanced_chunks.append(chunk) | |
logger.info(f"LaTeX document chunked into {len(enhanced_chunks)} structure-aware pieces") | |
return enhanced_chunks | |
except Exception as e: | |
logger.error(f"Error chunking LaTeX document: {e}") | |
# Fallback to regular chunking if LaTeX processing fails | |
return self._fallback_chunk(content, source_metadata) | |
def _fallback_chunk(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: | |
"""Fallback chunking method if LaTeX-aware chunking fails.""" | |
try: | |
doc = Document(page_content=content, metadata=source_metadata) | |
chunks = self.text_splitter.split_documents([doc]) | |
for i, chunk in enumerate(chunks): | |
chunk.metadata.update({ | |
"chunk_index": i, | |
"total_chunks": len(chunks), | |
"chunk_size": len(chunk.page_content), | |
"chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}", | |
"content_type": "latex" | |
}) | |
logger.warning(f"Used fallback chunking for LaTeX content: {len(chunks)} pieces") | |
return chunks | |
except Exception as e: | |
logger.error(f"Error in LaTeX fallback chunking: {e}") | |
raise | |
class MarkdownAwareChunker: | |
"""Handles markdown-aware document chunking that preserves tables and structures.""" | |
def __init__(self, chunk_size: int = 1000, chunk_overlap: int = 200): | |
""" | |
Initialize the markdown-aware document chunker. | |
Args: | |
chunk_size: Maximum size of each chunk in characters | |
chunk_overlap: Number of characters to overlap between chunks | |
""" | |
self.chunk_size = chunk_size | |
self.chunk_overlap = chunk_overlap | |
# Initialize the text splitter with markdown-aware settings | |
self.text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, | |
chunk_overlap=chunk_overlap, | |
length_function=len, | |
separators=[ | |
"\n\n", # Paragraphs and sections | |
"\n# ", # H1 headers | |
"\n## ", # H2 headers | |
"\n### ", # H3 headers | |
"\n\n---\n\n", # Horizontal rules | |
"\n", # Lines | |
" ", # Words | |
".", # Sentences | |
",", # Clauses | |
"" # Characters | |
], | |
keep_separator=True, | |
add_start_index=True | |
) | |
# Regex patterns for markdown structures | |
self.table_pattern = re.compile( | |
r'(\|.*\|.*\n)+(\|[-\s|:]+\|.*\n)(\|.*\|.*\n)*', | |
re.MULTILINE | |
) | |
self.code_block_pattern = re.compile( | |
r'```[\s\S]*?```', | |
re.MULTILINE | |
) | |
logger.info(f"Markdown-aware chunker initialized with chunk_size={chunk_size}, overlap={chunk_overlap}") | |
def extract_markdown_structures(self, content: str) -> Tuple[List[Tuple[int, int, str]], str]: | |
""" | |
Extract markdown tables and code blocks, replacing them with placeholders. | |
Args: | |
content: Original markdown content | |
Returns: | |
Tuple of (structures_list, content_with_placeholders) | |
where structures_list contains (start, end, type, content) tuples | |
""" | |
structures = [] | |
# Find all tables | |
for match in self.table_pattern.finditer(content): | |
structures.append(( | |
match.start(), | |
match.end(), | |
"table", | |
match.group() | |
)) | |
# Find all code blocks | |
for match in self.code_block_pattern.finditer(content): | |
structures.append(( | |
match.start(), | |
match.end(), | |
"code_block", | |
match.group() | |
)) | |
# Sort by start position | |
structures.sort(key=lambda x: x[0]) | |
# Replace structures with placeholders | |
content_with_placeholders = content | |
offset = 0 | |
for i, (start, end, struct_type, struct_content) in enumerate(structures): | |
placeholder = f"\n\n__STRUCTURE_{i}_{struct_type.upper()}__\n\n" | |
# Adjust positions based on previous replacements | |
adjusted_start = start - offset | |
adjusted_end = end - offset | |
content_with_placeholders = ( | |
content_with_placeholders[:adjusted_start] + | |
placeholder + | |
content_with_placeholders[adjusted_end:] | |
) | |
# Update offset for next replacement | |
offset += (end - start) - len(placeholder) | |
return structures, content_with_placeholders | |
def restore_structures(self, chunks: List[str], structures: List[Tuple[int, int, str, str]]) -> List[str]: | |
""" | |
Restore markdown structures in chunks, keeping tables and code blocks intact. | |
Args: | |
chunks: List of text chunks with placeholders | |
structures: List of original structures | |
Returns: | |
List of chunks with restored structures | |
""" | |
restored_chunks = [] | |
for chunk in chunks: | |
restored_chunk = chunk | |
# Find placeholders in this chunk | |
placeholder_pattern = re.compile(r'__STRUCTURE_(\d+)_(\w+)__') | |
for match in placeholder_pattern.finditer(chunk): | |
structure_index = int(match.group(1)) | |
if structure_index < len(structures): | |
original_structure = structures[structure_index][3] | |
restored_chunk = restored_chunk.replace(match.group(), original_structure) | |
restored_chunks.append(restored_chunk) | |
return restored_chunks | |
def chunk_document(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: | |
""" | |
Chunk a markdown document while preserving tables and code blocks. | |
Args: | |
content: The markdown content to chunk | |
source_metadata: Metadata about the source document | |
Returns: | |
List of Document objects with chunked content and enhanced metadata | |
""" | |
try: | |
# Extract markdown structures (tables, code blocks) and replace with placeholders | |
structures, content_with_placeholders = self.extract_markdown_structures(content) | |
# Create a document object with placeholders | |
doc = Document( | |
page_content=content_with_placeholders, | |
metadata=source_metadata | |
) | |
# Split the document into chunks | |
chunks = self.text_splitter.split_documents([doc]) | |
# Restore markdown structures in chunks | |
chunk_contents = [chunk.page_content for chunk in chunks] | |
restored_contents = self.restore_structures(chunk_contents, structures) | |
# Create enhanced chunks with restored content | |
enhanced_chunks = [] | |
for i, (chunk, restored_content) in enumerate(zip(chunks, restored_contents)): | |
# Add chunk-specific metadata | |
chunk.metadata.update({ | |
"chunk_index": i, | |
"total_chunks": len(chunks), | |
"chunk_size": len(restored_content), | |
"chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}", | |
"has_table": "table" in restored_content.lower() and "|" in restored_content, | |
"has_code": "```" in restored_content | |
}) | |
# Update the chunk content with restored structures | |
chunk.page_content = restored_content | |
enhanced_chunks.append(chunk) | |
logger.info(f"Document chunked into {len(enhanced_chunks)} markdown-aware pieces") | |
return enhanced_chunks | |
except Exception as e: | |
logger.error(f"Error chunking markdown document: {e}") | |
# Fallback to regular chunking if markdown processing fails | |
return self._fallback_chunk(content, source_metadata) | |
def _fallback_chunk(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: | |
"""Fallback chunking method if markdown-aware chunking fails.""" | |
try: | |
doc = Document(page_content=content, metadata=source_metadata) | |
chunks = self.text_splitter.split_documents([doc]) | |
for i, chunk in enumerate(chunks): | |
chunk.metadata.update({ | |
"chunk_index": i, | |
"total_chunks": len(chunks), | |
"chunk_size": len(chunk.page_content), | |
"chunk_id": f"{source_metadata.get('source_id', 'unknown')}_{i}" | |
}) | |
logger.warning(f"Used fallback chunking for {len(chunks)} pieces") | |
return chunks | |
except Exception as e: | |
logger.error(f"Error in fallback chunking: {e}") | |
raise | |
def chunk_multiple_documents(self, documents: List[Dict[str, Any]]) -> List[Document]: | |
""" | |
Chunk multiple documents for batch processing. | |
Args: | |
documents: List of dictionaries with 'content' and 'metadata' keys | |
Returns: | |
List of chunked Document objects | |
""" | |
all_chunks = [] | |
for doc_data in documents: | |
content = doc_data.get('content', '') | |
metadata = doc_data.get('metadata', {}) | |
if content.strip(): # Only process non-empty content | |
chunks = self.chunk_document(content, metadata) | |
all_chunks.extend(chunks) | |
logger.info(f"Chunked {len(documents)} documents into {len(all_chunks)} total chunks") | |
return all_chunks | |
def get_chunk_preview(self, chunks: List[Document], max_chunks: int = 5) -> str: | |
""" | |
Generate a preview of chunks for debugging/logging. | |
Args: | |
chunks: List of Document chunks | |
max_chunks: Maximum number of chunks to include in preview | |
Returns: | |
String preview of chunks | |
""" | |
preview = f"Document Chunks Preview ({len(chunks)} total chunks):\n" | |
preview += "=" * 50 + "\n" | |
for i, chunk in enumerate(chunks[:max_chunks]): | |
has_table = chunk.metadata.get('has_table', False) | |
has_code = chunk.metadata.get('has_code', False) | |
preview += f"Chunk {i + 1}:\n" | |
preview += f" Length: {len(chunk.page_content)} characters\n" | |
preview += f" Has Table: {has_table}, Has Code: {has_code}\n" | |
preview += f" Metadata: {chunk.metadata}\n" | |
preview += f" Content preview: {chunk.page_content[:100]}...\n" | |
preview += "-" * 30 + "\n" | |
if len(chunks) > max_chunks: | |
preview += f"... and {len(chunks) - max_chunks} more chunks\n" | |
return preview | |
class UnifiedDocumentChunker: | |
"""Unified chunker that handles both Markdown and LaTeX content types.""" | |
def __init__(self): | |
"""Initialize the unified chunker with both markdown and LaTeX chunkers.""" | |
self.markdown_chunker = MarkdownAwareChunker(chunk_size=1000, chunk_overlap=200) | |
self.latex_chunker = LaTeXAwareChunker(chunk_size=1200, chunk_overlap=150) | |
logger.info("Unified document chunker initialized with both Markdown and LaTeX support") | |
def chunk_document(self, content: str, source_metadata: Dict[str, Any]) -> List[Document]: | |
""" | |
Chunk a document using the appropriate chunker based on content type. | |
Args: | |
content: The document content to chunk | |
source_metadata: Metadata about the source document | |
Returns: | |
List of Document objects with chunked content and enhanced metadata | |
""" | |
# Determine content type from metadata or content analysis | |
content_type = source_metadata.get('doc_type', 'markdown').lower() | |
# Override content type detection for GOT-OCR results | |
if source_metadata.get('conversion_method', '').startswith('GOT-OCR'): | |
content_type = 'latex' | |
# Auto-detect content type if not specified | |
if content_type not in ['markdown', 'latex']: | |
if self._is_latex_content(content): | |
content_type = 'latex' | |
else: | |
content_type = 'markdown' | |
# Use appropriate chunker | |
if content_type == 'latex': | |
logger.info("Using LaTeX-aware chunker for document") | |
return self.latex_chunker.chunk_document(content, source_metadata) | |
else: | |
logger.info("Using Markdown-aware chunker for document") | |
return self.markdown_chunker.chunk_document(content, source_metadata) | |
def _is_latex_content(self, content: str) -> bool: | |
""" | |
Auto-detect if content is LaTeX based on common LaTeX commands. | |
Args: | |
content: Content to analyze | |
Returns: | |
True if content appears to be LaTeX, False otherwise | |
""" | |
latex_indicators = [ | |
r'\\begin\{', | |
r'\\end\{', | |
r'\\title\{', | |
r'\\section', | |
r'\\subsection', | |
r'\\hline', | |
r'\\multirow', | |
r'\\multicolumn' | |
] | |
# Count LaTeX indicators | |
latex_count = sum(1 for indicator in latex_indicators if re.search(indicator, content)) | |
# If we find multiple LaTeX indicators, treat as LaTeX | |
return latex_count >= 2 | |
def chunk_multiple_documents(self, documents: List[Dict[str, Any]]) -> List[Document]: | |
""" | |
Chunk multiple documents using appropriate chunkers. | |
Args: | |
documents: List of dictionaries with 'content' and 'metadata' keys | |
Returns: | |
List of chunked Document objects | |
""" | |
all_chunks = [] | |
for doc_data in documents: | |
content = doc_data.get('content', '') | |
metadata = doc_data.get('metadata', {}) | |
if content.strip(): # Only process non-empty content | |
chunks = self.chunk_document(content, metadata) | |
all_chunks.extend(chunks) | |
logger.info(f"Chunked {len(documents)} documents into {len(all_chunks)} total chunks") | |
return all_chunks | |
def get_chunk_preview(self, chunks: List[Document], max_chunks: int = 5) -> str: | |
""" | |
Generate a preview of chunks for debugging/logging. | |
Args: | |
chunks: List of Document chunks | |
max_chunks: Maximum number of chunks to include in preview | |
Returns: | |
String preview of chunks | |
""" | |
preview = f"Document Chunks Preview ({len(chunks)} total chunks):\n" | |
preview += "=" * 50 + "\n" | |
for i, chunk in enumerate(chunks[:max_chunks]): | |
content_type = chunk.metadata.get('content_type', 'unknown') | |
has_table = chunk.metadata.get('has_table', False) or chunk.metadata.get('has_latex_table', False) | |
has_code = chunk.metadata.get('has_code', False) or chunk.metadata.get('has_latex_environment', False) | |
preview += f"Chunk {i + 1} ({content_type}):\n" | |
preview += f" Length: {len(chunk.page_content)} characters\n" | |
preview += f" Has Table: {has_table}, Has Code/Environment: {has_code}\n" | |
preview += f" Metadata: {chunk.metadata}\n" | |
preview += f" Content preview: {chunk.page_content[:100]}...\n" | |
preview += "-" * 30 + "\n" | |
if len(chunks) > max_chunks: | |
preview += f"... and {len(chunks) - max_chunks} more chunks\n" | |
return preview | |
# Global unified chunker instance that supports both Markdown and LaTeX | |
document_chunker = UnifiedDocumentChunker() |