|
|
|
|
|
import google.generativeai as genai |
|
import logging |
|
import json |
|
from PIL import Image |
|
import io |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
|
|
|
|
MAX_IMAGE_DIMENSION = 3500 |
|
MAX_IMAGE_MB = 16 |
|
TARGET_COMPRESSION_QUALITY = 35 |
|
|
|
|
|
def configure_gemini(api_key): |
|
"""Configures the Gemini client library and returns the model instance.""" |
|
if not api_key: |
|
logging.warning("GOOGLE_API_KEY not found. Gemini API cannot be configured.") |
|
return None |
|
try: |
|
genai.configure(api_key=api_key) |
|
|
|
model = genai.GenerativeModel('gemini-2.5-flash-preview-04-17') |
|
logging.info("Gemini model initialized successfully (gemini-2.5-flash-preview-04-17).") |
|
return model |
|
except Exception as e: |
|
logging.error(f"Failed to configure Gemini API or initialize model: {e}", exc_info=True) |
|
return None |
|
|
|
|
|
def resize_image(image_bytes): |
|
"""Resizes and compresses image bytes using Pillow.""" |
|
try: |
|
img = Image.open(io.BytesIO(image_bytes)) |
|
original_format = img.format or 'PNG' |
|
logging.info(f"Opened image for processing. Original format: {original_format}, Original mode: {img.mode}") |
|
|
|
|
|
width, height = img.size |
|
if width > MAX_IMAGE_DIMENSION or height > MAX_IMAGE_DIMENSION: |
|
logging.info(f"Resizing image from {width}x{height} to max {MAX_IMAGE_DIMENSION}px dimension.") |
|
img.thumbnail((MAX_IMAGE_DIMENSION, MAX_IMAGE_DIMENSION), Image.Resampling.LANCZOS) |
|
|
|
|
|
output_buffer = io.BytesIO() |
|
save_format = 'JPEG' |
|
quality = TARGET_COMPRESSION_QUALITY |
|
|
|
|
|
if img.mode in ('RGBA', 'P', 'LA'): |
|
logging.info(f"Converting image mode {img.mode} to RGB for JPEG saving.") |
|
|
|
background = Image.new('RGB', img.size, (255, 255, 255)) |
|
|
|
try: |
|
if img.mode == 'P': |
|
img = img.convert('RGBA') |
|
background.paste(img, mask=img.split()[-1]) |
|
img = background |
|
except Exception as paste_err: |
|
logging.warning(f"Could not properly handle transparency during conversion, falling back to simple RGB conversion: {paste_err}") |
|
img = img.convert('RGB') |
|
elif img.mode != 'RGB': |
|
img = img.convert('RGB') |
|
|
|
|
|
img.save(output_buffer, format=save_format, quality=quality, optimize=True) |
|
compressed_bytes = output_buffer.getvalue() |
|
|
|
|
|
final_size_mb = len(compressed_bytes) / (1024 * 1024) |
|
if final_size_mb > MAX_IMAGE_MB: |
|
logging.warning(f"Compressed image size ({final_size_mb:.2f} MB) still exceeds target ({MAX_IMAGE_MB} MB). Consider adjusting quality further if needed.") |
|
|
|
return compressed_bytes, save_format |
|
|
|
except Exception as e: |
|
logging.error(f"Error processing image: {e}", exc_info=True) |
|
|
|
|
|
return image_bytes, original_format |
|
|
|
|
|
def process_error_response(text_response): |
|
"""Fallback processor if Gemini doesn't return valid JSON.""" |
|
logging.warning("Gemini response was not valid JSON. Returning raw text.") |
|
|
|
|
|
return { |
|
"analysis_error": "Response was not valid JSON", |
|
"raw_text": text_response |
|
} |
|
|
|
def analyze_input_with_gemini(gemini_model, prompt, image_bytes=None, text_content=None): |
|
""" |
|
Sends the prompt and either image bytes or text content to the Gemini model. |
|
Handles potential errors and parses the JSON response. |
|
""" |
|
if not gemini_model: |
|
raise ValueError("Gemini model not configured.") |
|
|
|
if image_bytes is None and text_content is None: |
|
raise ValueError("No input (image or text) provided for analysis.") |
|
|
|
content_payload = [prompt] |
|
input_type = "" |
|
|
|
try: |
|
if image_bytes: |
|
input_type = "Image" |
|
|
|
|
|
mime_type = "image/jpeg" |
|
img_part = {"mime_type": mime_type, "data": image_bytes} |
|
content_payload.append(img_part) |
|
logging.info(f"Preparing Gemini request with processed image ({len(image_bytes)/(1024*1024):.2f} MB)") |
|
else: |
|
input_type = "Text" |
|
content_payload.append(text_content) |
|
logging.info("Preparing Gemini request with text content.") |
|
|
|
|
|
response = gemini_model.generate_content(content_payload) |
|
response.resolve() |
|
logging.info("Received response from Gemini API.") |
|
|
|
|
|
try: |
|
|
|
cleaned_text = response.text.strip().removeprefix('```json').removesuffix('```').strip() |
|
result = json.loads(cleaned_text) |
|
|
|
if isinstance(result, dict): |
|
result['input_type'] = input_type |
|
logging.info("Successfully parsed JSON response from Gemini.") |
|
return result |
|
except (json.JSONDecodeError, AttributeError) as e: |
|
logging.error(f"Failed to decode or process JSON response: {e}") |
|
return process_error_response(response.text) |
|
|
|
except Exception as e: |
|
logging.error(f"Gemini API call failed: {e}", exc_info=True) |
|
|
|
raise RuntimeError(f"Analysis failed due to API error: {e}") |