WebCrawler / src /webui /components /browser_use_agent_tab.py.bak
Carlos Gonzalez
Add application file
b1f90a5
raw
history blame
56.6 kB
import asyncio
import json
import logging
import os
import uuid
from typing import Any, AsyncGenerator, Dict, Optional
import gradio as gr
# from browser_use.agent.service import Agent
from browser_use.agent.views import (
AgentHistoryList,
AgentOutput,
)
from browser_use.browser.browser import BrowserConfig
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from browser_use.browser.views import BrowserState
from gradio.components import Component
from langchain_core.language_models.chat_models import BaseChatModel
from src.agent.browser_use.browser_use_agent import BrowserUseAgent
from src.browser.custom_browser import CustomBrowser
from src.controller.custom_controller import CustomController
from src.utils import llm_provider
from src.webui.webui_manager import WebuiManager
logger = logging.getLogger(__name__)
# Custom function to format task metrics as markdown
def format_task_metrics(metrics):
if not metrics:
return "No task metrics available yet. Run a task to see metrics here."
md = "#### Task Execution Summary\n\n"
md += f"⏱️ **Duration:** {metrics['duration']} seconds\n\n"
md += f"🔢 **Tokens Used:** {metrics['tokens']}\n\n"
if metrics['result']:
md += f"📋 **Final Result:**\n\n```\n{metrics['result']}\n```\n\n"
md += f"✅ **Status:** {metrics['status']}\n\n"
if metrics['errors'] and any(metrics['errors']):
md += f"❌ **Errors:**\n\n```\n{metrics['errors']}\n```\n\n"
# Display screenshot if available
if metrics.get('screenshot'):
md += f"📸 **Final Screenshot:**\n\n"
md += f"<img src=\"data:image/jpeg;base64,{metrics['screenshot']}\" alt=\"Final Screenshot\" style=\"max-width:100%; border:1px solid #ccc;\" />\n\n"
return md
# Add this new function after the format_task_metrics function
def create_business_task(business_name, business_type, business_website=None, additional_info=None):
"""Creates a standardized task description for analyzing a business."""
task = f"Analyze the business '{business_name}' which is in the {business_type} industry."
if business_website:
task += f" Start by visiting their website at {business_website}."
else:
task += f" Search for information about this business online."
task += " Gather the following information: main products/services, contact information, location, hours of operation, and customer reviews."
if additional_info:
task += f" Additional context: {additional_info}"
task += " Provide a comprehensive report with all findings."
return task
# --- Helper Functions --- (Defined at module level)
async def _initialize_llm(
provider: Optional[str],
model_name: Optional[str],
temperature: float,
base_url: Optional[str],
api_key: Optional[str],
num_ctx: Optional[int] = None,
) -> Optional[BaseChatModel]:
"""Initializes the LLM based on settings. Returns None if provider/model is missing."""
if not provider or not model_name:
logger.info("LLM Provider or Model Name not specified, LLM will be None.")
return None
try:
# Use your actual LLM provider logic here
logger.info(
f"Initializing LLM: Provider={provider}, Model={model_name}, Temp={temperature}"
)
# Example using a placeholder function
llm = llm_provider.get_llm_model(
provider=provider,
model_name=model_name,
temperature=temperature,
base_url=base_url or None,
api_key=api_key or None,
# Add other relevant params like num_ctx for ollama
num_ctx=num_ctx if provider == "ollama" else None,
)
return llm
except Exception as e:
logger.error(f"Failed to initialize LLM: {e}", exc_info=True)
gr.Warning(
f"Failed to initialize LLM '{model_name}' for provider '{provider}'. Please check settings. Error: {e}"
)
return None
def _get_config_value(
webui_manager: WebuiManager,
comp_dict: Dict[gr.components.Component, Any],
comp_id_suffix: str,
default: Any = None,
) -> Any:
"""Safely get value from component dictionary using its ID suffix relative to the tab."""
# Assumes component ID format is "tab_name.comp_name"
tab_name = "browser_use_agent" # Hardcode or derive if needed
comp_id = f"{tab_name}.{comp_id_suffix}"
# Need to find the component object first using the ID from the manager
try:
comp = webui_manager.get_component_by_id(comp_id)
return comp_dict.get(comp, default)
except KeyError:
# Try accessing settings tabs as well
for prefix in ["agent_settings", "browser_settings"]:
try:
comp_id = f"{prefix}.{comp_id_suffix}"
comp = webui_manager.get_component_by_id(comp_id)
return comp_dict.get(comp, default)
except KeyError:
continue
logger.warning(
f"Component with suffix '{comp_id_suffix}' not found in manager for value lookup."
)
return default
def _format_agent_output(model_output: AgentOutput) -> str:
"""Formats AgentOutput for display in the chatbot using JSON."""
content = ""
if model_output:
try:
# Directly use model_dump if actions and current_state are Pydantic models
action_dump = [
action.model_dump(exclude_none=True) for action in model_output.action
]
state_dump = model_output.current_state.model_dump(exclude_none=True)
model_output_dump = {
"current_state": state_dump,
"action": action_dump,
}
# Dump to JSON string with indentation
json_string = json.dumps(model_output_dump, indent=4, ensure_ascii=False)
# Wrap in <pre><code> for proper display in HTML
content = f"<pre><code class='language-json'>{json_string}</code></pre>"
except AttributeError as ae:
logger.error(
f"AttributeError during model dump: {ae}. Check if 'action' or 'current_state' or their items support 'model_dump'."
)
content = f"<pre><code>Error: Could not format agent output (AttributeError: {ae}).\nRaw output: {str(model_output)}</code></pre>"
except Exception as e:
logger.error(f"Error formatting agent output: {e}", exc_info=True)
# Fallback to simple string representation on error
content = f"<pre><code>Error formatting agent output.\nRaw output:\n{str(model_output)}</code></pre>"
return content.strip()
# --- Updated Callback Implementation ---
async def _handle_new_step(
webui_manager: WebuiManager, state: BrowserState, output: AgentOutput, step_num: int
):
"""Callback for each step taken by the agent, including screenshot display."""
# Use the correct chat history attribute name from the user's code
if not hasattr(webui_manager, "bu_chat_history"):
logger.error(
"Attribute 'bu_chat_history' not found in webui_manager! Cannot add chat message."
)
# Initialize it maybe? Or raise an error? For now, log and potentially skip chat update.
webui_manager.bu_chat_history = [] # Initialize if missing (consider if this is the right place)
# return # Or stop if this is critical
step_num -= 1
logger.info(f"Step {step_num} completed.")
# --- Screenshot Handling ---
screenshot_html = ""
# Ensure state.screenshot exists and is not empty before proceeding
# Use getattr for safer access
screenshot_data = getattr(state, "screenshot", None)
if screenshot_data:
try:
# Basic validation: check if it looks like base64
if (
isinstance(screenshot_data, str) and len(screenshot_data) > 100
): # Arbitrary length check
# *** UPDATED STYLE: Removed centering, adjusted width ***
img_tag = f'<img src="data:image/jpeg;base64,{screenshot_data}" alt="Step {step_num} Screenshot" style="max-width: 800px; max-height: 600px; object-fit:contain;" />'
screenshot_html = (
img_tag + "<br/>"
) # Use <br/> for line break after inline-block image
else:
logger.warning(
f"Screenshot for step {step_num} seems invalid (type: {type(screenshot_data)}, len: {len(screenshot_data) if isinstance(screenshot_data, str) else 'N/A'})."
)
screenshot_html = "**[Invalid screenshot data]**<br/>"
except Exception as e:
logger.error(
f"Error processing or formatting screenshot for step {step_num}: {e}",
exc_info=True,
)
screenshot_html = "**[Error displaying screenshot]**<br/>"
else:
logger.debug(f"No screenshot available for step {step_num}.")
# --- Format Agent Output ---
formatted_output = _format_agent_output(output) # Use the updated function
# --- Combine and Append to Chat ---
step_header = f"--- **Step {step_num}** ---"
# Combine header, image (with line break), and JSON block
final_content = step_header + "<br/>" + screenshot_html + formatted_output
chat_message = {
"role": "assistant",
"content": final_content.strip(), # Remove leading/trailing whitespace
}
# Append to the correct chat history list
webui_manager.bu_chat_history.append(chat_message)
await asyncio.sleep(0.05)
def _handle_done(webui_manager: WebuiManager, history: AgentHistoryList):
"""Callback when the agent finishes the task (success or failure)."""
logger.info(
f"Agent task finished. Duration: {history.total_duration_seconds():.2f}s, Tokens: {history.total_input_tokens()}"
)
final_summary = "**Task Completed**\n"
final_summary += f"- Duration: {history.total_duration_seconds():.2f} seconds\n"
final_summary += f"- Total Input Tokens: {history.total_input_tokens()}\n" # Or total tokens if available
final_result = history.final_result()
if final_result:
final_summary += f"- Final Result: {final_result}\n"
errors = history.errors()
if errors and any(errors):
final_summary += f"- **Errors:**\n```\n{errors}\n```\n"
else:
final_summary += "- Status: Success\n"
# Get the last screenshot if available
screenshots = history.screenshots()
final_screenshot = screenshots[-1] if screenshots and len(screenshots) > 0 else None
# Store task metrics separately for the metrics display
webui_manager.bu_task_metrics = {
"duration": f"{history.total_duration_seconds():.2f}",
"tokens": f"{history.total_input_tokens()}",
"result": final_result if final_result else "",
"status": "Error" if (errors and any(errors)) else "Success",
"errors": errors if (errors and any(errors)) else None,
"screenshot": final_screenshot # Add the final screenshot to the metrics
}
webui_manager.bu_chat_history.append(
{"role": "assistant", "content": final_summary}
)
async def _ask_assistant_callback(
webui_manager: WebuiManager, query: str, browser_context: BrowserContext
) -> Dict[str, Any]:
"""Callback triggered by the agent's ask_for_assistant action."""
logger.info("Agent requires assistance. Waiting for user input.")
if not hasattr(webui_manager, "_chat_history"):
logger.error("Chat history not found in webui_manager during ask_assistant!")
return {"response": "Internal Error: Cannot display help request."}
webui_manager.bu_chat_history.append(
{
"role": "assistant",
"content": f"**Need Help:** {query}\nPlease provide information or perform the required action in the browser, then type your response/confirmation below and click 'Submit Response'.",
}
)
# Use state stored in webui_manager
webui_manager.bu_response_event = asyncio.Event()
webui_manager.bu_user_help_response = None # Reset previous response
try:
logger.info("Waiting for user response event...")
await asyncio.wait_for(
webui_manager.bu_response_event.wait(), timeout=3600.0
) # Long timeout
logger.info("User response event received.")
except asyncio.TimeoutError:
logger.warning("Timeout waiting for user assistance.")
webui_manager.bu_chat_history.append(
{
"role": "assistant",
"content": "**Timeout:** No response received. Trying to proceed.",
}
)
webui_manager.bu_response_event = None # Clear the event
return {"response": "Timeout: User did not respond."} # Inform the agent
response = webui_manager.bu_user_help_response
webui_manager.bu_chat_history.append(
{"role": "user", "content": response}
) # Show user response in chat
webui_manager.bu_response_event = (
None # Clear the event for the next potential request
)
return {"response": response}
# --- Core Agent Execution Logic --- (Needs access to webui_manager)
async def run_agent_task(
webui_manager: WebuiManager, components: Dict[gr.components.Component, Any]
) -> AsyncGenerator[Dict[gr.components.Component, Any], None]:
"""Handles the entire lifecycle of initializing and running the agent."""
# --- Get Components ---
# Need handles to specific UI components to update them
business_name_comp = webui_manager.get_component_by_id("browser_use_agent.business_name")
business_website_comp = webui_manager.get_component_by_id("browser_use_agent.business_website")
business_type_comp = webui_manager.get_component_by_id("browser_use_agent.business_type")
additional_info_comp = webui_manager.get_component_by_id("browser_use_agent.additional_info")
run_button_comp = webui_manager.get_component_by_id("browser_use_agent.run_button")
stop_button_comp = webui_manager.get_component_by_id(
"browser_use_agent.stop_button"
)
pause_resume_button_comp = webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
)
clear_button_comp = webui_manager.get_component_by_id(
"browser_use_agent.clear_button"
)
chatbot_comp = webui_manager.get_component_by_id("browser_use_agent.chatbot")
history_file_comp = webui_manager.get_component_by_id(
"browser_use_agent.agent_history_file"
)
gif_comp = webui_manager.get_component_by_id("browser_use_agent.recording_gif")
browser_view_comp = webui_manager.get_component_by_id(
"browser_use_agent.browser_view"
)
# --- 1. Get Task and Initial UI Update ---
task = components.get(business_name_comp, "").strip()
if not task:
gr.Warning("Please enter a business name or task.")
yield {run_button_comp: gr.update(interactive=True)}
return
# Set running state indirectly via _current_task
if "Analyze the business" not in task:
# If task isn't already formatted, create one from the business info
business_name = task # The business name was stored in the "task" variable
business_website = components.get(business_website_comp, "").strip()
business_type = components.get(business_type_comp, "Retail")
additional_info = components.get(additional_info_comp, "").strip()
task = create_business_task(
business_name,
business_type,
business_website,
additional_info
)
# We should already have added the task to chat history in handle_submit
if not any(msg.get("content") == task for msg in webui_manager.bu_chat_history if msg.get("role") == "user"):
webui_manager.bu_chat_history.append({"role": "user", "content": task})
yield {
business_name_comp: gr.Textbox(
value=components.get(business_name_comp, ""), interactive=False
),
business_website_comp: gr.Textbox(
value=components.get(business_website_comp, ""), interactive=False
),
business_type_comp: gr.update(interactive=False),
additional_info_comp: gr.Textbox(
value=components.get(additional_info_comp, ""), interactive=False
),
run_button_comp: gr.Button(value="⏳ Running...", interactive=False),
stop_button_comp: gr.Button(interactive=True),
pause_resume_button_comp: gr.Button(value="⏸️ Pause", interactive=True),
clear_button_comp: gr.Button(interactive=False),
chatbot_comp: gr.update(value=webui_manager.bu_chat_history),
history_file_comp: gr.update(value=None),
gif_comp: gr.update(value=None),
}
# --- Agent Settings ---
# Access settings values via components dict, getting IDs from webui_manager
def get_setting(key, default=None):
comp = webui_manager.id_to_component.get(f"agent_settings.{key}")
return components.get(comp, default) if comp else default
override_system_prompt = get_setting("override_system_prompt") or None
extend_system_prompt = get_setting("extend_system_prompt") or None
llm_provider_name = get_setting(
"llm_provider", None
) # Default to None if not found
llm_model_name = get_setting("llm_model_name", None)
llm_temperature = get_setting("llm_temperature", 0.6)
use_vision = get_setting("use_vision", True)
ollama_num_ctx = get_setting("ollama_num_ctx", 16000)
llm_base_url = get_setting("llm_base_url") or None
llm_api_key = get_setting("llm_api_key") or None
max_steps = get_setting("max_steps", 100)
max_actions = get_setting("max_actions", 10)
max_input_tokens = get_setting("max_input_tokens", 128000)
tool_calling_str = get_setting("tool_calling_method", "auto")
tool_calling_method = tool_calling_str if tool_calling_str != "None" else None
mcp_server_config_comp = webui_manager.id_to_component.get(
"agent_settings.mcp_server_config"
)
mcp_server_config_str = (
components.get(mcp_server_config_comp) if mcp_server_config_comp else None
)
mcp_server_config = (
json.loads(mcp_server_config_str) if mcp_server_config_str else None
)
# Planner LLM Settings (Optional)
planner_llm_provider_name = get_setting("planner_llm_provider") or None
planner_llm = None
planner_use_vision = False
if planner_llm_provider_name:
planner_llm_model_name = get_setting("planner_llm_model_name")
planner_llm_temperature = get_setting("planner_llm_temperature", 0.6)
planner_ollama_num_ctx = get_setting("planner_ollama_num_ctx", 16000)
planner_llm_base_url = get_setting("planner_llm_base_url") or None
planner_llm_api_key = get_setting("planner_llm_api_key") or None
planner_use_vision = get_setting("planner_use_vision", False)
planner_llm = await _initialize_llm(
planner_llm_provider_name,
planner_llm_model_name,
planner_llm_temperature,
planner_llm_base_url,
planner_llm_api_key,
planner_ollama_num_ctx if planner_llm_provider_name == "ollama" else None,
)
# --- Browser Settings ---
def get_browser_setting(key, default=None):
comp = webui_manager.id_to_component.get(f"browser_settings.{key}")
return components.get(comp, default) if comp else default
browser_binary_path = get_browser_setting("browser_binary_path") or None
browser_user_data_dir = get_browser_setting("browser_user_data_dir") or None
use_own_browser = get_browser_setting(
"use_own_browser", False
) # Logic handled by CDP/WSS presence
keep_browser_open = get_browser_setting("keep_browser_open", False)
headless = get_browser_setting("headless", False)
disable_security = get_browser_setting("disable_security", False)
window_w = int(get_browser_setting("window_w", 1280))
window_h = int(get_browser_setting("window_h", 1100))
cdp_url = get_browser_setting("cdp_url") or None
wss_url = get_browser_setting("wss_url") or None
save_recording_path = get_browser_setting("save_recording_path") or None
save_trace_path = get_browser_setting("save_trace_path") or None
save_agent_history_path = get_browser_setting(
"save_agent_history_path", "./tmp/agent_history"
)
save_download_path = get_browser_setting("save_download_path", "./tmp/downloads")
stream_vw = 70
stream_vh = int(70 * window_h // window_w)
os.makedirs(save_agent_history_path, exist_ok=True)
if save_recording_path:
os.makedirs(save_recording_path, exist_ok=True)
if save_trace_path:
os.makedirs(save_trace_path, exist_ok=True)
if save_download_path:
os.makedirs(save_download_path, exist_ok=True)
# --- 2. Initialize LLM ---
main_llm = await _initialize_llm(
llm_provider_name,
llm_model_name,
llm_temperature,
llm_base_url,
llm_api_key,
ollama_num_ctx if llm_provider_name == "ollama" else None,
)
# Pass the webui_manager instance to the callback when wrapping it
async def ask_callback_wrapper(
query: str, browser_context: BrowserContext
) -> Dict[str, Any]:
return await _ask_assistant_callback(webui_manager, query, browser_context)
if not webui_manager.bu_controller:
webui_manager.bu_controller = CustomController(
ask_assistant_callback=ask_callback_wrapper
)
await webui_manager.bu_controller.setup_mcp_client(mcp_server_config)
# --- 4. Initialize Browser and Context ---
should_close_browser_on_finish = not keep_browser_open
try:
# Close existing resources if not keeping open
if not keep_browser_open:
if webui_manager.bu_browser_context:
logger.info("Closing previous browser context.")
await webui_manager.bu_browser_context.close()
webui_manager.bu_browser_context = None
if webui_manager.bu_browser:
logger.info("Closing previous browser.")
await webui_manager.bu_browser.close()
webui_manager.bu_browser = None
# Create Browser if needed
if not webui_manager.bu_browser:
logger.info("Launching new browser instance.")
extra_args = []
if use_own_browser:
browser_binary_path = os.getenv("BROWSER_PATH", None) or browser_binary_path
if browser_binary_path == "":
browser_binary_path = None
browser_user_data = browser_user_data_dir or os.getenv("BROWSER_USER_DATA", None)
if browser_user_data:
extra_args += [f"--user-data-dir={browser_user_data}"]
else:
browser_binary_path = None
webui_manager.bu_browser = CustomBrowser(
config=BrowserConfig(
headless=headless,
disable_security=disable_security,
browser_binary_path=browser_binary_path,
extra_browser_args=extra_args,
wss_url=wss_url,
cdp_url=cdp_url,
new_context_config=BrowserContextConfig(
window_width=window_w,
window_height=window_h,
)
)
)
# Create Context if needed
if not webui_manager.bu_browser_context:
logger.info("Creating new browser context.")
context_config = BrowserContextConfig(
trace_path=save_trace_path if save_trace_path else None,
save_recording_path=save_recording_path
if save_recording_path
else None,
save_downloads_path=save_download_path if save_download_path else None,
window_height=window_h,
window_width=window_w,
)
if not webui_manager.bu_browser:
raise ValueError("Browser not initialized, cannot create context.")
webui_manager.bu_browser_context = (
await webui_manager.bu_browser.new_context(config=context_config)
)
# --- 5. Initialize or Update Agent ---
webui_manager.bu_agent_task_id = str(uuid.uuid4()) # New ID for this task run
os.makedirs(
os.path.join(save_agent_history_path, webui_manager.bu_agent_task_id),
exist_ok=True,
)
history_file = os.path.join(
save_agent_history_path,
webui_manager.bu_agent_task_id,
f"{webui_manager.bu_agent_task_id}.json",
)
gif_path = os.path.join(
save_agent_history_path,
webui_manager.bu_agent_task_id,
f"{webui_manager.bu_agent_task_id}.gif",
)
# Pass the webui_manager to callbacks when wrapping them
async def step_callback_wrapper(
state: BrowserState, output: AgentOutput, step_num: int
):
await _handle_new_step(webui_manager, state, output, step_num)
def done_callback_wrapper(history: AgentHistoryList):
_handle_done(webui_manager, history)
if not webui_manager.bu_agent:
logger.info(f"Initializing new agent for task: {task}")
if not webui_manager.bu_browser or not webui_manager.bu_browser_context:
raise ValueError(
"Browser or Context not initialized, cannot create agent."
)
webui_manager.bu_agent = BrowserUseAgent(
task=task,
llm=main_llm,
browser=webui_manager.bu_browser,
browser_context=webui_manager.bu_browser_context,
controller=webui_manager.bu_controller,
register_new_step_callback=step_callback_wrapper,
register_done_callback=done_callback_wrapper,
use_vision=use_vision,
override_system_message=override_system_prompt,
extend_system_message=extend_system_prompt,
max_input_tokens=max_input_tokens,
max_actions_per_step=max_actions,
tool_calling_method=tool_calling_method,
planner_llm=planner_llm,
use_vision_for_planner=planner_use_vision if planner_llm else False,
source="webui",
)
webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id
webui_manager.bu_agent.settings.generate_gif = gif_path
else:
webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id
webui_manager.bu_agent.add_new_task(task)
webui_manager.bu_agent.settings.generate_gif = gif_path
webui_manager.bu_agent.browser = webui_manager.bu_browser
webui_manager.bu_agent.browser_context = webui_manager.bu_browser_context
webui_manager.bu_agent.controller = webui_manager.bu_controller
# --- 6. Run Agent Task and Stream Updates ---
agent_run_coro = webui_manager.bu_agent.run(max_steps=max_steps)
agent_task = asyncio.create_task(agent_run_coro)
webui_manager.bu_current_task = agent_task # Store the task
last_chat_len = len(webui_manager.bu_chat_history)
while not agent_task.done():
is_paused = webui_manager.bu_agent.state.paused
is_stopped = webui_manager.bu_agent.state.stopped
# Check for pause state
if is_paused:
yield {
pause_resume_button_comp: gr.update(
value="▶️ Resume", interactive=True
),
stop_button_comp: gr.update(interactive=True),
}
# Wait until pause is released or task is stopped/done
while is_paused and not agent_task.done():
# Re-check agent state in loop
is_paused = webui_manager.bu_agent.state.paused
is_stopped = webui_manager.bu_agent.state.stopped
if is_stopped: # Stop signal received while paused
break
await asyncio.sleep(0.2)
if (
agent_task.done() or is_stopped
): # If stopped or task finished while paused
break
# If resumed, yield UI update
yield {
pause_resume_button_comp: gr.update(
value="⏸️ Pause", interactive=True
),
run_button_comp: gr.update(
value="⏳ Running...", interactive=False
),
}
# Check if agent stopped itself or stop button was pressed (which sets agent.state.stopped)
if is_stopped:
logger.info("Agent has stopped (internally or via stop button).")
if not agent_task.done():
# Ensure the task coroutine finishes if agent just set flag
try:
await asyncio.wait_for(
agent_task, timeout=1.0
) # Give it a moment to exit run()
except asyncio.TimeoutError:
logger.warning(
"Agent task did not finish quickly after stop signal, cancelling."
)
agent_task.cancel()
except Exception: # Catch task exceptions if it errors on stop
pass
break # Exit the streaming loop
# Check if agent is asking for help (via response_event)
update_dict = {}
if webui_manager.bu_response_event is not None:
update_dict = {
business_name_comp: gr.update(
placeholder="Agent needs help. Enter response and submit.",
interactive=True,
),
business_website_comp: gr.update(
placeholder="Agent needs help. Enter response and submit.",
interactive=True,
),
business_type_comp: gr.update(
placeholder="Agent needs help. Enter response and submit.",
interactive=True,
),
additional_info_comp: gr.update(
placeholder="Agent needs help. Enter response and submit.",
interactive=True,
),
run_button_comp: gr.update(
value="✔️ Submit Response", interactive=True
),
pause_resume_button_comp: gr.update(interactive=False),
stop_button_comp: gr.update(interactive=False),
chatbot_comp: gr.update(value=webui_manager.bu_chat_history),
}
last_chat_len = len(webui_manager.bu_chat_history)
yield update_dict
# Wait until response is submitted or task finishes
while (
webui_manager.bu_response_event is not None
and not agent_task.done()
):
await asyncio.sleep(0.2)
# Restore UI after response submitted or if task ended unexpectedly
if not agent_task.done():
yield {
business_name_comp: gr.update(
placeholder="Enter business name", interactive=False
),
business_website_comp: gr.update(
placeholder="Enter business website", interactive=False
),
business_type_comp: gr.update(
placeholder="Enter business type", interactive=False
),
additional_info_comp: gr.update(
placeholder="Enter additional information", interactive=False
),
run_button_comp: gr.update(
value="⏳ Running...", interactive=False
),
pause_resume_button_comp: gr.update(interactive=True),
stop_button_comp: gr.update(interactive=True),
}
else:
break # Task finished while waiting for response
# Update Chatbot if new messages arrived via callbacks
if len(webui_manager.bu_chat_history) > last_chat_len:
update_dict[chatbot_comp] = gr.update(
value=webui_manager.bu_chat_history
)
last_chat_len = len(webui_manager.bu_chat_history)
# Update Browser View
if headless and webui_manager.bu_browser_context:
try:
screenshot_b64 = (
await webui_manager.bu_browser_context.take_screenshot()
)
if screenshot_b64:
html_content = f'<img src="data:image/jpeg;base64,{screenshot_b64}" style="width:{stream_vw}vw; height:{stream_vh}vh ; border:1px solid #ccc;">'
update_dict[browser_view_comp] = gr.update(
value=html_content, visible=True
)
else:
html_content = f"<h1 style='width:{stream_vw}vw; height:{stream_vh}vh'>Waiting for browser session...</h1>"
update_dict[browser_view_comp] = gr.update(
value=html_content, visible=True
)
except Exception as e:
logger.debug(f"Failed to capture screenshot: {e}")
update_dict[browser_view_comp] = gr.update(
value="<div style='...'>Error loading view...</div>",
visible=True,
)
else:
update_dict[browser_view_comp] = gr.update(visible=False)
# Yield accumulated updates
if update_dict:
yield update_dict
await asyncio.sleep(0.1) # Polling interval
# --- 7. Task Finalization ---
webui_manager.bu_agent.state.paused = False
webui_manager.bu_agent.state.stopped = False
final_update = {}
try:
logger.info("Agent task completing...")
# Await the task ensure completion and catch exceptions if not already caught
if not agent_task.done():
await agent_task # Retrieve result/exception
elif agent_task.exception(): # Check if task finished with exception
agent_task.result() # Raise the exception to be caught below
logger.info("Agent task completed processing.")
logger.info(f"Explicitly saving agent history to: {history_file}")
webui_manager.bu_agent.save_history(history_file)
if os.path.exists(history_file):
final_update[history_file_comp] = gr.File(value=history_file)
if gif_path and os.path.exists(gif_path):
logger.info(f"GIF found at: {gif_path}")
final_update[gif_comp] = gr.Image(value=gif_path)
# Update task metrics display if metrics are available
task_metrics_display_comp = webui_manager.get_component_by_id("browser_use_agent.task_metrics_display")
if hasattr(webui_manager, 'bu_task_metrics') and webui_manager.bu_task_metrics:
# If we have metrics but no screenshot, try to get the latest screenshot
if not webui_manager.bu_task_metrics.get('screenshot') and webui_manager.bu_browser_context:
try:
final_screenshot = await webui_manager.bu_browser_context.take_screenshot()
if final_screenshot:
webui_manager.bu_task_metrics['screenshot'] = final_screenshot
except Exception as e:
logger.warning(f"Failed to capture final screenshot for metrics: {e}")
# Format the metrics for display
metrics_md = format_task_metrics(webui_manager.bu_task_metrics)
final_update[task_metrics_display_comp] = gr.update(value=metrics_md)
except asyncio.CancelledError:
logger.info("Agent task was cancelled.")
if not any(
"Cancelled" in msg.get("content", "")
for msg in webui_manager.bu_chat_history
if msg.get("role") == "assistant"
):
webui_manager.bu_chat_history.append(
{"role": "assistant", "content": "**Task Cancelled**."}
)
final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history)
except Exception as e:
logger.error(f"Error during agent execution: {e}", exc_info=True)
error_message = (
f"**Agent Execution Error:**\n```\n{type(e).__name__}: {e}\n```"
)
if not any(
error_message in msg.get("content", "")
for msg in webui_manager.bu_chat_history
if msg.get("role") == "assistant"
):
webui_manager.bu_chat_history.append(
{"role": "assistant", "content": error_message}
)
final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history)
gr.Error(f"Agent execution failed: {e}")
finally:
webui_manager.bu_current_task = None # Clear the task reference
# Close browser/context if requested
if should_close_browser_on_finish:
if webui_manager.bu_browser_context:
logger.info("Closing browser context after task.")
await webui_manager.bu_browser_context.close()
webui_manager.bu_browser_context = None
if webui_manager.bu_browser:
logger.info("Closing browser after task.")
await webui_manager.bu_browser.close()
webui_manager.bu_browser = None
# --- 8. Final UI Update ---
final_update.update(
{
business_name_comp: gr.update(
value="",
interactive=True,
placeholder="Enter business name",
),
business_website_comp: gr.update(
value="",
interactive=True,
placeholder="Enter business website",
),
business_type_comp: gr.update(interactive=True),
additional_info_comp: gr.update(
value="",
interactive=True,
placeholder="Enter additional information",
),
run_button_comp: gr.update(value="▶️ Start Analysis", interactive=True),
stop_button_comp: gr.update(value="⏹️ Stop", interactive=False),
pause_resume_button_comp: gr.update(
value="⏸️ Pause", interactive=False
),
clear_button_comp: gr.update(interactive=True),
# Ensure final chat history is shown
chatbot_comp: gr.update(value=webui_manager.bu_chat_history),
}
)
yield final_update
except Exception as e:
# Catch errors during setup (before agent run starts)
logger.error(f"Error setting up agent task: {e}", exc_info=True)
webui_manager.bu_current_task = None # Ensure state is reset
yield {
business_name_comp: gr.update(
interactive=True, placeholder="Enter business name"
),
business_website_comp: gr.update(
interactive=True, placeholder="Enter business website"
),
business_type_comp: gr.update(interactive=True),
additional_info_comp: gr.update(
interactive=True, placeholder="Enter additional information"
),
run_button_comp: gr.update(value="▶️ Start Analysis", interactive=True),
stop_button_comp: gr.update(value="⏹️ Stop", interactive=False),
pause_resume_button_comp: gr.update(value="⏸️ Pause", interactive=False),
clear_button_comp: gr.update(interactive=True),
chatbot_comp: gr.update(
value=webui_manager.bu_chat_history
+ [{"role": "assistant", "content": f"**Setup Error:** {e}"}]
),
}
# --- Button Click Handlers --- (Need access to webui_manager)
async def handle_submit(
webui_manager: WebuiManager, components: Dict[gr.components.Component, Any]
):
"""Handles clicks on the main 'Start Analysis' button."""
# Get business information from the form
business_name_comp = webui_manager.get_component_by_id("browser_use_agent.business_name")
business_website_comp = webui_manager.get_component_by_id("browser_use_agent.business_website")
business_type_comp = webui_manager.get_component_by_id("browser_use_agent.business_type")
additional_info_comp = webui_manager.get_component_by_id("browser_use_agent.additional_info")
business_name = components.get(business_name_comp, "").strip()
business_website = components.get(business_website_comp, "").strip()
business_type = components.get(business_type_comp, "Retail")
additional_info = components.get(additional_info_comp, "").strip()
if not business_name:
gr.Warning("Please enter a business name.")
yield {business_name_comp: gr.update(value=business_name)}
return
# Generate the standardized task using our template
task = create_business_task(
business_name,
business_type,
business_website,
additional_info
)
# Check if waiting for user assistance
if webui_manager.bu_response_event and not webui_manager.bu_response_event.is_set():
logger.info(f"User submitted assistance")
webui_manager.bu_user_help_response = "Continue with the current task."
webui_manager.bu_response_event.set()
# UI updates handled by the main loop reacting to the event being set
yield {
business_name_comp: gr.update(
interactive=False,
),
business_website_comp: gr.update(
interactive=False,
),
business_type_comp: gr.update(
interactive=False,
),
additional_info_comp: gr.update(
interactive=False,
),
webui_manager.get_component_by_id(
"browser_use_agent.run_button"
): gr.update(value="⏳ Running...", interactive=False),
}
# Check if a task is currently running (using _current_task)
elif webui_manager.bu_current_task and not webui_manager.bu_current_task.done():
logger.warning(
"Start button clicked while agent is already running and not asking for help."
)
gr.Info("Agent is currently running. Please wait or use Stop/Pause.")
yield {} # No change
else:
# Store the task in the user input field before running
components[business_name_comp] = task
# Handle submission for a new task
logger.info(f"Starting analysis for business: {business_name}")
# Update chat history with the business information
webui_manager.bu_chat_history.append({"role": "user", "content": task})
# Run the task using our agent
async for update in run_agent_task(webui_manager, components):
yield update
async def handle_stop(webui_manager: WebuiManager):
"""Handles clicks on the 'Stop' button."""
logger.info("Stop button clicked.")
agent = webui_manager.bu_agent
task = webui_manager.bu_current_task
if agent and task and not task.done():
# Signal the agent to stop by setting its internal flag
agent.state.stopped = True
agent.state.paused = False # Ensure not paused if stopped
return {
webui_manager.get_component_by_id(
"browser_use_agent.stop_button"
): gr.update(interactive=False, value="⏹️ Stopping..."),
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(interactive=False),
webui_manager.get_component_by_id(
"browser_use_agent.run_button"
): gr.update(interactive=False),
}
else:
logger.warning("Stop clicked but agent is not running or task is already done.")
# Reset UI just in case it's stuck
return {
webui_manager.get_component_by_id(
"browser_use_agent.run_button"
): gr.update(interactive=True),
webui_manager.get_component_by_id(
"browser_use_agent.stop_button"
): gr.update(interactive=False),
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(interactive=False),
webui_manager.get_component_by_id(
"browser_use_agent.clear_button"
): gr.update(interactive=True),
}
async def handle_pause_resume(webui_manager: WebuiManager):
"""Handles clicks on the 'Pause/Resume' button."""
agent = webui_manager.bu_agent
task = webui_manager.bu_current_task
if agent and task and not task.done():
if agent.state.paused:
logger.info("Resume button clicked.")
agent.resume()
# UI update happens in main loop
return {
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(value="⏸️ Pause", interactive=True)
} # Optimistic update
else:
logger.info("Pause button clicked.")
agent.pause()
return {
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(value="▶️ Resume", interactive=True)
} # Optimistic update
else:
logger.warning(
"Pause/Resume clicked but agent is not running or doesn't support state."
)
return {} # No change
async def handle_clear(webui_manager: WebuiManager):
"""Handles clicks on the 'Clear' button."""
logger.info("Clear button clicked.")
# Stop any running task first
task = webui_manager.bu_current_task
if task and not task.done():
logger.info("Clearing requires stopping the current task.")
webui_manager.bu_agent.stop()
task.cancel()
try:
await asyncio.wait_for(task, timeout=2.0) # Wait briefly
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
except Exception as e:
logger.warning(f"Error stopping task on clear: {e}")
webui_manager.bu_current_task = None
if webui_manager.bu_controller:
await webui_manager.bu_controller.close_mcp_client()
webui_manager.bu_controller = None
webui_manager.bu_agent = None
# Reset state stored in manager
webui_manager.bu_chat_history = []
webui_manager.bu_response_event = None
webui_manager.bu_user_help_response = None
webui_manager.bu_agent_task_id = None
webui_manager.bu_task_metrics = None # Clear task metrics
logger.info("Agent state and browser resources cleared.")
# Reset UI components
return {
webui_manager.get_component_by_id("browser_use_agent.chatbot"): gr.update(
value=[]
),
webui_manager.get_component_by_id("browser_use_agent.business_name"): gr.update(
value="", interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.business_website"): gr.update(
value="", interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.business_type"): gr.update(
value="Retail", interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.additional_info"): gr.update(
value="", interactive=True
),
webui_manager.get_component_by_id(
"browser_use_agent.agent_history_file"
): gr.update(value=None),
webui_manager.get_component_by_id("browser_use_agent.recording_gif"): gr.update(
value=None
),
webui_manager.get_component_by_id("browser_use_agent.browser_view"): gr.update(
value="<div style='...'>Browser Cleared</div>"
),
webui_manager.get_component_by_id("browser_use_agent.run_button"): gr.update(
value="▶️ Start Analysis", interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.stop_button"): gr.update(
interactive=False
),
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(value="⏸️ Pause", interactive=False),
webui_manager.get_component_by_id("browser_use_agent.clear_button"): gr.update(
interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.task_metrics_display"): gr.update(
value="No task metrics available yet. Run a task to see metrics here."
),
}
# --- Tab Creation Function ---
def create_browser_use_agent_tab(webui_manager: WebuiManager):
"""
Create the run agent tab with business-focused UI.
"""
webui_manager.init_browser_use_agent()
# Initialize task metrics if not already present
if not hasattr(webui_manager, 'bu_task_metrics'):
webui_manager.bu_task_metrics = None
# --- Define UI Components ---
tab_components = {}
with gr.Column():
chatbot = gr.Chatbot(
lambda: webui_manager.bu_chat_history, # Load history dynamically
elem_id="browser_use_chatbot",
label="Agent Interaction",
type="messages",
height=600,
show_copy_button=True,
)
# Business information form
with gr.Column(elem_id="business_form"):
gr.Markdown("### Business Information")
business_name = gr.Textbox(
label="Business Name",
placeholder="Enter business name",
elem_id="business_name",
)
business_website = gr.Textbox(
label="Business Website (optional)",
placeholder="https://www.example.com",
elem_id="business_website",
)
business_type = gr.Dropdown(
label="Business Type",
choices=["Retail", "Restaurant", "Service", "Healthcare", "Technology", "Other"],
value="Retail",
elem_id="business_type",
)
additional_info = gr.Textbox(
label="Additional Information (optional)",
placeholder="Any specific details about the business that might help the agent",
lines=2,
elem_id="additional_info",
)
with gr.Row():
stop_button = gr.Button(
"⏹️ Stop", interactive=False, variant="stop", scale=2
)
pause_resume_button = gr.Button(
"⏸️ Pause", interactive=False, variant="secondary", scale=2, visible=True
)
clear_button = gr.Button(
"🗑️ Clear", interactive=True, variant="secondary", scale=2
)
run_button = gr.Button("▶️ Start Analysis", variant="primary", scale=3)
browser_view = gr.HTML(
value="<div style='width:100%; height:50vh; display:flex; justify-content:center; align-items:center; border:1px solid #ccc; background-color:#f0f0f0;'><p>Browser View (Requires Headless=True)</p></div>",
label="Browser Live View",
elem_id="browser_view",
visible=False,
)
# Task Metrics Section
with gr.Column(visible=True) as task_metrics_container:
gr.Markdown("### Task Metrics", elem_id="task_metrics_heading")
task_metrics_display = gr.Markdown(
value=lambda: format_task_metrics(webui_manager.bu_task_metrics),
elem_id="task_metrics_display",
)
with gr.Column():
gr.Markdown("### Task Outputs")
agent_history_file = gr.File(label="Agent History JSON", interactive=False)
recording_gif = gr.Image(
label="Task Recording GIF",
format="gif",
interactive=False,
type="filepath",
)
# --- Store Components in Manager ---
tab_components.update(
dict(
chatbot=chatbot,
business_name=business_name,
business_website=business_website,
business_type=business_type,
additional_info=additional_info,
clear_button=clear_button,
run_button=run_button,
stop_button=stop_button,
pause_resume_button=pause_resume_button,
agent_history_file=agent_history_file,
recording_gif=recording_gif,
browser_view=browser_view,
task_metrics_display=task_metrics_display,
)
)
webui_manager.add_components(
"browser_use_agent", tab_components
) # Use "browser_use_agent" as tab_name prefix
all_managed_components = set(
webui_manager.get_components()
) # Get all components known to manager
run_tab_outputs = list(tab_components.values())
async def submit_wrapper(
components_dict: Dict[Component, Any],
) -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_submit that yields its results."""
async for update in handle_submit(webui_manager, components_dict):
yield update
async def stop_wrapper() -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_stop."""
update_dict = await handle_stop(webui_manager)
yield update_dict
async def pause_resume_wrapper() -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_pause_resume."""
update_dict = await handle_pause_resume(webui_manager)
yield update_dict
async def clear_wrapper() -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_clear."""
update_dict = await handle_clear(webui_manager)
yield update_dict
# --- Connect Event Handlers using the Wrappers --
run_button.click(
fn=submit_wrapper, inputs=all_managed_components, outputs=run_tab_outputs
)
stop_button.click(fn=stop_wrapper, inputs=None, outputs=run_tab_outputs)
pause_resume_button.click(
fn=pause_resume_wrapper, inputs=None, outputs=run_tab_outputs
)
clear_button.click(fn=clear_wrapper, inputs=None, outputs=run_tab_outputs)