import asyncio import json import logging import os from typing import Any, AsyncGenerator, Dict, List, Optional from datetime import datetime import gradio as gr from gradio.components import Component from src.agent.browser_use.browser_use_agent import BrowserUseAgent from src.browser.custom_browser import CustomBrowser from src.controller.custom_controller import CustomController from src.utils import llm_provider from src.webui.webui_manager import WebuiManager from browser_use.browser.browser import BrowserConfig from browser_use.browser.context import BrowserContext, BrowserContextConfig logger = logging.getLogger(__name__) # Import environment variables from dotenv import load_dotenv load_dotenv() # This ensures environment variables are loaded # Get Vayner credentials from environment VAYNER_USERNAME = os.getenv("VAYNER_USERNAME", "") VAYNER_PASSWORD = os.getenv("VAYNER_PASSWORD", "") VAYNER_CLIENT_TEMPLATE = """ Task: Research Vayner Commerce data for business: "{business_name}" 1. Log in to https://local.vaynercommerce.com/myclients - Username: admin@vaynercommerce.com - Password: oKLl4li-HY - Use these credentials on the login form 2. After successful login, search for the business named "{business_name}" in the search box 3. Click on the business in the search results **Part 1: Keyword Performance Table** 4. Extract the keyword performance table (columns: Keyword, Performance, Status) - Return this as a formatted table **Part 2: Keyword Ranking History Analysis Table** 5. For the first keyword in the list: a. Click on the keyword to open its detail view b. Look for the **History** section c. Click the **last row (earliest date)** in the History section: -extract: - Top 3 Rank → This is the **Initial Top 3 Rank (SOV)** - Coverage → This is the **Initial Coverage** - Then look for the **Your Rankings** section (while this row is selected), extract: - ARP → This is the **Initial ARP** d. Then again the table under the **History** section, click the **first row (most recent date)** in the History section: - From the **History section**, extract: - Top 3 Rank → This is the **Current Rank (SOV) in our table** - Coverage → This is the **Current Coverage** - Then look for the **Your Rankings** section (while this row is selected), extract: - ARP → This is the **Current Scan ARP** 6. Go back to the Keywords list and repeat Step 5 for the second keyword in the list. Please provide: - The complete keyword performance data as a 1 table. - Another new table that Return all of information from **Part 2: Keyword Ranking History Analysis Table** as a second table with the following columns: - Keyword - Initial ARP - Initial Top 3 Rank (SOV) - Initial Coverage - Current Scan ARP - Current Rank (SOV) - Current Coverage At the bottom of the table, compute and include a final row labeled "Average" showing the average of all numeric columns (excluding the "Keyword" column). """ # Function to generate PDF-like report from task results def generate_pdf_report(business_name, history): """ Generate HTML for a PDF-like report based on the agent's history data """ # Extract relevant information from history final_result = history.final_result() or {} screenshots = [] keyword_data = [] ranking_data = [] performance_data = [] # Process agent history to extract information try: # The history object is itself iterable for item in history: try: # Extract screenshot if available if hasattr(item, "state") and hasattr(item.state, "screenshot"): if item.state.screenshot and isinstance(item.state.screenshot, str) and len(item.state.screenshot) > 100: screenshots.append(item.state.screenshot) # Extract data from actions if hasattr(item, "output") and item.output: for action in item.output.action: if hasattr(action, "thought"): thought = action.thought.lower() if action.thought else "" # Look for keyword data in thoughts if "keyword" in thought and ("performance" in thought or "score" in thought): keyword_data.append(action.thought) # Check if action contains ranking data elif "ranking" in thought or "rank" in thought: ranking_data.append(action.thought) # Check if action contains performance data elif "performance" in thought and "score" in thought: performance_data.append(action.thought) # Check for extracted data in observe action results if hasattr(action, "result") and action.result: if isinstance(action.result, str): result = action.result.lower() if "keyword" in result or "performance" in result: if action.result not in keyword_data and len(action.result.strip()) > 5: keyword_data.append(action.result) if "ranking" in result or "rank" in result: if action.result not in ranking_data and len(action.result.strip()) > 5: ranking_data.append(action.result) except Exception as e: logger.error(f"Error processing history item: {e}") continue except Exception as e: logger.error(f"Error iterating through history: {e}") # Generate HTML for PDF-like report html = f"""

Vayner Client Research Report

{business_name}

Generated on {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}

Executive Summary

This report contains research data for {business_name} extracted from Vayner Commerce platform. We analyzed keyword performance data and geographic rankings.

""" # If no specific data is found, try to extract from all output if not keyword_data and not ranking_data and not performance_data: try: all_text = [] for item in history: if hasattr(item, "output") and item.output: for action in item.output.action: if hasattr(action, "thought") and action.thought: all_text.append(action.thought) if hasattr(action, "result") and action.result: all_text.append(action.result) # Look for sections in the text for text in all_text: if "keyword" in text.lower() or "score" in text.lower() or "performance" in text.lower(): keyword_data.append(text) if "ranking" in text.lower() or "rank" in text.lower(): ranking_data.append(text) except Exception as e: logger.error(f"Error extracting all text: {e}") # Add performance data section if performance_data or keyword_data: html += """

Keyword Performance Data

""" # Try to parse data into a table format table_data = [] try: combined_data = performance_data + keyword_data for data in combined_data: lines = data.split("\n") for line in lines: if ":" in line: parts = line.split(":", 1) if len(parts) == 2: keyword, value = parts table_data.append((keyword.strip(), value.strip())) elif "-" in line and not line.strip().startswith("-"): parts = line.split("-", 1) if len(parts) == 2: keyword, value = parts table_data.append((keyword.strip(), value.strip())) if table_data: html += """ """ for keyword, value in table_data: html += f""" """ html += """
Keyword Performance/Score
{keyword} {value}
""" else: # Display raw data if table parsing failed for data in combined_data: html += f"""
{data}
""" except Exception as e: logger.error(f"Error formatting table data: {e}") # Fallback to raw display for data in performance_data + keyword_data: html += f"""
{data}
""" html += """
""" # Add rankings data section if ranking_data: html += """

Geographic Rankings

""" for data in ranking_data: html += f"""
{data}
""" html += """
""" # Add screenshots section if screenshots: html += """

Map Visualizations

""" for idx, screenshot in enumerate(screenshots): if isinstance(screenshot, str) and len(screenshot) > 100: html += f"""
Map {idx+1}

Map Visualization {idx+1}

""" html += """
""" # If no data was found, show a message if not keyword_data and not performance_data and not ranking_data and not screenshots: html += """

No data extracted

The agent was unable to extract specific data for this report. Please check the chat logs for more details on what was found.

""" # Add footer html += """

Generated by Vayner Client Research Agent | Browser-Use WebUI

""" return html # Function to generate live PDF-like report updated during the task def generate_live_report(business_name, business_info, keyword_data, ranking_data, screenshots, keyword_table_rows=None, final_result=None): """ Generate HTML for a live-updating PDF-like report based on data collected so far Only show the first three pages: cover, second page, and keyword table with final result. """ if keyword_table_rows is None: keyword_table_rows = [] # Cover page (black background, business name, VaynerCommerce logo) html = f'''
{business_name}
X
VAYNERCOMMERCE
''' # Second page (logo, business name, service, date, image) html += f'''
{business_name}
BEHAVIORAL HEALTH
SEO Services
Weeks of 04/07/25 - 04/21/25
VAYNERCOMMERCE
''' # Third page: Final Result and Keyword Table html += f'''
Final Research Results
''' # Display Final Result if available if final_result: # Determine if final_result is likely a table is_table = False if isinstance(final_result, str): lines = final_result.strip().split('\n') if any('|' in line for line in lines) or any('keyword' in line.lower() and 'performance' in line.lower() for line in lines): is_table = True if is_table: # Format as a table try: html += '
' # Split the table data lines = [line.strip() for line in final_result.split('\n') if line.strip()] # Find the header row header_row_index = -1 for i, line in enumerate(lines): if ('keyword' in line.lower() and 'performance' in line.lower()) or ('keyword' in line.lower() and 'sov' in line.lower()): header_row_index = i break if header_row_index != -1: # Create an HTML table html += '' # Format the header row header = lines[header_row_index] header_cells = [cell.strip() for cell in header.strip('|').split('|')] html += '' for cell in header_cells: html += f'' html += '' # Skip the separator row if it exists data_start = header_row_index + 2 if header_row_index + 1 < len(lines) and '---' in lines[header_row_index + 1] else header_row_index + 1 # Format the data rows for i in range(data_start, len(lines)): row = lines[i] if '|' in row: cells = [cell.strip() for cell in row.strip('|').split('|')] bg_color = '#111' if i % 2 == 0 else '#181818' html += f'' for cell in cells: html += f'' html += '' html += '
{cell}
{cell}
' else: # If no proper header found, just display the text in a pre tag html += f'
{final_result}
' html += '
' except Exception: # If parsing fails, just display the raw text html += f'
{final_result}
' else: # Format as regular text html += f'
' if isinstance(final_result, str): # Format the text with proper paragraphs paragraphs = final_result.split('\n\n') for paragraph in paragraphs: if paragraph.strip(): paragraph_html = paragraph.replace("\n", "
") html += f'

{paragraph_html}

' elif isinstance(final_result, list): # Handle list of items html += '
    ' for item in final_result: html += f'
  • {item}
  • ' html += '
' elif isinstance(final_result, dict): # Handle dictionary html += '
' for key, value in final_result.items(): html += f'
{key}: {value}
' html += '
' else: # Generic string representation html += f'

{str(final_result)}

' html += '
' else: html += '''

Results will appear here when the task is completed.

''' # Additional keyword table display if keyword_table_rows: html += '''
Keyword Performance Summary
''' for row in keyword_table_rows: html += f'''''' html += '''
Keyword Performance SOV
{row['keyword']} {row['performance']} {row['sov']}
''' html += '
' return html async def run_vayner_research( webui_manager: WebuiManager, components: Dict[gr.components.Component, Any], business_name: str ) -> AsyncGenerator[Dict[gr.components.Component, Any], None]: """ Runs a Vayner client research task and yields UI updates. """ # Get all required UI components run_button_comp = webui_manager.get_component_by_id("vayner_client_research.run_button") stop_button_comp = webui_manager.get_component_by_id("vayner_client_research.stop_button") chatbot_comp = webui_manager.get_component_by_id("vayner_client_research.chatbot") browser_view_comp = webui_manager.get_component_by_id("vayner_client_research.browser_view") pdf_report_comp = webui_manager.get_component_by_id("vayner_client_research.pdf_report") # Create the task using the template with credentials task = VAYNER_CLIENT_TEMPLATE.format( business_name=business_name, vayner_username=VAYNER_USERNAME, vayner_password=VAYNER_PASSWORD ) # Initialize chat history if needed if not hasattr(webui_manager, "vayner_chat_history"): webui_manager.vayner_chat_history = [] # Show the business being researched webui_manager.vayner_chat_history.append( {"role": "user", "content": f"Research business: {business_name}"} ) webui_manager.vayner_chat_history.append( {"role": "assistant", "content": f"Starting research for {business_name}..."} ) yield { k: v for k, v in { chatbot_comp: gr.update(value=webui_manager.vayner_chat_history), run_button_comp: gr.update(value="⏳ Researching...", interactive=False), stop_button_comp: gr.update(interactive=True), pdf_report_comp: gr.update(visible=False) }.items() if k is not None } # Get settings from agent settings def get_setting(name, default=None): comp = webui_manager.get_component_by_id(f"agent_settings.{name}") return components.get(comp, default) if comp else default # LLM Settings llm_provider_name = get_setting("llm_provider", "openai") llm_model_name = get_setting("llm_model_name", "gpt-4o") llm_temperature = get_setting("llm_temperature", 0.6) use_vision = True # Always need vision for this task llm_base_url = get_setting("llm_base_url", "") llm_api_key = get_setting("llm_api_key", "") if not llm_api_key: llm_api_key = os.getenv("OPENAI_API_KEY", "") # Browser Settings def get_browser_setting(key, default=None): comp = webui_manager.get_component_by_id(f"browser_settings.{key}") return components.get(comp, default) if comp else default headless = True # Force headless mode for this agent disable_security = get_browser_setting("disable_security", False) window_w = int(get_browser_setting("window_w", 1920)) window_h = int(get_browser_setting("window_h", 1080)) save_recording_path = get_browser_setting("save_recording_path") or "./tmp/vayner_recordings" save_download_path = get_browser_setting("save_download_path", "./tmp/downloads") # Make sure paths exist os.makedirs(save_recording_path, exist_ok=True) if save_download_path: os.makedirs(save_download_path, exist_ok=True) # Stream settings for view stream_vw = 80 stream_vh = int(80 * window_h // window_w) # Get LLM for agent main_llm = llm_provider.get_llm_model( provider="openai", # Force OpenAI for vision capabilities model_name=str(llm_model_name) if llm_model_name else "gpt-4o", temperature=float(llm_temperature), base_url=str(llm_base_url) if llm_base_url else None, api_key=str(llm_api_key) if llm_api_key else None, ) if main_llm is None: raise ValueError("Failed to initialize LLM. Please check your OpenAI API key and model settings in Agent Settings.") # Step and done callbacks async def step_callback(state, output, step_num): step_num -= 1 logger.info(f"Step {step_num} completed.") # Process screenshot if available (for PDF only, not chat) screenshot_data = getattr(state, "screenshot", None) if screenshot_data: try: if isinstance(screenshot_data, str) and len(screenshot_data) > 100: # Store screenshot for report if not hasattr(webui_manager, "vayner_screenshots"): webui_manager.vayner_screenshots = [] webui_manager.vayner_screenshots.append(screenshot_data) except Exception as e: logger.error(f"Error processing screenshot: {e}") # Extract information for real-time PDF report try: if not hasattr(webui_manager, "vayner_business_info"): webui_manager.vayner_business_info = [] if not hasattr(webui_manager, "vayner_keyword_data"): webui_manager.vayner_keyword_data = [] if not hasattr(webui_manager, "vayner_ranking_data"): webui_manager.vayner_ranking_data = [] # Extract business info, keywords, and rankings from this step for action in output.action: if hasattr(action, "thought") and action.thought: thought = action.thought.lower() # Extract business info if "business" in thought and any(x in thought for x in ["name", "address", "info", "details", "about"]): if action.thought not in webui_manager.vayner_business_info: webui_manager.vayner_business_info.append(action.thought) # Extract keyword data if "keyword" in thought and any(x in thought for x in ["performance", "score", "data"]): if action.thought not in webui_manager.vayner_keyword_data: webui_manager.vayner_keyword_data.append(action.thought) # Extract ranking data if any(x in thought for x in ["ranking", "rank", "geography", "location"]): if action.thought not in webui_manager.vayner_ranking_data: webui_manager.vayner_ranking_data.append(action.thought) # Also check action results for structured data if hasattr(action, "result") and action.result and isinstance(action.result, str): result = action.result.lower() # Extract structured data from results if "business" in result and len(action.result) > 10: if action.result not in webui_manager.vayner_business_info: webui_manager.vayner_business_info.append(action.result) if "keyword" in result and len(action.result) > 10: if action.result not in webui_manager.vayner_keyword_data: webui_manager.vayner_keyword_data.append(action.result) if "rank" in result and len(action.result) > 10: if action.result not in webui_manager.vayner_ranking_data: webui_manager.vayner_ranking_data.append(action.result) # Extract current URL for page context if hasattr(state, "url") and state.url: page_url = state.url if "business" in page_url.lower() and not any(page_url in info for info in webui_manager.vayner_business_info): webui_manager.vayner_business_info.append(f"Page URL: {page_url}") # Extract visible text from the page if available if hasattr(state, "text_content") and state.text_content: # Extract table-like data or lists that might contain keywords or rankings if "keyword" in state.text_content.lower() and len(state.text_content) > 20: if state.text_content not in webui_manager.vayner_keyword_data: webui_manager.vayner_keyword_data.append(state.text_content) # Extract keyword table data if not hasattr(webui_manager, "vayner_keyword_table_rows"): webui_manager.vayner_keyword_table_rows = [] for action in output.action: # Try to extract keyword, performance, SOV from action.thought or action.result for field in [getattr(action, "thought", None), getattr(action, "result", None)]: if field and isinstance(field, str): # Simple regex/parse for lines like: "keyword: X, performance: Y, sov: Z" import re match = re.search(r"keyword[:\s]+([\w\- ]+)[,;\s]+performance[:\s]+([\w\-\.]+)[,;\s]+sov[:\s]+([\w\-\.]+)", field, re.IGNORECASE) if match: keyword = match.group(1).strip() performance = match.group(2).strip() sov = match.group(3).strip() # Only add if not already present if not any(row["keyword"].lower() == keyword.lower() for row in webui_manager.vayner_keyword_table_rows): webui_manager.vayner_keyword_table_rows.append({ "keyword": keyword, "performance": performance, "sov": sov }) # Update the PDF report with the latest data business_name = getattr(webui_manager, "vayner_current_business", "Unknown Business") webui_manager.vayner_pdf_report = generate_live_report( business_name, webui_manager.vayner_business_info, webui_manager.vayner_keyword_data, webui_manager.vayner_ranking_data, webui_manager.vayner_screenshots, webui_manager.vayner_keyword_table_rows, history.final_result() ) # Get the PDF report component and update it in real-time pdf_report_comp = webui_manager.get_component_by_id("vayner_client_research.pdf_report") if pdf_report_comp and hasattr(webui_manager, "update_queue"): webui_manager.update_queue.append({ pdf_report_comp: gr.update( value=webui_manager.vayner_pdf_report, visible=True ) }) except Exception as e: logger.error(f"Error updating PDF report: {e}") # Format logs similar to the screenshot (NO screenshots in chat) try: log_html = f'''
agent {datetime.now().strftime('%H:%M:%S')}
''' # Extract actions actions_text = [] has_content = False # Get full json output action_dump = [action.model_dump(exclude_none=True) for action in output.action] state_dump = output.current_state.model_dump(exclude_none=True) # Step info log_html += f'
🔶 Step {step_num}
' # Add current URL if available if hasattr(state, "url") and state.url: log_html += f'
URL: {state.url}
' # Add actions for action in action_dump: has_content = True if 'action_type' in action: action_type = action['action_type'].upper() # Icon based on action type if action_type == "CLICK": icon = "🖱️" elif action_type == "TYPE": icon = "⌨️" elif action_type == "NAVIGATE": icon = "🔗" elif action_type == "EXTRACT": icon = "📋" elif action_type == "WAIT_FOR_ELEMENT": icon = "⏳" else: icon = "⚙️" # Format based on action type if action_type == "CLICK" and 'selector' in action: log_html += f'
{icon} CLICK: {action["selector"]}
' elif action_type == "TYPE" and 'text' in action: text = action['text'] if len(text) > 50: text = text[:47] + "..." log_html += f'
{icon} TYPE: "{text}"
' elif action_type == "NAVIGATE" and 'url' in action: log_html += f'
{icon} NAVIGATE: {action["url"]}
' elif action_type == "EXTRACT": log_html += f'
{icon} EXTRACT DATA
' elif action_type == "WAIT_FOR_ELEMENT" and 'selector' in action: log_html += f'
{icon} WAIT FOR: {action["selector"]}
' else: details = ", ".join([f"{k}={v}" for k, v in action.items() if k != 'action_type' and k != 'thought']) log_html += f'
{icon} {action_type}: {details}
' # Include thoughts with thinking emoji if 'thought' in action and action['thought']: thought = action['thought'].strip() if len(thought) > 150: thought = thought[:147] + "..." log_html += f'
💭 {thought}
' # Close log div log_html += '
' # If no actions found if not has_content: log_html = f'''
agent {datetime.now().strftime('%H:%M:%S')}
⚠️ Step {step_num} - No actions recorded
''' except Exception as e: logger.error(f"Error formatting step output: {e}") log_html = f'''
error
⚠️ Error formatting Step {step_num}
{str(e)}
''' # Add to chat history webui_manager.vayner_chat_history.append( {"role": "assistant", "content": log_html} ) def done_callback(history): logger.info(f"Vayner research task finished. Duration: {history.total_duration_seconds():.2f}s") final_summary = "**Task Completed**\n" final_summary += f"- Duration: {history.total_duration_seconds():.2f} seconds\n" final_result = history.final_result() if final_result: final_summary += f"- Final Result: {final_result}\n" # --- FIX: Parse final_result for keywords and update table --- import re if not hasattr(webui_manager, "vayner_keyword_table_rows"): webui_manager.vayner_keyword_table_rows = [] # Accept both string and dict/list results if isinstance(final_result, str): # 1. Parse markdown/pipe table lines = [line.strip() for line in final_result.splitlines() if line.strip()] table_start = -1 for i, line in enumerate(lines): if re.match(r"\|?\s*keyword\s*\|\s*performance\s*\|\s*sov\s*\|?", line, re.IGNORECASE): table_start = i break if table_start != -1 and table_start + 2 < len(lines): # Table header, separator, then data rows for row in lines[table_start+2:]: if not row.startswith("|"): continue cells = [c.strip() for c in row.strip("|").split("|")] if len(cells) >= 3: keyword, performance, sov = cells[:3] if keyword and performance and sov: if not any(row_item["keyword"].lower() == keyword.lower() for row_item in webui_manager.vayner_keyword_table_rows): webui_manager.vayner_keyword_table_rows.append({ "keyword": keyword, "performance": performance, "sov": sov }) # 2. Also parse lines like: "keyword: X, performance: Y, sov: Z" for line in lines: match = re.search(r"keyword[:\s]+([\w\- ]+)[,;\s]+performance[:\s]+([\w\-\.]+)[,;\s]+sov[:\s]+([\w\-\.]+)", line, re.IGNORECASE) if match: keyword = match.group(1).strip() performance = match.group(2).strip() sov = match.group(3).strip() if not any(row_item["keyword"].lower() == keyword.lower() for row_item in webui_manager.vayner_keyword_table_rows): webui_manager.vayner_keyword_table_rows.append({ "keyword": keyword, "performance": performance, "sov": sov }) elif isinstance(final_result, list): for item in final_result: if isinstance(item, dict): keyword = item.get("keyword") performance = item.get("performance") sov = item.get("sov") if keyword and performance and sov: if not any(row_item["keyword"].lower() == keyword.lower() for row_item in webui_manager.vayner_keyword_table_rows): webui_manager.vayner_keyword_table_rows.append({ "keyword": keyword, "performance": performance, "sov": sov }) errors = history.errors() if errors and any(errors): final_summary += f"- **Errors:**\n```\n{errors}\n```\n" else: final_summary += "- Status: Success\n" webui_manager.vayner_chat_history.append( {"role": "assistant", "content": final_summary} ) # Generate PDF report using the current live data collections try: business_name = getattr(webui_manager, "vayner_current_business", "Unknown Business") webui_manager.vayner_pdf_report = generate_live_report( business_name, webui_manager.vayner_business_info, webui_manager.vayner_keyword_data, webui_manager.vayner_ranking_data, webui_manager.vayner_screenshots, webui_manager.vayner_keyword_table_rows, final_result ) except Exception as e: logger.error(f"Error generating PDF report: {e}", exc_info=True) webui_manager.vayner_pdf_report = f"
Error generating report: {str(e)}
" # Initialize controller and browser try: if not webui_manager.vayner_controller: webui_manager.vayner_controller = CustomController() if not webui_manager.vayner_browser: webui_manager.vayner_browser = CustomBrowser( config=BrowserConfig( headless=headless, disable_security=disable_security, browser_binary_path=None, new_context_config=BrowserContextConfig( window_width=window_w, window_height=window_h, ) ) ) if not webui_manager.vayner_browser_context: context_config = BrowserContextConfig( save_recording_path=save_recording_path, save_downloads_path=save_download_path, window_height=window_h, window_width=window_w, ) webui_manager.vayner_browser_context = ( await webui_manager.vayner_browser.new_context(config=context_config) ) # Initialize agent if not webui_manager.vayner_agent: webui_manager.vayner_agent = BrowserUseAgent( task=task, llm=main_llm, browser=webui_manager.vayner_browser, browser_context=webui_manager.vayner_browser_context, controller=webui_manager.vayner_controller, register_new_step_callback=step_callback, register_done_callback=done_callback, use_vision=use_vision, max_input_tokens=128000, max_actions_per_step=10, source="vayner_research", ) else: webui_manager.vayner_agent.add_new_task(task) # Run the agent agent_run_coro = webui_manager.vayner_agent.run(max_steps=50) agent_task = asyncio.create_task(agent_run_coro) webui_manager.vayner_current_task = agent_task # Monitor the task and update UI last_chat_len = len(webui_manager.vayner_chat_history) while not agent_task.done(): # Update Chatbot if new messages arrived if len(webui_manager.vayner_chat_history) > last_chat_len: yield { chatbot_comp: gr.update(value=webui_manager.vayner_chat_history) } last_chat_len = len(webui_manager.vayner_chat_history) # Update Browser View if webui_manager.vayner_browser_context: try: screenshot_b64 = await webui_manager.vayner_browser_context.take_screenshot() if screenshot_b64: html_content = f'' yield { browser_view_comp: gr.update(value=html_content, visible=True) } except Exception as e: logger.debug(f"Failed to capture screenshot: {e}") await asyncio.sleep(0.5) # Polling interval # Wait for the task to complete await agent_task # Show PDF Report if generated if hasattr(webui_manager, "vayner_pdf_report") and webui_manager.vayner_pdf_report: yield { run_button_comp: gr.update(value="▶️ Research Client", interactive=True), stop_button_comp: gr.update(interactive=False), chatbot_comp: gr.update(value=webui_manager.vayner_chat_history), pdf_report_comp: gr.update(value=webui_manager.vayner_pdf_report, visible=True) } else: # Update UI when complete without PDF report yield { run_button_comp: gr.update(value="▶️ Research Client", interactive=True), stop_button_comp: gr.update(interactive=False), chatbot_comp: gr.update(value=webui_manager.vayner_chat_history) } except Exception as e: logger.error(f"Error during Vayner research: {e}", exc_info=True) error_message = f"**Error during research:**\n```\n{str(e)}\n```" webui_manager.vayner_chat_history.append( {"role": "assistant", "content": error_message} ) yield { chatbot_comp: gr.update(value=webui_manager.vayner_chat_history), run_button_comp: gr.update(value="▶️ Research Client", interactive=True), stop_button_comp: gr.update(interactive=False), pdf_report_comp: gr.update(visible=False) } gr.Error(f"Research task failed: {e}") async def handle_submit(webui_manager: WebuiManager, business_name: str): """Handles click on the Research Client button.""" if not business_name.strip(): gr.Warning("Please enter a business name") yield {} else: # Store the current business name webui_manager.vayner_current_business = business_name.strip() # Reset report data collections webui_manager.vayner_screenshots = [] webui_manager.vayner_business_info = [] webui_manager.vayner_keyword_data = [] webui_manager.vayner_ranking_data = [] webui_manager.vayner_keyword_table_rows = [] # Reset keyword table rows # Initialize empty report (cover and second page only) webui_manager.vayner_pdf_report = generate_live_report( business_name.strip(), [], [], [], [], [] ) # Get PDF report component pdf_report_comp = webui_manager.get_component_by_id("vayner_client_research.pdf_report") # Show the cover/second page immediately yield { pdf_report_comp: gr.update( value=webui_manager.vayner_pdf_report, visible=True ) } # Initialize update queue webui_manager.update_queue = [] # Use async generator to stream updates components = {} # Will be populated by components in run_vayner_research async for update in run_vayner_research(webui_manager, components, business_name.strip()): # Include any queued PDF report updates while webui_manager.update_queue: pdf_updates = webui_manager.update_queue.pop(0) update.update(pdf_updates) yield update async def handle_stop(webui_manager: WebuiManager): """Handles clicks on the 'Stop' button.""" logger.info("Stop button clicked.") agent = webui_manager.vayner_agent task = webui_manager.vayner_current_task if agent and task and not task.done(): # Safely try to stop the agent try: if hasattr(agent, 'stop'): agent.stop() else: # Alternative method agent.state.stopped = True agent.state.paused = False except Exception as e: logger.warning(f"Error stopping agent: {e}") task.cancel() try: await asyncio.wait_for(task, timeout=2.0) except (asyncio.CancelledError, asyncio.TimeoutError, Exception): pass run_button_comp = webui_manager.get_component_by_id("vayner_client_research.run_button") stop_button_comp = webui_manager.get_component_by_id("vayner_client_research.stop_button") yield { run_button_comp: gr.update(value="▶️ Research Client", interactive=True), stop_button_comp: gr.update(interactive=False) } else: yield {} async def handle_clear(webui_manager: WebuiManager): """Handles clicks on the 'Clear' button.""" logger.info("Clear button clicked.") # Stop any running task task = webui_manager.vayner_current_task if task and not task.done(): # Stop the agent instead of using handle_stop try: agent = webui_manager.vayner_agent if agent and hasattr(agent, 'stop'): agent.stop() elif agent: agent.state.stopped = True agent.state.paused = False # Cancel the task task.cancel() try: await asyncio.wait_for(task, timeout=1.0) except (asyncio.CancelledError, asyncio.TimeoutError, Exception): pass except Exception as e: logger.warning(f"Error stopping agent: {e}") # Reset the chat history and PDF report webui_manager.vayner_chat_history = [] webui_manager.vayner_pdf_report = generate_live_report( "Business Name", [], [], [], [], [] ) # Reset data collections for PDF report webui_manager.vayner_screenshots = [] webui_manager.vayner_business_info = [] webui_manager.vayner_keyword_data = [] webui_manager.vayner_ranking_data = [] webui_manager.vayner_keyword_table_rows = [] webui_manager.vayner_current_business = "Business Name" webui_manager.update_queue = [] # Get components chatbot_comp = webui_manager.get_component_by_id("vayner_client_research.chatbot") run_button_comp = webui_manager.get_component_by_id("vayner_client_research.run_button") stop_button_comp = webui_manager.get_component_by_id("vayner_client_research.stop_button") browser_view_comp = webui_manager.get_component_by_id("vayner_client_research.browser_view") business_name_comp = webui_manager.get_component_by_id("vayner_client_research.business_name") pdf_report_comp = webui_manager.get_component_by_id("vayner_client_research.pdf_report") yield { chatbot_comp: gr.update(value=[]), run_button_comp: gr.update(value="▶️ Research Client", interactive=True), stop_button_comp: gr.update(interactive=False), browser_view_comp: gr.update(value="
Browser View
"), business_name_comp: gr.update(value=""), pdf_report_comp: gr.update(value=webui_manager.vayner_pdf_report, visible=True) } def create_vayner_client_research_tab(webui_manager: WebuiManager): """ Create the Vayner Client Research tab with specialized agent functionality. """ # Initialize manager for Vayner client research webui_manager.init_vayner_client_research() # Create UI layout with left panel for agent interaction and right panel for browser view with gr.Row(elem_id="vayner_client_research_container"): # Left Panel - Agent Interaction (30% width) with gr.Column(scale=3): gr.Markdown("### Vayner Client Research Agent") chatbot = gr.Chatbot( value=webui_manager.vayner_chat_history, label="Agent Interaction", height=700, show_copy_button=True, type="messages" ) with gr.Row(): business_name = gr.Textbox( label="Business Name", placeholder="Enter the business name to research", lines=1 ) with gr.Row(): run_button = gr.Button("▶️ Research Client", variant="primary", scale=3) stop_button = gr.Button("⏹️ Stop", interactive=False, variant="stop", scale=2) clear_button = gr.Button("🗑️ Clear", variant="secondary", scale=2) # Right Panel - Browser View (70% width) with gr.Column(scale=7): with gr.Tabs(): with gr.TabItem("Browser View"): browser_view = gr.HTML( value="

Browser view will appear here during research

", label="Browser Live View", ) with gr.TabItem("PDF Report"): pdf_report = gr.HTML( value="

PDF Report will appear here after task completion

", label="Research Report", visible=False ) # Store components in manager tab_components = { "chatbot": chatbot, "business_name": business_name, "run_button": run_button, "stop_button": stop_button, "clear_button": clear_button, "browser_view": browser_view, "pdf_report": pdf_report } webui_manager.add_components("vayner_client_research", tab_components) # Wrapper functions for button handlers async def submit_wrapper(business_name_value): async for update in handle_submit(webui_manager, business_name_value): yield update async def stop_wrapper(): async for update in handle_stop(webui_manager): yield update async def clear_wrapper(): async for update in handle_clear(webui_manager): yield update # Connect event handlers run_button.click( fn=submit_wrapper, inputs=[business_name], outputs=list(tab_components.values()) ) business_name.submit( fn=submit_wrapper, inputs=[business_name], outputs=list(tab_components.values()) ) stop_button.click( fn=stop_wrapper, inputs=None, outputs=list(tab_components.values()) ) clear_button.click( fn=clear_wrapper, inputs=None, outputs=list(tab_components.values()) )