File size: 7,141 Bytes
7af9a4a |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 |
import os
import gradio as gr
from os import getenv
import base64
from io import BytesIO
from dotenv import load_dotenv
import requests
import socket
import logging
import json
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.callbacks import StreamingStdOutCallbackHandler
# Load environment
dotenv_path = os.path.join(os.path.dirname(__file__), '.env')
load_dotenv(dotenv_path=dotenv_path)
# Connectivity test
def test_connectivity(url="https://openrouter.helicone.ai/api/v1"):
try:
return requests.get(url, timeout=5).status_code == 200
except (requests.RequestException, socket.error):
return False
# Helper to make direct API calls to OpenRouter when LangChain fails
def direct_api_call(messages, api_key, base_url):
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {api_key}",
"HTTP-Referer": "https://your-app-domain.com", # Add your domain
"X-Title": "Image Analysis App"
}
if getenv("HELICONE_API_KEY"):
headers["Helicone-Auth"] = f"Bearer {getenv('HELICONE_API_KEY')}"
payload = {
"model": "google/gemini-flash-1.5",
"messages": messages,
"stream": False,
}
try:
response = requests.post(
f"{base_url}/chat/completions",
headers=headers,
json=payload,
timeout=30
)
response.raise_for_status()
return response.json()["choices"][0]["message"]["content"]
except Exception as e:
return f"Error: {str(e)}"
# Initialize LLM with streaming and retry logic
def init_llm():
if not test_connectivity():
raise RuntimeError("No hay conexión a OpenRouter. Verifica red y claves.")
return ChatOpenAI(
openai_api_key=getenv("OPENROUTER_API_KEY"),
openai_api_base=getenv("OPENROUTER_BASE_URL"),
model_name="google/gemini-flash-1.5",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()],
model_kwargs={
"extra_headers": {"Helicone-Auth": f"Bearer {getenv('HELICONE_API_KEY')}"}
},
)
# Try to initialize LLM but handle failures gracefully
try:
llm = init_llm()
except Exception as e:
llm = None
# Helpers
def encode_image_to_base64(pil_image):
buffer = BytesIO()
pil_image.save(buffer, format="PNG")
return base64.b64encode(buffer.getvalue()).decode()
# Core logic
def generate_response(message, chat_history, image):
# Convert chat history to standard format
formatted_history = []
for msg in chat_history:
role = msg.get('role')
content = msg.get('content')
if role == 'user':
formatted_history.append({"role": "user", "content": content})
else:
formatted_history.append({"role": "assistant", "content": content})
# Prepare system message
system_msg = {"role": "system", "content": "You are an expert image analysis assistant. Answer succinctly."}
# Prepare the latest message with image if provided
if image:
base64_image = encode_image_to_base64(image)
# Format for direct API call (OpenRouter/OpenAI format)
api_messages = [system_msg] + formatted_history + [{
"role": "user",
"content": [
{"type": "text", "text": message},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
}]
# For LangChain format
content_for_langchain = [
{"type": "text", "text": message},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
]
else:
api_messages = [system_msg] + formatted_history + [{"role": "user", "content": message}]
content_for_langchain = message
# Build LangChain messages
lc_messages = [HumanMessage(content="You are an expert image analysis assistant. Answer succinctly.")]
for msg in chat_history:
role = msg.get('role')
content = msg.get('content')
if role == 'user':
lc_messages.append(HumanMessage(content=content))
else:
lc_messages.append(AIMessage(content=content))
lc_messages.append(HumanMessage(content=content_for_langchain))
try:
# First try with LangChain
if llm:
try:
try:
stream_iter = llm.stream(lc_messages)
partial = ""
for chunk in stream_iter:
if chunk is None:
continue
content = getattr(chunk, 'content', None)
if content is None:
continue
partial += content
yield partial
# If we got this far, streaming worked
return
except Exception as e:
print(f"Streaming failed: {e}. Falling back to non-streaming mode")
# Try non-streaming
try:
response = llm.invoke(lc_messages)
yield response.content
return
except Exception as e:
raise e
except Exception as e:
raise e
response_text = direct_api_call(
api_messages,
getenv("OPENROUTER_API_KEY"),
getenv("OPENROUTER_BASE_URL")
)
yield response_text
except Exception as e:
import traceback
error_trace = traceback.format_exc()
yield f"⚠️ Error al generar respuesta: {str(e)}. Intenta más tarde."
# Gradio interface
def process_message(message, chat_history, image):
if chat_history is None:
chat_history = []
if image is None:
chat_history.append({'role':'assistant','content':'Por favor sube una imagen.'})
return "", chat_history
chat_history.append({'role':'user','content':message})
chat_history.append({'role':'assistant','content':'⏳ Procesando...'})
yield "", chat_history
for chunk in generate_response(message, chat_history, image):
chat_history[-1]['content'] = chunk
yield "", chat_history
return "", chat_history
with gr.Blocks() as demo:
with gr.Row():
with gr.Column(scale=2):
chatbot = gr.Chatbot(type='messages', height=600)
msg = gr.Textbox(label="Mensaje", placeholder="Escribe tu pregunta...")
clear = gr.ClearButton([msg, chatbot])
with gr.Column(scale=1):
image_input = gr.Image(type="pil", label="Sube Imagen")
info = gr.Textbox(label="Info Imagen", interactive=False)
msg.submit(process_message, [msg, chatbot, image_input], [msg, chatbot])
image_input.change(lambda img: f"Tamaño: {img.size}" if img else "Sin imagen.", [image_input], [info])
demo.launch() |