|
import os |
|
import gradio as gr |
|
from os import getenv |
|
import base64 |
|
from io import BytesIO |
|
from dotenv import load_dotenv |
|
import requests |
|
import socket |
|
import logging |
|
import json |
|
|
|
from langchain_openai import ChatOpenAI |
|
from langchain_core.messages import HumanMessage, AIMessage |
|
from langchain_core.callbacks import StreamingStdOutCallbackHandler |
|
|
|
|
|
dotenv_path = os.path.join(os.path.dirname(__file__), '.env') |
|
load_dotenv(dotenv_path=dotenv_path) |
|
|
|
|
|
def test_connectivity(url="https://openrouter.helicone.ai/api/v1"): |
|
try: |
|
return requests.get(url, timeout=5).status_code == 200 |
|
except (requests.RequestException, socket.error): |
|
return False |
|
|
|
|
|
def direct_api_call(messages, api_key, base_url): |
|
headers = { |
|
"Content-Type": "application/json", |
|
"Authorization": f"Bearer {api_key}", |
|
"HTTP-Referer": "https://your-app-domain.com", |
|
"X-Title": "Image Analysis App" |
|
} |
|
|
|
if getenv("HELICONE_API_KEY"): |
|
headers["Helicone-Auth"] = f"Bearer {getenv('HELICONE_API_KEY')}" |
|
|
|
payload = { |
|
"model": "google/gemini-flash-1.5", |
|
"messages": messages, |
|
"stream": False, |
|
} |
|
|
|
try: |
|
response = requests.post( |
|
f"{base_url}/chat/completions", |
|
headers=headers, |
|
json=payload, |
|
timeout=30 |
|
) |
|
response.raise_for_status() |
|
return response.json()["choices"][0]["message"]["content"] |
|
except Exception as e: |
|
return f"Error: {str(e)}" |
|
|
|
|
|
def init_llm(): |
|
if not test_connectivity(): |
|
raise RuntimeError("No hay conexión a OpenRouter. Verifica red y claves.") |
|
return ChatOpenAI( |
|
openai_api_key=getenv("OPENROUTER_API_KEY"), |
|
openai_api_base=getenv("OPENROUTER_BASE_URL"), |
|
model_name="google/gemini-flash-1.5", |
|
streaming=True, |
|
callbacks=[StreamingStdOutCallbackHandler()], |
|
model_kwargs={ |
|
"extra_headers": {"Helicone-Auth": f"Bearer {getenv('HELICONE_API_KEY')}"} |
|
}, |
|
) |
|
|
|
|
|
try: |
|
llm = init_llm() |
|
except Exception as e: |
|
llm = None |
|
|
|
|
|
def encode_image_to_base64(pil_image): |
|
buffer = BytesIO() |
|
pil_image.save(buffer, format="PNG") |
|
return base64.b64encode(buffer.getvalue()).decode() |
|
|
|
|
|
def generate_response(message, chat_history, image): |
|
|
|
formatted_history = [] |
|
for msg in chat_history: |
|
role = msg.get('role') |
|
content = msg.get('content') |
|
if role == 'user': |
|
formatted_history.append({"role": "user", "content": content}) |
|
else: |
|
formatted_history.append({"role": "assistant", "content": content}) |
|
|
|
|
|
system_msg = {"role": "system", "content": "You are an expert image analysis assistant. Answer succinctly."} |
|
|
|
|
|
if image: |
|
base64_image = encode_image_to_base64(image) |
|
|
|
|
|
api_messages = [system_msg] + formatted_history + [{ |
|
"role": "user", |
|
"content": [ |
|
{"type": "text", "text": message}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} |
|
] |
|
}] |
|
|
|
|
|
content_for_langchain = [ |
|
{"type": "text", "text": message}, |
|
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}} |
|
] |
|
else: |
|
api_messages = [system_msg] + formatted_history + [{"role": "user", "content": message}] |
|
content_for_langchain = message |
|
|
|
|
|
lc_messages = [HumanMessage(content="You are an expert image analysis assistant. Answer succinctly.")] |
|
for msg in chat_history: |
|
role = msg.get('role') |
|
content = msg.get('content') |
|
if role == 'user': |
|
lc_messages.append(HumanMessage(content=content)) |
|
else: |
|
lc_messages.append(AIMessage(content=content)) |
|
|
|
lc_messages.append(HumanMessage(content=content_for_langchain)) |
|
|
|
try: |
|
|
|
if llm: |
|
try: |
|
try: |
|
stream_iter = llm.stream(lc_messages) |
|
partial = "" |
|
for chunk in stream_iter: |
|
if chunk is None: |
|
continue |
|
content = getattr(chunk, 'content', None) |
|
if content is None: |
|
continue |
|
partial += content |
|
yield partial |
|
|
|
|
|
return |
|
except Exception as e: |
|
print(f"Streaming failed: {e}. Falling back to non-streaming mode") |
|
|
|
|
|
try: |
|
response = llm.invoke(lc_messages) |
|
yield response.content |
|
return |
|
except Exception as e: |
|
raise e |
|
except Exception as e: |
|
raise e |
|
|
|
response_text = direct_api_call( |
|
api_messages, |
|
getenv("OPENROUTER_API_KEY"), |
|
getenv("OPENROUTER_BASE_URL") |
|
) |
|
yield response_text |
|
|
|
except Exception as e: |
|
import traceback |
|
error_trace = traceback.format_exc() |
|
yield f"⚠️ Error al generar respuesta: {str(e)}. Intenta más tarde." |
|
|
|
|
|
def process_message(message, chat_history, image): |
|
if chat_history is None: |
|
chat_history = [] |
|
if image is None: |
|
chat_history.append({'role':'assistant','content':'Por favor sube una imagen.'}) |
|
return "", chat_history |
|
chat_history.append({'role':'user','content':message}) |
|
chat_history.append({'role':'assistant','content':'⏳ Procesando...'}) |
|
yield "", chat_history |
|
for chunk in generate_response(message, chat_history, image): |
|
chat_history[-1]['content'] = chunk |
|
yield "", chat_history |
|
return "", chat_history |
|
|
|
with gr.Blocks() as demo: |
|
with gr.Row(): |
|
with gr.Column(scale=2): |
|
chatbot = gr.Chatbot(type='messages', height=600) |
|
msg = gr.Textbox(label="Mensaje", placeholder="Escribe tu pregunta...") |
|
clear = gr.ClearButton([msg, chatbot]) |
|
with gr.Column(scale=1): |
|
image_input = gr.Image(type="pil", label="Sube Imagen") |
|
info = gr.Textbox(label="Info Imagen", interactive=False) |
|
|
|
msg.submit(process_message, [msg, chatbot, image_input], [msg, chatbot]) |
|
image_input.change(lambda img: f"Tamaño: {img.size}" if img else "Sin imagen.", [image_input], [info]) |
|
|
|
demo.launch() |