Spaces:
Running
Running
import asyncio | |
import json | |
import logging | |
import os | |
import uuid | |
from typing import Any, AsyncGenerator, Dict, Optional | |
import gradio as gr | |
# from browser_use.agent.service import Agent | |
from browser_use.agent.views import ( | |
AgentHistoryList, | |
AgentOutput, | |
) | |
from browser_use.browser.browser import BrowserConfig | |
from browser_use.browser.context import BrowserContext, BrowserContextConfig | |
from browser_use.browser.views import BrowserState | |
from gradio.components import Component | |
from langchain_core.language_models.chat_models import BaseChatModel | |
from src.agent.browser_use.browser_use_agent import BrowserUseAgent | |
from src.browser.custom_browser import CustomBrowser | |
from src.controller.custom_controller import CustomController | |
from src.utils import llm_provider | |
from src.webui.webui_manager import WebuiManager | |
logger = logging.getLogger(__name__) | |
# Custom function to format task metrics as markdown | |
def format_task_metrics(metrics): | |
if not metrics: | |
return "No task metrics available yet. Run a task to see metrics here." | |
md = "#### Task Execution Summary\n\n" | |
md += f"⏱️ **Duration:** {metrics['duration']} seconds\n\n" | |
md += f"🔢 **Tokens Used:** {metrics['tokens']}\n\n" | |
if metrics['result']: | |
md += f"📋 **Final Result:**\n\n```\n{metrics['result']}\n```\n\n" | |
md += f"✅ **Status:** {metrics['status']}\n\n" | |
if metrics['errors'] and any(metrics['errors']): | |
md += f"❌ **Errors:**\n\n```\n{metrics['errors']}\n```\n\n" | |
# Display screenshot if available | |
if metrics.get('screenshot'): | |
md += f"📸 **Final Screenshot:**\n\n" | |
md += f"<img src=\"data:image/jpeg;base64,{metrics['screenshot']}\" alt=\"Final Screenshot\" style=\"max-width:100%; border:1px solid #ccc;\" />\n\n" | |
return md | |
# Add this new function after the format_task_metrics function | |
def create_business_task(business_name, business_type, business_website=None, additional_info=None): | |
"""Creates a standardized task description for analyzing a business.""" | |
task = f"Analyze the business '{business_name}' which is in the {business_type} industry." | |
if business_website: | |
task += f" Start by visiting their website at {business_website}." | |
else: | |
task += f" Search for information about this business online." | |
task += " Gather the following information: main products/services, contact information, location, hours of operation, and customer reviews." | |
if additional_info: | |
task += f" Additional context: {additional_info}" | |
task += " Provide a comprehensive report with all findings." | |
return task | |
# --- Helper Functions --- (Defined at module level) | |
async def _initialize_llm( | |
provider: Optional[str], | |
model_name: Optional[str], | |
temperature: float, | |
base_url: Optional[str], | |
api_key: Optional[str], | |
num_ctx: Optional[int] = None, | |
) -> Optional[BaseChatModel]: | |
"""Initializes the LLM based on settings. Returns None if provider/model is missing.""" | |
if not provider or not model_name: | |
logger.info("LLM Provider or Model Name not specified, LLM will be None.") | |
return None | |
try: | |
# Use your actual LLM provider logic here | |
logger.info( | |
f"Initializing LLM: Provider={provider}, Model={model_name}, Temp={temperature}" | |
) | |
# Example using a placeholder function | |
llm = llm_provider.get_llm_model( | |
provider=provider, | |
model_name=model_name, | |
temperature=temperature, | |
base_url=base_url or None, | |
api_key=api_key or None, | |
# Add other relevant params like num_ctx for ollama | |
num_ctx=num_ctx if provider == "ollama" else None, | |
) | |
return llm | |
except Exception as e: | |
logger.error(f"Failed to initialize LLM: {e}", exc_info=True) | |
gr.Warning( | |
f"Failed to initialize LLM '{model_name}' for provider '{provider}'. Please check settings. Error: {e}" | |
) | |
return None | |
def _get_config_value( | |
webui_manager: WebuiManager, | |
comp_dict: Dict[gr.components.Component, Any], | |
comp_id_suffix: str, | |
default: Any = None, | |
) -> Any: | |
"""Safely get value from component dictionary using its ID suffix relative to the tab.""" | |
# Assumes component ID format is "tab_name.comp_name" | |
tab_name = "browser_use_agent" # Hardcode or derive if needed | |
comp_id = f"{tab_name}.{comp_id_suffix}" | |
# Need to find the component object first using the ID from the manager | |
try: | |
comp = webui_manager.get_component_by_id(comp_id) | |
return comp_dict.get(comp, default) | |
except KeyError: | |
# Try accessing settings tabs as well | |
for prefix in ["agent_settings", "browser_settings"]: | |
try: | |
comp_id = f"{prefix}.{comp_id_suffix}" | |
comp = webui_manager.get_component_by_id(comp_id) | |
return comp_dict.get(comp, default) | |
except KeyError: | |
continue | |
logger.warning( | |
f"Component with suffix '{comp_id_suffix}' not found in manager for value lookup." | |
) | |
return default | |
def _format_agent_output(model_output: AgentOutput) -> str: | |
"""Formats AgentOutput for display in the chatbot using JSON.""" | |
content = "" | |
if model_output: | |
try: | |
# Directly use model_dump if actions and current_state are Pydantic models | |
action_dump = [ | |
action.model_dump(exclude_none=True) for action in model_output.action | |
] | |
state_dump = model_output.current_state.model_dump(exclude_none=True) | |
model_output_dump = { | |
"current_state": state_dump, | |
"action": action_dump, | |
} | |
# Dump to JSON string with indentation | |
json_string = json.dumps(model_output_dump, indent=4, ensure_ascii=False) | |
# Wrap in <pre><code> for proper display in HTML | |
content = f"<pre><code class='language-json'>{json_string}</code></pre>" | |
except AttributeError as ae: | |
logger.error( | |
f"AttributeError during model dump: {ae}. Check if 'action' or 'current_state' or their items support 'model_dump'." | |
) | |
content = f"<pre><code>Error: Could not format agent output (AttributeError: {ae}).\nRaw output: {str(model_output)}</code></pre>" | |
except Exception as e: | |
logger.error(f"Error formatting agent output: {e}", exc_info=True) | |
# Fallback to simple string representation on error | |
content = f"<pre><code>Error formatting agent output.\nRaw output:\n{str(model_output)}</code></pre>" | |
return content.strip() | |
# --- Updated Callback Implementation --- | |
async def _handle_new_step( | |
webui_manager: WebuiManager, state: BrowserState, output: AgentOutput, step_num: int | |
): | |
"""Callback for each step taken by the agent, including screenshot display.""" | |
# Use the correct chat history attribute name from the user's code | |
if not hasattr(webui_manager, "bu_chat_history"): | |
logger.error( | |
"Attribute 'bu_chat_history' not found in webui_manager! Cannot add chat message." | |
) | |
# Initialize it maybe? Or raise an error? For now, log and potentially skip chat update. | |
webui_manager.bu_chat_history = [] # Initialize if missing (consider if this is the right place) | |
# return # Or stop if this is critical | |
step_num -= 1 | |
logger.info(f"Step {step_num} completed.") | |
# --- Screenshot Handling --- | |
screenshot_html = "" | |
# Ensure state.screenshot exists and is not empty before proceeding | |
# Use getattr for safer access | |
screenshot_data = getattr(state, "screenshot", None) | |
if screenshot_data: | |
try: | |
# Basic validation: check if it looks like base64 | |
if ( | |
isinstance(screenshot_data, str) and len(screenshot_data) > 100 | |
): # Arbitrary length check | |
# *** UPDATED STYLE: Removed centering, adjusted width *** | |
img_tag = f'<img src="data:image/jpeg;base64,{screenshot_data}" alt="Step {step_num} Screenshot" style="max-width: 800px; max-height: 600px; object-fit:contain;" />' | |
screenshot_html = ( | |
img_tag + "<br/>" | |
) # Use <br/> for line break after inline-block image | |
else: | |
logger.warning( | |
f"Screenshot for step {step_num} seems invalid (type: {type(screenshot_data)}, len: {len(screenshot_data) if isinstance(screenshot_data, str) else 'N/A'})." | |
) | |
screenshot_html = "**[Invalid screenshot data]**<br/>" | |
except Exception as e: | |
logger.error( | |
f"Error processing or formatting screenshot for step {step_num}: {e}", | |
exc_info=True, | |
) | |
screenshot_html = "**[Error displaying screenshot]**<br/>" | |
else: | |
logger.debug(f"No screenshot available for step {step_num}.") | |
# --- Format Agent Output --- | |
formatted_output = _format_agent_output(output) # Use the updated function | |
# --- Combine and Append to Chat --- | |
step_header = f"--- **Step {step_num}** ---" | |
# Combine header, image (with line break), and JSON block | |
final_content = step_header + "<br/>" + screenshot_html + formatted_output | |
chat_message = { | |
"role": "assistant", | |
"content": final_content.strip(), # Remove leading/trailing whitespace | |
} | |
# Append to the correct chat history list | |
webui_manager.bu_chat_history.append(chat_message) | |
await asyncio.sleep(0.05) | |
def _handle_done(webui_manager: WebuiManager, history: AgentHistoryList): | |
"""Callback when the agent finishes the task (success or failure).""" | |
logger.info( | |
f"Agent task finished. Duration: {history.total_duration_seconds():.2f}s, Tokens: {history.total_input_tokens()}" | |
) | |
final_summary = "**Task Completed**\n" | |
final_summary += f"- Duration: {history.total_duration_seconds():.2f} seconds\n" | |
final_summary += f"- Total Input Tokens: {history.total_input_tokens()}\n" # Or total tokens if available | |
final_result = history.final_result() | |
if final_result: | |
final_summary += f"- Final Result: {final_result}\n" | |
errors = history.errors() | |
if errors and any(errors): | |
final_summary += f"- **Errors:**\n```\n{errors}\n```\n" | |
else: | |
final_summary += "- Status: Success\n" | |
# Get the last screenshot if available | |
screenshots = history.screenshots() | |
final_screenshot = screenshots[-1] if screenshots and len(screenshots) > 0 else None | |
# Store task metrics separately for the metrics display | |
webui_manager.bu_task_metrics = { | |
"duration": f"{history.total_duration_seconds():.2f}", | |
"tokens": f"{history.total_input_tokens()}", | |
"result": final_result if final_result else "", | |
"status": "Error" if (errors and any(errors)) else "Success", | |
"errors": errors if (errors and any(errors)) else None, | |
"screenshot": final_screenshot # Add the final screenshot to the metrics | |
} | |
webui_manager.bu_chat_history.append( | |
{"role": "assistant", "content": final_summary} | |
) | |
async def _ask_assistant_callback( | |
webui_manager: WebuiManager, query: str, browser_context: BrowserContext | |
) -> Dict[str, Any]: | |
"""Callback triggered by the agent's ask_for_assistant action.""" | |
logger.info("Agent requires assistance. Waiting for user input.") | |
if not hasattr(webui_manager, "_chat_history"): | |
logger.error("Chat history not found in webui_manager during ask_assistant!") | |
return {"response": "Internal Error: Cannot display help request."} | |
webui_manager.bu_chat_history.append( | |
{ | |
"role": "assistant", | |
"content": f"**Need Help:** {query}\nPlease provide information or perform the required action in the browser, then type your response/confirmation below and click 'Submit Response'.", | |
} | |
) | |
# Use state stored in webui_manager | |
webui_manager.bu_response_event = asyncio.Event() | |
webui_manager.bu_user_help_response = None # Reset previous response | |
try: | |
logger.info("Waiting for user response event...") | |
await asyncio.wait_for( | |
webui_manager.bu_response_event.wait(), timeout=3600.0 | |
) # Long timeout | |
logger.info("User response event received.") | |
except asyncio.TimeoutError: | |
logger.warning("Timeout waiting for user assistance.") | |
webui_manager.bu_chat_history.append( | |
{ | |
"role": "assistant", | |
"content": "**Timeout:** No response received. Trying to proceed.", | |
} | |
) | |
webui_manager.bu_response_event = None # Clear the event | |
return {"response": "Timeout: User did not respond."} # Inform the agent | |
response = webui_manager.bu_user_help_response | |
webui_manager.bu_chat_history.append( | |
{"role": "user", "content": response} | |
) # Show user response in chat | |
webui_manager.bu_response_event = ( | |
None # Clear the event for the next potential request | |
) | |
return {"response": response} | |
# --- Core Agent Execution Logic --- (Needs access to webui_manager) | |
async def run_agent_task( | |
webui_manager: WebuiManager, components: Dict[gr.components.Component, Any] | |
) -> AsyncGenerator[Dict[gr.components.Component, Any], None]: | |
"""Handles the entire lifecycle of initializing and running the agent.""" | |
# --- Get Components --- | |
# Need handles to specific UI components to update them | |
business_name_comp = webui_manager.get_component_by_id("browser_use_agent.business_name") | |
business_website_comp = webui_manager.get_component_by_id("browser_use_agent.business_website") | |
business_type_comp = webui_manager.get_component_by_id("browser_use_agent.business_type") | |
additional_info_comp = webui_manager.get_component_by_id("browser_use_agent.additional_info") | |
run_button_comp = webui_manager.get_component_by_id("browser_use_agent.run_button") | |
stop_button_comp = webui_manager.get_component_by_id( | |
"browser_use_agent.stop_button" | |
) | |
pause_resume_button_comp = webui_manager.get_component_by_id( | |
"browser_use_agent.pause_resume_button" | |
) | |
clear_button_comp = webui_manager.get_component_by_id( | |
"browser_use_agent.clear_button" | |
) | |
chatbot_comp = webui_manager.get_component_by_id("browser_use_agent.chatbot") | |
history_file_comp = webui_manager.get_component_by_id( | |
"browser_use_agent.agent_history_file" | |
) | |
gif_comp = webui_manager.get_component_by_id("browser_use_agent.recording_gif") | |
browser_view_comp = webui_manager.get_component_by_id( | |
"browser_use_agent.browser_view" | |
) | |
# --- 1. Get Task and Initial UI Update --- | |
task = components.get(business_name_comp, "").strip() | |
if not task: | |
gr.Warning("Please enter a business name or task.") | |
yield {run_button_comp: gr.update(interactive=True)} | |
return | |
# Set running state indirectly via _current_task | |
if "Analyze the business" not in task: | |
# If task isn't already formatted, create one from the business info | |
business_name = task # The business name was stored in the "task" variable | |
business_website = components.get(business_website_comp, "").strip() | |
business_type = components.get(business_type_comp, "Retail") | |
additional_info = components.get(additional_info_comp, "").strip() | |
task = create_business_task( | |
business_name, | |
business_type, | |
business_website, | |
additional_info | |
) | |
# We should already have added the task to chat history in handle_submit | |
if not any(msg.get("content") == task for msg in webui_manager.bu_chat_history if msg.get("role") == "user"): | |
webui_manager.bu_chat_history.append({"role": "user", "content": task}) | |
yield { | |
business_name_comp: gr.Textbox( | |
value=components.get(business_name_comp, ""), interactive=False | |
), | |
business_website_comp: gr.Textbox( | |
value=components.get(business_website_comp, ""), interactive=False | |
), | |
business_type_comp: gr.update(interactive=False), | |
additional_info_comp: gr.Textbox( | |
value=components.get(additional_info_comp, ""), interactive=False | |
), | |
run_button_comp: gr.Button(value="⏳ Running...", interactive=False), | |
stop_button_comp: gr.Button(interactive=True), | |
pause_resume_button_comp: gr.Button(value="⏸️ Pause", interactive=True), | |
clear_button_comp: gr.Button(interactive=False), | |
chatbot_comp: gr.update(value=webui_manager.bu_chat_history), | |
history_file_comp: gr.update(value=None), | |
gif_comp: gr.update(value=None), | |
} | |
# --- Agent Settings --- | |
# Access settings values via components dict, getting IDs from webui_manager | |
def get_setting(key, default=None): | |
comp = webui_manager.id_to_component.get(f"agent_settings.{key}") | |
return components.get(comp, default) if comp else default | |
override_system_prompt = get_setting("override_system_prompt") or None | |
extend_system_prompt = get_setting("extend_system_prompt") or None | |
llm_provider_name = get_setting( | |
"llm_provider", None | |
) # Default to None if not found | |
llm_model_name = get_setting("llm_model_name", None) | |
llm_temperature = get_setting("llm_temperature", 0.6) | |
use_vision = get_setting("use_vision", True) | |
ollama_num_ctx = get_setting("ollama_num_ctx", 16000) | |
llm_base_url = get_setting("llm_base_url") or None | |
llm_api_key = get_setting("llm_api_key") or None | |
max_steps = get_setting("max_steps", 100) | |
max_actions = get_setting("max_actions", 10) | |
max_input_tokens = get_setting("max_input_tokens", 128000) | |
tool_calling_str = get_setting("tool_calling_method", "auto") | |
tool_calling_method = tool_calling_str if tool_calling_str != "None" else None | |
mcp_server_config_comp = webui_manager.id_to_component.get( | |
"agent_settings.mcp_server_config" | |
) | |
mcp_server_config_str = ( | |
components.get(mcp_server_config_comp) if mcp_server_config_comp else None | |
) | |
mcp_server_config = ( | |
json.loads(mcp_server_config_str) if mcp_server_config_str else None | |
) | |
# Planner LLM Settings (Optional) | |
planner_llm_provider_name = get_setting("planner_llm_provider") or None | |
planner_llm = None | |
planner_use_vision = False | |
if planner_llm_provider_name: | |
planner_llm_model_name = get_setting("planner_llm_model_name") | |
planner_llm_temperature = get_setting("planner_llm_temperature", 0.6) | |
planner_ollama_num_ctx = get_setting("planner_ollama_num_ctx", 16000) | |
planner_llm_base_url = get_setting("planner_llm_base_url") or None | |
planner_llm_api_key = get_setting("planner_llm_api_key") or None | |
planner_use_vision = get_setting("planner_use_vision", False) | |
planner_llm = await _initialize_llm( | |
planner_llm_provider_name, | |
planner_llm_model_name, | |
planner_llm_temperature, | |
planner_llm_base_url, | |
planner_llm_api_key, | |
planner_ollama_num_ctx if planner_llm_provider_name == "ollama" else None, | |
) | |
# --- Browser Settings --- | |
def get_browser_setting(key, default=None): | |
comp = webui_manager.id_to_component.get(f"browser_settings.{key}") | |
return components.get(comp, default) if comp else default | |
browser_binary_path = get_browser_setting("browser_binary_path") or None | |
browser_user_data_dir = get_browser_setting("browser_user_data_dir") or None | |
use_own_browser = get_browser_setting( | |
"use_own_browser", False | |
) # Logic handled by CDP/WSS presence | |
keep_browser_open = get_browser_setting("keep_browser_open", False) | |
headless = get_browser_setting("headless", False) | |
disable_security = get_browser_setting("disable_security", False) | |
window_w = int(get_browser_setting("window_w", 1280)) | |
window_h = int(get_browser_setting("window_h", 1100)) | |
cdp_url = get_browser_setting("cdp_url") or None | |
wss_url = get_browser_setting("wss_url") or None | |
save_recording_path = get_browser_setting("save_recording_path") or None | |
save_trace_path = get_browser_setting("save_trace_path") or None | |
save_agent_history_path = get_browser_setting( | |
"save_agent_history_path", "./tmp/agent_history" | |
) | |
save_download_path = get_browser_setting("save_download_path", "./tmp/downloads") | |
stream_vw = 70 | |
stream_vh = int(70 * window_h // window_w) | |
os.makedirs(save_agent_history_path, exist_ok=True) | |
if save_recording_path: | |
os.makedirs(save_recording_path, exist_ok=True) | |
if save_trace_path: | |
os.makedirs(save_trace_path, exist_ok=True) | |
if save_download_path: | |
os.makedirs(save_download_path, exist_ok=True) | |
# --- 2. Initialize LLM --- | |
main_llm = await _initialize_llm( | |
llm_provider_name, | |
llm_model_name, | |
llm_temperature, | |
llm_base_url, | |
llm_api_key, | |
ollama_num_ctx if llm_provider_name == "ollama" else None, | |
) | |
# Pass the webui_manager instance to the callback when wrapping it | |
async def ask_callback_wrapper( | |
query: str, browser_context: BrowserContext | |
) -> Dict[str, Any]: | |
return await _ask_assistant_callback(webui_manager, query, browser_context) | |
if not webui_manager.bu_controller: | |
webui_manager.bu_controller = CustomController( | |
ask_assistant_callback=ask_callback_wrapper | |
) | |
await webui_manager.bu_controller.setup_mcp_client(mcp_server_config) | |
# --- 4. Initialize Browser and Context --- | |
should_close_browser_on_finish = not keep_browser_open | |
try: | |
# Close existing resources if not keeping open | |
if not keep_browser_open: | |
if webui_manager.bu_browser_context: | |
logger.info("Closing previous browser context.") | |
await webui_manager.bu_browser_context.close() | |
webui_manager.bu_browser_context = None | |
if webui_manager.bu_browser: | |
logger.info("Closing previous browser.") | |
await webui_manager.bu_browser.close() | |
webui_manager.bu_browser = None | |
# Create Browser if needed | |
if not webui_manager.bu_browser: | |
logger.info("Launching new browser instance.") | |
extra_args = [] | |
if use_own_browser: | |
browser_binary_path = os.getenv("BROWSER_PATH", None) or browser_binary_path | |
if browser_binary_path == "": | |
browser_binary_path = None | |
browser_user_data = browser_user_data_dir or os.getenv("BROWSER_USER_DATA", None) | |
if browser_user_data: | |
extra_args += [f"--user-data-dir={browser_user_data}"] | |
else: | |
browser_binary_path = None | |
webui_manager.bu_browser = CustomBrowser( | |
config=BrowserConfig( | |
headless=headless, | |
disable_security=disable_security, | |
browser_binary_path=browser_binary_path, | |
extra_browser_args=extra_args, | |
wss_url=wss_url, | |
cdp_url=cdp_url, | |
new_context_config=BrowserContextConfig( | |
window_width=window_w, | |
window_height=window_h, | |
) | |
) | |
) | |
# Create Context if needed | |
if not webui_manager.bu_browser_context: | |
logger.info("Creating new browser context.") | |
context_config = BrowserContextConfig( | |
trace_path=save_trace_path if save_trace_path else None, | |
save_recording_path=save_recording_path | |
if save_recording_path | |
else None, | |
save_downloads_path=save_download_path if save_download_path else None, | |
window_height=window_h, | |
window_width=window_w, | |
) | |
if not webui_manager.bu_browser: | |
raise ValueError("Browser not initialized, cannot create context.") | |
webui_manager.bu_browser_context = ( | |
await webui_manager.bu_browser.new_context(config=context_config) | |
) | |
# --- 5. Initialize or Update Agent --- | |
webui_manager.bu_agent_task_id = str(uuid.uuid4()) # New ID for this task run | |
os.makedirs( | |
os.path.join(save_agent_history_path, webui_manager.bu_agent_task_id), | |
exist_ok=True, | |
) | |
history_file = os.path.join( | |
save_agent_history_path, | |
webui_manager.bu_agent_task_id, | |
f"{webui_manager.bu_agent_task_id}.json", | |
) | |
gif_path = os.path.join( | |
save_agent_history_path, | |
webui_manager.bu_agent_task_id, | |
f"{webui_manager.bu_agent_task_id}.gif", | |
) | |
# Pass the webui_manager to callbacks when wrapping them | |
async def step_callback_wrapper( | |
state: BrowserState, output: AgentOutput, step_num: int | |
): | |
await _handle_new_step(webui_manager, state, output, step_num) | |
def done_callback_wrapper(history: AgentHistoryList): | |
_handle_done(webui_manager, history) | |
if not webui_manager.bu_agent: | |
logger.info(f"Initializing new agent for task: {task}") | |
if not webui_manager.bu_browser or not webui_manager.bu_browser_context: | |
raise ValueError( | |
"Browser or Context not initialized, cannot create agent." | |
) | |
webui_manager.bu_agent = BrowserUseAgent( | |
task=task, | |
llm=main_llm, | |
browser=webui_manager.bu_browser, | |
browser_context=webui_manager.bu_browser_context, | |
controller=webui_manager.bu_controller, | |
register_new_step_callback=step_callback_wrapper, | |
register_done_callback=done_callback_wrapper, | |
use_vision=use_vision, | |
override_system_message=override_system_prompt, | |
extend_system_message=extend_system_prompt, | |
max_input_tokens=max_input_tokens, | |
max_actions_per_step=max_actions, | |
tool_calling_method=tool_calling_method, | |
planner_llm=planner_llm, | |
use_vision_for_planner=planner_use_vision if planner_llm else False, | |
source="webui", | |
) | |
webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id | |
webui_manager.bu_agent.settings.generate_gif = gif_path | |
else: | |
webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id | |
webui_manager.bu_agent.add_new_task(task) | |
webui_manager.bu_agent.settings.generate_gif = gif_path | |
webui_manager.bu_agent.browser = webui_manager.bu_browser | |
webui_manager.bu_agent.browser_context = webui_manager.bu_browser_context | |
webui_manager.bu_agent.controller = webui_manager.bu_controller | |
# --- 6. Run Agent Task and Stream Updates --- | |
agent_run_coro = webui_manager.bu_agent.run(max_steps=max_steps) | |
agent_task = asyncio.create_task(agent_run_coro) | |
webui_manager.bu_current_task = agent_task # Store the task | |
last_chat_len = len(webui_manager.bu_chat_history) | |
while not agent_task.done(): | |
is_paused = webui_manager.bu_agent.state.paused | |
is_stopped = webui_manager.bu_agent.state.stopped | |
# Check for pause state | |
if is_paused: | |
yield { | |
pause_resume_button_comp: gr.update( | |
value="▶️ Resume", interactive=True | |
), | |
stop_button_comp: gr.update(interactive=True), | |
} | |
# Wait until pause is released or task is stopped/done | |
while is_paused and not agent_task.done(): | |
# Re-check agent state in loop | |
is_paused = webui_manager.bu_agent.state.paused | |
is_stopped = webui_manager.bu_agent.state.stopped | |
if is_stopped: # Stop signal received while paused | |
break | |
await asyncio.sleep(0.2) | |
if ( | |
agent_task.done() or is_stopped | |
): # If stopped or task finished while paused | |
break | |
# If resumed, yield UI update | |
yield { | |
pause_resume_button_comp: gr.update( | |
value="⏸️ Pause", interactive=True | |
), | |
run_button_comp: gr.update( | |
value="⏳ Running...", interactive=False | |
), | |
} | |
# Check if agent stopped itself or stop button was pressed (which sets agent.state.stopped) | |
if is_stopped: | |
logger.info("Agent has stopped (internally or via stop button).") | |
if not agent_task.done(): | |
# Ensure the task coroutine finishes if agent just set flag | |
try: | |
await asyncio.wait_for( | |
agent_task, timeout=1.0 | |
) # Give it a moment to exit run() | |
except asyncio.TimeoutError: | |
logger.warning( | |
"Agent task did not finish quickly after stop signal, cancelling." | |
) | |
agent_task.cancel() | |
except Exception: # Catch task exceptions if it errors on stop | |
pass | |
break # Exit the streaming loop | |
# Check if agent is asking for help (via response_event) | |
update_dict = {} | |
if webui_manager.bu_response_event is not None: | |
update_dict = { | |
business_name_comp: gr.update( | |
placeholder="Agent needs help. Enter response and submit.", | |
interactive=True, | |
), | |
business_website_comp: gr.update( | |
placeholder="Agent needs help. Enter response and submit.", | |
interactive=True, | |
), | |
business_type_comp: gr.update( | |
placeholder="Agent needs help. Enter response and submit.", | |
interactive=True, | |
), | |
additional_info_comp: gr.update( | |
placeholder="Agent needs help. Enter response and submit.", | |
interactive=True, | |
), | |
run_button_comp: gr.update( | |
value="✔️ Submit Response", interactive=True | |
), | |
pause_resume_button_comp: gr.update(interactive=False), | |
stop_button_comp: gr.update(interactive=False), | |
chatbot_comp: gr.update(value=webui_manager.bu_chat_history), | |
} | |
last_chat_len = len(webui_manager.bu_chat_history) | |
yield update_dict | |
# Wait until response is submitted or task finishes | |
while ( | |
webui_manager.bu_response_event is not None | |
and not agent_task.done() | |
): | |
await asyncio.sleep(0.2) | |
# Restore UI after response submitted or if task ended unexpectedly | |
if not agent_task.done(): | |
yield { | |
business_name_comp: gr.update( | |
placeholder="Enter business name", interactive=False | |
), | |
business_website_comp: gr.update( | |
placeholder="Enter business website", interactive=False | |
), | |
business_type_comp: gr.update( | |
placeholder="Enter business type", interactive=False | |
), | |
additional_info_comp: gr.update( | |
placeholder="Enter additional information", interactive=False | |
), | |
run_button_comp: gr.update( | |
value="⏳ Running...", interactive=False | |
), | |
pause_resume_button_comp: gr.update(interactive=True), | |
stop_button_comp: gr.update(interactive=True), | |
} | |
else: | |
break # Task finished while waiting for response | |
# Update Chatbot if new messages arrived via callbacks | |
if len(webui_manager.bu_chat_history) > last_chat_len: | |
update_dict[chatbot_comp] = gr.update( | |
value=webui_manager.bu_chat_history | |
) | |
last_chat_len = len(webui_manager.bu_chat_history) | |
# Update Browser View | |
if headless and webui_manager.bu_browser_context: | |
try: | |
screenshot_b64 = ( | |
await webui_manager.bu_browser_context.take_screenshot() | |
) | |
if screenshot_b64: | |
html_content = f'<img src="data:image/jpeg;base64,{screenshot_b64}" style="width:{stream_vw}vw; height:{stream_vh}vh ; border:1px solid #ccc;">' | |
update_dict[browser_view_comp] = gr.update( | |
value=html_content, visible=True | |
) | |
else: | |
html_content = f"<h1 style='width:{stream_vw}vw; height:{stream_vh}vh'>Waiting for browser session...</h1>" | |
update_dict[browser_view_comp] = gr.update( | |
value=html_content, visible=True | |
) | |
except Exception as e: | |
logger.debug(f"Failed to capture screenshot: {e}") | |
update_dict[browser_view_comp] = gr.update( | |
value="<div style='...'>Error loading view...</div>", | |
visible=True, | |
) | |
else: | |
update_dict[browser_view_comp] = gr.update(visible=False) | |
# Yield accumulated updates | |
if update_dict: | |
yield update_dict | |
await asyncio.sleep(0.1) # Polling interval | |
# --- 7. Task Finalization --- | |
webui_manager.bu_agent.state.paused = False | |
webui_manager.bu_agent.state.stopped = False | |
final_update = {} | |
try: | |
logger.info("Agent task completing...") | |
# Await the task ensure completion and catch exceptions if not already caught | |
if not agent_task.done(): | |
await agent_task # Retrieve result/exception | |
elif agent_task.exception(): # Check if task finished with exception | |
agent_task.result() # Raise the exception to be caught below | |
logger.info("Agent task completed processing.") | |
logger.info(f"Explicitly saving agent history to: {history_file}") | |
webui_manager.bu_agent.save_history(history_file) | |
if os.path.exists(history_file): | |
final_update[history_file_comp] = gr.File(value=history_file) | |
if gif_path and os.path.exists(gif_path): | |
logger.info(f"GIF found at: {gif_path}") | |
final_update[gif_comp] = gr.Image(value=gif_path) | |
# Update task metrics display if metrics are available | |
task_metrics_display_comp = webui_manager.get_component_by_id("browser_use_agent.task_metrics_display") | |
if hasattr(webui_manager, 'bu_task_metrics') and webui_manager.bu_task_metrics: | |
# If we have metrics but no screenshot, try to get the latest screenshot | |
if not webui_manager.bu_task_metrics.get('screenshot') and webui_manager.bu_browser_context: | |
try: | |
final_screenshot = await webui_manager.bu_browser_context.take_screenshot() | |
if final_screenshot: | |
webui_manager.bu_task_metrics['screenshot'] = final_screenshot | |
except Exception as e: | |
logger.warning(f"Failed to capture final screenshot for metrics: {e}") | |
# Format the metrics for display | |
metrics_md = format_task_metrics(webui_manager.bu_task_metrics) | |
final_update[task_metrics_display_comp] = gr.update(value=metrics_md) | |
except asyncio.CancelledError: | |
logger.info("Agent task was cancelled.") | |
if not any( | |
"Cancelled" in msg.get("content", "") | |
for msg in webui_manager.bu_chat_history | |
if msg.get("role") == "assistant" | |
): | |
webui_manager.bu_chat_history.append( | |
{"role": "assistant", "content": "**Task Cancelled**."} | |
) | |
final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history) | |
except Exception as e: | |
logger.error(f"Error during agent execution: {e}", exc_info=True) | |
error_message = ( | |
f"**Agent Execution Error:**\n```\n{type(e).__name__}: {e}\n```" | |
) | |
if not any( | |
error_message in msg.get("content", "") | |
for msg in webui_manager.bu_chat_history | |
if msg.get("role") == "assistant" | |
): | |
webui_manager.bu_chat_history.append( | |
{"role": "assistant", "content": error_message} | |
) | |
final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history) | |
gr.Error(f"Agent execution failed: {e}") | |
finally: | |
webui_manager.bu_current_task = None # Clear the task reference | |
# Close browser/context if requested | |
if should_close_browser_on_finish: | |
if webui_manager.bu_browser_context: | |
logger.info("Closing browser context after task.") | |
await webui_manager.bu_browser_context.close() | |
webui_manager.bu_browser_context = None | |
if webui_manager.bu_browser: | |
logger.info("Closing browser after task.") | |
await webui_manager.bu_browser.close() | |
webui_manager.bu_browser = None | |
# --- 8. Final UI Update --- | |
final_update.update( | |
{ | |
business_name_comp: gr.update( | |
value="", | |
interactive=True, | |
placeholder="Enter business name", | |
), | |
business_website_comp: gr.update( | |
value="", | |
interactive=True, | |
placeholder="Enter business website", | |
), | |
business_type_comp: gr.update(interactive=True), | |
additional_info_comp: gr.update( | |
value="", | |
interactive=True, | |
placeholder="Enter additional information", | |
), | |
run_button_comp: gr.update(value="▶️ Start Analysis", interactive=True), | |
stop_button_comp: gr.update(value="⏹️ Stop", interactive=False), | |
pause_resume_button_comp: gr.update( | |
value="⏸️ Pause", interactive=False | |
), | |
clear_button_comp: gr.update(interactive=True), | |
# Ensure final chat history is shown | |
chatbot_comp: gr.update(value=webui_manager.bu_chat_history), | |
} | |
) | |
yield final_update | |
except Exception as e: | |
# Catch errors during setup (before agent run starts) | |
logger.error(f"Error setting up agent task: {e}", exc_info=True) | |
webui_manager.bu_current_task = None # Ensure state is reset | |
yield { | |
business_name_comp: gr.update( | |
interactive=True, placeholder="Enter business name" | |
), | |
business_website_comp: gr.update( | |
interactive=True, placeholder="Enter business website" | |
), | |
business_type_comp: gr.update(interactive=True), | |
additional_info_comp: gr.update( | |
interactive=True, placeholder="Enter additional information" | |
), | |
run_button_comp: gr.update(value="▶️ Start Analysis", interactive=True), | |
stop_button_comp: gr.update(value="⏹️ Stop", interactive=False), | |
pause_resume_button_comp: gr.update(value="⏸️ Pause", interactive=False), | |
clear_button_comp: gr.update(interactive=True), | |
chatbot_comp: gr.update( | |
value=webui_manager.bu_chat_history | |
+ [{"role": "assistant", "content": f"**Setup Error:** {e}"}] | |
), | |
} | |
# --- Button Click Handlers --- (Need access to webui_manager) | |
async def handle_submit( | |
webui_manager: WebuiManager, components: Dict[gr.components.Component, Any] | |
): | |
"""Handles clicks on the main 'Start Analysis' button.""" | |
# Get business information from the form | |
business_name_comp = webui_manager.get_component_by_id("browser_use_agent.business_name") | |
business_website_comp = webui_manager.get_component_by_id("browser_use_agent.business_website") | |
business_type_comp = webui_manager.get_component_by_id("browser_use_agent.business_type") | |
additional_info_comp = webui_manager.get_component_by_id("browser_use_agent.additional_info") | |
business_name = components.get(business_name_comp, "").strip() | |
business_website = components.get(business_website_comp, "").strip() | |
business_type = components.get(business_type_comp, "Retail") | |
additional_info = components.get(additional_info_comp, "").strip() | |
if not business_name: | |
gr.Warning("Please enter a business name.") | |
yield {business_name_comp: gr.update(value=business_name)} | |
return | |
# Generate the standardized task using our template | |
task = create_business_task( | |
business_name, | |
business_type, | |
business_website, | |
additional_info | |
) | |
# Check if waiting for user assistance | |
if webui_manager.bu_response_event and not webui_manager.bu_response_event.is_set(): | |
logger.info(f"User submitted assistance") | |
webui_manager.bu_user_help_response = "Continue with the current task." | |
webui_manager.bu_response_event.set() | |
# UI updates handled by the main loop reacting to the event being set | |
yield { | |
business_name_comp: gr.update( | |
interactive=False, | |
), | |
business_website_comp: gr.update( | |
interactive=False, | |
), | |
business_type_comp: gr.update( | |
interactive=False, | |
), | |
additional_info_comp: gr.update( | |
interactive=False, | |
), | |
webui_manager.get_component_by_id( | |
"browser_use_agent.run_button" | |
): gr.update(value="⏳ Running...", interactive=False), | |
} | |
# Check if a task is currently running (using _current_task) | |
elif webui_manager.bu_current_task and not webui_manager.bu_current_task.done(): | |
logger.warning( | |
"Start button clicked while agent is already running and not asking for help." | |
) | |
gr.Info("Agent is currently running. Please wait or use Stop/Pause.") | |
yield {} # No change | |
else: | |
# Store the task in the user input field before running | |
components[business_name_comp] = task | |
# Handle submission for a new task | |
logger.info(f"Starting analysis for business: {business_name}") | |
# Update chat history with the business information | |
webui_manager.bu_chat_history.append({"role": "user", "content": task}) | |
# Run the task using our agent | |
async for update in run_agent_task(webui_manager, components): | |
yield update | |
async def handle_stop(webui_manager: WebuiManager): | |
"""Handles clicks on the 'Stop' button.""" | |
logger.info("Stop button clicked.") | |
agent = webui_manager.bu_agent | |
task = webui_manager.bu_current_task | |
if agent and task and not task.done(): | |
# Signal the agent to stop by setting its internal flag | |
agent.state.stopped = True | |
agent.state.paused = False # Ensure not paused if stopped | |
return { | |
webui_manager.get_component_by_id( | |
"browser_use_agent.stop_button" | |
): gr.update(interactive=False, value="⏹️ Stopping..."), | |
webui_manager.get_component_by_id( | |
"browser_use_agent.pause_resume_button" | |
): gr.update(interactive=False), | |
webui_manager.get_component_by_id( | |
"browser_use_agent.run_button" | |
): gr.update(interactive=False), | |
} | |
else: | |
logger.warning("Stop clicked but agent is not running or task is already done.") | |
# Reset UI just in case it's stuck | |
return { | |
webui_manager.get_component_by_id( | |
"browser_use_agent.run_button" | |
): gr.update(interactive=True), | |
webui_manager.get_component_by_id( | |
"browser_use_agent.stop_button" | |
): gr.update(interactive=False), | |
webui_manager.get_component_by_id( | |
"browser_use_agent.pause_resume_button" | |
): gr.update(interactive=False), | |
webui_manager.get_component_by_id( | |
"browser_use_agent.clear_button" | |
): gr.update(interactive=True), | |
} | |
async def handle_pause_resume(webui_manager: WebuiManager): | |
"""Handles clicks on the 'Pause/Resume' button.""" | |
agent = webui_manager.bu_agent | |
task = webui_manager.bu_current_task | |
if agent and task and not task.done(): | |
if agent.state.paused: | |
logger.info("Resume button clicked.") | |
agent.resume() | |
# UI update happens in main loop | |
return { | |
webui_manager.get_component_by_id( | |
"browser_use_agent.pause_resume_button" | |
): gr.update(value="⏸️ Pause", interactive=True) | |
} # Optimistic update | |
else: | |
logger.info("Pause button clicked.") | |
agent.pause() | |
return { | |
webui_manager.get_component_by_id( | |
"browser_use_agent.pause_resume_button" | |
): gr.update(value="▶️ Resume", interactive=True) | |
} # Optimistic update | |
else: | |
logger.warning( | |
"Pause/Resume clicked but agent is not running or doesn't support state." | |
) | |
return {} # No change | |
async def handle_clear(webui_manager: WebuiManager): | |
"""Handles clicks on the 'Clear' button.""" | |
logger.info("Clear button clicked.") | |
# Stop any running task first | |
task = webui_manager.bu_current_task | |
if task and not task.done(): | |
logger.info("Clearing requires stopping the current task.") | |
webui_manager.bu_agent.stop() | |
task.cancel() | |
try: | |
await asyncio.wait_for(task, timeout=2.0) # Wait briefly | |
except (asyncio.CancelledError, asyncio.TimeoutError): | |
pass | |
except Exception as e: | |
logger.warning(f"Error stopping task on clear: {e}") | |
webui_manager.bu_current_task = None | |
if webui_manager.bu_controller: | |
await webui_manager.bu_controller.close_mcp_client() | |
webui_manager.bu_controller = None | |
webui_manager.bu_agent = None | |
# Reset state stored in manager | |
webui_manager.bu_chat_history = [] | |
webui_manager.bu_response_event = None | |
webui_manager.bu_user_help_response = None | |
webui_manager.bu_agent_task_id = None | |
webui_manager.bu_task_metrics = None # Clear task metrics | |
logger.info("Agent state and browser resources cleared.") | |
# Reset UI components | |
return { | |
webui_manager.get_component_by_id("browser_use_agent.chatbot"): gr.update( | |
value=[] | |
), | |
webui_manager.get_component_by_id("browser_use_agent.business_name"): gr.update( | |
value="", interactive=True | |
), | |
webui_manager.get_component_by_id("browser_use_agent.business_website"): gr.update( | |
value="", interactive=True | |
), | |
webui_manager.get_component_by_id("browser_use_agent.business_type"): gr.update( | |
value="Retail", interactive=True | |
), | |
webui_manager.get_component_by_id("browser_use_agent.additional_info"): gr.update( | |
value="", interactive=True | |
), | |
webui_manager.get_component_by_id( | |
"browser_use_agent.agent_history_file" | |
): gr.update(value=None), | |
webui_manager.get_component_by_id("browser_use_agent.recording_gif"): gr.update( | |
value=None | |
), | |
webui_manager.get_component_by_id("browser_use_agent.browser_view"): gr.update( | |
value="<div style='...'>Browser Cleared</div>" | |
), | |
webui_manager.get_component_by_id("browser_use_agent.run_button"): gr.update( | |
value="▶️ Start Analysis", interactive=True | |
), | |
webui_manager.get_component_by_id("browser_use_agent.stop_button"): gr.update( | |
interactive=False | |
), | |
webui_manager.get_component_by_id( | |
"browser_use_agent.pause_resume_button" | |
): gr.update(value="⏸️ Pause", interactive=False), | |
webui_manager.get_component_by_id("browser_use_agent.clear_button"): gr.update( | |
interactive=True | |
), | |
webui_manager.get_component_by_id("browser_use_agent.task_metrics_display"): gr.update( | |
value="No task metrics available yet. Run a task to see metrics here." | |
), | |
} | |
# --- Tab Creation Function --- | |
def create_browser_use_agent_tab(webui_manager: WebuiManager): | |
""" | |
Create the run agent tab with business-focused UI. | |
""" | |
webui_manager.init_browser_use_agent() | |
# Initialize task metrics if not already present | |
if not hasattr(webui_manager, 'bu_task_metrics'): | |
webui_manager.bu_task_metrics = None | |
# --- Define UI Components --- | |
tab_components = {} | |
with gr.Column(): | |
chatbot = gr.Chatbot( | |
lambda: webui_manager.bu_chat_history, # Load history dynamically | |
elem_id="browser_use_chatbot", | |
label="Agent Interaction", | |
type="messages", | |
height=600, | |
show_copy_button=True, | |
) | |
# Business information form | |
with gr.Column(elem_id="business_form"): | |
gr.Markdown("### Business Information") | |
business_name = gr.Textbox( | |
label="Business Name", | |
placeholder="Enter business name", | |
elem_id="business_name", | |
) | |
business_website = gr.Textbox( | |
label="Business Website (optional)", | |
placeholder="https://www.example.com", | |
elem_id="business_website", | |
) | |
business_type = gr.Dropdown( | |
label="Business Type", | |
choices=["Retail", "Restaurant", "Service", "Healthcare", "Technology", "Other"], | |
value="Retail", | |
elem_id="business_type", | |
) | |
additional_info = gr.Textbox( | |
label="Additional Information (optional)", | |
placeholder="Any specific details about the business that might help the agent", | |
lines=2, | |
elem_id="additional_info", | |
) | |
with gr.Row(): | |
stop_button = gr.Button( | |
"⏹️ Stop", interactive=False, variant="stop", scale=2 | |
) | |
pause_resume_button = gr.Button( | |
"⏸️ Pause", interactive=False, variant="secondary", scale=2, visible=True | |
) | |
clear_button = gr.Button( | |
"🗑️ Clear", interactive=True, variant="secondary", scale=2 | |
) | |
run_button = gr.Button("▶️ Start Analysis", variant="primary", scale=3) | |
browser_view = gr.HTML( | |
value="<div style='width:100%; height:50vh; display:flex; justify-content:center; align-items:center; border:1px solid #ccc; background-color:#f0f0f0;'><p>Browser View (Requires Headless=True)</p></div>", | |
label="Browser Live View", | |
elem_id="browser_view", | |
visible=False, | |
) | |
# Task Metrics Section | |
with gr.Column(visible=True) as task_metrics_container: | |
gr.Markdown("### Task Metrics", elem_id="task_metrics_heading") | |
task_metrics_display = gr.Markdown( | |
value=lambda: format_task_metrics(webui_manager.bu_task_metrics), | |
elem_id="task_metrics_display", | |
) | |
with gr.Column(): | |
gr.Markdown("### Task Outputs") | |
agent_history_file = gr.File(label="Agent History JSON", interactive=False) | |
recording_gif = gr.Image( | |
label="Task Recording GIF", | |
format="gif", | |
interactive=False, | |
type="filepath", | |
) | |
# --- Store Components in Manager --- | |
tab_components.update( | |
dict( | |
chatbot=chatbot, | |
business_name=business_name, | |
business_website=business_website, | |
business_type=business_type, | |
additional_info=additional_info, | |
clear_button=clear_button, | |
run_button=run_button, | |
stop_button=stop_button, | |
pause_resume_button=pause_resume_button, | |
agent_history_file=agent_history_file, | |
recording_gif=recording_gif, | |
browser_view=browser_view, | |
task_metrics_display=task_metrics_display, | |
) | |
) | |
webui_manager.add_components( | |
"browser_use_agent", tab_components | |
) # Use "browser_use_agent" as tab_name prefix | |
all_managed_components = set( | |
webui_manager.get_components() | |
) # Get all components known to manager | |
run_tab_outputs = list(tab_components.values()) | |
async def submit_wrapper( | |
components_dict: Dict[Component, Any], | |
) -> AsyncGenerator[Dict[Component, Any], None]: | |
"""Wrapper for handle_submit that yields its results.""" | |
async for update in handle_submit(webui_manager, components_dict): | |
yield update | |
async def stop_wrapper() -> AsyncGenerator[Dict[Component, Any], None]: | |
"""Wrapper for handle_stop.""" | |
update_dict = await handle_stop(webui_manager) | |
yield update_dict | |
async def pause_resume_wrapper() -> AsyncGenerator[Dict[Component, Any], None]: | |
"""Wrapper for handle_pause_resume.""" | |
update_dict = await handle_pause_resume(webui_manager) | |
yield update_dict | |
async def clear_wrapper() -> AsyncGenerator[Dict[Component, Any], None]: | |
"""Wrapper for handle_clear.""" | |
update_dict = await handle_clear(webui_manager) | |
yield update_dict | |
# --- Connect Event Handlers using the Wrappers -- | |
run_button.click( | |
fn=submit_wrapper, inputs=all_managed_components, outputs=run_tab_outputs | |
) | |
stop_button.click(fn=stop_wrapper, inputs=None, outputs=run_tab_outputs) | |
pause_resume_button.click( | |
fn=pause_resume_wrapper, inputs=None, outputs=run_tab_outputs | |
) | |
clear_button.click(fn=clear_wrapper, inputs=None, outputs=run_tab_outputs) | |