import os import gradio as gr from os import getenv import base64 from io import BytesIO from dotenv import load_dotenv import requests import socket import logging import json from langchain_openai import ChatOpenAI from langchain_core.messages import HumanMessage, AIMessage from langchain_core.callbacks import StreamingStdOutCallbackHandler # Load environment dotenv_path = os.path.join(os.path.dirname(__file__), '.env') load_dotenv(dotenv_path=dotenv_path) # Connectivity test def test_connectivity(url="https://openrouter.helicone.ai/api/v1"): try: return requests.get(url, timeout=5).status_code == 200 except (requests.RequestException, socket.error): return False # Helper to make direct API calls to OpenRouter when LangChain fails def direct_api_call(messages, api_key, base_url): headers = { "Content-Type": "application/json", "Authorization": f"Bearer {api_key}", "HTTP-Referer": "https://your-app-domain.com", # Add your domain "X-Title": "Image Analysis App" } if getenv("HELICONE_API_KEY"): headers["Helicone-Auth"] = f"Bearer {getenv('HELICONE_API_KEY')}" payload = { "model": "google/gemini-flash-1.5", "messages": messages, "stream": False, } try: response = requests.post( f"{base_url}/chat/completions", headers=headers, json=payload, timeout=30 ) response.raise_for_status() return response.json()["choices"][0]["message"]["content"] except Exception as e: return f"Error: {str(e)}" # Initialize LLM with streaming and retry logic def init_llm(): if not test_connectivity(): raise RuntimeError("No hay conexión a OpenRouter. Verifica red y claves.") return ChatOpenAI( openai_api_key=getenv("OPENROUTER_API_KEY"), openai_api_base=getenv("OPENROUTER_BASE_URL"), model_name="google/gemini-flash-1.5", streaming=True, callbacks=[StreamingStdOutCallbackHandler()], model_kwargs={ "extra_headers": {"Helicone-Auth": f"Bearer {getenv('HELICONE_API_KEY')}"} }, ) # Try to initialize LLM but handle failures gracefully try: llm = init_llm() except Exception as e: llm = None # Helpers def encode_image_to_base64(pil_image): buffer = BytesIO() pil_image.save(buffer, format="PNG") return base64.b64encode(buffer.getvalue()).decode() # Core logic def generate_response(message, chat_history, image): # Convert chat history to standard format formatted_history = [] for msg in chat_history: role = msg.get('role') content = msg.get('content') if role == 'user': formatted_history.append({"role": "user", "content": content}) else: formatted_history.append({"role": "assistant", "content": content}) # Prepare system message system_msg = {"role": "system", "content": "You are an expert image analysis assistant. Answer succinctly."} # Prepare the latest message with image if provided if image: base64_image = encode_image_to_base64(image) # Format for direct API call (OpenRouter/OpenAI format) api_messages = [system_msg] + formatted_history + [{ "role": "user", "content": [ {"type": "text", "text": message}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} ] }] # For LangChain format content_for_langchain = [ {"type": "text", "text": message}, {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} ] else: api_messages = [system_msg] + formatted_history + [{"role": "user", "content": message}] content_for_langchain = message # Build LangChain messages lc_messages = [HumanMessage(content="You are an expert image analysis assistant. Answer succinctly.")] for msg in chat_history: role = msg.get('role') content = msg.get('content') if role == 'user': lc_messages.append(HumanMessage(content=content)) else: lc_messages.append(AIMessage(content=content)) lc_messages.append(HumanMessage(content=content_for_langchain)) try: # First try with LangChain if llm: try: try: stream_iter = llm.stream(lc_messages) partial = "" for chunk in stream_iter: if chunk is None: continue content = getattr(chunk, 'content', None) if content is None: continue partial += content yield partial # If we got this far, streaming worked return except Exception as e: print(f"Streaming failed: {e}. Falling back to non-streaming mode") # Try non-streaming try: response = llm.invoke(lc_messages) yield response.content return except Exception as e: raise e except Exception as e: raise e response_text = direct_api_call( api_messages, getenv("OPENROUTER_API_KEY"), getenv("OPENROUTER_BASE_URL") ) yield response_text except Exception as e: import traceback error_trace = traceback.format_exc() yield f"⚠️ Error al generar respuesta: {str(e)}. Intenta más tarde." # Gradio interface def process_message(message, chat_history, image): if chat_history is None: chat_history = [] if image is None: chat_history.append({'role':'assistant','content':'Por favor sube una imagen.'}) return "", chat_history chat_history.append({'role':'user','content':message}) chat_history.append({'role':'assistant','content':'⏳ Procesando...'}) yield "", chat_history for chunk in generate_response(message, chat_history, image): chat_history[-1]['content'] = chunk yield "", chat_history return "", chat_history with gr.Blocks() as demo: with gr.Row(): with gr.Column(scale=2): chatbot = gr.Chatbot(type='messages', height=600) msg = gr.Textbox(label="Mensaje", placeholder="Escribe tu pregunta...") clear = gr.ClearButton([msg, chatbot]) with gr.Column(scale=1): image_input = gr.Image(type="pil", label="Sube Imagen") info = gr.Textbox(label="Info Imagen", interactive=False) msg.submit(process_message, [msg, chatbot, image_input], [msg, chatbot]) image_input.change(lambda img: f"Tamaño: {img.size}" if img else "Sin imagen.", [image_input], [info]) demo.launch()