"""Document converter UI component and logic."""
import threading
import time
import gradio as gr
import logging
from pathlib import Path
from src.core.converter import convert_file
from src.core.logging_config import get_logger
from src.services.document_service import DocumentService
from src.rag import document_ingestion_service
from src.ui.utils.file_validation import validate_file_for_parser
from src.ui.utils.threading_utils import (
conversion_cancelled,
monitor_cancellation,
reset_cancellation,
set_cancellation
)
from src.ui.formatters.content_formatters import format_markdown_content, format_latex_content
logger = get_logger(__name__)
def run_conversion_thread(file_path, parser_name, ocr_method_name, output_format):
"""Run the conversion in a separate thread and return the thread object"""
# Reset the cancellation flag
reset_cancellation()
# Create a container for the results
results = {"content": None, "download_file": None, "error": None}
def conversion_worker():
try:
content, download_file = convert_file(file_path, parser_name, ocr_method_name, output_format)
results["content"] = content
results["download_file"] = download_file
except Exception as e:
logger.error(f"Error during conversion: {str(e)}")
results["error"] = str(e)
# Create and start the thread
thread = threading.Thread(target=conversion_worker)
thread.daemon = True
thread.start()
return thread, results
def run_conversion_thread_multi(file_paths, parser_name, ocr_method_name, output_format, processing_type):
"""Run the conversion in a separate thread for multiple files."""
# Results will be shared between threads
results = {"content": None, "download_file": None, "error": None}
def conversion_worker():
try:
logger.info(f"Starting multi-file conversion thread for {len(file_paths)} files")
# Use the new document service unified method
document_service = DocumentService()
document_service.set_cancellation_flag(conversion_cancelled)
# Call the unified convert_documents method
content, output_file = document_service.convert_documents(
file_paths=file_paths,
parser_name=parser_name,
ocr_method_name=ocr_method_name,
output_format=output_format,
processing_type=processing_type
)
logger.info(f"Multi-file conversion completed successfully for {len(file_paths)} files")
results["content"] = content
results["download_file"] = output_file
except Exception as e:
logger.error(f"Error during multi-file conversion: {str(e)}")
results["error"] = str(e)
# Create and start the thread
thread = threading.Thread(target=conversion_worker)
thread.daemon = True
thread.start()
return thread, results
def handle_convert(files, parser_name, ocr_method_name, output_format, processing_type, is_cancelled):
"""Handle file conversion for single or multiple files."""
# Check if we should cancel before starting
if is_cancelled:
logger.info("Conversion cancelled before starting")
return "Conversion cancelled.", None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
# Validate files input
if not files or len(files) == 0:
error_msg = "No files uploaded. Please upload at least one document."
logger.error(error_msg)
return f"Error: {error_msg}", None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
# Convert Gradio file objects to file paths
file_paths = []
for file in files:
if hasattr(file, 'name'):
file_paths.append(file.name)
else:
file_paths.append(str(file))
# Validate file types for the selected parser
for file_path in file_paths:
is_valid, error_msg = validate_file_for_parser(file_path, parser_name)
if not is_valid:
logger.error(f"File validation error: {error_msg}")
return f"Error: {error_msg}", None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
logger.info(f"Starting conversion of {len(file_paths)} file(s) with cancellation flag cleared")
# Start the conversion in a separate thread
thread, results = run_conversion_thread_multi(file_paths, parser_name, ocr_method_name, output_format, processing_type)
# Start the monitoring thread
monitor_thread = threading.Thread(target=monitor_cancellation)
monitor_thread.daemon = True
monitor_thread.start()
# Wait for the thread to complete or be cancelled
while thread.is_alive():
# Check if cancellation was requested
if conversion_cancelled.is_set():
logger.info("Cancellation detected, waiting for thread to finish")
# Give the thread a chance to clean up
thread.join(timeout=0.5)
if thread.is_alive():
logger.warning("Thread did not finish within timeout")
return "Conversion cancelled.", None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
# Sleep briefly to avoid busy waiting
time.sleep(0.1)
# Thread has completed, check results
if results["error"]:
return f"Error: {results['error']}", None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
content = results["content"]
download_file = results["download_file"]
# If conversion returned a cancellation message
if content == "Conversion cancelled.":
logger.info("Converter returned cancellation message")
return content, None, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
# Format the content based on parser type
if "GOT-OCR" in parser_name:
# For GOT-OCR, display as LaTeX
formatted_content = format_latex_content(str(content))
html_output = f"
{formatted_content}
"
else:
# For other parsers, display as Markdown
formatted_content = format_markdown_content(str(content))
html_output = f"{formatted_content}
"
logger.info("Conversion completed successfully")
# Auto-ingest the converted document for RAG
try:
# For multi-file conversion, use the first file for metadata
file_path = file_paths[0] if file_paths else None
# Read original file content for proper deduplication hashing
original_file_content = None
if file_path and Path(file_path).exists():
try:
with open(file_path, 'rb') as f:
original_file_content = f.read().decode('utf-8', errors='ignore')
except Exception as e:
logger.warning(f"Could not read original file content: {e}")
conversion_result = {
"markdown_content": content,
"original_filename": Path(file_path).name if file_path else "unknown",
"conversion_method": parser_name,
"file_size": Path(file_path).stat().st_size if file_path and Path(file_path).exists() else 0,
"conversion_time": 0, # Could be tracked if needed
"original_file_content": original_file_content
}
success, ingestion_msg, stats = document_ingestion_service.ingest_from_conversion_result(conversion_result)
if success:
logger.info(f"Document auto-ingested for RAG: {ingestion_msg}")
else:
logger.warning(f"Document ingestion failed: {ingestion_msg}")
except Exception as e:
logger.error(f"Error during auto-ingestion: {e}")
return html_output, download_file, gr.update(visible=False), gr.update(visible=True), gr.update(visible=False)
def create_document_converter_tab():
"""Create the document converter tab UI."""
with gr.TabItem("đ Document Converter"):
with gr.Column(elem_classes=["chat-tab-container"]):
# Modern header matching other tabs
gr.HTML("""
""")
# State to track if cancellation is requested
cancel_requested = gr.State(False)
# State to store the conversion thread
conversion_thread = gr.State(None)
# State to store the output format (fixed to Markdown)
output_format_state = gr.State("Markdown")
# Multi-file input (supports single and multiple files)
files_input = gr.Files(
label="Upload Document(s) - Single file or up to 5 files (20MB max combined)",
file_count="multiple",
file_types=[".pdf", ".png", ".jpg", ".jpeg", ".tiff", ".bmp", ".webp", ".docx", ".doc", ".pptx", ".ppt", ".xlsx", ".xls", ".txt", ".md", ".html", ".htm", ".csv"]
)
# Processing type selector (visible only for multiple files)
processing_type_selector = gr.Radio(
choices=["combined", "individual", "summary", "comparison"],
value="combined",
label="Multi-Document Processing Type",
info="How to process multiple documents together",
visible=False
)
# Status text to show file count and processing mode
file_status_text = gr.HTML(
value="Upload documents to begin
",
label=""
)
# Provider and OCR options below the file input
with gr.Row(elem_classes=["provider-options-row"]):
with gr.Column(scale=1):
from src.parsers.parser_registry import ParserRegistry
parser_names = ParserRegistry.get_parser_names()
# Make MarkItDown the default parser if available
default_parser = next((p for p in parser_names if p == "MarkItDown"), parser_names[0] if parser_names else "PyPdfium")
provider_dropdown = gr.Dropdown(
label="Provider",
choices=parser_names,
value=default_parser,
interactive=True
)
with gr.Column(scale=1):
default_ocr_options = ParserRegistry.get_ocr_options(default_parser)
default_ocr = default_ocr_options[0] if default_ocr_options else "No OCR"
ocr_dropdown = gr.Dropdown(
label="OCR Options",
choices=default_ocr_options,
value=default_ocr,
interactive=True
)
# Processing controls row with consistent styling
with gr.Row(elem_classes=["control-buttons"]):
convert_button = gr.Button("đ Convert", elem_classes=["control-btn", "btn-primary"])
cancel_button = gr.Button("âšī¸ Cancel", elem_classes=["control-btn", "btn-clear-data"], visible=False)
# Simple output container with just one scrollbar
file_display = gr.HTML(
value="",
label="Converted Content"
)
file_download = gr.File(label="Download File")
# Event handlers
from src.ui.utils.file_validation import update_ui_for_file_count
# Update UI when files are uploaded
files_input.change(
fn=update_ui_for_file_count,
inputs=[files_input],
outputs=[processing_type_selector, file_status_text]
)
provider_dropdown.change(
lambda p: gr.Dropdown(
choices=["Plain Text", "Formatted Text"] if "GOT-OCR" in p else ParserRegistry.get_ocr_options(p),
value="Plain Text" if "GOT-OCR" in p else (ParserRegistry.get_ocr_options(p)[0] if ParserRegistry.get_ocr_options(p) else None)
),
inputs=[provider_dropdown],
outputs=[ocr_dropdown]
)
# Reset cancel flag when starting conversion
def start_conversion():
from src.ui.utils.threading_utils import conversion_cancelled
conversion_cancelled.clear()
logger.info("Starting conversion with cancellation flag cleared")
return gr.update(visible=False), gr.update(visible=True), False
# Set cancel flag and terminate thread when cancel button is clicked
def request_cancellation(thread):
from src.ui.utils.threading_utils import conversion_cancelled
conversion_cancelled.set()
logger.info("Cancel button clicked, cancellation flag set")
# Try to join the thread with a timeout
if thread is not None:
logger.info(f"Attempting to join conversion thread: {thread}")
thread.join(timeout=0.5)
if thread.is_alive():
logger.warning("Thread did not finish within timeout")
# Add immediate feedback to the user
return gr.update(visible=True), gr.update(visible=False), True, None
# Start conversion sequence
convert_button.click(
fn=start_conversion,
inputs=[],
outputs=[convert_button, cancel_button, cancel_requested],
queue=False # Execute immediately
).then(
fn=handle_convert,
inputs=[files_input, provider_dropdown, ocr_dropdown, output_format_state, processing_type_selector, cancel_requested],
outputs=[file_display, file_download, convert_button, cancel_button, conversion_thread]
)
# Handle cancel button click
cancel_button.click(
fn=request_cancellation,
inputs=[conversion_thread],
outputs=[convert_button, cancel_button, cancel_requested, conversion_thread],
queue=False # Execute immediately
)