# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import asyncio # Import asyncio for managing asynchronous operations and concurrency import aiohttp # Import aiohttp to perform asynchronous HTTP requests efficiently import httpx # Import httpx library for asynchronous HTTP requests as an alternative client from src.utils.ip_generator import generate_ip # Import function to generate random IP addresses for request headers # Define the main SearchTools class that provides web searching and URL reading capabilities class SearchTools: # This class provides methods to connect to the web """ A robust asynchronous class that provides tools to perform web searches and read content from URLs. This implementation uses both aiohttp and httpx libraries to demonstrate usage of multiple async HTTP clients. It includes infinite retry mechanisms to ensure reliability when network errors or timeouts occur. Attributes: searxng_url (str): Base URL for the SearXNG search proxy service for Google and Bing searches. baidu_url (str): Base URL for Baidu search engine for Chinese language queries. timeout (int): Timeout duration in seconds for HTTP requests to prevent indefinite hanging. reader_api (str): Base URL for the reader API service used to extract clean content from URLs. Methods: read_url(url): Asynchronously reads and returns textual content of the specified URL using the reader API. search(query, engine): Asynchronously performs a web search with the given query on the specified search engine, returning the raw HTML response text. """ def __init__(self): """ Initialize the SearchTools instance with all necessary URLs and timeout settings. This sets up the base URLs for search engines and reader API, along with a default timeout. """ self.searxng_url = "https://paulgo.io/search" # SearXNG proxy URL for Google/Bing searches self.baidu_url = "https://www.baidu.com/s" # Baidu search engine base URL self.timeout = 30 # Timeout for HTTP requests in seconds self.reader_api = "https://r.jina.ai/" # Reader API endpoint to extract clean text from web pages async def _fetch_with_retry_aiohttp(self, session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> str: """ Internal helper method to perform HTTP requests using aiohttp with infinite retry until success. Args: session (aiohttp.ClientSession): aiohttp session object for making requests. method (str): HTTP method to use ('get', 'post', etc.). url (str): The full URL to send the request to. **kwargs: Additional parameters passed to session.request (headers, data, etc.). Returns: str: The response text content upon successful request. """ while True: # Loop indefinitely until a successful response is received try: # Perform the HTTP request asynchronously using aiohttp session async with session.request(method, url, **kwargs) as response: # Raise exception if HTTP status indicates an error (4xx or 5xx) response.raise_for_status() # Return the response body as text return await response.text() except Exception as e: # On any exception (network error, timeout, HTTP error), wait 5 seconds before retrying await asyncio.sleep(5) async def _fetch_with_retry_httpx(self, client: httpx.AsyncClient, method: str, url: str, **kwargs) -> str: """ Internal helper method to perform HTTP requests using httpx with infinite retry until success. Args: client (httpx.AsyncClient): httpx asynchronous client instance. method (str): HTTP method to use ('get', 'post', etc.). url (str): The full URL to send the request to. **kwargs: Additional parameters passed to client.request (headers, data, etc.). Returns: str: The response text content upon successful request. """ while True: # Loop indefinitely until a successful response is received try: # Perform the HTTP request asynchronously using httpx client response = await client.request(method, url, **kwargs) # Raise exception if HTTP status indicates an error (4xx or 5xx) response.raise_for_status() # Return the response body as text return response.text except Exception as e: # On any exception (network error, timeout, HTTP error), wait 5 seconds before retrying await asyncio.sleep(5) async def read_url(self, url: str) -> str: """ Asynchronously read and extract textual content from the specified URL using the reader API. This method uses aiohttp client with infinite retry to ensure reliable content retrieval. Args: url (str): The full URL of the webpage to extract content from. Returns: str: The clean textual content extracted by the reader API. """ # Prepare POST data payload containing the target URL for the reader API data = {"url": url} # Create aiohttp client session with a total timeout configured async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session: # Use the internal retry helper to POST the URL to the reader API and return the extracted content return await self._fetch_with_retry_aiohttp(session, 'post', self.reader_api, data=data) async def search(self, query: str, engine: str = "google") -> str: """ Asynchronously perform a web search for the given query using the specified search engine. This method uses httpx client with infinite retry to reliably fetch search results. Args: query (str): The search query string. engine (str, optional): The search engine to use ("google" or "baidu"). Defaults to "google". Returns: str: The raw HTML content of the search results page. """ if engine == "baidu": # Construct the full URL for Baidu search with the query parameter 'wd' url = f"{self.reader_api}{self.baidu_url}?wd={query}" # Set HTTP headers to target Baidu's main content container and spoof IP address headers = { "X-Target-Selector": "#content_left", # CSS selector for Baidu search results container "X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins } else: # Determine prefix for SearXNG proxy based on engine: '!go' for Google, '!bi' for Bing prefix = "!go" if engine == "google" else "!bi" # Construct the full URL for SearXNG search proxy with query and prefix url = f"{self.reader_api}{self.searxng_url}?q={prefix} {query}" # Set HTTP headers to target SearXNG search results container and spoof IP address headers = { "X-Target-Selector": "#urls", # CSS selector for SearXNG search results container "X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins } # Create httpx asynchronous client with timeout configured async with httpx.AsyncClient(timeout=self.timeout) as client: # Use the internal retry helper to GET the search results and return the HTML content return await self._fetch_with_retry_httpx(client, 'get', url, headers=headers)