WebCrawler / src /webui /components /browser_use_agent_tab.py
Carlos Gonzalez
Add application file
b1f90a5
raw
history blame
49.8 kB
import asyncio
import json
import logging
import os
import uuid
from typing import Any, AsyncGenerator, Dict, Optional
import gradio as gr
# from browser_use.agent.service import Agent
from browser_use.agent.views import (
AgentHistoryList,
AgentOutput,
)
from browser_use.browser.browser import BrowserConfig
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from browser_use.browser.views import BrowserState
from gradio.components import Component
from langchain_core.language_models.chat_models import BaseChatModel
from src.agent.browser_use.browser_use_agent import BrowserUseAgent
from src.browser.custom_browser import CustomBrowser
from src.controller.custom_controller import CustomController
from src.utils import llm_provider
from src.webui.webui_manager import WebuiManager
logger = logging.getLogger(__name__)
# Custom function to format task metrics as markdown
def format_task_metrics(metrics):
if not metrics:
return "No task metrics available yet. Run a task to see metrics here."
md = "#### Task Execution Summary\n\n"
md += f"⏱️ **Duration:** {metrics['duration']} seconds\n\n"
md += f"🔢 **Tokens Used:** {metrics['tokens']}\n\n"
if metrics['result']:
md += f"📋 **Final Result:**\n\n```\n{metrics['result']}\n```\n\n"
md += f"✅ **Status:** {metrics['status']}\n\n"
if metrics['errors'] and any(metrics['errors']):
md += f"❌ **Errors:**\n\n```\n{metrics['errors']}\n```\n\n"
return md
# --- Helper Functions --- (Defined at module level)
def get_llm(provider: str, model_name: str, temperature: float = 0.0, base_url: str = "", api_key: str = "", num_ctx: Optional[int] = None):
"""
Get LLM based on provider name, model name, and other parameters.
Only OpenAI is supported now.
Note: This function was simplified to remove dependencies on browser_use.browser_controller
which was causing import errors.
"""
if not model_name:
logger.info("Model Name not specified, LLM will be None.")
return None
try:
# Always use OpenAI
from langchain_openai import ChatOpenAI
from pydantic import SecretStr
# Ensure proper defaults
if not model_name:
model_name = "gpt-4o"
if temperature is None:
temperature = 0.0
# Convert the API key to SecretStr if provided
api_key_value = SecretStr(api_key) if api_key else None
# Create OpenAI model
llm = ChatOpenAI(
model=model_name,
temperature=temperature,
base_url=base_url if base_url else None,
api_key=api_key_value,
)
logger.info(f"Created OpenAI LLM: {model_name}")
return llm
except Exception as e:
logger.error(f"Failed to create OpenAI LLM: {model_name} - {e}")
return None
def _get_config_value(
webui_manager: WebuiManager,
comp_dict: Dict[gr.components.Component, Any],
comp_id_suffix: str,
default: Any = None,
) -> Any:
"""Safely get value from component dictionary using its ID suffix relative to the tab."""
# Assumes component ID format is "tab_name.comp_name"
tab_name = "browser_use_agent" # Hardcode or derive if needed
comp_id = f"{tab_name}.{comp_id_suffix}"
# Need to find the component object first using the ID from the manager
try:
comp = webui_manager.get_component_by_id(comp_id)
return comp_dict.get(comp, default)
except KeyError:
# Try accessing settings tabs as well
for prefix in ["agent_settings", "browser_settings"]:
try:
comp_id = f"{prefix}.{comp_id_suffix}"
comp = webui_manager.get_component_by_id(comp_id)
return comp_dict.get(comp, default)
except KeyError:
continue
logger.warning(
f"Component with suffix '{comp_id_suffix}' not found in manager for value lookup."
)
return default
def _format_agent_output(model_output: AgentOutput) -> str:
"""Formats AgentOutput for display in the chatbot using JSON."""
content = ""
if model_output:
try:
# Directly use model_dump if actions and current_state are Pydantic models
action_dump = [
action.model_dump(exclude_none=True) for action in model_output.action
]
state_dump = model_output.current_state.model_dump(exclude_none=True)
model_output_dump = {
"current_state": state_dump,
"action": action_dump,
}
# Dump to JSON string with indentation
json_string = json.dumps(model_output_dump, indent=4, ensure_ascii=False)
# Wrap in <pre><code> for proper display in HTML
content = f"<pre><code class='language-json'>{json_string}</code></pre>"
except AttributeError as ae:
logger.error(
f"AttributeError during model dump: {ae}. Check if 'action' or 'current_state' or their items support 'model_dump'."
)
content = f"<pre><code>Error: Could not format agent output (AttributeError: {ae}).\nRaw output: {str(model_output)}</code></pre>"
except Exception as e:
logger.error(f"Error formatting agent output: {e}", exc_info=True)
# Fallback to simple string representation on error
content = f"<pre><code>Error formatting agent output.\nRaw output:\n{str(model_output)}</code></pre>"
return content.strip()
# --- Updated Callback Implementation ---
async def _handle_new_step(
webui_manager: WebuiManager, state: BrowserState, output: AgentOutput, step_num: int
):
"""Callback for each step taken by the agent, including screenshot display."""
# Use the correct chat history attribute name from the user's code
if not hasattr(webui_manager, "bu_chat_history"):
logger.error(
"Attribute 'bu_chat_history' not found in webui_manager! Cannot add chat message."
)
# Initialize it maybe? Or raise an error? For now, log and potentially skip chat update.
webui_manager.bu_chat_history = [] # Initialize if missing (consider if this is the right place)
# return # Or stop if this is critical
step_num -= 1
logger.info(f"Step {step_num} completed.")
# --- Screenshot Handling ---
screenshot_html = ""
# Ensure state.screenshot exists and is not empty before proceeding
# Use getattr for safer access
screenshot_data = getattr(state, "screenshot", None)
if screenshot_data:
try:
# Basic validation: check if it looks like base64
if (
isinstance(screenshot_data, str) and len(screenshot_data) > 100
): # Arbitrary length check
# *** UPDATED STYLE: Removed centering, adjusted width ***
img_tag = f'<img src="data:image/jpeg;base64,{screenshot_data}" alt="Step {step_num} Screenshot" style="max-width: 800px; max-height: 600px; object-fit:contain;" />'
screenshot_html = (
img_tag + "<br/>"
) # Use <br/> for line break after inline-block image
else:
logger.warning(
f"Screenshot for step {step_num} seems invalid (type: {type(screenshot_data)}, len: {len(screenshot_data) if isinstance(screenshot_data, str) else 'N/A'})."
)
screenshot_html = "**[Invalid screenshot data]**<br/>"
except Exception as e:
logger.error(
f"Error processing or formatting screenshot for step {step_num}: {e}",
exc_info=True,
)
screenshot_html = "**[Error displaying screenshot]**<br/>"
else:
logger.debug(f"No screenshot available for step {step_num}.")
# --- Format Agent Output ---
formatted_output = _format_agent_output(output) # Use the updated function
# --- Combine and Append to Chat ---
step_header = f"--- **Step {step_num}** ---"
# Combine header, image (with line break), and JSON block
final_content = step_header + "<br/>" + screenshot_html + formatted_output
chat_message = {
"role": "assistant",
"content": final_content.strip(), # Remove leading/trailing whitespace
}
# Append to the correct chat history list
webui_manager.bu_chat_history.append(chat_message)
await asyncio.sleep(0.05)
def _handle_done(webui_manager: WebuiManager, history: AgentHistoryList):
"""Callback when the agent finishes the task (success or failure)."""
logger.info(
f"Agent task finished. Duration: {history.total_duration_seconds():.2f}s, Tokens: {history.total_input_tokens()}"
)
final_summary = "**Task Completed**\n"
final_summary += f"- Duration: {history.total_duration_seconds():.2f} seconds\n"
final_summary += f"- Total Input Tokens: {history.total_input_tokens()}\n" # Or total tokens if available
final_result = history.final_result()
if final_result:
final_summary += f"- Final Result: {final_result}\n"
errors = history.errors()
if errors and any(errors):
final_summary += f"- **Errors:**\n```\n{errors}\n```\n"
else:
final_summary += "- Status: Success\n"
# Store task metrics separately for the metrics display
webui_manager.bu_task_metrics = {
"duration": f"{history.total_duration_seconds():.2f}",
"tokens": f"{history.total_input_tokens()}",
"result": final_result if final_result else "",
"status": "Error" if (errors and any(errors)) else "Success",
"errors": errors if (errors and any(errors)) else None
}
webui_manager.bu_chat_history.append(
{"role": "assistant", "content": final_summary}
)
async def _ask_assistant_callback(
webui_manager: WebuiManager, query: str, browser_context: BrowserContext
) -> Dict[str, Any]:
"""Callback triggered by the agent's ask_for_assistant action."""
logger.info("Agent requires assistance. Waiting for user input.")
if not hasattr(webui_manager, "_chat_history"):
logger.error("Chat history not found in webui_manager during ask_assistant!")
return {"response": "Internal Error: Cannot display help request."}
webui_manager.bu_chat_history.append(
{
"role": "assistant",
"content": f"**Need Help:** {query}\nPlease provide information or perform the required action in the browser, then type your response/confirmation below and click 'Submit Response'.",
}
)
# Use state stored in webui_manager
webui_manager.bu_response_event = asyncio.Event()
webui_manager.bu_user_help_response = None # Reset previous response
try:
logger.info("Waiting for user response event...")
await asyncio.wait_for(
webui_manager.bu_response_event.wait(), timeout=3600.0
) # Long timeout
logger.info("User response event received.")
except asyncio.TimeoutError:
logger.warning("Timeout waiting for user assistance.")
webui_manager.bu_chat_history.append(
{
"role": "assistant",
"content": "**Timeout:** No response received. Trying to proceed.",
}
)
webui_manager.bu_response_event = None # Clear the event
return {"response": "Timeout: User did not respond."} # Inform the agent
response = webui_manager.bu_user_help_response
webui_manager.bu_chat_history.append(
{"role": "user", "content": response}
) # Show user response in chat
webui_manager.bu_response_event = (
None # Clear the event for the next potential request
)
return {"response": response}
# --- Core Agent Execution Logic --- (Needs access to webui_manager)
async def run_agent_task(
webui_manager: WebuiManager, components: Dict[gr.components.Component, Any]
) -> AsyncGenerator[Dict[gr.components.Component, Any], None]:
"""
Runs a browser agent task and yields UI updates.
"""
# Get user task from input field
user_input_comp = webui_manager.get_component_by_id("browser_use_agent.user_input")
task = components.get(user_input_comp, "").strip()
if not task:
logger.warning("Empty task submitted")
task = "Help me browse the web" # Default task if empty
# Get all required UI components
run_button_comp = webui_manager.get_component_by_id("browser_use_agent.run_button")
stop_button_comp = webui_manager.get_component_by_id("browser_use_agent.stop_button")
pause_resume_button_comp = webui_manager.get_component_by_id("browser_use_agent.pause_resume_button")
clear_button_comp = webui_manager.get_component_by_id("browser_use_agent.clear_button")
chatbot_comp = webui_manager.get_component_by_id("browser_use_agent.chatbot")
history_file_comp = webui_manager.get_component_by_id("browser_use_agent.agent_history_file")
gif_comp = webui_manager.get_component_by_id("browser_use_agent.recording_gif")
browser_view_comp = webui_manager.get_component_by_id("browser_use_agent.browser_view")
# Get settings from UI components
from src.utils.llm_provider import get_llm_model
# Helper function to check if OpenAI model supports vision
def is_openai_vision_model(model_name: str) -> bool:
"""Check if an OpenAI model supports vision capabilities"""
vision_models = ["gpt-4-vision", "gpt-4o", "gpt-4-turbo"]
return any(vision_model in model_name.lower() for vision_model in vision_models)
def get_setting(name, default=None):
# Get component from UI
return _get_config_value(webui_manager, components, name, default)
override_system_prompt = get_setting("override_system_prompt", "")
extend_system_prompt = get_setting("extend_system_prompt", "")
llm_provider_name = get_setting("llm_provider", "openai")
llm_model_name = get_setting("llm_model_name", "gpt-4o")
llm_temperature = get_setting("llm_temperature", 0.6)
use_vision = get_setting("use_vision", True)
ollama_num_ctx = get_setting("ollama_num_ctx", 16000)
llm_base_url = get_setting("llm_base_url", "")
llm_api_key = get_setting("llm_api_key", "")
max_steps = get_setting("max_steps", 100)
max_actions = get_setting("max_actions", 10)
max_input_tokens = get_setting("max_input_tokens", 128000)
tool_calling_method = get_setting("tool_calling_method", "auto")
tool_calling_method = tool_calling_method if tool_calling_method != "None" else None
mcp_server_config_comp = webui_manager.get_component_by_id(
"agent_settings.mcp_server_config"
)
mcp_server_config_str = (
components.get(mcp_server_config_comp) if mcp_server_config_comp else None
)
mcp_server_config = (
json.loads(mcp_server_config_str) if mcp_server_config_str else None
)
# Planner LLM Settings (Optional)
planner_llm_provider_name = get_setting("planner_llm_provider") or None
planner_llm = None
planner_use_vision = False
if planner_llm_provider_name:
planner_llm_model_name = get_setting("planner_llm_model_name")
planner_llm_temperature = get_setting("planner_llm_temperature", 0.6)
planner_ollama_num_ctx = get_setting("planner_ollama_num_ctx", 16000)
planner_llm_base_url = get_setting("planner_llm_base_url") or None
planner_llm_api_key = get_setting("planner_llm_api_key") or None
planner_use_vision = get_setting("planner_use_vision", False)
# Always use OpenAI for planner too
planner_llm_provider_name = "openai"
planner_llm = get_llm(
provider=planner_llm_provider_name,
model_name=str(planner_llm_model_name) if planner_llm_model_name else "gpt-4o",
temperature=float(planner_llm_temperature),
base_url=str(planner_llm_base_url) if planner_llm_base_url else "",
api_key=str(planner_llm_api_key) if planner_llm_api_key else "",
)
# --- Browser Settings ---
def get_browser_setting(key, default=None):
comp = webui_manager.get_component_by_id(f"browser_settings.{key}")
return components.get(comp, default) if comp else default
browser_binary_path = get_browser_setting("browser_binary_path") or None
browser_user_data_dir = get_browser_setting("browser_user_data_dir") or None
use_own_browser = get_browser_setting(
"use_own_browser", False
) # Logic handled by CDP/WSS presence
keep_browser_open = get_browser_setting("keep_browser_open", False)
headless = get_browser_setting("headless", True)
disable_security = get_browser_setting("disable_security", False)
window_w = int(get_browser_setting("window_w", 1920))
window_h = int(get_browser_setting("window_h", 1080))
cdp_url = get_browser_setting("cdp_url") or None
wss_url = get_browser_setting("wss_url") or None
save_recording_path = get_browser_setting("save_recording_path") or None
save_trace_path = get_browser_setting("save_trace_path") or None
save_agent_history_path = get_browser_setting(
"save_agent_history_path", "./tmp/agent_history"
)
save_download_path = get_browser_setting("save_download_path", "./tmp/downloads")
stream_vw = 80
stream_vh = int(80 * window_h // window_w)
os.makedirs(save_agent_history_path, exist_ok=True)
if save_recording_path:
os.makedirs(save_recording_path, exist_ok=True)
if save_trace_path:
os.makedirs(save_trace_path, exist_ok=True)
if save_download_path:
os.makedirs(save_download_path, exist_ok=True)
# --- 2. Initialize LLM ---
# Always use OpenAI
llm_provider_name = "openai"
main_llm = get_llm(
provider=llm_provider_name,
model_name=str(llm_model_name) if llm_model_name else "gpt-4o",
temperature=float(llm_temperature),
base_url=str(llm_base_url) if llm_base_url else "",
api_key=str(llm_api_key) if llm_api_key else "",
)
# Pass the webui_manager instance to the callback when wrapping it
async def ask_callback_wrapper(
query: str, browser_context: BrowserContext
) -> Dict[str, Any]:
return await _ask_assistant_callback(webui_manager, query, browser_context)
if not webui_manager.bu_controller:
webui_manager.bu_controller = CustomController(
ask_assistant_callback=ask_callback_wrapper
)
await webui_manager.bu_controller.setup_mcp_client(mcp_server_config)
# --- 4. Initialize Browser and Context ---
should_close_browser_on_finish = not keep_browser_open
try:
# Close existing resources if not keeping open
if not keep_browser_open:
if webui_manager.bu_browser_context:
logger.info("Closing previous browser context.")
await webui_manager.bu_browser_context.close()
webui_manager.bu_browser_context = None
if webui_manager.bu_browser:
logger.info("Closing previous browser.")
await webui_manager.bu_browser.close()
webui_manager.bu_browser = None
# Create Browser if needed
if not webui_manager.bu_browser:
logger.info("Launching new browser instance.")
extra_args = []
if use_own_browser:
browser_binary_path = os.getenv("BROWSER_PATH", None) or browser_binary_path
if browser_binary_path == "":
browser_binary_path = None
browser_user_data = browser_user_data_dir or os.getenv("BROWSER_USER_DATA", None)
if browser_user_data:
extra_args += [f"--user-data-dir={browser_user_data}"]
else:
browser_binary_path = None
webui_manager.bu_browser = CustomBrowser(
config=BrowserConfig(
headless=headless,
disable_security=disable_security,
browser_binary_path=browser_binary_path,
extra_browser_args=extra_args,
wss_url=wss_url,
cdp_url=cdp_url,
new_context_config=BrowserContextConfig(
window_width=window_w,
window_height=window_h,
)
)
)
# Create Context if needed
if not webui_manager.bu_browser_context:
logger.info("Creating new browser context.")
context_config = BrowserContextConfig(
trace_path=save_trace_path if save_trace_path else None,
save_recording_path=save_recording_path
if save_recording_path
else None,
save_downloads_path=save_download_path if save_download_path else None,
window_height=window_h,
window_width=window_w,
)
if not webui_manager.bu_browser:
raise ValueError("Browser not initialized, cannot create context.")
webui_manager.bu_browser_context = (
await webui_manager.bu_browser.new_context(config=context_config)
)
# --- 5. Initialize or Update Agent ---
webui_manager.bu_agent_task_id = str(uuid.uuid4()) # New ID for this task run
os.makedirs(
os.path.join(save_agent_history_path, webui_manager.bu_agent_task_id),
exist_ok=True,
)
history_file = os.path.join(
save_agent_history_path,
webui_manager.bu_agent_task_id,
f"{webui_manager.bu_agent_task_id}.json",
)
gif_path = os.path.join(
save_agent_history_path,
webui_manager.bu_agent_task_id,
f"{webui_manager.bu_agent_task_id}.gif",
)
# Pass the webui_manager to callbacks when wrapping them
async def step_callback_wrapper(
state: BrowserState, output: AgentOutput, step_num: int
):
await _handle_new_step(webui_manager, state, output, step_num)
def done_callback_wrapper(history: AgentHistoryList):
_handle_done(webui_manager, history)
if not webui_manager.bu_agent:
logger.info(f"Initializing new agent for task: {task}")
if not webui_manager.bu_browser or not webui_manager.bu_browser_context:
raise ValueError(
"Browser or Context not initialized, cannot create agent."
)
webui_manager.bu_agent = BrowserUseAgent(
task=task,
llm=main_llm,
browser=webui_manager.bu_browser,
browser_context=webui_manager.bu_browser_context,
controller=webui_manager.bu_controller,
register_new_step_callback=step_callback_wrapper,
register_done_callback=done_callback_wrapper,
use_vision=use_vision,
override_system_message=override_system_prompt,
extend_system_message=extend_system_prompt,
max_input_tokens=max_input_tokens,
max_actions_per_step=max_actions,
tool_calling_method=tool_calling_method,
planner_llm=planner_llm,
use_vision_for_planner=planner_use_vision if planner_llm else False,
source="webui",
)
webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id
webui_manager.bu_agent.settings.generate_gif = gif_path
else:
webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id
webui_manager.bu_agent.add_new_task(task)
webui_manager.bu_agent.settings.generate_gif = gif_path
webui_manager.bu_agent.browser = webui_manager.bu_browser
webui_manager.bu_agent.browser_context = webui_manager.bu_browser_context
webui_manager.bu_agent.controller = webui_manager.bu_controller
# --- 6. Run Agent Task and Stream Updates ---
agent_run_coro = webui_manager.bu_agent.run(max_steps=max_steps)
agent_task = asyncio.create_task(agent_run_coro)
webui_manager.bu_current_task = agent_task # Store the task
last_chat_len = len(webui_manager.bu_chat_history)
while not agent_task.done():
is_paused = webui_manager.bu_agent.state.paused
is_stopped = webui_manager.bu_agent.state.stopped
# Check for pause state
if is_paused:
yield {
pause_resume_button_comp: gr.update(
value="▶️ Resume", interactive=True
),
stop_button_comp: gr.update(interactive=True),
}
# Wait until pause is released or task is stopped/done
while is_paused and not agent_task.done():
# Re-check agent state in loop
is_paused = webui_manager.bu_agent.state.paused
is_stopped = webui_manager.bu_agent.state.stopped
if is_stopped: # Stop signal received while paused
break
await asyncio.sleep(0.2)
if (
agent_task.done() or is_stopped
): # If stopped or task finished while paused
break
# If resumed, yield UI update
yield {
pause_resume_button_comp: gr.update(
value="⏸️ Pause", interactive=True
),
run_button_comp: gr.update(
value="⏳ Running...", interactive=False
),
}
# Check if agent stopped itself or stop button was pressed (which sets agent.state.stopped)
if is_stopped:
logger.info("Agent has stopped (internally or via stop button).")
if not agent_task.done():
# Ensure the task coroutine finishes if agent just set flag
try:
await asyncio.wait_for(
agent_task, timeout=1.0
) # Give it a moment to exit run()
except asyncio.TimeoutError:
logger.warning(
"Agent task did not finish quickly after stop signal, cancelling."
)
agent_task.cancel()
except Exception: # Catch task exceptions if it errors on stop
pass
break # Exit the streaming loop
# Check if agent is asking for help (via response_event)
update_dict = {}
if webui_manager.bu_response_event is not None:
update_dict = {
user_input_comp: gr.update(
placeholder="Agent needs help. Enter response and submit.",
interactive=True,
),
run_button_comp: gr.update(
value="✔️ Submit Response", interactive=True
),
pause_resume_button_comp: gr.update(interactive=False),
stop_button_comp: gr.update(interactive=False),
chatbot_comp: gr.update(value=webui_manager.bu_chat_history),
}
last_chat_len = len(webui_manager.bu_chat_history)
yield update_dict
# Wait until response is submitted or task finishes
while (
webui_manager.bu_response_event is not None
and not agent_task.done()
):
await asyncio.sleep(0.2)
# Restore UI after response submitted or if task ended unexpectedly
if not agent_task.done():
yield {
user_input_comp: gr.update(
placeholder="Agent is running...", interactive=False
),
run_button_comp: gr.update(
value="⏳ Running...", interactive=False
),
pause_resume_button_comp: gr.update(interactive=True),
stop_button_comp: gr.update(interactive=True),
}
else:
break # Task finished while waiting for response
# Update Chatbot if new messages arrived via callbacks
if len(webui_manager.bu_chat_history) > last_chat_len:
update_dict[chatbot_comp] = gr.update(
value=webui_manager.bu_chat_history
)
last_chat_len = len(webui_manager.bu_chat_history)
# Update Browser View
if headless and webui_manager.bu_browser_context:
try:
screenshot_b64 = (
await webui_manager.bu_browser_context.take_screenshot()
)
if screenshot_b64:
html_content = f'<img src="data:image/jpeg;base64,{screenshot_b64}" style="width:{stream_vw}vw; height:{stream_vh}vh ; border:1px solid #ccc;">'
update_dict[browser_view_comp] = gr.update(
value=html_content, visible=True
)
else:
html_content = f"<h1 style='width:{stream_vw}vw; height:{stream_vh}vh'>Waiting for browser session...</h1>"
update_dict[browser_view_comp] = gr.update(
value=html_content, visible=True
)
except Exception as e:
logger.debug(f"Failed to capture screenshot: {e}")
update_dict[browser_view_comp] = gr.update(
value="<div style='...'>Error loading view...</div>",
visible=True,
)
else:
update_dict[browser_view_comp] = gr.update(visible=False)
# Yield accumulated updates
if update_dict:
yield update_dict
await asyncio.sleep(0.1) # Polling interval
# --- 7. Task Finalization ---
webui_manager.bu_agent.state.paused = False
webui_manager.bu_agent.state.stopped = False
final_update = {}
try:
logger.info("Agent task completing...")
# Await the task ensure completion and catch exceptions if not already caught
if not agent_task.done():
await agent_task # Retrieve result/exception
elif agent_task.exception(): # Check if task finished with exception
agent_task.result() # Raise the exception to be caught below
logger.info("Agent task completed processing.")
logger.info(f"Explicitly saving agent history to: {history_file}")
webui_manager.bu_agent.save_history(history_file)
if os.path.exists(history_file):
final_update[history_file_comp] = gr.File(value=history_file)
if gif_path and os.path.exists(gif_path):
logger.info(f"GIF found at: {gif_path}")
final_update[gif_comp] = gr.Image(value=gif_path)
# Update task metrics display if metrics are available
task_metrics_display_comp = webui_manager.get_component_by_id("browser_use_agent.task_metrics_display")
if hasattr(webui_manager, 'bu_task_metrics') and webui_manager.bu_task_metrics:
# For Markdown display, we need to format the metrics
metrics_md = format_task_metrics(webui_manager.bu_task_metrics)
final_update[task_metrics_display_comp] = gr.update(value=metrics_md)
except asyncio.CancelledError:
logger.info("Agent task was cancelled.")
if not any(
msg.get("role") == "assistant" and
msg.get("content", "") and
"Cancelled" in msg.get("content", "")
for msg in webui_manager.bu_chat_history
):
webui_manager.bu_chat_history.append(
{"role": "assistant", "content": "**Task Cancelled**."}
)
final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history)
except Exception as e:
logger.error(f"Error during agent execution: {e}", exc_info=True)
error_message = (
f"**Agent Execution Error:**\n```\n{type(e).__name__}: {e}\n```"
)
if not any(
msg.get("role") == "assistant" and
msg.get("content", "") and
error_message in msg.get("content", "")
for msg in webui_manager.bu_chat_history
):
webui_manager.bu_chat_history.append(
{"role": "assistant", "content": error_message}
)
final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history)
gr.Error(f"Agent execution failed: {e}")
finally:
webui_manager.bu_current_task = None # Clear the task reference
# Close browser/context if requested
if should_close_browser_on_finish:
if webui_manager.bu_browser_context:
logger.info("Closing browser context after task.")
await webui_manager.bu_browser_context.close()
webui_manager.bu_browser_context = None
if webui_manager.bu_browser:
logger.info("Closing browser after task.")
await webui_manager.bu_browser.close()
webui_manager.bu_browser = None
# --- 8. Final UI Update ---
final_update.update(
{
user_input_comp: gr.update(
value="",
interactive=True,
placeholder="Enter your next task...",
),
run_button_comp: gr.update(value="▶️ Submit Task", interactive=True),
stop_button_comp: gr.update(value="⏹️ Stop", interactive=False),
pause_resume_button_comp: gr.update(
value="⏸️ Pause", interactive=False
),
clear_button_comp: gr.update(interactive=True),
# Ensure final chat history is shown
chatbot_comp: gr.update(value=webui_manager.bu_chat_history),
}
)
yield final_update
except Exception as e:
# Catch errors during setup (before agent run starts)
logger.error(f"Error setting up agent task: {e}", exc_info=True)
webui_manager.bu_current_task = None # Ensure state is reset
yield {
user_input_comp: gr.update(
interactive=True, placeholder="Error during setup. Enter task..."
),
run_button_comp: gr.update(value="▶️ Submit Task", interactive=True),
stop_button_comp: gr.update(value="⏹️ Stop", interactive=False),
pause_resume_button_comp: gr.update(value="⏸️ Pause", interactive=False),
clear_button_comp: gr.update(interactive=True),
chatbot_comp: gr.update(
value=webui_manager.bu_chat_history
+ [{"role": "assistant", "content": f"**Setup Error:** {e}"}]
),
}
# --- Button Click Handlers --- (Need access to webui_manager)
async def handle_submit(
webui_manager: WebuiManager, components: Dict[gr.components.Component, Any]
):
"""Handles clicks on the main 'Submit' button."""
user_input_comp = webui_manager.get_component_by_id("browser_use_agent.user_input")
run_button_comp = webui_manager.get_component_by_id("browser_use_agent.run_button")
user_input_value = components.get(user_input_comp, "").strip()
# Check if waiting for user assistance
if webui_manager.bu_response_event and not webui_manager.bu_response_event.is_set():
logger.info(f"User submitted assistance: {user_input_value}")
webui_manager.bu_user_help_response = (
user_input_value if user_input_value else "User provided no text response."
)
webui_manager.bu_response_event.set()
# UI updates handled by the main loop reacting to the event being set
yield {
user_input_comp: gr.update(
value="",
interactive=False,
placeholder="Waiting for agent to continue...",
),
run_button_comp: gr.update(value="⏳ Running...", interactive=False),
}
# Check if a task is currently running (using _current_task)
elif webui_manager.bu_current_task and not webui_manager.bu_current_task.done():
logger.warning(
"Submit button clicked while agent is already running and not asking for help."
)
gr.Info("Agent is currently running. Please wait or use Stop/Pause.")
yield {} # No change
else:
# Handle submission for a new task
logger.info("Submit button clicked for new task.")
# Use async generator to stream updates from run_agent_task
async for update in run_agent_task(webui_manager, components):
yield update
async def handle_stop(webui_manager: WebuiManager):
"""Handles clicks on the 'Stop' button."""
logger.info("Stop button clicked.")
agent = webui_manager.bu_agent
task = webui_manager.bu_current_task
if agent and task and not task.done():
# Signal the agent to stop by setting its internal flag
agent.state.stopped = True
agent.state.paused = False # Ensure not paused if stopped
return {
webui_manager.get_component_by_id(
"browser_use_agent.stop_button"
): gr.update(interactive=False, value="⏹️ Stopping..."),
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(interactive=False),
webui_manager.get_component_by_id(
"browser_use_agent.run_button"
): gr.update(interactive=False),
}
else:
logger.warning("Stop clicked but agent is not running or task is already done.")
# Reset UI just in case it's stuck
return {
webui_manager.get_component_by_id(
"browser_use_agent.run_button"
): gr.update(interactive=True),
webui_manager.get_component_by_id(
"browser_use_agent.stop_button"
): gr.update(interactive=False),
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(interactive=False),
webui_manager.get_component_by_id(
"browser_use_agent.clear_button"
): gr.update(interactive=True),
}
async def handle_pause_resume(webui_manager: WebuiManager):
"""Handles clicks on the 'Pause/Resume' button."""
agent = webui_manager.bu_agent
task = webui_manager.bu_current_task
if agent and task and not task.done():
if agent.state.paused:
logger.info("Resume button clicked.")
agent.resume()
# UI update happens in main loop
return {
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(value="⏸️ Pause", interactive=True)
} # Optimistic update
else:
logger.info("Pause button clicked.")
agent.pause()
return {
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(value="▶️ Resume", interactive=True)
} # Optimistic update
else:
logger.warning(
"Pause/Resume clicked but agent is not running or doesn't support state."
)
return {} # No change
async def handle_clear(webui_manager: WebuiManager):
"""Handles clicks on the 'Clear' button."""
logger.info("Clear button clicked.")
# Stop any running task first
task = webui_manager.bu_current_task
if task and not task.done():
logger.info("Clearing requires stopping the current task.")
# Safely try to stop the agent if it exists
if webui_manager.bu_agent:
try:
if hasattr(webui_manager.bu_agent, 'stop'):
webui_manager.bu_agent.stop()
else:
# Alternative method if stop isn't available
webui_manager.bu_agent.state.stopped = True
webui_manager.bu_agent.state.paused = False
except Exception as e:
logger.warning(f"Error stopping agent: {e}")
task.cancel()
try:
await asyncio.wait_for(task, timeout=2.0) # Wait briefly
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
except Exception as e:
logger.warning(f"Error stopping task on clear: {e}")
webui_manager.bu_current_task = None
if webui_manager.bu_controller:
await webui_manager.bu_controller.close_mcp_client()
webui_manager.bu_controller = None
webui_manager.bu_agent = None
# Reset state stored in manager
webui_manager.bu_chat_history = []
webui_manager.bu_response_event = None
webui_manager.bu_user_help_response = None
webui_manager.bu_agent_task_id = None
webui_manager.bu_task_metrics = None # Clear task metrics
logger.info("Agent state and browser resources cleared.")
# Reset UI components
return {
webui_manager.get_component_by_id("browser_use_agent.chatbot"): gr.update(
value=[]
),
webui_manager.get_component_by_id("browser_use_agent.user_input"): gr.update(
value="", placeholder="Enter your task here..."
),
webui_manager.get_component_by_id(
"browser_use_agent.agent_history_file"
): gr.update(value=None),
webui_manager.get_component_by_id("browser_use_agent.recording_gif"): gr.update(
value=None
),
webui_manager.get_component_by_id("browser_use_agent.browser_view"): gr.update(
value="<div style='...'>Browser Cleared</div>"
),
webui_manager.get_component_by_id("browser_use_agent.run_button"): gr.update(
value="▶️ Submit Task", interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.stop_button"): gr.update(
interactive=False
),
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(value="⏸️ Pause", interactive=False),
webui_manager.get_component_by_id("browser_use_agent.clear_button"): gr.update(
interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.task_metrics_display"): gr.update(
value="No task metrics available yet. Run a task to see metrics here."
),
}
# --- Tab Creation Function ---
def create_browser_use_agent_tab(webui_manager: WebuiManager):
"""
Create the run agent tab, defining UI, state, and handlers.
"""
webui_manager.init_browser_use_agent()
# Initialize task metrics if not already present
if not hasattr(webui_manager, 'bu_task_metrics'):
webui_manager.bu_task_metrics = None
# --- Define UI Components ---
tab_components = {}
with gr.Column():
chatbot = gr.Chatbot(
lambda: webui_manager.bu_chat_history, # Load history dynamically
elem_id="browser_use_chatbot",
label="Agent Interaction",
type="messages",
height=600,
show_copy_button=True,
)
user_input = gr.Textbox(
label="Your Task or Response",
placeholder="Enter your task here or provide assistance when asked.",
lines=3,
interactive=True,
elem_id="user_input",
)
with gr.Row():
stop_button = gr.Button(
"⏹️ Stop", interactive=False, variant="stop", scale=2
)
pause_resume_button = gr.Button(
"⏸️ Pause", interactive=False, variant="secondary", scale=2, visible=True
)
clear_button = gr.Button(
"🗑️ Clear", interactive=True, variant="secondary", scale=2
)
run_button = gr.Button("▶️ Submit Task", variant="primary", scale=3)
browser_view = gr.HTML(
value="<div style='width:100%; height:50vh; display:flex; justify-content:center; align-items:center; border:1px solid #ccc; background-color:#f0f0f0;'><p>Browser View (Requires Headless=True)</p></div>",
label="Browser Live View",
elem_id="browser_view",
visible=False,
)
# Task Metrics Section
with gr.Column(visible=True) as task_metrics_container:
gr.Markdown("### Task Metrics", elem_id="task_metrics_heading")
task_metrics_display = gr.Markdown(
value=lambda: format_task_metrics(webui_manager.bu_task_metrics),
elem_id="task_metrics_display",
)
with gr.Column():
gr.Markdown("### Task Outputs")
agent_history_file = gr.File(label="Agent History JSON", interactive=False)
recording_gif = gr.Image(
label="Task Recording GIF",
format="gif",
interactive=False,
type="filepath",
)
# --- Store Components in Manager ---
tab_components.update(
dict(
chatbot=chatbot,
user_input=user_input,
clear_button=clear_button,
run_button=run_button,
stop_button=stop_button,
pause_resume_button=pause_resume_button,
agent_history_file=agent_history_file,
recording_gif=recording_gif,
browser_view=browser_view,
task_metrics_display=task_metrics_display,
)
)
webui_manager.add_components(
"browser_use_agent", tab_components
) # Use "browser_use_agent" as tab_name prefix
all_managed_components = set(
webui_manager.get_components()
) # Get all components known to manager
run_tab_outputs = list(tab_components.values())
async def submit_wrapper(
*args,
) -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_submit that yields its results.
This receives all component values as separate arguments.
"""
# Convert the individual args into a components dictionary
components_dict = {}
if args and len(args) == len(all_managed_components):
# Combine the args and components into a dictionary
components_dict = dict(zip(all_managed_components, args))
async for update in handle_submit(webui_manager, components_dict):
yield update
async def stop_wrapper(*args) -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_stop."""
update_dict = await handle_stop(webui_manager)
if update_dict: # Only yield if there's an update to return
yield update_dict
else:
yield {} # Yield empty dict to avoid errors
async def pause_resume_wrapper(*args) -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_pause_resume."""
update_dict = await handle_pause_resume(webui_manager)
if update_dict: # Only yield if there's an update to return
yield update_dict
else:
yield {} # Yield empty dict to avoid errors
async def clear_wrapper(*args) -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_clear."""
update_dict = await handle_clear(webui_manager)
if update_dict: # Only yield if there's an update to return
yield update_dict
else:
yield {} # Yield empty dict to avoid errors
# --- Connect Event Handlers using the Wrappers --
run_button.click(
fn=submit_wrapper, inputs=list(all_managed_components), outputs=run_tab_outputs
)
user_input.submit(
fn=submit_wrapper, inputs=list(all_managed_components), outputs=run_tab_outputs
)
stop_button.click(fn=stop_wrapper, inputs=None, outputs=run_tab_outputs)
pause_resume_button.click(
fn=pause_resume_wrapper, inputs=None, outputs=run_tab_outputs
)
clear_button.click(fn=clear_wrapper, inputs=None, outputs=run_tab_outputs)