Spaces:
Running
Running
# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import asyncio # Import asyncio for managing asynchronous operations and concurrency | |
import aiohttp # Import aiohttp to perform asynchronous HTTP requests efficiently | |
import httpx # Import httpx library for asynchronous HTTP requests as an alternative client | |
from src.utils.ip_generator import generate_ip # Import function to generate random IP addresses for request headers | |
# Define the main SearchTools class that provides web searching and URL reading capabilities | |
class SearchTools: | |
# This class provides methods to connect to the web | |
""" | |
A robust asynchronous class that provides tools to perform web searches and read content from URLs. | |
This implementation uses both aiohttp and httpx libraries to demonstrate usage of multiple async HTTP clients. | |
It includes infinite retry mechanisms to ensure reliability when network errors or timeouts occur. | |
Attributes: | |
searxng_url (str): Base URL for the SearXNG search proxy service for Google and Bing searches. | |
baidu_url (str): Base URL for Baidu search engine for Chinese language queries. | |
timeout (int): Timeout duration in seconds for HTTP requests to prevent indefinite hanging. | |
reader_api (str): Base URL for the reader API service used to extract clean content from URLs. | |
Methods: | |
read_url(url): Asynchronously reads and returns textual content of the specified URL using the reader API. | |
search(query, engine): Asynchronously performs a web search with the given query on the specified search engine, | |
returning the raw HTML response text. | |
""" | |
def __init__(self): | |
""" | |
Initialize the SearchTools instance with all necessary URLs and timeout settings. | |
This sets up the base URLs for search engines and reader API, along with a default timeout. | |
""" | |
self.searxng_url = "https://paulgo.io/search" # SearXNG proxy URL for Google/Bing searches | |
self.baidu_url = "https://www.baidu.com/s" # Baidu search engine base URL | |
self.timeout = 30 # Timeout for HTTP requests in seconds | |
self.reader_api = "https://r.jina.ai/" # Reader API endpoint to extract clean text from web pages | |
async def _fetch_with_retry_aiohttp(self, session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> str: | |
""" | |
Internal helper method to perform HTTP requests using aiohttp with infinite retry until success. | |
Args: | |
session (aiohttp.ClientSession): aiohttp session object for making requests. | |
method (str): HTTP method to use ('get', 'post', etc.). | |
url (str): The full URL to send the request to. | |
**kwargs: Additional parameters passed to session.request (headers, data, etc.). | |
Returns: | |
str: The response text content upon successful request. | |
""" | |
while True: # Loop indefinitely until a successful response is received | |
try: | |
# Perform the HTTP request asynchronously using aiohttp session | |
async with session.request(method, url, **kwargs) as response: | |
# Raise exception if HTTP status indicates an error (4xx or 5xx) | |
response.raise_for_status() | |
# Return the response body as text | |
return await response.text() | |
except Exception as e: | |
# On any exception (network error, timeout, HTTP error), wait 5 seconds before retrying | |
await asyncio.sleep(5) | |
async def _fetch_with_retry_httpx(self, client: httpx.AsyncClient, method: str, url: str, **kwargs) -> str: | |
""" | |
Internal helper method to perform HTTP requests using httpx with infinite retry until success. | |
Args: | |
client (httpx.AsyncClient): httpx asynchronous client instance. | |
method (str): HTTP method to use ('get', 'post', etc.). | |
url (str): The full URL to send the request to. | |
**kwargs: Additional parameters passed to client.request (headers, data, etc.). | |
Returns: | |
str: The response text content upon successful request. | |
""" | |
while True: # Loop indefinitely until a successful response is received | |
try: | |
# Perform the HTTP request asynchronously using httpx client | |
response = await client.request(method, url, **kwargs) | |
# Raise exception if HTTP status indicates an error (4xx or 5xx) | |
response.raise_for_status() | |
# Return the response body as text | |
return response.text | |
except Exception as e: | |
# On any exception (network error, timeout, HTTP error), wait 5 seconds before retrying | |
await asyncio.sleep(5) | |
async def read_url(self, url: str) -> str: | |
""" | |
Asynchronously read and extract textual content from the specified URL using the reader API. | |
This method uses aiohttp client with infinite retry to ensure reliable content retrieval. | |
Args: | |
url (str): The full URL of the webpage to extract content from. | |
Returns: | |
str: The clean textual content extracted by the reader API. | |
""" | |
# Prepare POST data payload containing the target URL for the reader API | |
data = {"url": url} | |
# Create aiohttp client session with a total timeout configured | |
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session: | |
# Use the internal retry helper to POST the URL to the reader API and return the extracted content | |
return await self._fetch_with_retry_aiohttp(session, 'post', self.reader_api, data=data) | |
async def search(self, query: str, engine: str = "google") -> str: | |
""" | |
Asynchronously perform a web search for the given query using the specified search engine. | |
This method uses httpx client with infinite retry to reliably fetch search results. | |
Args: | |
query (str): The search query string. | |
engine (str, optional): The search engine to use ("google" or "baidu"). Defaults to "google". | |
Returns: | |
str: The raw HTML content of the search results page. | |
""" | |
if engine == "baidu": | |
# Construct the full URL for Baidu search with the query parameter 'wd' | |
url = f"{self.reader_api}{self.baidu_url}?wd={query}" | |
# Set HTTP headers to target Baidu's main content container and spoof IP address | |
headers = { | |
"X-Target-Selector": "#content_left", # CSS selector for Baidu search results container | |
"X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins | |
} | |
else: | |
# Determine prefix for SearXNG proxy based on engine: '!go' for Google, '!bi' for Bing | |
prefix = "!go" if engine == "google" else "!bi" | |
# Construct the full URL for SearXNG search proxy with query and prefix | |
url = f"{self.reader_api}{self.searxng_url}?q={prefix} {query}" | |
# Set HTTP headers to target SearXNG search results container and spoof IP address | |
headers = { | |
"X-Target-Selector": "#urls", # CSS selector for SearXNG search results container | |
"X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins | |
} | |
# Create httpx asynchronous client with timeout configured | |
async with httpx.AsyncClient(timeout=self.timeout) as client: | |
# Use the internal retry helper to GET the search results and return the HTML content | |
return await self._fetch_with_retry_httpx(client, 'get', url, headers=headers) |