import asyncio import json import logging import os import uuid from typing import Any, AsyncGenerator, Dict, Optional import gradio as gr # from browser_use.agent.service import Agent from browser_use.agent.views import ( AgentHistoryList, AgentOutput, ) from browser_use.browser.browser import BrowserConfig from browser_use.browser.context import BrowserContext, BrowserContextConfig from browser_use.browser.views import BrowserState from gradio.components import Component from langchain_core.language_models.chat_models import BaseChatModel from src.agent.browser_use.browser_use_agent import BrowserUseAgent from src.browser.custom_browser import CustomBrowser from src.controller.custom_controller import CustomController from src.utils import llm_provider from src.webui.webui_manager import WebuiManager logger = logging.getLogger(__name__) # Custom function to format task metrics as markdown def format_task_metrics(metrics): if not metrics: return "No task metrics available yet. Run a task to see metrics here." md = "#### Task Execution Summary\n\n" md += f"⏱️ **Duration:** {metrics['duration']} seconds\n\n" md += f"🔢 **Tokens Used:** {metrics['tokens']}\n\n" if metrics['result']: md += f"📋 **Final Result:**\n\n```\n{metrics['result']}\n```\n\n" md += f"✅ **Status:** {metrics['status']}\n\n" if metrics['errors'] and any(metrics['errors']): md += f"❌ **Errors:**\n\n```\n{metrics['errors']}\n```\n\n" # Display screenshot if available if metrics.get('screenshot'): md += f"📸 **Final Screenshot:**\n\n" md += f"\"Final\n\n" return md # Add this new function after the format_task_metrics function def create_business_task(business_name, business_type, business_website=None, additional_info=None): """Creates a standardized task description for analyzing a business.""" task = f"Analyze the business '{business_name}' which is in the {business_type} industry." if business_website: task += f" Start by visiting their website at {business_website}." else: task += f" Search for information about this business online." task += " Gather the following information: main products/services, contact information, location, hours of operation, and customer reviews." if additional_info: task += f" Additional context: {additional_info}" task += " Provide a comprehensive report with all findings." return task # --- Helper Functions --- (Defined at module level) async def _initialize_llm( provider: Optional[str], model_name: Optional[str], temperature: float, base_url: Optional[str], api_key: Optional[str], num_ctx: Optional[int] = None, ) -> Optional[BaseChatModel]: """Initializes the LLM based on settings. Returns None if provider/model is missing.""" if not provider or not model_name: logger.info("LLM Provider or Model Name not specified, LLM will be None.") return None try: # Use your actual LLM provider logic here logger.info( f"Initializing LLM: Provider={provider}, Model={model_name}, Temp={temperature}" ) # Example using a placeholder function llm = llm_provider.get_llm_model( provider=provider, model_name=model_name, temperature=temperature, base_url=base_url or None, api_key=api_key or None, # Add other relevant params like num_ctx for ollama num_ctx=num_ctx if provider == "ollama" else None, ) return llm except Exception as e: logger.error(f"Failed to initialize LLM: {e}", exc_info=True) gr.Warning( f"Failed to initialize LLM '{model_name}' for provider '{provider}'. Please check settings. Error: {e}" ) return None def _get_config_value( webui_manager: WebuiManager, comp_dict: Dict[gr.components.Component, Any], comp_id_suffix: str, default: Any = None, ) -> Any: """Safely get value from component dictionary using its ID suffix relative to the tab.""" # Assumes component ID format is "tab_name.comp_name" tab_name = "browser_use_agent" # Hardcode or derive if needed comp_id = f"{tab_name}.{comp_id_suffix}" # Need to find the component object first using the ID from the manager try: comp = webui_manager.get_component_by_id(comp_id) return comp_dict.get(comp, default) except KeyError: # Try accessing settings tabs as well for prefix in ["agent_settings", "browser_settings"]: try: comp_id = f"{prefix}.{comp_id_suffix}" comp = webui_manager.get_component_by_id(comp_id) return comp_dict.get(comp, default) except KeyError: continue logger.warning( f"Component with suffix '{comp_id_suffix}' not found in manager for value lookup." ) return default def _format_agent_output(model_output: AgentOutput) -> str: """Formats AgentOutput for display in the chatbot using JSON.""" content = "" if model_output: try: # Directly use model_dump if actions and current_state are Pydantic models action_dump = [ action.model_dump(exclude_none=True) for action in model_output.action ] state_dump = model_output.current_state.model_dump(exclude_none=True) model_output_dump = { "current_state": state_dump, "action": action_dump, } # Dump to JSON string with indentation json_string = json.dumps(model_output_dump, indent=4, ensure_ascii=False) # Wrap in
 for proper display in HTML
            content = f"
{json_string}
" except AttributeError as ae: logger.error( f"AttributeError during model dump: {ae}. Check if 'action' or 'current_state' or their items support 'model_dump'." ) content = f"
Error: Could not format agent output (AttributeError: {ae}).\nRaw output: {str(model_output)}
" except Exception as e: logger.error(f"Error formatting agent output: {e}", exc_info=True) # Fallback to simple string representation on error content = f"
Error formatting agent output.\nRaw output:\n{str(model_output)}
" return content.strip() # --- Updated Callback Implementation --- async def _handle_new_step( webui_manager: WebuiManager, state: BrowserState, output: AgentOutput, step_num: int ): """Callback for each step taken by the agent, including screenshot display.""" # Use the correct chat history attribute name from the user's code if not hasattr(webui_manager, "bu_chat_history"): logger.error( "Attribute 'bu_chat_history' not found in webui_manager! Cannot add chat message." ) # Initialize it maybe? Or raise an error? For now, log and potentially skip chat update. webui_manager.bu_chat_history = [] # Initialize if missing (consider if this is the right place) # return # Or stop if this is critical step_num -= 1 logger.info(f"Step {step_num} completed.") # --- Screenshot Handling --- screenshot_html = "" # Ensure state.screenshot exists and is not empty before proceeding # Use getattr for safer access screenshot_data = getattr(state, "screenshot", None) if screenshot_data: try: # Basic validation: check if it looks like base64 if ( isinstance(screenshot_data, str) and len(screenshot_data) > 100 ): # Arbitrary length check # *** UPDATED STYLE: Removed centering, adjusted width *** img_tag = f'Step {step_num} Screenshot' screenshot_html = ( img_tag + "
" ) # Use
for line break after inline-block image else: logger.warning( f"Screenshot for step {step_num} seems invalid (type: {type(screenshot_data)}, len: {len(screenshot_data) if isinstance(screenshot_data, str) else 'N/A'})." ) screenshot_html = "**[Invalid screenshot data]**
" except Exception as e: logger.error( f"Error processing or formatting screenshot for step {step_num}: {e}", exc_info=True, ) screenshot_html = "**[Error displaying screenshot]**
" else: logger.debug(f"No screenshot available for step {step_num}.") # --- Format Agent Output --- formatted_output = _format_agent_output(output) # Use the updated function # --- Combine and Append to Chat --- step_header = f"--- **Step {step_num}** ---" # Combine header, image (with line break), and JSON block final_content = step_header + "
" + screenshot_html + formatted_output chat_message = { "role": "assistant", "content": final_content.strip(), # Remove leading/trailing whitespace } # Append to the correct chat history list webui_manager.bu_chat_history.append(chat_message) await asyncio.sleep(0.05) def _handle_done(webui_manager: WebuiManager, history: AgentHistoryList): """Callback when the agent finishes the task (success or failure).""" logger.info( f"Agent task finished. Duration: {history.total_duration_seconds():.2f}s, Tokens: {history.total_input_tokens()}" ) final_summary = "**Task Completed**\n" final_summary += f"- Duration: {history.total_duration_seconds():.2f} seconds\n" final_summary += f"- Total Input Tokens: {history.total_input_tokens()}\n" # Or total tokens if available final_result = history.final_result() if final_result: final_summary += f"- Final Result: {final_result}\n" errors = history.errors() if errors and any(errors): final_summary += f"- **Errors:**\n```\n{errors}\n```\n" else: final_summary += "- Status: Success\n" # Get the last screenshot if available screenshots = history.screenshots() final_screenshot = screenshots[-1] if screenshots and len(screenshots) > 0 else None # Store task metrics separately for the metrics display webui_manager.bu_task_metrics = { "duration": f"{history.total_duration_seconds():.2f}", "tokens": f"{history.total_input_tokens()}", "result": final_result if final_result else "", "status": "Error" if (errors and any(errors)) else "Success", "errors": errors if (errors and any(errors)) else None, "screenshot": final_screenshot # Add the final screenshot to the metrics } webui_manager.bu_chat_history.append( {"role": "assistant", "content": final_summary} ) async def _ask_assistant_callback( webui_manager: WebuiManager, query: str, browser_context: BrowserContext ) -> Dict[str, Any]: """Callback triggered by the agent's ask_for_assistant action.""" logger.info("Agent requires assistance. Waiting for user input.") if not hasattr(webui_manager, "_chat_history"): logger.error("Chat history not found in webui_manager during ask_assistant!") return {"response": "Internal Error: Cannot display help request."} webui_manager.bu_chat_history.append( { "role": "assistant", "content": f"**Need Help:** {query}\nPlease provide information or perform the required action in the browser, then type your response/confirmation below and click 'Submit Response'.", } ) # Use state stored in webui_manager webui_manager.bu_response_event = asyncio.Event() webui_manager.bu_user_help_response = None # Reset previous response try: logger.info("Waiting for user response event...") await asyncio.wait_for( webui_manager.bu_response_event.wait(), timeout=3600.0 ) # Long timeout logger.info("User response event received.") except asyncio.TimeoutError: logger.warning("Timeout waiting for user assistance.") webui_manager.bu_chat_history.append( { "role": "assistant", "content": "**Timeout:** No response received. Trying to proceed.", } ) webui_manager.bu_response_event = None # Clear the event return {"response": "Timeout: User did not respond."} # Inform the agent response = webui_manager.bu_user_help_response webui_manager.bu_chat_history.append( {"role": "user", "content": response} ) # Show user response in chat webui_manager.bu_response_event = ( None # Clear the event for the next potential request ) return {"response": response} # --- Core Agent Execution Logic --- (Needs access to webui_manager) async def run_agent_task( webui_manager: WebuiManager, components: Dict[gr.components.Component, Any] ) -> AsyncGenerator[Dict[gr.components.Component, Any], None]: """Handles the entire lifecycle of initializing and running the agent.""" # --- Get Components --- # Need handles to specific UI components to update them business_name_comp = webui_manager.get_component_by_id("browser_use_agent.business_name") business_website_comp = webui_manager.get_component_by_id("browser_use_agent.business_website") business_type_comp = webui_manager.get_component_by_id("browser_use_agent.business_type") additional_info_comp = webui_manager.get_component_by_id("browser_use_agent.additional_info") run_button_comp = webui_manager.get_component_by_id("browser_use_agent.run_button") stop_button_comp = webui_manager.get_component_by_id( "browser_use_agent.stop_button" ) pause_resume_button_comp = webui_manager.get_component_by_id( "browser_use_agent.pause_resume_button" ) clear_button_comp = webui_manager.get_component_by_id( "browser_use_agent.clear_button" ) chatbot_comp = webui_manager.get_component_by_id("browser_use_agent.chatbot") history_file_comp = webui_manager.get_component_by_id( "browser_use_agent.agent_history_file" ) gif_comp = webui_manager.get_component_by_id("browser_use_agent.recording_gif") browser_view_comp = webui_manager.get_component_by_id( "browser_use_agent.browser_view" ) # --- 1. Get Task and Initial UI Update --- task = components.get(business_name_comp, "").strip() if not task: gr.Warning("Please enter a business name or task.") yield {run_button_comp: gr.update(interactive=True)} return # Set running state indirectly via _current_task if "Analyze the business" not in task: # If task isn't already formatted, create one from the business info business_name = task # The business name was stored in the "task" variable business_website = components.get(business_website_comp, "").strip() business_type = components.get(business_type_comp, "Retail") additional_info = components.get(additional_info_comp, "").strip() task = create_business_task( business_name, business_type, business_website, additional_info ) # We should already have added the task to chat history in handle_submit if not any(msg.get("content") == task for msg in webui_manager.bu_chat_history if msg.get("role") == "user"): webui_manager.bu_chat_history.append({"role": "user", "content": task}) yield { business_name_comp: gr.Textbox( value=components.get(business_name_comp, ""), interactive=False ), business_website_comp: gr.Textbox( value=components.get(business_website_comp, ""), interactive=False ), business_type_comp: gr.update(interactive=False), additional_info_comp: gr.Textbox( value=components.get(additional_info_comp, ""), interactive=False ), run_button_comp: gr.Button(value="⏳ Running...", interactive=False), stop_button_comp: gr.Button(interactive=True), pause_resume_button_comp: gr.Button(value="⏸️ Pause", interactive=True), clear_button_comp: gr.Button(interactive=False), chatbot_comp: gr.update(value=webui_manager.bu_chat_history), history_file_comp: gr.update(value=None), gif_comp: gr.update(value=None), } # --- Agent Settings --- # Access settings values via components dict, getting IDs from webui_manager def get_setting(key, default=None): comp = webui_manager.id_to_component.get(f"agent_settings.{key}") return components.get(comp, default) if comp else default override_system_prompt = get_setting("override_system_prompt") or None extend_system_prompt = get_setting("extend_system_prompt") or None llm_provider_name = get_setting( "llm_provider", None ) # Default to None if not found llm_model_name = get_setting("llm_model_name", None) llm_temperature = get_setting("llm_temperature", 0.6) use_vision = get_setting("use_vision", True) ollama_num_ctx = get_setting("ollama_num_ctx", 16000) llm_base_url = get_setting("llm_base_url") or None llm_api_key = get_setting("llm_api_key") or None max_steps = get_setting("max_steps", 100) max_actions = get_setting("max_actions", 10) max_input_tokens = get_setting("max_input_tokens", 128000) tool_calling_str = get_setting("tool_calling_method", "auto") tool_calling_method = tool_calling_str if tool_calling_str != "None" else None mcp_server_config_comp = webui_manager.id_to_component.get( "agent_settings.mcp_server_config" ) mcp_server_config_str = ( components.get(mcp_server_config_comp) if mcp_server_config_comp else None ) mcp_server_config = ( json.loads(mcp_server_config_str) if mcp_server_config_str else None ) # Planner LLM Settings (Optional) planner_llm_provider_name = get_setting("planner_llm_provider") or None planner_llm = None planner_use_vision = False if planner_llm_provider_name: planner_llm_model_name = get_setting("planner_llm_model_name") planner_llm_temperature = get_setting("planner_llm_temperature", 0.6) planner_ollama_num_ctx = get_setting("planner_ollama_num_ctx", 16000) planner_llm_base_url = get_setting("planner_llm_base_url") or None planner_llm_api_key = get_setting("planner_llm_api_key") or None planner_use_vision = get_setting("planner_use_vision", False) planner_llm = await _initialize_llm( planner_llm_provider_name, planner_llm_model_name, planner_llm_temperature, planner_llm_base_url, planner_llm_api_key, planner_ollama_num_ctx if planner_llm_provider_name == "ollama" else None, ) # --- Browser Settings --- def get_browser_setting(key, default=None): comp = webui_manager.id_to_component.get(f"browser_settings.{key}") return components.get(comp, default) if comp else default browser_binary_path = get_browser_setting("browser_binary_path") or None browser_user_data_dir = get_browser_setting("browser_user_data_dir") or None use_own_browser = get_browser_setting( "use_own_browser", False ) # Logic handled by CDP/WSS presence keep_browser_open = get_browser_setting("keep_browser_open", False) headless = get_browser_setting("headless", False) disable_security = get_browser_setting("disable_security", False) window_w = int(get_browser_setting("window_w", 1280)) window_h = int(get_browser_setting("window_h", 1100)) cdp_url = get_browser_setting("cdp_url") or None wss_url = get_browser_setting("wss_url") or None save_recording_path = get_browser_setting("save_recording_path") or None save_trace_path = get_browser_setting("save_trace_path") or None save_agent_history_path = get_browser_setting( "save_agent_history_path", "./tmp/agent_history" ) save_download_path = get_browser_setting("save_download_path", "./tmp/downloads") stream_vw = 70 stream_vh = int(70 * window_h // window_w) os.makedirs(save_agent_history_path, exist_ok=True) if save_recording_path: os.makedirs(save_recording_path, exist_ok=True) if save_trace_path: os.makedirs(save_trace_path, exist_ok=True) if save_download_path: os.makedirs(save_download_path, exist_ok=True) # --- 2. Initialize LLM --- main_llm = await _initialize_llm( llm_provider_name, llm_model_name, llm_temperature, llm_base_url, llm_api_key, ollama_num_ctx if llm_provider_name == "ollama" else None, ) # Pass the webui_manager instance to the callback when wrapping it async def ask_callback_wrapper( query: str, browser_context: BrowserContext ) -> Dict[str, Any]: return await _ask_assistant_callback(webui_manager, query, browser_context) if not webui_manager.bu_controller: webui_manager.bu_controller = CustomController( ask_assistant_callback=ask_callback_wrapper ) await webui_manager.bu_controller.setup_mcp_client(mcp_server_config) # --- 4. Initialize Browser and Context --- should_close_browser_on_finish = not keep_browser_open try: # Close existing resources if not keeping open if not keep_browser_open: if webui_manager.bu_browser_context: logger.info("Closing previous browser context.") await webui_manager.bu_browser_context.close() webui_manager.bu_browser_context = None if webui_manager.bu_browser: logger.info("Closing previous browser.") await webui_manager.bu_browser.close() webui_manager.bu_browser = None # Create Browser if needed if not webui_manager.bu_browser: logger.info("Launching new browser instance.") extra_args = [] if use_own_browser: browser_binary_path = os.getenv("BROWSER_PATH", None) or browser_binary_path if browser_binary_path == "": browser_binary_path = None browser_user_data = browser_user_data_dir or os.getenv("BROWSER_USER_DATA", None) if browser_user_data: extra_args += [f"--user-data-dir={browser_user_data}"] else: browser_binary_path = None webui_manager.bu_browser = CustomBrowser( config=BrowserConfig( headless=headless, disable_security=disable_security, browser_binary_path=browser_binary_path, extra_browser_args=extra_args, wss_url=wss_url, cdp_url=cdp_url, new_context_config=BrowserContextConfig( window_width=window_w, window_height=window_h, ) ) ) # Create Context if needed if not webui_manager.bu_browser_context: logger.info("Creating new browser context.") context_config = BrowserContextConfig( trace_path=save_trace_path if save_trace_path else None, save_recording_path=save_recording_path if save_recording_path else None, save_downloads_path=save_download_path if save_download_path else None, window_height=window_h, window_width=window_w, ) if not webui_manager.bu_browser: raise ValueError("Browser not initialized, cannot create context.") webui_manager.bu_browser_context = ( await webui_manager.bu_browser.new_context(config=context_config) ) # --- 5. Initialize or Update Agent --- webui_manager.bu_agent_task_id = str(uuid.uuid4()) # New ID for this task run os.makedirs( os.path.join(save_agent_history_path, webui_manager.bu_agent_task_id), exist_ok=True, ) history_file = os.path.join( save_agent_history_path, webui_manager.bu_agent_task_id, f"{webui_manager.bu_agent_task_id}.json", ) gif_path = os.path.join( save_agent_history_path, webui_manager.bu_agent_task_id, f"{webui_manager.bu_agent_task_id}.gif", ) # Pass the webui_manager to callbacks when wrapping them async def step_callback_wrapper( state: BrowserState, output: AgentOutput, step_num: int ): await _handle_new_step(webui_manager, state, output, step_num) def done_callback_wrapper(history: AgentHistoryList): _handle_done(webui_manager, history) if not webui_manager.bu_agent: logger.info(f"Initializing new agent for task: {task}") if not webui_manager.bu_browser or not webui_manager.bu_browser_context: raise ValueError( "Browser or Context not initialized, cannot create agent." ) webui_manager.bu_agent = BrowserUseAgent( task=task, llm=main_llm, browser=webui_manager.bu_browser, browser_context=webui_manager.bu_browser_context, controller=webui_manager.bu_controller, register_new_step_callback=step_callback_wrapper, register_done_callback=done_callback_wrapper, use_vision=use_vision, override_system_message=override_system_prompt, extend_system_message=extend_system_prompt, max_input_tokens=max_input_tokens, max_actions_per_step=max_actions, tool_calling_method=tool_calling_method, planner_llm=planner_llm, use_vision_for_planner=planner_use_vision if planner_llm else False, source="webui", ) webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id webui_manager.bu_agent.settings.generate_gif = gif_path else: webui_manager.bu_agent.state.agent_id = webui_manager.bu_agent_task_id webui_manager.bu_agent.add_new_task(task) webui_manager.bu_agent.settings.generate_gif = gif_path webui_manager.bu_agent.browser = webui_manager.bu_browser webui_manager.bu_agent.browser_context = webui_manager.bu_browser_context webui_manager.bu_agent.controller = webui_manager.bu_controller # --- 6. Run Agent Task and Stream Updates --- agent_run_coro = webui_manager.bu_agent.run(max_steps=max_steps) agent_task = asyncio.create_task(agent_run_coro) webui_manager.bu_current_task = agent_task # Store the task last_chat_len = len(webui_manager.bu_chat_history) while not agent_task.done(): is_paused = webui_manager.bu_agent.state.paused is_stopped = webui_manager.bu_agent.state.stopped # Check for pause state if is_paused: yield { pause_resume_button_comp: gr.update( value="▶️ Resume", interactive=True ), stop_button_comp: gr.update(interactive=True), } # Wait until pause is released or task is stopped/done while is_paused and not agent_task.done(): # Re-check agent state in loop is_paused = webui_manager.bu_agent.state.paused is_stopped = webui_manager.bu_agent.state.stopped if is_stopped: # Stop signal received while paused break await asyncio.sleep(0.2) if ( agent_task.done() or is_stopped ): # If stopped or task finished while paused break # If resumed, yield UI update yield { pause_resume_button_comp: gr.update( value="⏸️ Pause", interactive=True ), run_button_comp: gr.update( value="⏳ Running...", interactive=False ), } # Check if agent stopped itself or stop button was pressed (which sets agent.state.stopped) if is_stopped: logger.info("Agent has stopped (internally or via stop button).") if not agent_task.done(): # Ensure the task coroutine finishes if agent just set flag try: await asyncio.wait_for( agent_task, timeout=1.0 ) # Give it a moment to exit run() except asyncio.TimeoutError: logger.warning( "Agent task did not finish quickly after stop signal, cancelling." ) agent_task.cancel() except Exception: # Catch task exceptions if it errors on stop pass break # Exit the streaming loop # Check if agent is asking for help (via response_event) update_dict = {} if webui_manager.bu_response_event is not None: update_dict = { business_name_comp: gr.update( placeholder="Agent needs help. Enter response and submit.", interactive=True, ), business_website_comp: gr.update( placeholder="Agent needs help. Enter response and submit.", interactive=True, ), business_type_comp: gr.update( placeholder="Agent needs help. Enter response and submit.", interactive=True, ), additional_info_comp: gr.update( placeholder="Agent needs help. Enter response and submit.", interactive=True, ), run_button_comp: gr.update( value="✔️ Submit Response", interactive=True ), pause_resume_button_comp: gr.update(interactive=False), stop_button_comp: gr.update(interactive=False), chatbot_comp: gr.update(value=webui_manager.bu_chat_history), } last_chat_len = len(webui_manager.bu_chat_history) yield update_dict # Wait until response is submitted or task finishes while ( webui_manager.bu_response_event is not None and not agent_task.done() ): await asyncio.sleep(0.2) # Restore UI after response submitted or if task ended unexpectedly if not agent_task.done(): yield { business_name_comp: gr.update( placeholder="Enter business name", interactive=False ), business_website_comp: gr.update( placeholder="Enter business website", interactive=False ), business_type_comp: gr.update( placeholder="Enter business type", interactive=False ), additional_info_comp: gr.update( placeholder="Enter additional information", interactive=False ), run_button_comp: gr.update( value="⏳ Running...", interactive=False ), pause_resume_button_comp: gr.update(interactive=True), stop_button_comp: gr.update(interactive=True), } else: break # Task finished while waiting for response # Update Chatbot if new messages arrived via callbacks if len(webui_manager.bu_chat_history) > last_chat_len: update_dict[chatbot_comp] = gr.update( value=webui_manager.bu_chat_history ) last_chat_len = len(webui_manager.bu_chat_history) # Update Browser View if headless and webui_manager.bu_browser_context: try: screenshot_b64 = ( await webui_manager.bu_browser_context.take_screenshot() ) if screenshot_b64: html_content = f'' update_dict[browser_view_comp] = gr.update( value=html_content, visible=True ) else: html_content = f"

Waiting for browser session...

" update_dict[browser_view_comp] = gr.update( value=html_content, visible=True ) except Exception as e: logger.debug(f"Failed to capture screenshot: {e}") update_dict[browser_view_comp] = gr.update( value="
Error loading view...
", visible=True, ) else: update_dict[browser_view_comp] = gr.update(visible=False) # Yield accumulated updates if update_dict: yield update_dict await asyncio.sleep(0.1) # Polling interval # --- 7. Task Finalization --- webui_manager.bu_agent.state.paused = False webui_manager.bu_agent.state.stopped = False final_update = {} try: logger.info("Agent task completing...") # Await the task ensure completion and catch exceptions if not already caught if not agent_task.done(): await agent_task # Retrieve result/exception elif agent_task.exception(): # Check if task finished with exception agent_task.result() # Raise the exception to be caught below logger.info("Agent task completed processing.") logger.info(f"Explicitly saving agent history to: {history_file}") webui_manager.bu_agent.save_history(history_file) if os.path.exists(history_file): final_update[history_file_comp] = gr.File(value=history_file) if gif_path and os.path.exists(gif_path): logger.info(f"GIF found at: {gif_path}") final_update[gif_comp] = gr.Image(value=gif_path) # Update task metrics display if metrics are available task_metrics_display_comp = webui_manager.get_component_by_id("browser_use_agent.task_metrics_display") if hasattr(webui_manager, 'bu_task_metrics') and webui_manager.bu_task_metrics: # If we have metrics but no screenshot, try to get the latest screenshot if not webui_manager.bu_task_metrics.get('screenshot') and webui_manager.bu_browser_context: try: final_screenshot = await webui_manager.bu_browser_context.take_screenshot() if final_screenshot: webui_manager.bu_task_metrics['screenshot'] = final_screenshot except Exception as e: logger.warning(f"Failed to capture final screenshot for metrics: {e}") # Format the metrics for display metrics_md = format_task_metrics(webui_manager.bu_task_metrics) final_update[task_metrics_display_comp] = gr.update(value=metrics_md) except asyncio.CancelledError: logger.info("Agent task was cancelled.") if not any( "Cancelled" in msg.get("content", "") for msg in webui_manager.bu_chat_history if msg.get("role") == "assistant" ): webui_manager.bu_chat_history.append( {"role": "assistant", "content": "**Task Cancelled**."} ) final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history) except Exception as e: logger.error(f"Error during agent execution: {e}", exc_info=True) error_message = ( f"**Agent Execution Error:**\n```\n{type(e).__name__}: {e}\n```" ) if not any( error_message in msg.get("content", "") for msg in webui_manager.bu_chat_history if msg.get("role") == "assistant" ): webui_manager.bu_chat_history.append( {"role": "assistant", "content": error_message} ) final_update[chatbot_comp] = gr.update(value=webui_manager.bu_chat_history) gr.Error(f"Agent execution failed: {e}") finally: webui_manager.bu_current_task = None # Clear the task reference # Close browser/context if requested if should_close_browser_on_finish: if webui_manager.bu_browser_context: logger.info("Closing browser context after task.") await webui_manager.bu_browser_context.close() webui_manager.bu_browser_context = None if webui_manager.bu_browser: logger.info("Closing browser after task.") await webui_manager.bu_browser.close() webui_manager.bu_browser = None # --- 8. Final UI Update --- final_update.update( { business_name_comp: gr.update( value="", interactive=True, placeholder="Enter business name", ), business_website_comp: gr.update( value="", interactive=True, placeholder="Enter business website", ), business_type_comp: gr.update(interactive=True), additional_info_comp: gr.update( value="", interactive=True, placeholder="Enter additional information", ), run_button_comp: gr.update(value="▶️ Start Analysis", interactive=True), stop_button_comp: gr.update(value="⏹️ Stop", interactive=False), pause_resume_button_comp: gr.update( value="⏸️ Pause", interactive=False ), clear_button_comp: gr.update(interactive=True), # Ensure final chat history is shown chatbot_comp: gr.update(value=webui_manager.bu_chat_history), } ) yield final_update except Exception as e: # Catch errors during setup (before agent run starts) logger.error(f"Error setting up agent task: {e}", exc_info=True) webui_manager.bu_current_task = None # Ensure state is reset yield { business_name_comp: gr.update( interactive=True, placeholder="Enter business name" ), business_website_comp: gr.update( interactive=True, placeholder="Enter business website" ), business_type_comp: gr.update(interactive=True), additional_info_comp: gr.update( interactive=True, placeholder="Enter additional information" ), run_button_comp: gr.update(value="▶️ Start Analysis", interactive=True), stop_button_comp: gr.update(value="⏹️ Stop", interactive=False), pause_resume_button_comp: gr.update(value="⏸️ Pause", interactive=False), clear_button_comp: gr.update(interactive=True), chatbot_comp: gr.update( value=webui_manager.bu_chat_history + [{"role": "assistant", "content": f"**Setup Error:** {e}"}] ), } # --- Button Click Handlers --- (Need access to webui_manager) async def handle_submit( webui_manager: WebuiManager, components: Dict[gr.components.Component, Any] ): """Handles clicks on the main 'Start Analysis' button.""" # Get business information from the form business_name_comp = webui_manager.get_component_by_id("browser_use_agent.business_name") business_website_comp = webui_manager.get_component_by_id("browser_use_agent.business_website") business_type_comp = webui_manager.get_component_by_id("browser_use_agent.business_type") additional_info_comp = webui_manager.get_component_by_id("browser_use_agent.additional_info") business_name = components.get(business_name_comp, "").strip() business_website = components.get(business_website_comp, "").strip() business_type = components.get(business_type_comp, "Retail") additional_info = components.get(additional_info_comp, "").strip() if not business_name: gr.Warning("Please enter a business name.") yield {business_name_comp: gr.update(value=business_name)} return # Generate the standardized task using our template task = create_business_task( business_name, business_type, business_website, additional_info ) # Check if waiting for user assistance if webui_manager.bu_response_event and not webui_manager.bu_response_event.is_set(): logger.info(f"User submitted assistance") webui_manager.bu_user_help_response = "Continue with the current task." webui_manager.bu_response_event.set() # UI updates handled by the main loop reacting to the event being set yield { business_name_comp: gr.update( interactive=False, ), business_website_comp: gr.update( interactive=False, ), business_type_comp: gr.update( interactive=False, ), additional_info_comp: gr.update( interactive=False, ), webui_manager.get_component_by_id( "browser_use_agent.run_button" ): gr.update(value="⏳ Running...", interactive=False), } # Check if a task is currently running (using _current_task) elif webui_manager.bu_current_task and not webui_manager.bu_current_task.done(): logger.warning( "Start button clicked while agent is already running and not asking for help." ) gr.Info("Agent is currently running. Please wait or use Stop/Pause.") yield {} # No change else: # Store the task in the user input field before running components[business_name_comp] = task # Handle submission for a new task logger.info(f"Starting analysis for business: {business_name}") # Update chat history with the business information webui_manager.bu_chat_history.append({"role": "user", "content": task}) # Run the task using our agent async for update in run_agent_task(webui_manager, components): yield update async def handle_stop(webui_manager: WebuiManager): """Handles clicks on the 'Stop' button.""" logger.info("Stop button clicked.") agent = webui_manager.bu_agent task = webui_manager.bu_current_task if agent and task and not task.done(): # Signal the agent to stop by setting its internal flag agent.state.stopped = True agent.state.paused = False # Ensure not paused if stopped return { webui_manager.get_component_by_id( "browser_use_agent.stop_button" ): gr.update(interactive=False, value="⏹️ Stopping..."), webui_manager.get_component_by_id( "browser_use_agent.pause_resume_button" ): gr.update(interactive=False), webui_manager.get_component_by_id( "browser_use_agent.run_button" ): gr.update(interactive=False), } else: logger.warning("Stop clicked but agent is not running or task is already done.") # Reset UI just in case it's stuck return { webui_manager.get_component_by_id( "browser_use_agent.run_button" ): gr.update(interactive=True), webui_manager.get_component_by_id( "browser_use_agent.stop_button" ): gr.update(interactive=False), webui_manager.get_component_by_id( "browser_use_agent.pause_resume_button" ): gr.update(interactive=False), webui_manager.get_component_by_id( "browser_use_agent.clear_button" ): gr.update(interactive=True), } async def handle_pause_resume(webui_manager: WebuiManager): """Handles clicks on the 'Pause/Resume' button.""" agent = webui_manager.bu_agent task = webui_manager.bu_current_task if agent and task and not task.done(): if agent.state.paused: logger.info("Resume button clicked.") agent.resume() # UI update happens in main loop return { webui_manager.get_component_by_id( "browser_use_agent.pause_resume_button" ): gr.update(value="⏸️ Pause", interactive=True) } # Optimistic update else: logger.info("Pause button clicked.") agent.pause() return { webui_manager.get_component_by_id( "browser_use_agent.pause_resume_button" ): gr.update(value="▶️ Resume", interactive=True) } # Optimistic update else: logger.warning( "Pause/Resume clicked but agent is not running or doesn't support state." ) return {} # No change async def handle_clear(webui_manager: WebuiManager): """Handles clicks on the 'Clear' button.""" logger.info("Clear button clicked.") # Stop any running task first task = webui_manager.bu_current_task if task and not task.done(): logger.info("Clearing requires stopping the current task.") webui_manager.bu_agent.stop() task.cancel() try: await asyncio.wait_for(task, timeout=2.0) # Wait briefly except (asyncio.CancelledError, asyncio.TimeoutError): pass except Exception as e: logger.warning(f"Error stopping task on clear: {e}") webui_manager.bu_current_task = None if webui_manager.bu_controller: await webui_manager.bu_controller.close_mcp_client() webui_manager.bu_controller = None webui_manager.bu_agent = None # Reset state stored in manager webui_manager.bu_chat_history = [] webui_manager.bu_response_event = None webui_manager.bu_user_help_response = None webui_manager.bu_agent_task_id = None webui_manager.bu_task_metrics = None # Clear task metrics logger.info("Agent state and browser resources cleared.") # Reset UI components return { webui_manager.get_component_by_id("browser_use_agent.chatbot"): gr.update( value=[] ), webui_manager.get_component_by_id("browser_use_agent.business_name"): gr.update( value="", interactive=True ), webui_manager.get_component_by_id("browser_use_agent.business_website"): gr.update( value="", interactive=True ), webui_manager.get_component_by_id("browser_use_agent.business_type"): gr.update( value="Retail", interactive=True ), webui_manager.get_component_by_id("browser_use_agent.additional_info"): gr.update( value="", interactive=True ), webui_manager.get_component_by_id( "browser_use_agent.agent_history_file" ): gr.update(value=None), webui_manager.get_component_by_id("browser_use_agent.recording_gif"): gr.update( value=None ), webui_manager.get_component_by_id("browser_use_agent.browser_view"): gr.update( value="
Browser Cleared
" ), webui_manager.get_component_by_id("browser_use_agent.run_button"): gr.update( value="▶️ Start Analysis", interactive=True ), webui_manager.get_component_by_id("browser_use_agent.stop_button"): gr.update( interactive=False ), webui_manager.get_component_by_id( "browser_use_agent.pause_resume_button" ): gr.update(value="⏸️ Pause", interactive=False), webui_manager.get_component_by_id("browser_use_agent.clear_button"): gr.update( interactive=True ), webui_manager.get_component_by_id("browser_use_agent.task_metrics_display"): gr.update( value="No task metrics available yet. Run a task to see metrics here." ), } # --- Tab Creation Function --- def create_browser_use_agent_tab(webui_manager: WebuiManager): """ Create the run agent tab with business-focused UI. """ webui_manager.init_browser_use_agent() # Initialize task metrics if not already present if not hasattr(webui_manager, 'bu_task_metrics'): webui_manager.bu_task_metrics = None # --- Define UI Components --- tab_components = {} with gr.Column(): chatbot = gr.Chatbot( lambda: webui_manager.bu_chat_history, # Load history dynamically elem_id="browser_use_chatbot", label="Agent Interaction", type="messages", height=600, show_copy_button=True, ) # Business information form with gr.Column(elem_id="business_form"): gr.Markdown("### Business Information") business_name = gr.Textbox( label="Business Name", placeholder="Enter business name", elem_id="business_name", ) business_website = gr.Textbox( label="Business Website (optional)", placeholder="https://www.example.com", elem_id="business_website", ) business_type = gr.Dropdown( label="Business Type", choices=["Retail", "Restaurant", "Service", "Healthcare", "Technology", "Other"], value="Retail", elem_id="business_type", ) additional_info = gr.Textbox( label="Additional Information (optional)", placeholder="Any specific details about the business that might help the agent", lines=2, elem_id="additional_info", ) with gr.Row(): stop_button = gr.Button( "⏹️ Stop", interactive=False, variant="stop", scale=2 ) pause_resume_button = gr.Button( "⏸️ Pause", interactive=False, variant="secondary", scale=2, visible=True ) clear_button = gr.Button( "🗑️ Clear", interactive=True, variant="secondary", scale=2 ) run_button = gr.Button("▶️ Start Analysis", variant="primary", scale=3) browser_view = gr.HTML( value="

Browser View (Requires Headless=True)

", label="Browser Live View", elem_id="browser_view", visible=False, ) # Task Metrics Section with gr.Column(visible=True) as task_metrics_container: gr.Markdown("### Task Metrics", elem_id="task_metrics_heading") task_metrics_display = gr.Markdown( value=lambda: format_task_metrics(webui_manager.bu_task_metrics), elem_id="task_metrics_display", ) with gr.Column(): gr.Markdown("### Task Outputs") agent_history_file = gr.File(label="Agent History JSON", interactive=False) recording_gif = gr.Image( label="Task Recording GIF", format="gif", interactive=False, type="filepath", ) # --- Store Components in Manager --- tab_components.update( dict( chatbot=chatbot, business_name=business_name, business_website=business_website, business_type=business_type, additional_info=additional_info, clear_button=clear_button, run_button=run_button, stop_button=stop_button, pause_resume_button=pause_resume_button, agent_history_file=agent_history_file, recording_gif=recording_gif, browser_view=browser_view, task_metrics_display=task_metrics_display, ) ) webui_manager.add_components( "browser_use_agent", tab_components ) # Use "browser_use_agent" as tab_name prefix all_managed_components = set( webui_manager.get_components() ) # Get all components known to manager run_tab_outputs = list(tab_components.values()) async def submit_wrapper( components_dict: Dict[Component, Any], ) -> AsyncGenerator[Dict[Component, Any], None]: """Wrapper for handle_submit that yields its results.""" async for update in handle_submit(webui_manager, components_dict): yield update async def stop_wrapper() -> AsyncGenerator[Dict[Component, Any], None]: """Wrapper for handle_stop.""" update_dict = await handle_stop(webui_manager) yield update_dict async def pause_resume_wrapper() -> AsyncGenerator[Dict[Component, Any], None]: """Wrapper for handle_pause_resume.""" update_dict = await handle_pause_resume(webui_manager) yield update_dict async def clear_wrapper() -> AsyncGenerator[Dict[Component, Any], None]: """Wrapper for handle_clear.""" update_dict = await handle_clear(webui_manager) yield update_dict # --- Connect Event Handlers using the Wrappers -- run_button.click( fn=submit_wrapper, inputs=all_managed_components, outputs=run_tab_outputs ) stop_button.click(fn=stop_wrapper, inputs=None, outputs=run_tab_outputs) pause_resume_button.click( fn=pause_resume_wrapper, inputs=None, outputs=run_tab_outputs ) clear_button.click(fn=clear_wrapper, inputs=None, outputs=run_tab_outputs)