Javier-Jimenez99's picture
Actualizar el procesamiento del README.md para eliminar la sección de metadatos de Hugging Face y manejar correctamente bloques de código HTML.
0c0e1de
import asyncio
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_mcp_adapters.sessions import SSEConnection
from langgraph.prebuilt import create_react_agent
from langchain_ollama.chat_models import ChatOllama
from langchain_anthropic import ChatAnthropic
import gradio as gr
import re
from dotenv import load_dotenv
import os
import json
from datetime import datetime
from typing import List, Any
import re
load_dotenv()
# Global variable to store execution history
execution_history = []
def format_message_for_display(message):
"""Format a message for display in the chat interface"""
if hasattr(message, 'content'):
content = message.content
else:
content = str(message)
if hasattr(message, 'tool_calls') and message.tool_calls:
tool_info = []
for tool_call in message.tool_calls:
tool_info.append(f"🔧 **Tool Call**: {tool_call['name']}")
if 'args' in tool_call:
tool_info.append(f" **Args**: {json.dumps(tool_call['args'], indent=2)}")
content += "\n\n" + "\n".join(tool_info)
return content
def add_to_execution_history(step_type: str, data: Any, tab_id: str = None):
"""Add a step to the execution history"""
timestamp = datetime.now().strftime("%H:%M:%S")
execution_history.append({
"timestamp": timestamp,
"type": step_type,
"data": data,
"tab_id": tab_id
})
def format_execution_history():
"""Format the execution history for display"""
if not execution_history:
return "No execution history yet."
formatted_history = []
for entry in execution_history:
timestamp = entry["timestamp"]
step_type = entry["type"]
tab_id = entry.get("tab_id", "N/A")
if step_type == "user_input":
formatted_history.append(f"**[{timestamp}] 👤 User (Tab: {tab_id})**\n\n{entry['data']}\n\n")
elif step_type == "agent_response":
formatted_history.append(f"**[{timestamp}] 🤖 Agent**\n\n{entry['data']}\n\n")
elif step_type == "tool_call":
tool_data = entry['data']
formatted_history.append(f"**[{timestamp}] 🔧 Tool Call**\n\n**Tool**: {tool_data['name']}\n\n**Arguments**: \n\n```json\n{json.dumps(tool_data.get('args', {}), indent=2)}\n```\n\n")
elif step_type == "tool_result":
formatted_history.append(f"**[{timestamp}] ✅ Tool Result**\n\n```\n{entry['data']}\n```\n\n")
elif step_type == "error":
formatted_history.append(f"**[{timestamp}] ❌ Error**\n\n{entry['data']}\n\n")
formatted_history.append("---\n\n")
return "".join(formatted_history)
async def initialize_tools():
"""
Initializes the SSE connection and loads the MCP tools.
We can reuse this because the tools don't depend on the Anthropic API key.
"""
connection = SSEConnection(url=os.getenv("MCP_SERVER_URL"), transport="sse")
tools = await load_mcp_tools(session=None, connection=connection)
return tools
async def create_agent_with_llm(llm_provider: str, anthropic_key: str | None, ollama_model: str | None, tools):
"""
Creates a langgraph-react agent dynamically, injecting the Anthropic API key if requested.
"""
if llm_provider == "anthropic":
# If a key is provided, we use it; if not, we throw an exception or return an error.
if not anthropic_key:
anthropic_key = os.getenv("ANTHROPIC_API_KEY", anthropic_key)
if not anthropic_key:
raise ValueError("Anthropic API key is required for the 'anthropic' provider.")
llm = ChatAnthropic(
model=os.getenv("ANTHROPIC_MODEL", "claude-3-sonnet-20240229"),
anthropic_api_key=anthropic_key
)
else:
# In the case of Ollama, we don't depend on a key.
llm = ChatOllama(model=ollama_model or os.getenv("OLLAMA_MODEL", "qwen3:8b"))
with open("prompt.txt", "r") as f:
prompt = f.read()
agent = create_react_agent(llm, tools, prompt=prompt)
return agent
# We can initialize the tools only once, as they don't depend on the key.
tools = asyncio.get_event_loop().run_until_complete(initialize_tools())
async def chat(history: list, tab_id: str=None, anthropic_api_key: str=None):
"""
Original API function for compatibility - now with history tracking
history: list of messages [{"role": "user"/"assistant", "content": "..."}]
tab_id: a string that the client wants to correlate
anthropic_api_key: the key sent by the client in each request
"""
# Extract the last message to add to execution history
if history:
last_message = history[-1]["content"]
add_to_execution_history("user_input", last_message, tab_id)
if tab_id:
history[-1]["content"] += f"\nThis is your tab_id: {tab_id}"
llm_provider = os.getenv("LLM_PROVIDER", "ollama").lower()
ollama_model = os.getenv("OLLAMA_MODEL", "qwen3:8b")
try:
agent = await create_agent_with_llm(llm_provider, anthropic_api_key, ollama_model, tools)
except ValueError as e:
error_msg = str(e)
add_to_execution_history("error", error_msg, tab_id)
return error_msg
try:
result = await agent.ainvoke({"messages": history})
# Process all messages in the result to track tool calls
all_messages = result["messages"]
# Track tool calls and responses
for msg in all_messages:
if hasattr(msg, 'tool_calls') and msg.tool_calls:
for tool_call in msg.tool_calls:
add_to_execution_history("tool_call", {
"name": tool_call.get("name", "unknown"),
"args": tool_call.get("args", {})
}, tab_id)
# Check if it's a tool message (result of tool execution)
if hasattr(msg, 'name') and msg.name:
add_to_execution_history("tool_result", msg.content, tab_id)
output = all_messages[-1].content
cleaned = re.sub(r'<think>.*?</think>', '', output, flags=re.DOTALL).strip()
add_to_execution_history("agent_response", cleaned, tab_id)
return cleaned
except Exception as e:
error_msg = f"Error during execution: {str(e)}"
add_to_execution_history("error", error_msg, tab_id)
return error_msg
async def chat_with_history_tracking(message: str, history: List, tab_id: str = None, anthropic_api_key: str = None):
"""
Enhanced chat function that tracks all execution steps
"""
# Add user input to execution history
add_to_execution_history("user_input", message, tab_id)
# Convert history format for LangGraph (keeping compatibility)
messages = []
for h in history:
if isinstance(h, dict):
messages.append(h)
else:
# Convert tuple format to dict format
role = "user" if h[0] == "user" else "assistant"
messages.append({"role": role, "content": h[1]})
# Add current message
messages.append({"role": "user", "content": message})
if tab_id:
messages[-1]["content"] += f"\nThis is your tab_id: {tab_id}"
llm_provider = os.getenv("LLM_PROVIDER", "ollama").lower()
ollama_model = os.getenv("OLLAMA_MODEL", "qwen3:8b")
try:
agent = await create_agent_with_llm(llm_provider, anthropic_api_key, ollama_model, tools)
except ValueError as e:
error_msg = str(e)
add_to_execution_history("error", error_msg, tab_id)
history.append([message, error_msg])
return history, format_execution_history()
try:
# Stream the agent execution to capture intermediate steps
result = await agent.ainvoke({"messages": messages})
# Process all messages in the result
all_messages = result["messages"]
# Track tool calls and responses
for msg in all_messages:
if hasattr(msg, 'tool_calls') and msg.tool_calls:
for tool_call in msg.tool_calls:
add_to_execution_history("tool_call", {
"name": tool_call.get("name", "unknown"),
"args": tool_call.get("args", {})
}, tab_id)
# Check if it's a tool message (result of tool execution)
if hasattr(msg, 'name') and msg.name:
add_to_execution_history("tool_result", msg.content, tab_id)
# Get the final output
output = all_messages[-1].content
cleaned = re.sub(r'<think>.*?</think>', '', output, flags=re.DOTALL).strip()
add_to_execution_history("agent_response", cleaned, tab_id)
history.append([message, cleaned])
return history, format_execution_history()
except Exception as e:
error_msg = f"Error during execution: {str(e)}"
add_to_execution_history("error", error_msg, tab_id)
history.append([message, error_msg])
return history, format_execution_history()
def clear_history():
"""Clear the execution history"""
global execution_history
execution_history = []
return [], "Execution history cleared."
# Create the enhanced Gradio interface
with gr.Blocks(title="OwlBear Agent - Complete History", theme=gr.themes.Default()) as demo:
gr.Markdown("# 🦉 OwlBear Agent - Complete Execution View")
gr.Markdown("This interface shows the complete agent execution process, including tool calls and intermediate steps.")
gr.Markdown("**Note:** All messages sent to the original API also appear here automatically.")
with gr.Row():
with gr.Column(scale=1):
gr.Markdown("## 💬 Chat")
chatbot = gr.Chatbot(
label="Conversation",
height=400,
show_label=True,
container=True,
)
with gr.Row():
msg = gr.Textbox(
label="Message",
placeholder="Type your message here...",
lines=2,
scale=4
)
send_btn = gr.Button("Send", variant="primary", scale=1)
with gr.Row():
tab_id = gr.Textbox(
label="Tab ID",
placeholder="Tab ID (optional)",
value="main",
scale=1
)
anthropic_key = gr.Textbox(
label="Anthropic API Key",
placeholder="Anthropic API Key (optional)",
type="password",
scale=2
)
clear_btn = gr.Button("Clear Chat", variant="secondary")
with gr.Column(scale=1):
gr.Markdown("## 📊 Detailed Execution History")
gr.Markdown("*Updates automatically every 2 seconds*")
execution_display = gr.Markdown(
value="No execution history yet.",
label="Complete History",
height=600,
container=True,
)
refresh_btn = gr.Button("Refresh History", variant="secondary")
clear_history_btn = gr.Button("Clear History", variant="secondary")
# Auto-refresh timer for execution history
timer = gr.Timer(value=2) # Refresh every 2 seconds
timer.tick(lambda: format_execution_history(), outputs=[execution_display], show_api=False)
# Event handlers
def send_message(message, history, tab_id, anthropic_key):
if not message.strip():
return history, "", format_execution_history()
# Run the async function
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
new_history, execution_history_display = loop.run_until_complete(
chat_with_history_tracking(message, history, tab_id, anthropic_key)
)
return new_history, "", execution_history_display
finally:
loop.close()
send_btn.click(
send_message,
inputs=[msg, chatbot, tab_id, anthropic_key],
outputs=[chatbot, msg, execution_display],
show_api=False
)
msg.submit(
send_message,
inputs=[msg, chatbot, tab_id, anthropic_key],
outputs=[chatbot, msg, execution_display],
show_api=False
)
clear_btn.click(
lambda: ([], ""),
outputs=[chatbot, msg],
show_api=False
)
refresh_btn.click(
lambda: format_execution_history(),
outputs=[execution_display],
show_api=False
)
clear_history_btn.click(
clear_history,
outputs=[chatbot, execution_display],
show_api=False
)
api_demo = gr.Interface(
fn=chat,
inputs=[
gr.JSON(label="history"),
gr.Textbox(label="tab_id"),
gr.Textbox(label="anthropic_api_key"),
],
outputs="text", title="OwlBear Agent - Original API"
)
with open("README.md", "r", encoding="utf-8") as f:
readme = f.read()
if readme.startswith("---"):
parts = readme.split("---", 2)
if len(parts) >= 3:
readme = parts[2]
html_blocks = re.findall(r'```html\n(.*?)\n```', readme, re.DOTALL)
for i, html_block in enumerate(html_blocks):
readme = readme.replace(f"```html\n{html_block}\n```", f"{{HTML_BLOCK_{i}}}")
with gr.Blocks() as intro_demo:
parts = re.split(r'({HTML_BLOCK_\d+})', readme)
for part in parts:
if part.startswith("{HTML_BLOCK_"):
block_idx = int(part.replace("{HTML_BLOCK_", "").replace("}", ""))
gr.HTML(html_blocks[block_idx])
else:
if part.strip():
gr.Markdown(part)
# Combined interface with tabs
combined_demo = gr.TabbedInterface(
[intro_demo, demo, api_demo],
["README", "Complete View with History", "Original API"],
title="🧙🏼‍♂️ LLM Game Master - Agent"
)
if __name__ == "__main__":
combined_demo.launch(server_port=int(os.getenv("GRADIO_PORT", 7860)))