File size: 7,141 Bytes
7af9a4a
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import os
import gradio as gr
from os import getenv
import base64
from io import BytesIO
from dotenv import load_dotenv
import requests
import socket
import logging
import json

from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, AIMessage
from langchain_core.callbacks import StreamingStdOutCallbackHandler

# Load environment
dotenv_path = os.path.join(os.path.dirname(__file__), '.env')
load_dotenv(dotenv_path=dotenv_path)

# Connectivity test
def test_connectivity(url="https://openrouter.helicone.ai/api/v1"):
    try:
        return requests.get(url, timeout=5).status_code == 200
    except (requests.RequestException, socket.error):
        return False

# Helper to make direct API calls to OpenRouter when LangChain fails
def direct_api_call(messages, api_key, base_url):
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {api_key}",
        "HTTP-Referer": "https://your-app-domain.com",  # Add your domain
        "X-Title": "Image Analysis App"
    }
    
    if getenv("HELICONE_API_KEY"):
        headers["Helicone-Auth"] = f"Bearer {getenv('HELICONE_API_KEY')}"
    
    payload = {
        "model": "google/gemini-flash-1.5",
        "messages": messages,
        "stream": False,
    }
    
    try:
        response = requests.post(
            f"{base_url}/chat/completions",
            headers=headers,
            json=payload,
            timeout=30
        )
        response.raise_for_status()
        return response.json()["choices"][0]["message"]["content"]
    except Exception as e:
        return f"Error: {str(e)}"

# Initialize LLM with streaming and retry logic
def init_llm():
    if not test_connectivity():
        raise RuntimeError("No hay conexión a OpenRouter. Verifica red y claves.")
    return ChatOpenAI(
        openai_api_key=getenv("OPENROUTER_API_KEY"),
        openai_api_base=getenv("OPENROUTER_BASE_URL"),
        model_name="google/gemini-flash-1.5",
        streaming=True,
        callbacks=[StreamingStdOutCallbackHandler()],
        model_kwargs={
            "extra_headers": {"Helicone-Auth": f"Bearer {getenv('HELICONE_API_KEY')}"}
        },
    )

# Try to initialize LLM but handle failures gracefully
try:
    llm = init_llm()
except Exception as e:
    llm = None

# Helpers
def encode_image_to_base64(pil_image):
    buffer = BytesIO()
    pil_image.save(buffer, format="PNG")
    return base64.b64encode(buffer.getvalue()).decode()

# Core logic
def generate_response(message, chat_history, image):
    # Convert chat history to standard format
    formatted_history = []
    for msg in chat_history:
        role = msg.get('role')
        content = msg.get('content')
        if role == 'user':
            formatted_history.append({"role": "user", "content": content})
        else:
            formatted_history.append({"role": "assistant", "content": content})
    
    # Prepare system message
    system_msg = {"role": "system", "content": "You are an expert image analysis assistant. Answer succinctly."}
    
    # Prepare the latest message with image if provided
    if image:
        base64_image = encode_image_to_base64(image)
        
        # Format for direct API call (OpenRouter/OpenAI format)
        api_messages = [system_msg] + formatted_history + [{
            "role": "user",
            "content": [
                {"type": "text", "text": message},
                {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
            ]
        }]
        
        # For LangChain format
        content_for_langchain = [
            {"type": "text", "text": message},
            {"type": "image_url", "image_url": {"url": f"data:image/png;base64,{base64_image}"}}
        ]
    else:
        api_messages = [system_msg] + formatted_history + [{"role": "user", "content": message}]
        content_for_langchain = message

    # Build LangChain messages
    lc_messages = [HumanMessage(content="You are an expert image analysis assistant. Answer succinctly.")]
    for msg in chat_history:
        role = msg.get('role')
        content = msg.get('content')
        if role == 'user':
            lc_messages.append(HumanMessage(content=content))
        else:
            lc_messages.append(AIMessage(content=content))
    
    lc_messages.append(HumanMessage(content=content_for_langchain))

    try:
        # First try with LangChain
        if llm:
            try:
                try:
                    stream_iter = llm.stream(lc_messages)
                    partial = ""
                    for chunk in stream_iter:
                        if chunk is None:
                            continue
                        content = getattr(chunk, 'content', None)
                        if content is None:
                            continue
                        partial += content
                        yield partial
                    
                    # If we got this far, streaming worked
                    return
                except Exception as e:
                    print(f"Streaming failed: {e}. Falling back to non-streaming mode")
                    
                # Try non-streaming
                try:
                    response = llm.invoke(lc_messages)
                    yield response.content
                    return
                except Exception as e:
                    raise e
            except Exception as e:
                raise e
        
        response_text = direct_api_call(
            api_messages,
            getenv("OPENROUTER_API_KEY"),
            getenv("OPENROUTER_BASE_URL")
        )
        yield response_text
        
    except Exception as e:
        import traceback
        error_trace = traceback.format_exc()
        yield f"⚠️ Error al generar respuesta: {str(e)}. Intenta más tarde."

# Gradio interface
def process_message(message, chat_history, image):
    if chat_history is None:
        chat_history = []
    if image is None:
        chat_history.append({'role':'assistant','content':'Por favor sube una imagen.'})
        return "", chat_history
    chat_history.append({'role':'user','content':message})
    chat_history.append({'role':'assistant','content':'⏳ Procesando...'})
    yield "", chat_history
    for chunk in generate_response(message, chat_history, image):
        chat_history[-1]['content'] = chunk
        yield "", chat_history
    return "", chat_history

with gr.Blocks() as demo:
    with gr.Row():
        with gr.Column(scale=2):
            chatbot = gr.Chatbot(type='messages', height=600)
            msg = gr.Textbox(label="Mensaje", placeholder="Escribe tu pregunta...")
            clear = gr.ClearButton([msg, chatbot])
        with gr.Column(scale=1):
            image_input = gr.Image(type="pil", label="Sube Imagen")
            info = gr.Textbox(label="Info Imagen", interactive=False)

    msg.submit(process_message, [msg, chatbot, image_input], [msg, chatbot])
    image_input.change(lambda img: f"Tamaño: {img.size}" if img else "Sin imagen.", [image_input], [info])

demo.launch()