WebCrawler / src /webui /components /browser_use_agent_tab.py.bak
Carlos Gonzalez
Add application file
b1f90a5
import asyncio
import json
import logging
import os
import uuid
from typing import Any, AsyncGenerator, Dict, Optional
import gradio as gr
# from browser_use.agent.service import Agent
from browser_use.agent.views import (
AgentHistoryList,
AgentOutput,
)
from browser_use.browser.browser import BrowserConfig
from browser_use.browser.context import BrowserContext, BrowserContextConfig
from browser_use.browser.views import BrowserState
from gradio.components import Component
from langchain_core.language_models.chat_models import BaseChatModel
from src.agent.browser_use.browser_use_agent import BrowserUseAgent
from src.browser.custom_browser import CustomBrowser
from src.controller.custom_controller import CustomController
from src.utils import llm_provider
from src.webui.webui_manager import WebuiManager
logger = logging.getLogger(__name__)
# Custom function to format task metrics as markdown
def format_task_metrics(metrics):
if not metrics:
return "No task metrics available yet. Run a task to see metrics here."
md = "#### Task Execution Summary\n\n"
md += f"⏱️ **Duration:** {metrics['duration']} seconds\n\n"
md += f"🔢 **Tokens Used:** {metrics['tokens']}\n\n"
if metrics['result']:
md += f"📋 **Final Result:**\n\n```\n{metrics['result']}\n```\n\n"
md += f"✅ **Status:** {metrics['status']}\n\n"
if metrics['errors'] and any(metrics['errors']):
md += f"❌ **Errors:**\n\n```\n{metrics['errors']}\n```\n\n"
# Display screenshot if available
if metrics.get('screenshot'):
md += f"📸 **Final Screenshot:**\n\n"
md += f"<img src=\"data:image/jpeg;base64,{metrics['screenshot']}\" alt=\"Final Screenshot\" style=\"max-width:100%; border:1px solid #ccc;\" />\n\n"
return md
# Add this new function after the format_task_metrics function
def create_business_task(business_name, business_type, business_website=None, additional_info=None):
"""Creates a standardized task description for analyzing a business."""
task = f"Analyze the business '{business_name}' which is in the {business_type} industry."
if business_website:
task += f" Start by visiting their website at {business_website}."
else:
task += f" Search for information about this business online."
task += " Gather the following information: main products/services, contact information, location, hours of operation, and customer reviews."
if additional_info:
task += f" Additional context: {additional_info}"
task += " Provide a comprehensive report with all findings."
return task
# --- Helper Functions --- (Defined at module level)
async def _initialize_llm(
provider: Optional[str],
model_name: Optional[str],
temperature: float,
base_url: Optional[str],
api_key: Optional[str],
num_ctx: Optional[int] = None,
) -> Optional[BaseChatModel]:
"""Initializes the LLM based on settings. Returns None if provider/model is missing."""
if not provider or not model_name:
logger.info("LLM Provider or Model Name not specified, LLM will be None.")
return None
try:
# Use your actual LLM provider logic here
logger.info(
f"Initializing LLM: Provider={provider}, Model={model_name}, Temp={temperature}"
)
# Example using a placeholder function
llm = llm_provider.get_llm_model(
provider=provider,
model_name=model_name,
temperature=temperature,
base_url=base_url or None,
api_key=api_key or None,
# Add other relevant params like num_ctx for ollama
num_ctx=num_ctx if provider == "ollama" else None,
)
return llm
except Exception as e:
logger.error(f"Failed to initialize LLM: {e}", exc_info=True)
gr.Warning(
f"Failed to initialize LLM '{model_name}' for provider '{provider}'. Please check settings. Error: {e}"
)
return None
def _get_config_value(
webui_manager: WebuiManager,
comp_dict: Dict[gr.components.Component, Any],
comp_id_suffix: str,
default: Any = None,
) -> Any:
"""Safely get value from component dictionary using its ID suffix relative to the tab."""
# Assumes component ID format is "tab_name.comp_name"
tab_name = "browser_use_agent" # Hardcode or derive if needed
comp_id = f"{tab_name}.{comp_id_suffix}"
# Need to find the component object first using the ID from the manager
try:
comp = webui_manager.get_component_by_id(comp_id)
return comp_dict.get(comp, default)
except KeyError:
# Try accessing settings tabs as well
for prefix in ["agent_settings", "browser_settings"]:
try:
comp_id = f"{prefix}.{comp_id_suffix}"
comp = webui_manager.get_component_by_id(comp_id)
return comp_dict.get(comp, default)
except KeyError:
continue
logger.warning(
f"Component with suffix '{comp_id_suffix}' not found in manager for value lookup."
)
return default
def _format_agent_output(model_output: AgentOutput) -> str:
"""Formats AgentOutput for display in the chatbot using JSON."""
content = ""
if model_output:
try:
# Directly use model_dump if actions and current_state are Pydantic models
action_dump = [
action.model_dump(exclude_none=True) for action in model_output.action
]
state_dump = model_output.current_state.model_dump(exclude_none=True)
model_output_dump = {
"current_state": state_dump,
"action": action_dump,
}
# Dump to JSON string with indentation
json_string = json.dumps(model_output_dump, indent=4, ensure_ascii=False)
# Wrap in <pre><code> for proper display in HTML
content = f"<pre><code class='language-json'>{json_string}</code></pre>"
except AttributeError as ae:
logger.error(
f"AttributeError during model dump: {ae}. Check if 'action' or 'current_state' or their items support 'model_dump'."
)
content = f"<pre><code>Error: Could not format agent output (AttributeError: {ae}).\nRaw output: {str(model_output)}</code></pre>"
except Exception as e:
logger.error(f"Error formatting agent output: {e}", exc_info=True)
# Fallback to simple string representation on error
content = f"<pre><code>Error formatting agent output.\nRaw output:\n{str(model_output)}</code></pre>"
return content.strip()
# --- Updated Callback Implementation ---
async def _handle_new_step(
webui_manager: WebuiManager, state: BrowserState, output: AgentOutput, step_num: int
):
"""Callback for each step taken by the agent, including screenshot display."""
# Use the correct chat history attribute name from the user's code
if not hasattr(webui_manager, "bu_chat_history"):
logger.error(
"Attribute 'bu_chat_history' not found in webui_manager! Cannot add chat message."
)
# Initialize it maybe? Or raise an error? For now, log and potentially skip chat update.
webui_manager.bu_chat_history = [] # Initialize if missing (consider if this is the right place)
# return # Or stop if this is critical
step_num -= 1
logger.info(f"Step {step_num} completed.")
# --- Screenshot Handling ---
screenshot_html = ""
# Ensure state.screenshot exists and is not empty before proceeding
# Use getattr for safer access
screenshot_data = getattr(state, "screenshot", None)
if screenshot_data:
try:
# Basic validation: check if it looks like base64
if (
isinstance(screenshot_data, str) and len(screenshot_data) > 100
): # Arbitrary length check
# *** UPDATED STYLE: Removed centering, adjusted width ***
img_tag = f'<img src="data:image/jpeg;base64,{screenshot_data}" alt="Step {step_num} Screenshot" style="max-width: 800px; max-height: 600px; object-fit:contain;" />'
screenshot_html = (
img_tag + "<br/>"
) # Use <br/> for line break after inline-block image
else:
logger.warning(
f"Screenshot for step {step_num} seems invalid (type: {type(screenshot_data)}, len: {len(screenshot_data) if isinstance(screenshot_data, str) else 'N/A'})."
)
screenshot_html = "**[Invalid screenshot data]**<br/>"
except Exception as e:
logger.error(
f"Error processing or formatting screenshot for step {step_num}: {e}",
exc_info=True,
)
screenshot_html = "**[Error displaying screenshot]**<br/>"
else:
logger.debug(f"No screenshot available for step {step_num}.")
# --- Format Agent Output ---
formatted_output = _format_agent_output(output) # Use the updated function
# --- Combine and Append to Chat ---
step_header = f"--- **Step {step_num}** ---"
# Combine header, image (with line break), and JSON block
final_content = step_header + "<br/>" + screenshot_html + formatted_output
chat_message = {
"role": "assistant",
"content": final_content.strip(), # Remove leading/trailing whitespace
}
# Append to the correct chat history list
webui_manager.bu_chat_history.append(chat_message)
await asyncio.sleep(0.05)
def _handle_done(webui_manager: WebuiManager, history: AgentHistoryList):
"""Callback when the agent finishes the task (success or failure)."""
logger.info(
f"Agent task finished. Duration: {history.total_duration_seconds():.2f}s, Tokens: {history.total_input_tokens()}"
)
final_summary = "**Task Completed**\n"
final_summary += f"- Duration: {history.total_duration_seconds():.2f} seconds\n"
final_summary += f"- Total Input Tokens: {history.total_input_tokens()}\n" # Or total tokens if available
final_result = history.final_result()
if final_result:
final_summary += f"- Final Result: {final_result}\n"
errors = history.errors()
if errors and any(errors):
final_summary += f"- **Errors:**\n```\n{errors}\n```\n"
else:
final_summary += "- Status: Success\n"
# Get the last screenshot if available
screenshots = history.screenshots()
final_screenshot = screenshots[-1] if screenshots and len(screenshots) > 0 else None
# Store task metrics separately for the metrics display
webui_manager.bu_task_metrics = {
"duration": f"{history.total_duration_seconds():.2f}",
"tokens": f"{history.total_input_tokens()}",
"result": final_result if final_result else "",
"status": "Error" if (errors and any(errors)) else "Success",
"errors": errors if (errors and any(errors)) else None,
"screenshot": final_screenshot # Add the final screenshot to the metrics
}
webui_manager.bu_chat_history.append(
{"role": "assistant", "content": final_summary}
)
async def _ask_assistant_callback(
webui_manager: WebuiManager, query: str, browser_context: BrowserContext
) -> Dict[str, Any]:
"""Callback triggered by the agent's ask_for_assistant action."""
logger.info("Agent requires assistance. Waiting for user input.")
if not hasattr(webui_manager, "_chat_history"):
logger.error("Chat history not found in webui_manager during ask_assistant!")
return {"response": "Internal Error: Cannot display help request."}
webui_manager.bu_chat_history.append(
{
"role": "assistant",
"content": f"**Need Help:** {query}\nPlease provide information or perform the required action in the browser, then type your response/confirmation below and click 'Submit Response'.",
}
)
# Use state stored in webui_manager
webui_manager.bu_response_event = asyncio.Event()
webui_manager.bu_user_help_response = None # Reset previous response
try:
logger.info("Waiting for user response event...")
await asyncio.wait_for(
webui_manager.bu_response_event.wait(), timeout=3600.0
) # Long timeout
logger.info("User response event received.")
except asyncio.TimeoutError:
logger.warning("Timeout waiting for user assistance.")
webui_manager.bu_chat_history.append(
{
"role": "assistant",
"content": "**Timeout:** No response received. Trying to proceed.",
}
)
webui_manager.bu_response_event = None # Clear the event
return {"response": "Timeout: User did not respond."} # Inform the agent
response = webui_manager.bu_user_help_response
webui_manager.bu_chat_history.append(
{"role": "user", "content": response}
) # Show user response in chat
webui_manager.bu_response_event = (
None # Clear the event for the next potential request
)
return {"response": response}
# --- Core Agent Execution Logic --- (Needs access to webui_manager)
async def run_agent_task(
webui_manager: WebuiManager, components: Dict[gr.components.Component, Any]
) -> AsyncGenerator[Dict[gr.components.Component, Any], None]:
"""Handles the entire lifecycle of initializing and running the agent."""
# --- Get Components ---
# Need handles to specific UI components to update them
business_name_comp = webui_manager.get_component_by_id("browser_use_agent.business_name")
business_website_comp = webui_manager.get_component_by_id("browser_use_agent.business_website")
business_type_comp = webui_manager.get_component_by_id("browser_use_agent.business_type")
additional_info_comp = webui_manager.get_component_by_id("browser_use_agent.additional_info")
run_button_comp = webui_manager.get_component_by_id("browser_use_agent.run_button")
stop_button_comp = webui_manager.get_component_by_id(
"browser_use_agent.stop_button"
)
pause_resume_button_comp = webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
)
clear_button_comp = webui_manager.get_component_by_id(
"browser_use_agent.clear_button"
)
chatbot_comp = webui_manager.get_component_by_id("browser_use_agent.chatbot")
history_file_comp = webui_manager.get_component_by_id(
"browser_use_agent.agent_history_file"
)
gif_comp = webui_manager.get_component_by_id("browser_use_agent.recording_gif")
browser_view_comp = webui_manager.get_component_by_id(
"browser_use_agent.browser_view"
)
# --- 1. Get Task and Initial UI Update ---
task = components.get(business_name_comp, "").strip()
if not task:
gr.Warning("Please enter a business name or task.")
yield {run_button_comp: gr.update(interactive=True)}
return
# Set running state indirectly via _current_task
if "Analyze the business" not in task:
# If task isn't already formatted, create one from the business info
business_name = task # The business name was stored in the "task" variable
business_website = components.get(business_website_comp, "").strip()
business_type = components.get(business_type_comp, "Retail")
additional_info = components.get(additional_info_comp, "").strip()
task = create_business_task(
business_name,
business_type,
business_website,
additional_info
)
# We should already have added the task to chat history in handle_submit
if not any(msg.get("content") == task for msg in webui_manager.bu_chat_history if msg.get("role") == "user"):
webui_manager.bu_chat_history.append({"role": "user", "content": task})
yield {
business_name_comp: gr.Textbox(
value=components.get(business_name_comp, ""), interactive=False
),
business_website_comp: gr.Textbox(
value=components.get(business_website_comp, ""), interactive=False
),
business_type_comp: gr.update(interactive=False),
additional_info_comp: gr.Textbox(
value=components.get(additional_info_comp, ""), interactive=False
),
run_button_comp: gr.Button(value="⏳ Running...", interactive=False),
stop_button_comp: gr.Button(interactive=True),
pause_resume_button_comp: gr.Button(value="⏸️ Pause", interactive=True),
clear_button_comp: gr.Button(interactive=False),
chatbot_comp: gr.update(value=webui_manager.bu_chat_history),
history_file_comp: gr.update(value=None),
gif_comp: gr.update(value=None),
}
# --- Agent Settings ---
# Access settings values via components dict, getting IDs from webui_manager
def get_setting(key, default=None):
comp = webui_manager.id_to_component.get(f"agent_settings.{key}")
return components.get(comp, default) if comp else default
override_system_prompt = get_setting("override_system_prompt") or None
extend_system_prompt = get_setting("extend_system_prompt") or None
llm_provider_name = get_setting(
"llm_provider", None
) # Default to None if not found
llm_model_name = get_setting("llm_model_name", None)
llm_temperature = get_setting("llm_temperature", 0.6)
use_vision = get_setting("use_vision", True)
ollama_num_ctx = get_setting("ollama_num_ctx", 16000)
llm_base_url = get_setting("llm_base_url") or None
llm_api_key = get_setting("llm_api_key") or None
max_steps = get_setting("max_steps", 100)
max_actions = get_setting("max_actions", 10)
max_input_tokens = get_setting("max_input_tokens", 128000)
tool_calling_str = get_setting("tool_calling_method", "auto")
tool_calling_method = tool_calling_str if tool_calling_str != "None" else None
mcp_server_config_comp = webui_manager.id_to_component.get(
"agent_settings.mcp_server_config"
)
mcp_server_config_str = (
components.get(mcp_server_config_comp) if mcp_server_config_comp else None
)
mcp_server_config = (
json.loads(mcp_server_config_str) if mcp_server_config_str else None
)
# Planner LLM Settings (Optional)
planner_llm_provider_name = get_setting("planner_llm_provider") or None
planner_llm = None
planner_use_vision = False
if planner_llm_provider_name:
planner_llm_model_name = get_setting("planner_llm_model_name")
planner_llm_temperature = get_setting("planner_llm_temperature", 0.6)
planner_ollama_num_ctx = get_setting("planner_ollama_num_ctx", 16000)
planner_llm_base_url = get_setting("planner_llm_base_url") or None
planner_llm_api_key = get_setting("planner_llm_api_key") or None
planner_use_vision = get_setting("planner_use_vision", False)
planner_llm = await _initialize_llm(
planner_llm_provider_name,
planner_llm_model_name,
planner_llm_temperature,
planner_llm_base_url,
planner_llm_api_key,
planner_ollama_num_ctx if planner_llm_provider_name == "ollama" else None,
)
# --- Browser Settings ---
def get_browser_setting(key, default=None):
comp = webui_manager.id_to_component.get(f"browser_settings.{key}")
return components.get(comp, default) if comp else default
browser_binary_path = get_browser_setting("browser_binary_path") or None
browser_user_data_dir = get_browser_setting("browser_user_data_dir") or None
use_own_browser = get_browser_setting(
"use_own_browser", False
) # Logic handled by CDP/WSS presence
keep_browser_open = get_browser_setting("keep_browser_open", False)
headless = get_browser_setting("headless", False)
disable_security = get_browser_setting("disable_security", False)
window_w = int(get_browser_setting("window_w", 1280))
window_h = int(get_browser_setting("window_h", 1100))
cdp_url = get_browser_setting("cdp_url") or None
wss_url = get_browser_setting("wss_url") or None
save_recording_path = get_browser_setting("save_recording_path") or None
save_trace_path = get_browser_setting("save_trace_path") or None
save_agent_history_path = get_browser_setting(
"save_agent_history_path", "./tmp/agent_history"
)
save_download_path = get_browser_setting("save_download_path", "./tmp/downloads")
stream_vw = 70
stream_vh = int(70 * window_h // window_w)
os.makedirs(save_agent_history_path, exist_ok=True)
if save_recording_path:
os.makedirs(save_recording_path, exist_ok=True)
if save_trace_path:
os.makedirs(save_trace_path, exist_ok=True)
if save_download_path:
os.makedirs(save_download_path, exist_ok=True)
# --- 2. Initialize LLM ---
main_llm = await _initialize_llm(
llm_provider_name,
llm_model_name,
llm_temperature,
llm_base_url,
llm_api_key,
ollama_num_ctx if llm_provider_name == "ollama" else None,
)
# Pass the webui_manager instance to the callback when wrapping it
async def ask_callback_wrapper(
query: str, browser_context: BrowserContext
) -> Dict[str, Any]:
return await _ask_assistant_callback(webui_manager, query, browser_context)
if not webui_manager.bu_controller:
webui_manager.bu_controller = CustomController(
ask_assistant_callback=ask_callback_wrapper
)
await webui_manager.bu_controller.setup_mcp_client(mcp_server_config)
# --- 4. Initialize Browser and Context ---
should_close_browser_on_finish = not keep_browser_open
try:
# Close existing resources if not keeping open
if not keep_browser_open:
if webui_manager.bu_browser_context:
logger.info("Closing previous browser context.")
await webui_manager.bu_browser_context.close()
webui_manager.bu_browser_context = None
if webui_manager.bu_browser:
logger.info("Closing previous browser.")
await webui_manager.bu_browser.close()
webui_manager.bu_browser = None
# Create Browser if needed
if not webui_manager.bu_browser:
logger.info("Launching new browser instance.")
extra_args = []
if use_own_browser:
browser_binary_path = os.getenv("BROWSER_PATH", None) or browser_binary_path
if browser_binary_path == "":
browser_binary_path = None
browser_user_data = browser_user_data_dir or os.getenv("BROWSER_USER_DATA", None)
if browser_user_data:
extra_args += [f"--user-data-dir={browser_user_data}"]
else:
browser_binary_path = None
webui_manager.bu_browser = CustomBrowser(
config=BrowserConfig(
headless=headless,
disable_security=disable_security,
browser_binary_path=browser_binary_path,
extra_browser_args=extra_args,
wss_url=wss_url,
cdp_url=cdp_url,
new_context_config=BrowserContextConfig(
window_width=window_w,
window_height=window_h,
)
)
)
# Create Context if needed
if not webui_manager.bu_browser_context:
logger.info("Creating new browser context.")
context_config = BrowserContextConfig(
trace_path=save_trace_path if save_trace_path else None,
save_recording_path=save_recording_path
if save_recording_path
else None,
save_downloads_path=save_download_path if save_download_path else None,
window_height=window_h,
window_width=window_w,
)
if not webui_manager.bu_browser:
raise ValueError("Browser not initialized, cannot create context.")
webui_manager.bu_browser_context = (
await webui_manager.bu_browser.new_context(config=context_config)
)
# --- 5. Initialize or Update Agent ---
webui_manager.bu_agent_task_id = str(uuid.uuid4()) # New ID for this task run
os.makedirs(
os.path.join(save_agent_history_path, webui_manager.bu_agent_task_id),
exist_ok=True,
)
history_file = os.path.join(
save_agent_history_path,
webui_manager.bu_agent_task_id,
f"{webui_manager.bu_agent_task_id}.json",
)
gif_path = os.path.join(
save_agent_history_path,
webui_manager.bu_agent_task_id,
f"{webui_manager.bu_agent_task_id}.gif",
)
# Pass the webui_manager to callbacks when wrapping them
async def step_callback_wrapper(
state: BrowserState, output: AgentOutput, step_num: int
):
await _handle_new_step(webui_manager, state, output, step_num)
def done_callback_wrapper(history: AgentHistoryList):
_handle_done(webui_manager, history)
if not webui_manager.bu_agent:
logger.info(f"Initializing new agent for task: {task}")
if not webui_manager.bu_browser or not webui_manager.bu_browser_context:
raise ValueError(
"Browser or Context not initialized, cannot create agent."
)
webui_manager.bu_agent = BrowserUseAgent(
task=task,
llm=main_llm,
browser=webui_manager.bu_browser,
browser_context=webui_manager.bu_browser_context,
controller=webui_manager.bu_controller,
register_new_step_callback=step_callback_wrapper,
register_done_callback=done_callback_wrapper,
use_vision=use_vision,
override_system_message=override_system_prompt,
extend_system_message=extend_system_prompt,
max_input_tokens=max_input_tokens,
max_actions_per_step=max_actions,
tool_calling_method=tool_calling_method,
planner_llm=planner_llm,
use_vision_for_planner=planner_use_vision if planner_llm else False,
source="webui",
)
webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id
webui_manager.bu_agent.settings.generate_gif = gif_path
else:
webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id
webui_manager.bu_agent.add_new_task(task)
webui_manager.bu_agent.settings.generate_gif = gif_path
webui_manager.bu_agent.browser = webui_manager.bu_browser
webui_manager.bu_agent.browser_context = webui_manager.bu_browser_context
webui_manager.bu_agent.controller = webui_manager.bu_controller
# --- 6. Run Agent Task and Stream Updates ---
agent_run_coro = webui_manager.bu_agent.run(max_steps=max_steps)
agent_task = asyncio.create_task(agent_run_coro)
webui_manager.bu_current_task = agent_task # Store the task
last_chat_len = len(webui_manager.bu_chat_history)
while not agent_task.done():
is_paused = webui_manager.bu_agent.state.paused
is_stopped = webui_manager.bu_agent.state.stopped
# Check for pause state
if is_paused:
yield {
pause_resume_button_comp: gr.update(
value="▶️ Resume", interactive=True
),
stop_button_comp: gr.update(interactive=True),
}
# Wait until pause is released or task is stopped/done
while is_paused and not agent_task.done():
# Re-check agent state in loop
is_paused = webui_manager.bu_agent.state.paused
is_stopped = webui_manager.bu_agent.state.stopped
if is_stopped: # Stop signal received while paused
break
await asyncio.sleep(0.2)
if (
agent_task.done() or is_stopped
): # If stopped or task finished while paused
break
# If resumed, yield UI update
yield {
pause_resume_button_comp: gr.update(
value="⏸️ Pause", interactive=True
),
run_button_comp: gr.update(
value="⏳ Running...", interactive=False
),
}
# Check if agent stopped itself or stop button was pressed (which sets agent.state.stopped)
if is_stopped:
logger.info("Agent has stopped (internally or via stop button).")
if not agent_task.done():
# Ensure the task coroutine finishes if agent just set flag
try:
await asyncio.wait_for(
agent_task, timeout=1.0
) # Give it a moment to exit run()
except asyncio.TimeoutError:
logger.warning(
"Agent task did not finish quickly after stop signal, cancelling."
)
agent_task.cancel()
except Exception: # Catch task exceptions if it errors on stop
pass
break # Exit the streaming loop
# Check if agent is asking for help (via response_event)
update_dict = {}
if webui_manager.bu_response_event is not None:
update_dict = {
business_name_comp: gr.update(
placeholder="Agent needs help. Enter response and submit.",
interactive=True,
),
business_website_comp: gr.update(
placeholder="Agent needs help. Enter response and submit.",
interactive=True,
),
business_type_comp: gr.update(
placeholder="Agent needs help. Enter response and submit.",
interactive=True,
),
additional_info_comp: gr.update(
placeholder="Agent needs help. Enter response and submit.",
interactive=True,
),
run_button_comp: gr.update(
value="✔️ Submit Response", interactive=True
),
pause_resume_button_comp: gr.update(interactive=False),
stop_button_comp: gr.update(interactive=False),
chatbot_comp: gr.update(value=webui_manager.bu_chat_history),
}
last_chat_len = len(webui_manager.bu_chat_history)
yield update_dict
# Wait until response is submitted or task finishes
while (
webui_manager.bu_response_event is not None
and not agent_task.done()
):
await asyncio.sleep(0.2)
# Restore UI after response submitted or if task ended unexpectedly
if not agent_task.done():
yield {
business_name_comp: gr.update(
placeholder="Enter business name", interactive=False
),
business_website_comp: gr.update(
placeholder="Enter business website", interactive=False
),
business_type_comp: gr.update(
placeholder="Enter business type", interactive=False
),
additional_info_comp: gr.update(
placeholder="Enter additional information", interactive=False
),
run_button_comp: gr.update(
value="⏳ Running...", interactive=False
),
pause_resume_button_comp: gr.update(interactive=True),
stop_button_comp: gr.update(interactive=True),
}
else:
break # Task finished while waiting for response
# Update Chatbot if new messages arrived via callbacks
if len(webui_manager.bu_chat_history) > last_chat_len:
update_dict[chatbot_comp] = gr.update(
value=webui_manager.bu_chat_history
)
last_chat_len = len(webui_manager.bu_chat_history)
# Update Browser View
if headless and webui_manager.bu_browser_context:
try:
screenshot_b64 = (
await webui_manager.bu_browser_context.take_screenshot()
)
if screenshot_b64:
html_content = f'<img src="data:image/jpeg;base64,{screenshot_b64}" style="width:{stream_vw}vw; height:{stream_vh}vh ; border:1px solid #ccc;">'
update_dict[browser_view_comp] = gr.update(
value=html_content, visible=True
)
else:
html_content = f"<h1 style='width:{stream_vw}vw; height:{stream_vh}vh'>Waiting for browser session...</h1>"
update_dict[browser_view_comp] = gr.update(
value=html_content, visible=True
)
except Exception as e:
logger.debug(f"Failed to capture screenshot: {e}")
update_dict[browser_view_comp] = gr.update(
value="<div style='...'>Error loading view...</div>",
visible=True,
)
else:
update_dict[browser_view_comp] = gr.update(visible=False)
# Yield accumulated updates
if update_dict:
yield update_dict
await asyncio.sleep(0.1) # Polling interval
# --- 7. Task Finalization ---
webui_manager.bu_agent.state.paused = False
webui_manager.bu_agent.state.stopped = False
final_update = {}
try:
logger.info("Agent task completing...")
# Await the task ensure completion and catch exceptions if not already caught
if not agent_task.done():
await agent_task # Retrieve result/exception
elif agent_task.exception(): # Check if task finished with exception
agent_task.result() # Raise the exception to be caught below
logger.info("Agent task completed processing.")
logger.info(f"Explicitly saving agent history to: {history_file}")
webui_manager.bu_agent.save_history(history_file)
if os.path.exists(history_file):
final_update[history_file_comp] = gr.File(value=history_file)
if gif_path and os.path.exists(gif_path):
logger.info(f"GIF found at: {gif_path}")
final_update[gif_comp] = gr.Image(value=gif_path)
# Update task metrics display if metrics are available
task_metrics_display_comp = webui_manager.get_component_by_id("browser_use_agent.task_metrics_display")
if hasattr(webui_manager, 'bu_task_metrics') and webui_manager.bu_task_metrics:
# If we have metrics but no screenshot, try to get the latest screenshot
if not webui_manager.bu_task_metrics.get('screenshot') and webui_manager.bu_browser_context:
try:
final_screenshot = await webui_manager.bu_browser_context.take_screenshot()
if final_screenshot:
webui_manager.bu_task_metrics['screenshot'] = final_screenshot
except Exception as e:
logger.warning(f"Failed to capture final screenshot for metrics: {e}")
# Format the metrics for display
metrics_md = format_task_metrics(webui_manager.bu_task_metrics)
final_update[task_metrics_display_comp] = gr.update(value=metrics_md)
except asyncio.CancelledError:
logger.info("Agent task was cancelled.")
if not any(
"Cancelled" in msg.get("content", "")
for msg in webui_manager.bu_chat_history
if msg.get("role") == "assistant"
):
webui_manager.bu_chat_history.append(
{"role": "assistant", "content": "**Task Cancelled**."}
)
final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history)
except Exception as e:
logger.error(f"Error during agent execution: {e}", exc_info=True)
error_message = (
f"**Agent Execution Error:**\n```\n{type(e).__name__}: {e}\n```"
)
if not any(
error_message in msg.get("content", "")
for msg in webui_manager.bu_chat_history
if msg.get("role") == "assistant"
):
webui_manager.bu_chat_history.append(
{"role": "assistant", "content": error_message}
)
final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history)
gr.Error(f"Agent execution failed: {e}")
finally:
webui_manager.bu_current_task = None # Clear the task reference
# Close browser/context if requested
if should_close_browser_on_finish:
if webui_manager.bu_browser_context:
logger.info("Closing browser context after task.")
await webui_manager.bu_browser_context.close()
webui_manager.bu_browser_context = None
if webui_manager.bu_browser:
logger.info("Closing browser after task.")
await webui_manager.bu_browser.close()
webui_manager.bu_browser = None
# --- 8. Final UI Update ---
final_update.update(
{
business_name_comp: gr.update(
value="",
interactive=True,
placeholder="Enter business name",
),
business_website_comp: gr.update(
value="",
interactive=True,
placeholder="Enter business website",
),
business_type_comp: gr.update(interactive=True),
additional_info_comp: gr.update(
value="",
interactive=True,
placeholder="Enter additional information",
),
run_button_comp: gr.update(value="▶️ Start Analysis", interactive=True),
stop_button_comp: gr.update(value="⏹️ Stop", interactive=False),
pause_resume_button_comp: gr.update(
value="⏸️ Pause", interactive=False
),
clear_button_comp: gr.update(interactive=True),
# Ensure final chat history is shown
chatbot_comp: gr.update(value=webui_manager.bu_chat_history),
}
)
yield final_update
except Exception as e:
# Catch errors during setup (before agent run starts)
logger.error(f"Error setting up agent task: {e}", exc_info=True)
webui_manager.bu_current_task = None # Ensure state is reset
yield {
business_name_comp: gr.update(
interactive=True, placeholder="Enter business name"
),
business_website_comp: gr.update(
interactive=True, placeholder="Enter business website"
),
business_type_comp: gr.update(interactive=True),
additional_info_comp: gr.update(
interactive=True, placeholder="Enter additional information"
),
run_button_comp: gr.update(value="▶️ Start Analysis", interactive=True),
stop_button_comp: gr.update(value="⏹️ Stop", interactive=False),
pause_resume_button_comp: gr.update(value="⏸️ Pause", interactive=False),
clear_button_comp: gr.update(interactive=True),
chatbot_comp: gr.update(
value=webui_manager.bu_chat_history
+ [{"role": "assistant", "content": f"**Setup Error:** {e}"}]
),
}
# --- Button Click Handlers --- (Need access to webui_manager)
async def handle_submit(
webui_manager: WebuiManager, components: Dict[gr.components.Component, Any]
):
"""Handles clicks on the main 'Start Analysis' button."""
# Get business information from the form
business_name_comp = webui_manager.get_component_by_id("browser_use_agent.business_name")
business_website_comp = webui_manager.get_component_by_id("browser_use_agent.business_website")
business_type_comp = webui_manager.get_component_by_id("browser_use_agent.business_type")
additional_info_comp = webui_manager.get_component_by_id("browser_use_agent.additional_info")
business_name = components.get(business_name_comp, "").strip()
business_website = components.get(business_website_comp, "").strip()
business_type = components.get(business_type_comp, "Retail")
additional_info = components.get(additional_info_comp, "").strip()
if not business_name:
gr.Warning("Please enter a business name.")
yield {business_name_comp: gr.update(value=business_name)}
return
# Generate the standardized task using our template
task = create_business_task(
business_name,
business_type,
business_website,
additional_info
)
# Check if waiting for user assistance
if webui_manager.bu_response_event and not webui_manager.bu_response_event.is_set():
logger.info(f"User submitted assistance")
webui_manager.bu_user_help_response = "Continue with the current task."
webui_manager.bu_response_event.set()
# UI updates handled by the main loop reacting to the event being set
yield {
business_name_comp: gr.update(
interactive=False,
),
business_website_comp: gr.update(
interactive=False,
),
business_type_comp: gr.update(
interactive=False,
),
additional_info_comp: gr.update(
interactive=False,
),
webui_manager.get_component_by_id(
"browser_use_agent.run_button"
): gr.update(value="⏳ Running...", interactive=False),
}
# Check if a task is currently running (using _current_task)
elif webui_manager.bu_current_task and not webui_manager.bu_current_task.done():
logger.warning(
"Start button clicked while agent is already running and not asking for help."
)
gr.Info("Agent is currently running. Please wait or use Stop/Pause.")
yield {} # No change
else:
# Store the task in the user input field before running
components[business_name_comp] = task
# Handle submission for a new task
logger.info(f"Starting analysis for business: {business_name}")
# Update chat history with the business information
webui_manager.bu_chat_history.append({"role": "user", "content": task})
# Run the task using our agent
async for update in run_agent_task(webui_manager, components):
yield update
async def handle_stop(webui_manager: WebuiManager):
"""Handles clicks on the 'Stop' button."""
logger.info("Stop button clicked.")
agent = webui_manager.bu_agent
task = webui_manager.bu_current_task
if agent and task and not task.done():
# Signal the agent to stop by setting its internal flag
agent.state.stopped = True
agent.state.paused = False # Ensure not paused if stopped
return {
webui_manager.get_component_by_id(
"browser_use_agent.stop_button"
): gr.update(interactive=False, value="⏹️ Stopping..."),
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(interactive=False),
webui_manager.get_component_by_id(
"browser_use_agent.run_button"
): gr.update(interactive=False),
}
else:
logger.warning("Stop clicked but agent is not running or task is already done.")
# Reset UI just in case it's stuck
return {
webui_manager.get_component_by_id(
"browser_use_agent.run_button"
): gr.update(interactive=True),
webui_manager.get_component_by_id(
"browser_use_agent.stop_button"
): gr.update(interactive=False),
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(interactive=False),
webui_manager.get_component_by_id(
"browser_use_agent.clear_button"
): gr.update(interactive=True),
}
async def handle_pause_resume(webui_manager: WebuiManager):
"""Handles clicks on the 'Pause/Resume' button."""
agent = webui_manager.bu_agent
task = webui_manager.bu_current_task
if agent and task and not task.done():
if agent.state.paused:
logger.info("Resume button clicked.")
agent.resume()
# UI update happens in main loop
return {
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(value="⏸️ Pause", interactive=True)
} # Optimistic update
else:
logger.info("Pause button clicked.")
agent.pause()
return {
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(value="▶️ Resume", interactive=True)
} # Optimistic update
else:
logger.warning(
"Pause/Resume clicked but agent is not running or doesn't support state."
)
return {} # No change
async def handle_clear(webui_manager: WebuiManager):
"""Handles clicks on the 'Clear' button."""
logger.info("Clear button clicked.")
# Stop any running task first
task = webui_manager.bu_current_task
if task and not task.done():
logger.info("Clearing requires stopping the current task.")
webui_manager.bu_agent.stop()
task.cancel()
try:
await asyncio.wait_for(task, timeout=2.0) # Wait briefly
except (asyncio.CancelledError, asyncio.TimeoutError):
pass
except Exception as e:
logger.warning(f"Error stopping task on clear: {e}")
webui_manager.bu_current_task = None
if webui_manager.bu_controller:
await webui_manager.bu_controller.close_mcp_client()
webui_manager.bu_controller = None
webui_manager.bu_agent = None
# Reset state stored in manager
webui_manager.bu_chat_history = []
webui_manager.bu_response_event = None
webui_manager.bu_user_help_response = None
webui_manager.bu_agent_task_id = None
webui_manager.bu_task_metrics = None # Clear task metrics
logger.info("Agent state and browser resources cleared.")
# Reset UI components
return {
webui_manager.get_component_by_id("browser_use_agent.chatbot"): gr.update(
value=[]
),
webui_manager.get_component_by_id("browser_use_agent.business_name"): gr.update(
value="", interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.business_website"): gr.update(
value="", interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.business_type"): gr.update(
value="Retail", interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.additional_info"): gr.update(
value="", interactive=True
),
webui_manager.get_component_by_id(
"browser_use_agent.agent_history_file"
): gr.update(value=None),
webui_manager.get_component_by_id("browser_use_agent.recording_gif"): gr.update(
value=None
),
webui_manager.get_component_by_id("browser_use_agent.browser_view"): gr.update(
value="<div style='...'>Browser Cleared</div>"
),
webui_manager.get_component_by_id("browser_use_agent.run_button"): gr.update(
value="▶️ Start Analysis", interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.stop_button"): gr.update(
interactive=False
),
webui_manager.get_component_by_id(
"browser_use_agent.pause_resume_button"
): gr.update(value="⏸️ Pause", interactive=False),
webui_manager.get_component_by_id("browser_use_agent.clear_button"): gr.update(
interactive=True
),
webui_manager.get_component_by_id("browser_use_agent.task_metrics_display"): gr.update(
value="No task metrics available yet. Run a task to see metrics here."
),
}
# --- Tab Creation Function ---
def create_browser_use_agent_tab(webui_manager: WebuiManager):
"""
Create the run agent tab with business-focused UI.
"""
webui_manager.init_browser_use_agent()
# Initialize task metrics if not already present
if not hasattr(webui_manager, 'bu_task_metrics'):
webui_manager.bu_task_metrics = None
# --- Define UI Components ---
tab_components = {}
with gr.Column():
chatbot = gr.Chatbot(
lambda: webui_manager.bu_chat_history, # Load history dynamically
elem_id="browser_use_chatbot",
label="Agent Interaction",
type="messages",
height=600,
show_copy_button=True,
)
# Business information form
with gr.Column(elem_id="business_form"):
gr.Markdown("### Business Information")
business_name = gr.Textbox(
label="Business Name",
placeholder="Enter business name",
elem_id="business_name",
)
business_website = gr.Textbox(
label="Business Website (optional)",
placeholder="https://www.example.com",
elem_id="business_website",
)
business_type = gr.Dropdown(
label="Business Type",
choices=["Retail", "Restaurant", "Service", "Healthcare", "Technology", "Other"],
value="Retail",
elem_id="business_type",
)
additional_info = gr.Textbox(
label="Additional Information (optional)",
placeholder="Any specific details about the business that might help the agent",
lines=2,
elem_id="additional_info",
)
with gr.Row():
stop_button = gr.Button(
"⏹️ Stop", interactive=False, variant="stop", scale=2
)
pause_resume_button = gr.Button(
"⏸️ Pause", interactive=False, variant="secondary", scale=2, visible=True
)
clear_button = gr.Button(
"🗑️ Clear", interactive=True, variant="secondary", scale=2
)
run_button = gr.Button("▶️ Start Analysis", variant="primary", scale=3)
browser_view = gr.HTML(
value="<div style='width:100%; height:50vh; display:flex; justify-content:center; align-items:center; border:1px solid #ccc; background-color:#f0f0f0;'><p>Browser View (Requires Headless=True)</p></div>",
label="Browser Live View",
elem_id="browser_view",
visible=False,
)
# Task Metrics Section
with gr.Column(visible=True) as task_metrics_container:
gr.Markdown("### Task Metrics", elem_id="task_metrics_heading")
task_metrics_display = gr.Markdown(
value=lambda: format_task_metrics(webui_manager.bu_task_metrics),
elem_id="task_metrics_display",
)
with gr.Column():
gr.Markdown("### Task Outputs")
agent_history_file = gr.File(label="Agent History JSON", interactive=False)
recording_gif = gr.Image(
label="Task Recording GIF",
format="gif",
interactive=False,
type="filepath",
)
# --- Store Components in Manager ---
tab_components.update(
dict(
chatbot=chatbot,
business_name=business_name,
business_website=business_website,
business_type=business_type,
additional_info=additional_info,
clear_button=clear_button,
run_button=run_button,
stop_button=stop_button,
pause_resume_button=pause_resume_button,
agent_history_file=agent_history_file,
recording_gif=recording_gif,
browser_view=browser_view,
task_metrics_display=task_metrics_display,
)
)
webui_manager.add_components(
"browser_use_agent", tab_components
) # Use "browser_use_agent" as tab_name prefix
all_managed_components = set(
webui_manager.get_components()
) # Get all components known to manager
run_tab_outputs = list(tab_components.values())
async def submit_wrapper(
components_dict: Dict[Component, Any],
) -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_submit that yields its results."""
async for update in handle_submit(webui_manager, components_dict):
yield update
async def stop_wrapper() -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_stop."""
update_dict = await handle_stop(webui_manager)
yield update_dict
async def pause_resume_wrapper() -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_pause_resume."""
update_dict = await handle_pause_resume(webui_manager)
yield update_dict
async def clear_wrapper() -> AsyncGenerator[Dict[Component, Any], None]:
"""Wrapper for handle_clear."""
update_dict = await handle_clear(webui_manager)
yield update_dict
# --- Connect Event Handlers using the Wrappers --
run_button.click(
fn=submit_wrapper, inputs=all_managed_components, outputs=run_tab_outputs
)
stop_button.click(fn=stop_wrapper, inputs=None, outputs=run_tab_outputs)
pause_resume_button.click(
fn=pause_resume_wrapper, inputs=None, outputs=run_tab_outputs
)
clear_button.click(fn=clear_wrapper, inputs=None, outputs=run_tab_outputs)