Spaces:
Running
on
Zero
Running
on
Zero
File size: 14,788 Bytes
dda982a 63f3b68 dda982a 9caa2c7 dda982a 9caa2c7 dda982a 5b7f920 dda982a 9caa2c7 3f1b4af dda982a 9b25e42 dda982a 9caa2c7 a4f1c9e dda982a a4f1c9e dda982a 111954a 4a97b0c 111954a dda982a 4a97b0c dda982a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 |
from pathlib import Path
from typing import Dict, List, Optional, Any, Union
import os
import json
import tempfile
import base64
from PIL import Image
import io
from src.parsers.parser_interface import DocumentParser
from src.parsers.parser_registry import ParserRegistry
from src.core.config import config
# Import the Google Gemini API client
try:
from google import genai
GEMINI_AVAILABLE = True
except ImportError:
GEMINI_AVAILABLE = False
# Load API key from environment variable
api_key = os.getenv("GOOGLE_API_KEY")
# Check if API key is available and print a message if not
if not api_key:
print("Warning: GOOGLE_API_KEY environment variable not found. Gemini Flash parser may not work.")
class GeminiFlashParser(DocumentParser):
"""Parser that uses Google's Gemini Flash 2.0 to convert documents to markdown."""
@classmethod
def get_name(cls) -> str:
return "Gemini Flash"
@classmethod
def get_supported_ocr_methods(cls) -> List[Dict[str, Any]]:
return [
{
"id": "none",
"name": "None",
"default_params": {}
}
]
@classmethod
def get_description(cls) -> str:
return "Gemini Flash 2.0 parser for converting documents and images to markdown"
def parse(self, file_path: Union[str, Path], ocr_method: Optional[str] = None, **kwargs) -> str:
"""Parse a document using Gemini Flash 2.0."""
if not GEMINI_AVAILABLE:
raise ImportError(
"The Google Gemini API client is not installed. "
"Please install it with 'pip install google-genai'."
)
# Use the globally loaded API key
if not api_key:
raise ValueError(
"GOOGLE_API_KEY environment variable is not set. "
"Please set it to your Gemini API key."
)
try:
# Determine file type based on extension
file_path = Path(file_path)
file_extension = file_path.suffix.lower()
# Read the file content
file_content = file_path.read_bytes()
# Determine MIME type based on file extension
mime_type = self._get_mime_type(file_extension)
# Create a client and use the model
client = genai.Client(api_key=api_key)
# Set up the prompt
prompt = """
Convert this document to markdown format.
Preserve the structure, headings, lists, tables, and formatting as much as possible.
For images, include a brief description in markdown image syntax.
Return only the markdown content, no other text.
"""
# Generate the response
response = client.models.generate_content(
model=config.model.gemini_model,
contents=[
prompt,
genai.types.Part.from_bytes(
data=file_content,
mime_type=mime_type
)
],
config={
"temperature": config.model.temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": config.model.max_tokens,
}
)
# Extract the markdown text from the response
markdown_text = response.text
return markdown_text
except Exception as e:
error_message = f"Error parsing document with Gemini Flash: {str(e)}"
print(error_message)
return f"# Error\n\n{error_message}\n\nPlease check your API key and try again."
def parse_multiple(self, file_paths: List[Union[str, Path]], processing_type: str = "combined", original_filenames: Optional[List[str]] = None, **kwargs) -> str:
"""Parse multiple documents using Gemini Flash 2.0."""
if not GEMINI_AVAILABLE:
raise ImportError(
"The Google Gemini API client is not installed. "
"Please install it with 'pip install google-genai'."
)
if not api_key:
raise ValueError(
"GOOGLE_API_KEY environment variable is not set. "
"Please set it to your Gemini API key."
)
try:
# Convert to Path objects and validate
path_objects = [Path(fp) for fp in file_paths]
self._validate_batch_files(path_objects)
# Check for cancellation
if self._check_cancellation():
return "Conversion cancelled."
# Create client
client = genai.Client(api_key=api_key)
# Create contents for API call
contents = self._create_batch_contents(path_objects, processing_type, original_filenames)
# Check for cancellation before API call
if self._check_cancellation():
return "Conversion cancelled."
# Generate the response
response = client.models.generate_content(
model=config.model.gemini_model,
contents=contents,
config={
"temperature": config.model.temperature,
"top_p": 0.95,
"top_k": 40,
"max_output_tokens": config.model.max_tokens,
}
)
# Format the output based on processing type
formatted_output = self._format_batch_output(response.text, path_objects, processing_type, original_filenames)
return formatted_output
except Exception as e:
error_message = f"Error parsing multiple documents with Gemini Flash: {str(e)}"
print(error_message)
return f"# Error\n\n{error_message}\n\nPlease check your API key and try again."
def _validate_batch_files(self, file_paths: List[Path]) -> None:
"""Validate batch of files for multi-document processing."""
# Check file count limit
if len(file_paths) == 0:
raise ValueError("No files provided for processing")
if len(file_paths) > 5:
raise ValueError("Maximum 5 files allowed for batch processing")
# Check individual files and calculate total size
total_size = 0
for file_path in file_paths:
if not file_path.exists():
raise ValueError(f"File not found: {file_path}")
file_size = file_path.stat().st_size
total_size += file_size
# Check individual file size (reasonable limit per file)
if file_size > 10 * 1024 * 1024: # 10MB per file
raise ValueError(f"Individual file size exceeds 10MB: {file_path.name}")
# Check combined size limit
if total_size > 20 * 1024 * 1024: # 20MB total
raise ValueError(f"Combined file size ({total_size / (1024*1024):.1f}MB) exceeds 20MB limit")
# Validate file types
for file_path in file_paths:
file_extension = file_path.suffix.lower()
mime_type = self._get_mime_type(file_extension)
if mime_type == "application/octet-stream":
raise ValueError(f"Unsupported file type: {file_path.name}. Gemini supports: PDF, TXT, HTML, CSS, MD, CSV, XML, RTF, JS, PY, and image files.")
# Check if it's a supported MIME type for Gemini
if mime_type in ["application/vnd.openxmlformats-officedocument.wordprocessingml.document",
"application/msword",
"application/vnd.openxmlformats-officedocument.presentationml.presentation",
"application/vnd.ms-powerpoint",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
"application/vnd.ms-excel"]:
raise ValueError(f"File type not supported by Gemini: {file_path.name}. Gemini supports: PDF, TXT, HTML, CSS, MD, CSV, XML, RTF, JS, PY, and image files.")
def _create_batch_contents(self, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> List[Any]:
"""Create contents list for batch API call."""
# Create the prompt based on processing type
prompt = self._create_batch_prompt(file_paths, processing_type, original_filenames)
# Start with the prompt
contents = [prompt]
# Add each file as a content part
for file_path in file_paths:
file_content = file_path.read_bytes()
mime_type = self._get_mime_type(file_path.suffix.lower())
contents.append(
genai.types.Part.from_bytes(
data=file_content,
mime_type=mime_type
)
)
return contents
def _create_batch_prompt(self, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> str:
"""Create appropriate prompt for batch processing."""
# Use original filenames if provided, otherwise use temp file names
if original_filenames:
file_names = original_filenames
else:
file_names = [fp.name for fp in file_paths]
file_list = "\n".join([f"- {name}" for name in file_names])
base_prompt = f"""I will provide you with {len(file_paths)} documents to process:
{file_list}
"""
if processing_type == "combined":
return base_prompt + """Please convert all documents to a single, cohesive markdown document.
Merge the content logically, remove duplicate information, and create a unified structure with clear headings.
Preserve important formatting, tables, lists, and structure from all documents.
For images, include brief descriptions in markdown image syntax.
Return only the combined markdown content, no other text."""
elif processing_type == "individual":
return base_prompt + """Please convert each document to markdown format and present them as separate sections.
For each document, create a clear section header with the document name.
Preserve the structure, headings, lists, tables, and formatting within each section.
For images, include brief descriptions in markdown image syntax.
Return the content in this format:
# Document 1: [filename]
[converted content]
# Document 2: [filename]
[converted content]
Return only the markdown content, no other text."""
elif processing_type == "summary":
return base_prompt + """Please create a comprehensive analysis with two parts:
1. EXECUTIVE SUMMARY: A concise overview summarizing the key points from all documents
2. DETAILED SECTIONS: Individual converted sections for each document
Structure the output as:
# Executive Summary
[Brief summary of key findings and themes across all documents]
# Detailed Analysis
## Document 1: [filename]
[converted content]
## Document 2: [filename]
[converted content]
Preserve formatting, tables, lists, and structure throughout.
For images, include brief descriptions in markdown image syntax.
Return only the markdown content, no other text."""
elif processing_type == "comparison":
return base_prompt + """Please create a comparative analysis of these documents:
1. Create a comparison table highlighting key differences and similarities
2. Provide individual document summaries
3. Include a section on cross-document insights
Structure the output as:
# Document Comparison Analysis
## Comparison Table
| Aspect | Document 1 | Document 2 | Document 3 | ... |
|--------|------------|------------|------------|-----|
| [Key aspects found across documents] | | | | |
## Individual Document Summaries
### Document 1: [filename]
[Key points and content summary]
### Document 2: [filename]
[Key points and content summary]
## Cross-Document Insights
[Analysis of patterns, contradictions, or complementary information across documents]
Preserve important formatting and structure.
For images, include brief descriptions in markdown image syntax.
Return only the markdown content, no other text."""
else:
# Fallback to combined
return self._create_batch_prompt(file_paths, "combined")
def _format_batch_output(self, response_text: str, file_paths: List[Path], processing_type: str, original_filenames: Optional[List[str]] = None) -> str:
"""Format the batch processing output."""
# Add metadata header using original filenames if provided
if original_filenames:
file_names = original_filenames
else:
file_names = [fp.name for fp in file_paths]
header = f"""<!-- Multi-Document Processing Results -->
<!-- Processing Type: {processing_type} -->
<!-- Files Processed: {len(file_paths)} -->
<!-- File Names: {', '.join(file_names)} -->
"""
return header + response_text
def _get_mime_type(self, file_extension: str) -> str:
"""Get the MIME type for a file extension."""
mime_types = {
".pdf": "application/pdf",
".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document",
".doc": "application/msword",
".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation",
".ppt": "application/vnd.ms-powerpoint",
".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet",
".xls": "application/vnd.ms-excel",
".txt": "text/plain",
".md": "text/markdown",
".html": "text/html",
".htm": "text/html",
".csv": "text/csv",
".jpg": "image/jpeg",
".jpeg": "image/jpeg",
".png": "image/png",
".gif": "image/gif",
".bmp": "image/bmp",
".tiff": "image/tiff",
".tif": "image/tiff",
}
return mime_types.get(file_extension, "application/octet-stream")
# Register the parser with the registry
if GEMINI_AVAILABLE:
ParserRegistry.register(GeminiFlashParser)
else:
print("Gemini Flash parser not registered: google-genai package not installed") |