JuanjoSG5
feat: imported the project to the org
7af9a4a
import os
import gradio as gr
from os import getenv
import base64
from io import BytesIO
from dotenv import load_dotenv
import requests
import socket
import logging
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.callbacks import StreamingStdOutCallbackHandler
# Load environment
dotenv_path = os.path.join(os.path.dirname(__file__), '.env')
load_dotenv(dotenv_path=dotenv_path)
# Connectivity test
def test_connectivity(url="https://openrouter.helicone.ai/api/v1"):
try:
return requests.get(url, timeout=5).status_code == 200
except (requests.RequestException, socket.error):
return False
# Helper to make direct API calls to OpenRouter when LangChain fails
def direct_api_call(messages, api_key, base_url):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://your-app-domain.com", # Add your domain
"X-Title": "Image Analysis App"
}
if getenv("HELICONE_API_KEY"):
headers["Helicone-Auth"] = f"Bearer {getenv('HELICONE_API_KEY')}"
payload = {
"model": "google/gemini-flash-1.5",
"messages": messages,
"stream": False,
}
try:
response = requests.post(
f"{base_url}/chat/completions",
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except Exception as e:
return f"Error: {str(e)}"
# Initialize LLM with streaming and retry logic
def init_llm():
if not test_connectivity():
raise RuntimeError("No hay conexión a OpenRouter. Verifica red y claves.")
return ChatOpenAI(
openai_api_key=getenv("OPENROUTER_API_KEY"),
openai_api_base=getenv("OPENROUTER_BASE_URL"),
model_name="google/gemini-flash-1.5",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()],
model_kwargs={
"extra_headers": {"Helicone-Auth": f"Bearer {getenv('HELICONE_API_KEY')}"}
},
)
# Try to initialize LLM but handle failures gracefully
try:
llm = init_llm()
except Exception as e:
llm = None
# Helpers
def encode_image_to_base64(pil_image):
buffer = BytesIO()
pil_image.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode()
# Core logic
def generate_response(message, chat_history, image):
# Convert chat history to standard format
formatted_history = []
for msg in chat_history:
role = msg.get('role')
content = msg.get('content')
if role == 'user':
formatted_history.append({"role": "user", "content": content})
else:
formatted_history.append({"role": "assistant", "content": content})
# Prepare system message
system_msg = {"role": "system", "content": "You are an expert image analysis assistant. Answer succinctly."}
# Prepare the latest message with image if provided
if image:
base64_image = encode_image_to_base64(image)
# Format for direct API call (OpenRouter/OpenAI format)
api_messages = [system_msg] + formatted_history + [{
"role": "user",
"content": [
{"type": "text", "text": message},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
}]
# For LangChain format
content_for_langchain = [
{"type": "text", "text": message},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
else:
api_messages = [system_msg] + formatted_history + [{"role": "user", "content": message}]
content_for_langchain = message
# Build LangChain messages
lc_messages = [HumanMessage(content="You are an expert image analysis assistant. Answer succinctly.")]
for msg in chat_history:
role = msg.get('role')
content = msg.get('content')
if role == 'user':
lc_messages.append(HumanMessage(content=content))
else:
lc_messages.append(AIMessage(content=content))
lc_messages.append(HumanMessage(content=content_for_langchain))
try:
# First try with LangChain
if llm:
try:
try:
stream_iter = llm.stream(lc_messages)
partial = ""
for chunk in stream_iter:
if chunk is None:
continue
content = getattr(chunk, 'content', None)
if content is None:
continue
partial += content
yield partial
# If we got this far, streaming worked
return
except Exception as e:
print(f"Streaming failed: {e}. Falling back to non-streaming mode")
# Try non-streaming
try:
response = llm.invoke(lc_messages)
yield response.content
return
except Exception as e:
raise e
except Exception as e:
raise e
response_text = direct_api_call(
api_messages,
getenv("OPENROUTER_API_KEY"),
getenv("OPENROUTER_BASE_URL")
)
yield response_text
except Exception as e:
import traceback
error_trace = traceback.format_exc()
yield f"⚠️ Error al generar respuesta: {str(e)}. Intenta más tarde."
# Gradio interface
def process_message(message, chat_history, image):
if chat_history is None:
chat_history = []
if image is None:
chat_history.append({'role':'assistant','content':'Por favor sube una imagen.'})
return "", chat_history
chat_history.append({'role':'user','content':message})
chat_history.append({'role':'assistant','content':'⏳ Procesando...'})
yield "", chat_history
for chunk in generate_response(message, chat_history, image):
chat_history[-1]['content'] = chunk
yield "", chat_history
return "", chat_history
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(type='messages', height=600)
msg = gr.Textbox(label="Mensaje", placeholder="Escribe tu pregunta...")
clear = gr.ClearButton([msg, chatbot])
with gr.Column(scale=1):
image_input = gr.Image(type="pil", label="Sube Imagen")
info = gr.Textbox(label="Info Imagen", interactive=False)
msg.submit(process_message, [msg, chatbot, image_input], [msg, chatbot])
image_input.change(lambda img: f"Tamaño: {img.size}" if img else "Sin imagen.", [image_input], [info])
demo.launch()