ai / src /tools /deep_search.py
hadadrjt's picture
ai: Use asyncio with httpx and aiohttp for all requests.
e96f98e
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import asyncio # Import asyncio for managing asynchronous operations and concurrency
import aiohttp # Import aiohttp to perform asynchronous HTTP requests efficiently
import httpx # Import httpx library for asynchronous HTTP requests as an alternative client
from src.utils.ip_generator import generate_ip # Import function to generate random IP addresses for request headers
# Define the main SearchTools class that provides web searching and URL reading capabilities
class SearchTools:
# This class provides methods to connect to the web
"""
A robust asynchronous class that provides tools to perform web searches and read content from URLs.
This implementation uses both aiohttp and httpx libraries to demonstrate usage of multiple async HTTP clients.
It includes infinite retry mechanisms to ensure reliability when network errors or timeouts occur.
Attributes:
searxng_url (str): Base URL for the SearXNG search proxy service for Google and Bing searches.
baidu_url (str): Base URL for Baidu search engine for Chinese language queries.
timeout (int): Timeout duration in seconds for HTTP requests to prevent indefinite hanging.
reader_api (str): Base URL for the reader API service used to extract clean content from URLs.
Methods:
read_url(url): Asynchronously reads and returns textual content of the specified URL using the reader API.
search(query, engine): Asynchronously performs a web search with the given query on the specified search engine,
returning the raw HTML response text.
"""
def __init__(self):
"""
Initialize the SearchTools instance with all necessary URLs and timeout settings.
This sets up the base URLs for search engines and reader API, along with a default timeout.
"""
self.searxng_url = "https://paulgo.io/search" # SearXNG proxy URL for Google/Bing searches
self.baidu_url = "https://www.baidu.com/s" # Baidu search engine base URL
self.timeout = 30 # Timeout for HTTP requests in seconds
self.reader_api = "https://r.jina.ai/" # Reader API endpoint to extract clean text from web pages
async def _fetch_with_retry_aiohttp(self, session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> str:
"""
Internal helper method to perform HTTP requests using aiohttp with infinite retry until success.
Args:
session (aiohttp.ClientSession): aiohttp session object for making requests.
method (str): HTTP method to use ('get', 'post', etc.).
url (str): The full URL to send the request to.
**kwargs: Additional parameters passed to session.request (headers, data, etc.).
Returns:
str: The response text content upon successful request.
"""
while True: # Loop indefinitely until a successful response is received
try:
# Perform the HTTP request asynchronously using aiohttp session
async with session.request(method, url, **kwargs) as response:
# Raise exception if HTTP status indicates an error (4xx or 5xx)
response.raise_for_status()
# Return the response body as text
return await response.text()
except Exception as e:
# On any exception (network error, timeout, HTTP error), wait 5 seconds before retrying
await asyncio.sleep(5)
async def _fetch_with_retry_httpx(self, client: httpx.AsyncClient, method: str, url: str, **kwargs) -> str:
"""
Internal helper method to perform HTTP requests using httpx with infinite retry until success.
Args:
client (httpx.AsyncClient): httpx asynchronous client instance.
method (str): HTTP method to use ('get', 'post', etc.).
url (str): The full URL to send the request to.
**kwargs: Additional parameters passed to client.request (headers, data, etc.).
Returns:
str: The response text content upon successful request.
"""
while True: # Loop indefinitely until a successful response is received
try:
# Perform the HTTP request asynchronously using httpx client
response = await client.request(method, url, **kwargs)
# Raise exception if HTTP status indicates an error (4xx or 5xx)
response.raise_for_status()
# Return the response body as text
return response.text
except Exception as e:
# On any exception (network error, timeout, HTTP error), wait 5 seconds before retrying
await asyncio.sleep(5)
async def read_url(self, url: str) -> str:
"""
Asynchronously read and extract textual content from the specified URL using the reader API.
This method uses aiohttp client with infinite retry to ensure reliable content retrieval.
Args:
url (str): The full URL of the webpage to extract content from.
Returns:
str: The clean textual content extracted by the reader API.
"""
# Prepare POST data payload containing the target URL for the reader API
data = {"url": url}
# Create aiohttp client session with a total timeout configured
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
# Use the internal retry helper to POST the URL to the reader API and return the extracted content
return await self._fetch_with_retry_aiohttp(session, 'post', self.reader_api, data=data)
async def search(self, query: str, engine: str = "google") -> str:
"""
Asynchronously perform a web search for the given query using the specified search engine.
This method uses httpx client with infinite retry to reliably fetch search results.
Args:
query (str): The search query string.
engine (str, optional): The search engine to use ("google" or "baidu"). Defaults to "google".
Returns:
str: The raw HTML content of the search results page.
"""
if engine == "baidu":
# Construct the full URL for Baidu search with the query parameter 'wd'
url = f"{self.reader_api}{self.baidu_url}?wd={query}"
# Set HTTP headers to target Baidu's main content container and spoof IP address
headers = {
"X-Target-Selector": "#content_left", # CSS selector for Baidu search results container
"X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins
}
else:
# Determine prefix for SearXNG proxy based on engine: '!go' for Google, '!bi' for Bing
prefix = "!go" if engine == "google" else "!bi"
# Construct the full URL for SearXNG search proxy with query and prefix
url = f"{self.reader_api}{self.searxng_url}?q={prefix} {query}"
# Set HTTP headers to target SearXNG search results container and spoof IP address
headers = {
"X-Target-Selector": "#urls", # CSS selector for SearXNG search results container
"X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins
}
# Create httpx asynchronous client with timeout configured
async with httpx.AsyncClient(timeout=self.timeout) as client:
# Use the internal retry helper to GET the search results and return the HTML content
return await self._fetch_with_retry_httpx(client, 'get', url, headers=headers)