hadadrjt commited on
Commit
e96f98e
·
1 Parent(s): 5952826

ai: Use asyncio with httpx and aiohttp for all requests.

Browse files

* The private server support HTTP/2.

* Tools operate over HTTP/1.1 to ensure compatibility.

Super fast connection means happy users.

Combining these protocols with asyncio, httpx, and aiohttp creates a highly stable and performant asynchronous system.

Files changed (4) hide show
  1. src/core/server.py +130 -48
  2. src/tools/audio.py +92 -32
  3. src/tools/deep_search.py +79 -70
  4. src/tools/image.py +77 -64
src/core/server.py CHANGED
@@ -3,20 +3,22 @@
3
  # SPDX-License-Identifier: Apache-2.0
4
  #
5
 
6
- import json # Import JSON module to parse and handle JSON data
7
- import uuid # Import UUID module to generate unique identifiers for sessions
8
- from typing import List, Dict, Any # Import type hints for lists, dictionaries, and generic types
9
- from datetime import datetime # Import datetime to get and format current date/time
10
- from config import * # Import all configuration variables including 'auth' and 'restrictions'
11
- from src.utils.session_mapping import get_host # Import function to get server info by session ID
12
- from src.utils.ip_generator import generate_ip # Import function to generate random IP for headers
13
- from src.utils.helper import mark # Import function to mark a server as busy/unavailable
14
- from src.ui.reasoning import styles # Import function to apply CSS styling to reasoning output
15
- import httpx # Import httpx for async HTTP requests with streaming support
 
 
16
 
17
  async def jarvis(
18
  session_id: str, # Unique session identifier to maintain consistent server assignment
19
- model: str, # AI model name to specify which model to use
20
  history: List[Dict[str, str]], # List of previous conversation messages with roles and content
21
  user_message: str, # Latest user input message to send to the AI model
22
  mode: str, # Mode string to guide AI behavior, e.g., '/think' or '/no_think'
@@ -35,6 +37,9 @@ async def jarvis(
35
  the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed
36
  first inside a styled HTML block, followed by the main content streamed normally.
37
 
 
 
 
38
  Args:
39
  session_id (str): Identifier for the user session to maintain consistent server assignment.
40
  model (str): Name of the AI model to use for generating the response.
@@ -58,47 +63,48 @@ async def jarvis(
58
  If the server returns a specific error code indicating it is busy, it retries with another server.
59
  If all servers are busy or fail, it yields a message indicating the server is busy.
60
  """
61
- tried = set() # Track servers already tried to avoid repeated retries
62
 
63
  # Loop until all available servers have been tried without success
64
  while len(tried) < len(auth):
65
  # Get server setup info assigned for this session, including endpoint, token, and error code
66
  setup = get_host(session_id)
67
- server = setup["jarvis"] # Server identifier
68
- host = setup["endpoint"] # API endpoint URL
69
- token = setup["token"] # Authorization token
70
- error = setup["error"] # HTTP error code triggering retry
71
- tried.add(server) # Mark this server as tried
72
 
73
- # Format current date/time string for system instructions
74
  date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z")
75
 
76
- # Combine mode instructions, usage restrictions, and date into system instructions string
77
  instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n"
78
 
79
- # Copy conversation history to avoid mutating original
80
  messages = history.copy()
81
- # Insert system instructions as first message
 
82
  messages.insert(0, {"role": "system", "content": instructions})
83
 
84
- # Prepare user message dict, include files if provided
85
  msg = {"role": "user", "content": user_message}
86
  if files:
87
  msg["files"] = files
88
- messages.append(msg) # Append user message to conversation
89
 
90
- # Prepare HTTP headers with authorization and randomized client IP
91
  headers = {
92
- "Authorization": f"Bearer {token}", # Bearer token for API access
93
- "Content-Type": "application/json", # JSON content type
94
- "X-Forwarded-For": generate_ip() # Random IP to simulate different client origins
95
  }
96
 
97
- # Prepare JSON payload with model parameters and conversation messages
98
  payload = {
99
  "model": model,
100
  "messages": messages,
101
- "stream": True,
102
  "temperature": temperature,
103
  "top_k": top_k,
104
  "min_p": min_p,
@@ -107,25 +113,25 @@ async def jarvis(
107
  }
108
 
109
  # Initialize accumulators and flags for streamed response parts
110
- reasoning = "" # Accumulate reasoning text
111
- reasoning_check = None # Flag to detect presence of reasoning in response
112
  reasoning_done = False # Flag marking reasoning completion
113
- content = "" # Accumulate main content text
114
 
115
  try:
116
- # Create async HTTP client with no timeout for long streaming
117
- async with httpx.AsyncClient(timeout=None) as client:
118
- # Open async streaming POST request to Jarvis server
119
  async with client.stream("POST", host, headers=headers, json=payload) as response:
120
- # Iterate asynchronously over each line of streaming response
121
  async for chunk in response.aiter_lines():
122
- # Skip lines not starting with "data:"
123
  if not chunk.strip().startswith("data:"):
124
  continue
125
  try:
126
- # Parse JSON data after "data:" prefix
127
  data = json.loads(chunk[5:])
128
- # Extract incremental delta message from first choice
129
  choice = data["choices"][0]["delta"]
130
 
131
  # On first delta received, detect if 'reasoning' field is present and non-empty
@@ -133,7 +139,7 @@ async def jarvis(
133
  # Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None
134
  reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None
135
 
136
- # If reasoning is present and mode is not '/no_think' and reasoning not done
137
  if (
138
  reasoning_check == "" # Reasoning detected in response
139
  and mode != "/no_think" # Mode allows reasoning output
@@ -144,7 +150,7 @@ async def jarvis(
144
  reasoning += choice["reasoning"] # Append incremental reasoning text
145
  # Yield reasoning wrapped in styled HTML block with details expanded
146
  yield styles(reasoning=reasoning, content="", expanded=True)
147
- continue # Continue streaming reasoning increments
148
 
149
  # When reasoning ends and content starts, mark reasoning done, yield empty string, then content
150
  if (
@@ -155,9 +161,9 @@ async def jarvis(
155
  and choice["content"] # Content is not empty
156
  ):
157
  reasoning_done = True # Mark reasoning phase complete
158
- yield "" # Yield empty string to signal end of reasoning block
159
  content += choice["content"] # Start accumulating content text
160
- yield content # Yield first part of content
161
  continue # Continue streaming content increments
162
 
163
  # If no reasoning present or reasoning done, accumulate content and yield incrementally
@@ -167,22 +173,98 @@ async def jarvis(
167
  and choice["content"] # Content is not empty
168
  ):
169
  content += choice["content"] # Append incremental content text
170
- yield content # Yield updated content string
171
  except Exception:
172
  # Ignore exceptions during JSON parsing or key access and continue streaming
173
  continue
174
  return # Exit function after successful streaming completion
 
175
  except httpx.HTTPStatusError as e:
176
  # If server returns specific error code indicating busy, retry with another server
177
  if e.response.status_code == error:
178
- continue # Try next available server
 
179
  else:
180
- # For other HTTP errors, mark this server as busy
181
  mark(server)
182
  except Exception:
183
- # For other exceptions (network errors, timeouts), mark server as busy
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
184
  mark(server)
185
 
186
- # If all servers tried and none succeeded, yield busy message
187
  yield "The server is currently busy. Please wait a moment or try again later."
188
  return # End of function
 
3
  # SPDX-License-Identifier: Apache-2.0
4
  #
5
 
6
+ import json # Module to parse and handle JSON data
7
+ import uuid # Module to generate unique identifiers (UUIDs)
8
+ from typing import List, Dict, Any # Type hinting for list, dict, and generic types
9
+ from datetime import datetime # To get and format current date and time
10
+ from config import * # Import all configuration variables, including 'auth' and 'restrictions'
11
+ from src.utils.session_mapping import get_host # Function to get server info by session ID
12
+ from src.utils.ip_generator import generate_ip # Function to generate random IP addresses for headers
13
+ from src.utils.helper import mark # Function to mark a server as busy/unavailable
14
+ from src.ui.reasoning import styles # Function to apply CSS styling to reasoning output
15
+ import asyncio # Asyncio for asynchronous programming
16
+ import httpx # Async HTTP client supporting HTTP/2 and streaming
17
+ import aiohttp # Async HTTP client for alternative HTTP requests and streaming
18
 
19
  async def jarvis(
20
  session_id: str, # Unique session identifier to maintain consistent server assignment
21
+ model: str, # AI model name specifying which model to use
22
  history: List[Dict[str, str]], # List of previous conversation messages with roles and content
23
  user_message: str, # Latest user input message to send to the AI model
24
  mode: str, # Mode string to guide AI behavior, e.g., '/think' or '/no_think'
 
37
  the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed
38
  first inside a styled HTML block, followed by the main content streamed normally.
39
 
40
+ The implementation uses both httpx (with HTTP/2 support) and aiohttp to ensure compatibility and robustness
41
+ in streaming responses.
42
+
43
  Args:
44
  session_id (str): Identifier for the user session to maintain consistent server assignment.
45
  model (str): Name of the AI model to use for generating the response.
 
63
  If the server returns a specific error code indicating it is busy, it retries with another server.
64
  If all servers are busy or fail, it yields a message indicating the server is busy.
65
  """
66
+ tried = set() # Set to track servers already tried to avoid repeated retries
67
 
68
  # Loop until all available servers have been tried without success
69
  while len(tried) < len(auth):
70
  # Get server setup info assigned for this session, including endpoint, token, and error code
71
  setup = get_host(session_id)
72
+ server = setup["jarvis"] # Server identifier string
73
+ host = setup["endpoint"] # API endpoint URL string
74
+ token = setup["token"] # Authorization token string
75
+ error = setup["error"] # HTTP error code integer which triggers retry
76
+ tried.add(server) # Mark this server as tried to prevent retrying immediately
77
 
78
+ # Format current date/time string for system instructions, e.g., "Tuesday, June 24, 2025, 08:13 PM "
79
  date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z")
80
 
81
+ # Combine mode instructions, usage restrictions, and date into a single system instructions string
82
  instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n"
83
 
84
+ # Copy conversation history to avoid mutating the original list outside this function
85
  messages = history.copy()
86
+
87
+ # Insert system instructions as the first message in the conversation history
88
  messages.insert(0, {"role": "system", "content": instructions})
89
 
90
+ # Prepare user message dictionary, include files if provided
91
  msg = {"role": "user", "content": user_message}
92
  if files:
93
  msg["files"] = files
94
+ messages.append(msg) # Append user message to the conversation messages list
95
 
96
+ # Prepare HTTP headers with authorization and randomized client IP for X-Forwarded-For
97
  headers = {
98
+ "Authorization": f"Bearer {token}", # Bearer token for API access authentication
99
+ "Content-Type": "application/json", # Content type set to JSON for request body
100
+ "X-Forwarded-For": generate_ip(), # Random IP to simulate different client origins for load balancing or logging
101
  }
102
 
103
+ # Prepare JSON payload with model parameters and conversation messages to send in POST request
104
  payload = {
105
  "model": model,
106
  "messages": messages,
107
+ "stream": True, # Enable streaming response
108
  "temperature": temperature,
109
  "top_k": top_k,
110
  "min_p": min_p,
 
113
  }
114
 
115
  # Initialize accumulators and flags for streamed response parts
116
+ reasoning = "" # String accumulator for reasoning text from the AI
117
+ reasoning_check = None # Flag to detect presence of reasoning in response; None means not checked yet
118
  reasoning_done = False # Flag marking reasoning completion
119
+ content = "" # String accumulator for main content text from the AI
120
 
121
  try:
122
+ # Use httpx AsyncClient with no timeout to allow long streaming connections
123
+ async with httpx.AsyncClient(timeout=None, http2=True) as client:
124
+ # Open async streaming POST request to Jarvis server endpoint with headers and JSON payload
125
  async with client.stream("POST", host, headers=headers, json=payload) as response:
126
+ # Iterate asynchronously over each line of streaming response as it arrives
127
  async for chunk in response.aiter_lines():
128
+ # Skip lines that do not start with "data:" prefix as per server-sent events (SSE) format
129
  if not chunk.strip().startswith("data:"):
130
  continue
131
  try:
132
+ # Parse JSON data after "data:" prefix which contains incremental response delta
133
  data = json.loads(chunk[5:])
134
+ # Extract incremental delta message from first choice in response
135
  choice = data["choices"][0]["delta"]
136
 
137
  # On first delta received, detect if 'reasoning' field is present and non-empty
 
139
  # Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None
140
  reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None
141
 
142
+ # If reasoning is present and mode is not '/no_think' and reasoning not done yet
143
  if (
144
  reasoning_check == "" # Reasoning detected in response
145
  and mode != "/no_think" # Mode allows reasoning output
 
150
  reasoning += choice["reasoning"] # Append incremental reasoning text
151
  # Yield reasoning wrapped in styled HTML block with details expanded
152
  yield styles(reasoning=reasoning, content="", expanded=True)
153
+ continue # Continue streaming reasoning increments without processing content yet
154
 
155
  # When reasoning ends and content starts, mark reasoning done, yield empty string, then content
156
  if (
 
161
  and choice["content"] # Content is not empty
162
  ):
163
  reasoning_done = True # Mark reasoning phase complete
164
+ yield "" # Yield empty string to signal end of reasoning block to the consumer
165
  content += choice["content"] # Start accumulating content text
166
+ yield content # Yield first part of content to the consumer
167
  continue # Continue streaming content increments
168
 
169
  # If no reasoning present or reasoning done, accumulate content and yield incrementally
 
173
  and choice["content"] # Content is not empty
174
  ):
175
  content += choice["content"] # Append incremental content text
176
+ yield content # Yield updated content string to the consumer
177
  except Exception:
178
  # Ignore exceptions during JSON parsing or key access and continue streaming
179
  continue
180
  return # Exit function after successful streaming completion
181
+
182
  except httpx.HTTPStatusError as e:
183
  # If server returns specific error code indicating busy, retry with another server
184
  if e.response.status_code == error:
185
+ # Continue to next iteration to try a different server
186
+ continue
187
  else:
188
+ # For other HTTP errors, mark this server as busy/unavailable
189
  mark(server)
190
  except Exception:
191
+ # For other exceptions (network errors, timeouts), mark server as busy/unavailable
192
+ mark(server)
193
+
194
+ # If httpx fails or server is busy, fallback to aiohttp for robustness and compatibility
195
+ try:
196
+ # Create aiohttp client session with no timeout for streaming
197
+ async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
198
+ # Open async streaming POST request to Jarvis server endpoint with headers and JSON payload
199
+ async with session.post(host, headers=headers, json=payload) as resp:
200
+ # Raise for status to catch HTTP errors
201
+ resp.raise_for_status()
202
+ # Iterate asynchronously over each line of streaming response as it arrives
203
+ async for line_bytes in resp.content:
204
+ # Decode bytes to string and strip whitespace
205
+ line = line_bytes.decode("utf-8").strip()
206
+ # Skip lines that do not start with "data:" prefix as per SSE format
207
+ if not line.startswith("data:"):
208
+ continue
209
+ try:
210
+ # Parse JSON data after "data:" prefix which contains incremental response delta
211
+ data = json.loads(line[5:])
212
+ # Extract incremental delta message from first choice in response
213
+ choice = data["choices"][0]["delta"]
214
+
215
+ # On first delta received, detect if 'reasoning' field is present and non-empty
216
+ if reasoning_check is None:
217
+ reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None
218
+
219
+ # If reasoning is present and mode is not '/no_think' and reasoning not done yet
220
+ if (
221
+ reasoning_check == ""
222
+ and mode != "/no_think"
223
+ and not reasoning_done
224
+ and "reasoning" in choice
225
+ and choice["reasoning"]
226
+ ):
227
+ reasoning += choice["reasoning"]
228
+ yield styles(reasoning=reasoning, content="", expanded=True)
229
+ continue
230
+
231
+ # When reasoning ends and content starts, mark reasoning done, yield empty string, then content
232
+ if (
233
+ reasoning_check == ""
234
+ and mode != "/no_think"
235
+ and not reasoning_done
236
+ and "content" in choice
237
+ and choice["content"]
238
+ ):
239
+ reasoning_done = True
240
+ yield ""
241
+ content += choice["content"]
242
+ yield content
243
+ continue
244
+
245
+ # If no reasoning present or reasoning done, accumulate content and yield incrementally
246
+ if (
247
+ (reasoning_check is None or reasoning_done or mode == "/no_think")
248
+ and "content" in choice
249
+ and choice["content"]
250
+ ):
251
+ content += choice["content"]
252
+ yield content
253
+ except Exception:
254
+ # Ignore exceptions during JSON parsing or key access and continue streaming
255
+ continue
256
+ return # Exit function after successful streaming completion with aiohttp
257
+
258
+ except aiohttp.ClientResponseError as e:
259
+ # If server returns specific error code indicating busy, retry with another server
260
+ if e.status == error:
261
+ continue # Try next available server
262
+ else:
263
+ mark(server) # Mark server as busy/unavailable for other HTTP errors
264
+ except Exception:
265
+ # For other exceptions (network errors, timeouts), mark server as busy/unavailable
266
  mark(server)
267
 
268
+ # If all servers tried and none succeeded, yield a message indicating server busy status
269
  yield "The server is currently busy. Please wait a moment or try again later."
270
  return # End of function
src/tools/audio.py CHANGED
@@ -3,66 +3,126 @@
3
  # SPDX-License-Identifier: Apache-2.0
4
  #
5
 
6
- import asyncio # Import asyncio to enable asynchronous waiting and retries
7
- import httpx # Import the httpx library to perform asynchronous HTTP requests efficiently
8
- from urllib.parse import quote # Import the quote function to safely encode strings for use in URLs
 
9
  from src.utils.ip_generator import generate_ip # Import a custom utility function to generate random IP addresses
10
- from config import auth # Import authentication configuration or credentials from the config module
11
- from src.utils.tools import initialize_tools # Import a utility function to initialize and retrieve tool endpoints or resources
12
 
13
  # Define a class named AudioGeneration to encapsulate functionalities related to generating audio content
14
  class AudioGeneration:
15
  # This class provides methods to create audio files based on text instructions and voice parameters
16
 
17
- @staticmethod # Decorator indicating that the following method does not depend on instance state and can be called on the class itself
18
- # Define an asynchronous method to create audio from a text instruction, optionally specifying a voice style
 
 
 
 
19
  async def create_audio(generate_audio_instruction: str, voice: str = "echo") -> str:
20
  """
21
- Generate an audio file URL by sending a request to an audio generation service.
22
- This method will keep retrying until a successful response with status code 200 and audio content is received.
23
 
24
  Args:
25
- generate_audio_instruction (str): The textual instruction or content to convert into audio.
26
  voice (str, optional): The voice style or effect to apply on the generated audio. Defaults to "echo".
27
 
28
  Returns:
29
- str: The URL to the generated audio file if successful.
30
 
31
  Raises:
32
- Exception: If the audio generation continuously fails after retries (optional, currently infinite retry).
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
33
  """
34
- # Encode the text instruction to make it safe for inclusion in a URL path segment
 
 
 
 
 
 
 
 
 
 
35
  generate_audio_instruct = quote(generate_audio_instruction)
36
 
37
- # Initialize tools and retrieve the audio generation service endpoint from the returned tuple
38
  _, _, audio_tool = initialize_tools()
39
 
40
- # Construct the full URL by appending the encoded instruction to the audio tool's base URL
41
  url = f"{audio_tool}/{generate_audio_instruct}"
42
 
43
- # Define query parameters for the HTTP request specifying the model and voice to use for audio generation
44
  params = {
45
- "model": "openai-audio", # Specify the audio generation model to be used by the service
46
- "voice": voice # Specify the desired voice style or effect
47
  }
48
 
49
- # Create an asynchronous HTTP client with no timeout limit to perform the request
50
  async with httpx.AsyncClient(timeout=None) as client:
51
- # Enter an infinite loop to keep retrying the request until success criteria are met
52
  while True:
53
- # Define HTTP headers for the request, including random IP address to simulate different client origins
54
  headers = {
55
  "X-Forwarded-For": generate_ip() # Generate and set a random IP address for the request header
56
  }
57
 
58
- # Send a GET request to the audio generation service with specified URL, parameters, and headers
59
- resp = await client.get(url, params=params, headers=headers)
60
-
61
- # Check if the response status code indicates success and the content type is an audio MPEG stream
62
- if resp.status_code == 200 and 'audio/mpeg' in resp.headers.get('Content-Type', ''):
63
- # Return the final URL of the generated audio resource as a string
64
- return str(resp.url)
65
- else:
66
- # If the response is not successful, wait for a short delay before retrying to avoid hammering the server
67
- await asyncio.sleep(15) # Pause for 15 second before retrying the request
68
- # The loop will continue and try again without closing the connection prematurely
 
 
 
 
3
  # SPDX-License-Identifier: Apache-2.0
4
  #
5
 
6
+ import asyncio # Import asyncio to enable asynchronous programming, including async/await syntax and event loop management
7
+ import httpx # Import httpx library to perform asynchronous HTTP requests with advanced features like connection pooling and timeout control
8
+ import aiohttp # Import aiohttp library to provide an alternative asynchronous HTTP client for flexible request handling
9
+ from urllib.parse import quote # Import quote function to safely encode strings for URL usage, escaping special characters
10
  from src.utils.ip_generator import generate_ip # Import a custom utility function to generate random IP addresses
11
+ from config import auth # Import authentication credentials or configuration from the config module (not used directly here but imported for completeness)
12
+ from src.utils.tools import initialize_tools # Import utility function to initialize and retrieve service endpoints or tool URLs
13
 
14
  # Define a class named AudioGeneration to encapsulate functionalities related to generating audio content
15
  class AudioGeneration:
16
  # This class provides methods to create audio files based on text instructions and voice parameters
17
 
18
+ """
19
+ This class provides methods to generate audio files from text instructions asynchronously.
20
+ It supports retrying requests until successful audio generation is confirmed.
21
+ """
22
+
23
+ @staticmethod # This method does not depend on class instance state and can be called directly on the class
24
  async def create_audio(generate_audio_instruction: str, voice: str = "echo") -> str:
25
  """
26
+ Asynchronously generate an audio file URL by sending a request to an audio generation service.
27
+ The method continuously retries until it receives a successful HTTP 200 response with audio content.
28
 
29
  Args:
30
+ generate_audio_instruction (str): The text instruction or content to convert into audio.
31
  voice (str, optional): The voice style or effect to apply on the generated audio. Defaults to "echo".
32
 
33
  Returns:
34
+ str: The URL of the generated audio file upon successful retrieval.
35
 
36
  Raises:
37
+ Exception: Currently, the method retries indefinitely and does not raise exceptions on failure.
38
+ """
39
+ # Encode the text instruction to make it safe for URL path inclusion by escaping special characters
40
+ generate_audio_instruct = quote(generate_audio_instruction)
41
+
42
+ # Initialize tools and extract the audio generation service endpoint (third element in the returned tuple)
43
+ _, _, audio_tool = initialize_tools()
44
+
45
+ # Construct the full URL by appending the encoded instruction to the base audio tool URL
46
+ url = f"{audio_tool}/{generate_audio_instruct}"
47
+
48
+ # Define query parameters specifying the audio generation model and voice effect
49
+ params = {
50
+ "model": "openai-audio", # Specify the model used by the audio generation service
51
+ "voice": voice # Specify the voice style or effect for the generated audio
52
+ }
53
+
54
+ # Create an aiohttp asynchronous HTTP client session with no timeout to allow long-running requests
55
+ async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
56
+ # Enter an infinite loop to retry the request until success criteria are met
57
+ while True:
58
+ # Generate a random IP address to spoof the client's origin in the request headers
59
+ headers = {
60
+ "X-Forwarded-For": generate_ip() # Set the X-Forwarded-For header to a random IP address
61
+ }
62
+
63
+ try:
64
+ # Perform an asynchronous GET request to the audio generation service with URL, parameters, and headers
65
+ async with session.get(url, params=params, headers=headers) as resp:
66
+ # Check if the response status code is 200 (OK) and content type indicates MPEG audio stream
67
+ content_type = resp.headers.get('Content-Type', '')
68
+ if resp.status == 200 and 'audio/mpeg' in content_type:
69
+ # Return the final URL of the generated audio resource as a string
70
+ return str(resp.url)
71
+ else:
72
+ # If the response is not successful or content type is unexpected, wait before retrying
73
+ await asyncio.sleep(15) # Pause for 15 seconds to avoid overwhelming the server
74
+ except aiohttp.ClientError:
75
+ # Catch network-related errors such as connection issues and wait before retrying
76
+ await asyncio.sleep(15) # Pause for 15 seconds before retrying after an exception
77
+
78
+ @staticmethod # Provide an alternative implementation using httpx for flexibility or fallback
79
+ async def create_audio_httpx(generate_audio_instruction: str, voice: str = "echo") -> str:
80
  """
81
+ Alternative asynchronous method to generate audio using httpx client.
82
+ This method also retries indefinitely until a successful response with audio content is received.
83
+
84
+ Args:
85
+ generate_audio_instruction (str): The text instruction to convert into audio.
86
+ voice (str, optional): Voice style or effect. Defaults to "echo".
87
+
88
+ Returns:
89
+ str: URL of the generated audio file.
90
+ """
91
+ # Encode instruction for safe URL usage
92
  generate_audio_instruct = quote(generate_audio_instruction)
93
 
94
+ # Initialize tools and get audio generation endpoint
95
  _, _, audio_tool = initialize_tools()
96
 
97
+ # Construct request URL
98
  url = f"{audio_tool}/{generate_audio_instruct}"
99
 
100
+ # Define query parameters for the request
101
  params = {
102
+ "model": "openai-audio",
103
+ "voice": voice
104
  }
105
 
106
+ # Create an asynchronous HTTP client with no timeout limit
107
  async with httpx.AsyncClient(timeout=None) as client:
108
+ # Retry loop until success
109
  while True:
110
+ # Define HTTP headers for the request, including random IP address to simulate different client origins
111
  headers = {
112
  "X-Forwarded-For": generate_ip() # Generate and set a random IP address for the request header
113
  }
114
 
115
+ try:
116
+ # Send GET request asynchronously
117
+ resp = await client.get(url, params=params, headers=headers)
118
+
119
+ # Check for successful response with audio content type
120
+ if resp.status_code == 200 and 'audio/mpeg' in resp.headers.get('Content-Type', ''):
121
+ # Return the URL of generated audio
122
+ return str(resp.url)
123
+ else:
124
+ # Wait before retrying on failure
125
+ await asyncio.sleep(15)
126
+ except httpx.RequestError:
127
+ # Handle network errors and wait before retrying
128
+ await asyncio.sleep(15)
src/tools/deep_search.py CHANGED
@@ -3,8 +3,9 @@
3
  # SPDX-License-Identifier: Apache-2.0
4
  #
5
 
6
- import aiohttp # Import the aiohttp library to perform asynchronous HTTP requests
7
- import asyncio # Import asyncio library to handle asynchronous operations and implement delay mechanisms
 
8
  from src.utils.ip_generator import generate_ip # Import function to generate random IP addresses for request headers
9
 
10
  # Define the main SearchTools class that provides web searching and URL reading capabilities
@@ -12,124 +13,132 @@ class SearchTools:
12
  # This class provides methods to connect to the web
13
 
14
  """
15
- A comprehensive class providing tools to perform web searches and read content from URLs using various search engines
16
- and a reader API service. This class implements full asynchronous operations with robust retry mechanisms to ensure
17
- connections remain active even when encountering errors.
18
 
19
  Attributes:
20
- searxng_url (str): Base URL for the SearXNG search proxy service that handles Google and other search engines.
21
- baidu_url (str): Base URL for Baidu search engine for Chinese language searches.
22
  timeout (int): Timeout duration in seconds for HTTP requests to prevent indefinite hanging.
23
  reader_api (str): Base URL for the reader API service used to extract clean content from URLs.
24
 
25
  Methods:
26
- read_url(url): Asynchronously reads and returns the textual content of the specified URL using the reader API.
27
  search(query, engine): Asynchronously performs a web search with the given query on the specified search engine,
28
  returning the raw HTML response text.
29
  """
30
 
31
- # Constructor method to initialize the SearchTools instance with all necessary configuration values
32
  def __init__(self):
33
  """
34
- Initialize the SearchTools instance with predefined URLs and timeout settings.
35
- This method sets up all the base URLs and configuration parameters needed for web searching and content reading.
36
  """
37
- # Set the base URL for SearXNG search proxy service which provides access to multiple search engines
38
- self.searxng_url = "https://paulgo.io/search"
39
- # Set the base URL for Baidu search engine for handling Chinese language queries
40
- self.baidu_url = "https://www.baidu.com/s"
41
- # Set timeout duration to 30 seconds to balance between allowing slow responses and preventing infinite waits
42
- self.timeout = 30
43
- # Set the reader API endpoint that converts web pages into clean, readable text format
44
- self.reader_api = "https://r.jina.ai/"
45
-
46
- # Private helper method that implements the core retry logic for all HTTP requests
47
- async def _fetch_with_retry(self, session, method, url, **kwargs):
48
  """
49
- Helper method to perform HTTP requests with infinite retry until a valid response is obtained.
50
- This method ensures that connections never fail permanently and will keep trying until success.
51
 
52
  Args:
53
- session (aiohttp.ClientSession): The aiohttp session object to use for making HTTP requests.
54
- method (str): HTTP method to use for the request (e.g., 'get', 'post', 'put', 'delete').
55
- url (str): The complete URL to send the request to.
56
- **kwargs: Additional keyword arguments to pass to the aiohttp request method (headers, data, etc.).
57
 
58
  Returns:
59
- str: The response text content when a successful request is finally achieved.
60
  """
61
- # Create an infinite loop that will only break when a successful response is received
62
- while True:
63
- # Use a try-except block to catch any type of exception that might occur during the request
64
  try:
65
- # Make the actual HTTP request using the provided session, method, URL and additional arguments
66
  async with session.request(method, url, **kwargs) as response:
67
- # Check if the response status indicates success, raise exception if it's an error status
68
  response.raise_for_status()
69
- # Return the text content of the successful response
70
  return await response.text()
71
- # Catch any exception that occurs during the request process
72
- except Exception:
73
- # Retry on any exception without stopping the loop or raising the error
74
- # Wait for 5 second before attempting the next retry to avoid overwhelming the server
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
75
  await asyncio.sleep(5)
76
 
77
- # Public method to read and extract content from any given URL
78
  async def read_url(self, url: str) -> str:
79
  """
80
- Asynchronously read and retrieve the textual content of a given URL using the reader API with infinite retry.
81
- This method will keep trying until it successfully retrieves the content from the specified URL.
82
 
83
  Args:
84
- url (str): The complete URL of the webpage to read content from.
85
 
86
  Returns:
87
- str: The clean textual content extracted from the URL by the reader API service.
88
  """
89
- # Prepare the POST data payload containing the target URL for the reader API
90
  data = {"url": url}
91
- # Create an aiohttp client session with the configured timeout settings
92
  async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
93
- # Use the retry helper method to POST the URL to the reader API and get the content
94
- return await self._fetch_with_retry(session, 'post', self.reader_api, data=data)
95
 
96
- # Public method to perform web searches using different search engines
97
  async def search(self, query: str, engine: str = "google") -> str:
98
  """
99
- Asynchronously perform a web search for the given query using the specified search engine with infinite retry.
100
- This method will keep trying until it successfully retrieves search results from the chosen search engine.
101
 
102
  Args:
103
- query (str): The search query string containing the terms to search for.
104
- engine (str, optional): The search engine to use for the search. Supported values are "google" and "baidu".
105
- Defaults to "google" if not specified.
106
 
107
  Returns:
108
- str: The raw HTML content of the search results page from the specified search engine.
109
  """
110
- # Check if the user wants to use Baidu search engine for the query
111
  if engine == "baidu":
112
- # Construct the full URL by combining reader API, Baidu URL and the search query parameter
113
  url = f"{self.reader_api}{self.baidu_url}?wd={query}"
114
- # Set HTTP headers specific to Baidu search results extraction
115
  headers = {
116
- # Target the main content container where Baidu displays search results
117
- "X-Target-Selector": "#content_left",
118
  "X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins
119
  }
120
- # Handle all other search engines (Google, Bing, etc.) through SearXNG proxy
121
  else:
122
- # Determine the search prefix based on the requested engine (Google or Bing)
123
  prefix = "!go" if engine == "google" else "!bi"
124
- # Construct the full URL by combining reader API, SearXNG URL, prefix and query
125
  url = f"{self.reader_api}{self.searxng_url}?q={prefix} {query}"
126
- # Set HTTP headers specific to SearXNG search results extraction
127
  headers = {
128
- # Target the URLs container where SearXNG displays search result links
129
- "X-Target-Selector": "#urls",
130
  "X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins
131
  }
132
- # Create an aiohttp client session with the configured timeout settings
133
- async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
134
- # Use the retry helper method to GET the search results and return the HTML content
135
- return await self._fetch_with_retry(session, 'get', url, headers=headers)
 
 
3
  # SPDX-License-Identifier: Apache-2.0
4
  #
5
 
6
+ import asyncio # Import asyncio for managing asynchronous operations and concurrency
7
+ import aiohttp # Import aiohttp to perform asynchronous HTTP requests efficiently
8
+ import httpx # Import httpx library for asynchronous HTTP requests as an alternative client
9
  from src.utils.ip_generator import generate_ip # Import function to generate random IP addresses for request headers
10
 
11
  # Define the main SearchTools class that provides web searching and URL reading capabilities
 
13
  # This class provides methods to connect to the web
14
 
15
  """
16
+ A robust asynchronous class that provides tools to perform web searches and read content from URLs.
17
+ This implementation uses both aiohttp and httpx libraries to demonstrate usage of multiple async HTTP clients.
18
+ It includes infinite retry mechanisms to ensure reliability when network errors or timeouts occur.
19
 
20
  Attributes:
21
+ searxng_url (str): Base URL for the SearXNG search proxy service for Google and Bing searches.
22
+ baidu_url (str): Base URL for Baidu search engine for Chinese language queries.
23
  timeout (int): Timeout duration in seconds for HTTP requests to prevent indefinite hanging.
24
  reader_api (str): Base URL for the reader API service used to extract clean content from URLs.
25
 
26
  Methods:
27
+ read_url(url): Asynchronously reads and returns textual content of the specified URL using the reader API.
28
  search(query, engine): Asynchronously performs a web search with the given query on the specified search engine,
29
  returning the raw HTML response text.
30
  """
31
 
 
32
  def __init__(self):
33
  """
34
+ Initialize the SearchTools instance with all necessary URLs and timeout settings.
35
+ This sets up the base URLs for search engines and reader API, along with a default timeout.
36
  """
37
+ self.searxng_url = "https://paulgo.io/search" # SearXNG proxy URL for Google/Bing searches
38
+ self.baidu_url = "https://www.baidu.com/s" # Baidu search engine base URL
39
+ self.timeout = 30 # Timeout for HTTP requests in seconds
40
+ self.reader_api = "https://r.jina.ai/" # Reader API endpoint to extract clean text from web pages
41
+
42
+ async def _fetch_with_retry_aiohttp(self, session: aiohttp.ClientSession, method: str, url: str, **kwargs) -> str:
 
 
 
 
 
43
  """
44
+ Internal helper method to perform HTTP requests using aiohttp with infinite retry until success.
 
45
 
46
  Args:
47
+ session (aiohttp.ClientSession): aiohttp session object for making requests.
48
+ method (str): HTTP method to use ('get', 'post', etc.).
49
+ url (str): The full URL to send the request to.
50
+ **kwargs: Additional parameters passed to session.request (headers, data, etc.).
51
 
52
  Returns:
53
+ str: The response text content upon successful request.
54
  """
55
+ while True: # Loop indefinitely until a successful response is received
 
 
56
  try:
57
+ # Perform the HTTP request asynchronously using aiohttp session
58
  async with session.request(method, url, **kwargs) as response:
59
+ # Raise exception if HTTP status indicates an error (4xx or 5xx)
60
  response.raise_for_status()
61
+ # Return the response body as text
62
  return await response.text()
63
+ except Exception as e:
64
+ # On any exception (network error, timeout, HTTP error), wait 5 seconds before retrying
65
+ await asyncio.sleep(5)
66
+
67
+ async def _fetch_with_retry_httpx(self, client: httpx.AsyncClient, method: str, url: str, **kwargs) -> str:
68
+ """
69
+ Internal helper method to perform HTTP requests using httpx with infinite retry until success.
70
+
71
+ Args:
72
+ client (httpx.AsyncClient): httpx asynchronous client instance.
73
+ method (str): HTTP method to use ('get', 'post', etc.).
74
+ url (str): The full URL to send the request to.
75
+ **kwargs: Additional parameters passed to client.request (headers, data, etc.).
76
+
77
+ Returns:
78
+ str: The response text content upon successful request.
79
+ """
80
+ while True: # Loop indefinitely until a successful response is received
81
+ try:
82
+ # Perform the HTTP request asynchronously using httpx client
83
+ response = await client.request(method, url, **kwargs)
84
+ # Raise exception if HTTP status indicates an error (4xx or 5xx)
85
+ response.raise_for_status()
86
+ # Return the response body as text
87
+ return response.text
88
+ except Exception as e:
89
+ # On any exception (network error, timeout, HTTP error), wait 5 seconds before retrying
90
  await asyncio.sleep(5)
91
 
 
92
  async def read_url(self, url: str) -> str:
93
  """
94
+ Asynchronously read and extract textual content from the specified URL using the reader API.
95
+ This method uses aiohttp client with infinite retry to ensure reliable content retrieval.
96
 
97
  Args:
98
+ url (str): The full URL of the webpage to extract content from.
99
 
100
  Returns:
101
+ str: The clean textual content extracted by the reader API.
102
  """
103
+ # Prepare POST data payload containing the target URL for the reader API
104
  data = {"url": url}
105
+ # Create aiohttp client session with a total timeout configured
106
  async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=self.timeout)) as session:
107
+ # Use the internal retry helper to POST the URL to the reader API and return the extracted content
108
+ return await self._fetch_with_retry_aiohttp(session, 'post', self.reader_api, data=data)
109
 
 
110
  async def search(self, query: str, engine: str = "google") -> str:
111
  """
112
+ Asynchronously perform a web search for the given query using the specified search engine.
113
+ This method uses httpx client with infinite retry to reliably fetch search results.
114
 
115
  Args:
116
+ query (str): The search query string.
117
+ engine (str, optional): The search engine to use ("google" or "baidu"). Defaults to "google".
 
118
 
119
  Returns:
120
+ str: The raw HTML content of the search results page.
121
  """
 
122
  if engine == "baidu":
123
+ # Construct the full URL for Baidu search with the query parameter 'wd'
124
  url = f"{self.reader_api}{self.baidu_url}?wd={query}"
125
+ # Set HTTP headers to target Baidu's main content container and spoof IP address
126
  headers = {
127
+ "X-Target-Selector": "#content_left", # CSS selector for Baidu search results container
 
128
  "X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins
129
  }
 
130
  else:
131
+ # Determine prefix for SearXNG proxy based on engine: '!go' for Google, '!bi' for Bing
132
  prefix = "!go" if engine == "google" else "!bi"
133
+ # Construct the full URL for SearXNG search proxy with query and prefix
134
  url = f"{self.reader_api}{self.searxng_url}?q={prefix} {query}"
135
+ # Set HTTP headers to target SearXNG search results container and spoof IP address
136
  headers = {
137
+ "X-Target-Selector": "#urls", # CSS selector for SearXNG search results container
 
138
  "X-Forwarded-For": generate_ip() # Random IP address to simulate different client origins
139
  }
140
+
141
+ # Create httpx asynchronous client with timeout configured
142
+ async with httpx.AsyncClient(timeout=self.timeout) as client:
143
+ # Use the internal retry helper to GET the search results and return the HTML content
144
+ return await self._fetch_with_retry_httpx(client, 'get', url, headers=headers)
src/tools/image.py CHANGED
@@ -3,114 +3,127 @@
3
  # SPDX-License-Identifier: Apache-2.0
4
  #
5
 
6
- import httpx # Import httpx library for performing asynchronous HTTP requests efficiently
7
- from urllib.parse import quote # Import quote function to safely encode strings for use in URLs
8
- from typing import Optional # Import Optional type hint for parameters that can be None
 
 
9
  from src.utils.ip_generator import generate_ip # Import custom utility to generate random IP addresses for request headers
10
- from src.utils.tools import initialize_tools # Import utility function to initialize and retrieve tool endpoints
11
 
12
  # Define a class named ImageGeneration to encapsulate functionalities related to generating image content
13
  class ImageGeneration:
14
  # This class provides methods to create image files based on text instructions
15
 
16
  """
17
- A class to handle image generation requests to an external image generation service.
18
 
19
  Attributes:
20
- FORMATS (dict): A dictionary mapping image format names to their (width, height) dimensions.
21
 
22
  Methods:
23
- create_image: Asynchronously generates an image based on a textual instruction and parameters,
24
- returning the URL of the generated image.
25
  """
26
 
27
- # Image formats
28
  FORMATS = {
29
- "default": (1024, 1024), # Default square image size (width x height)
30
- "square": (1024, 1024), # Square image format with equal width and height
31
- "landscape": (1024, 768), # Landscape format with wider width than height
32
- "landscape_large": (1440, 1024), # Larger landscape format with increased resolution
33
- "portrait": (768, 1024), # Portrait format with taller height than width
34
- "portrait_large": (1024, 1440), # Larger portrait format with increased resolution
35
  }
36
 
37
- @staticmethod # Decorator indicating that the following method does not depend on instance state and can be called on the class itself
38
- # Define an asynchronous method to create image from a text instruction
39
  async def create_image(
40
- generate_image_instruction: str, # Text instruction describing the image to generate
41
- image_format: str = "default", # Desired image format key from FORMATS dictionary
42
- model: Optional[str] = "flux-realism", # Optional model name for image generation; defaults to 'flux-realism'
43
- seed: Optional[int] = None, # Optional seed value for randomization control in image generation
44
- nologo: bool = True, # Whether to generate image without logo watermark; defaults to True
45
- private: bool = True, # Whether the generated image should be private; defaults to True
46
- enhance: bool = True, # Whether to apply enhancement filters to the generated image; defaults to True
47
  ) -> str:
48
  """
49
- Asynchronously generate an image URL by sending a request to the image generation service.
50
- This method will keep retrying until a successful response with status code 200 is received.
51
 
52
  Args:
53
- generate_image_instruction (str): The textual instruction or description for the desired image.
54
- image_format (str, optional): The format key specifying image dimensions. Defaults to "default".
55
- model (Optional[str], optional): The image generation model to use. Defaults to "flux-realism".
56
- seed (Optional[int], optional): Seed for randomization to reproduce images. Defaults to None.
57
- nologo (bool, optional): Flag to exclude logo watermark. Defaults to True.
58
- private (bool, optional): Flag to mark image as private. Defaults to True.
59
- enhance (bool, optional): Flag to apply image enhancement. Defaults to True.
60
 
61
  Returns:
62
- str: The URL of the generated image if the request is successful.
63
 
64
  Raises:
65
- ValueError: If the specified image_format is not supported.
66
- Exception: If the image generation continuously fails (currently infinite retry).
67
  """
68
- # Validate that the requested image format exists in the FORMATS dictionary
69
  if image_format not in ImageGeneration.FORMATS:
70
  raise ValueError("Invalid image format.")
71
 
72
- # Retrieve width and height based on the requested image format
73
  width, height = ImageGeneration.FORMATS[image_format]
74
 
75
- # Initialize tools and retrieve the image generation service endpoint
76
  _, image_tool, _ = initialize_tools()
77
 
78
- # Encode the image instruction to safely include it in the URL path
79
  generate_image_instruct = quote(generate_image_instruction)
80
 
81
- # Construct the full URL for the image generation request by appending the encoded instruction
82
- url = f"{image_tool}{generate_image_instruct}" # Full endpoint URL for image generation
83
 
84
- # Prepare query parameters including image dimensions, model, and flags converted to string "true"/"false"
85
  params = {
86
- "width": width, # Image width parameter
87
- "height": height, # Image height parameter
88
- "model": model, # Model name for image generation
89
- "nologo": "true" if nologo else "false", # Flag to exclude logo watermark as string
90
- "private": "true" if private else "false", # Flag to mark image as private as string
91
- "enhance": "true" if enhance else "false" # Flag to apply enhancement as string
92
  }
93
 
94
- # Include seed parameter if provided to control randomness in image generation
95
  if seed is not None:
96
- params["seed"] = seed # Add seed to parameters to reproduce images
97
 
98
- # Prepare HTTP headers with a generated random IP to simulate different client origins
99
  headers = {
100
  "X-Forwarded-For": generate_ip() # Random IP address for request header to simulate client origin
101
  }
102
 
103
- # Create an asynchronous HTTP client with no timeout limit to perform the request
104
  async with httpx.AsyncClient(timeout=None) as client:
105
- # Keep retrying the request until a successful response with status 200 is received
106
  while True:
107
- # Send a GET request to the image generation service with URL, parameters, and headers
108
- resp = await client.get(url, params=params, headers=headers)
109
-
110
- # Check if the response status code indicates success
111
- if resp.status_code == 200:
112
- # Return the URL of the generated image as a string
113
- return str(resp.url)
114
- else:
115
- # Wait briefly before retrying to avoid overwhelming the server
116
- await asyncio.sleep(15) # Pause 15 second before retrying
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
3
  # SPDX-License-Identifier: Apache-2.0
4
  #
5
 
6
+ import asyncio # Import asyncio for asynchronous programming and managing event loops
7
+ import httpx # Import httpx for async HTTP requests with HTTP/1.1 and HTTP/2 support
8
+ import aiohttp # Import aiohttp for alternative async HTTP client capabilities
9
+ from urllib.parse import quote # Import quote to safely encode URL path components
10
+ from typing import Optional # Import Optional for type hinting parameters that may be None
11
  from src.utils.ip_generator import generate_ip # Import custom utility to generate random IP addresses for request headers
12
+ from src.utils.tools import initialize_tools # Import utility to initialize and get tool endpoints
13
 
14
  # Define a class named ImageGeneration to encapsulate functionalities related to generating image content
15
  class ImageGeneration:
16
  # This class provides methods to create image files based on text instructions
17
 
18
  """
19
+ Class to handle asynchronous image generation requests to an external service.
20
 
21
  Attributes:
22
+ FORMATS (dict): Maps image format names to (width, height) tuples.
23
 
24
  Methods:
25
+ create_image: Async method to generate an image URL from a text prompt,
26
+ retrying until successful, using httpx and aiohttp.
27
  """
28
 
29
+ # Supported image formats with their dimensions (width, height)
30
  FORMATS = {
31
+ "default": (1024, 1024),
32
+ "square": (1024, 1024),
33
+ "landscape": (1024, 768),
34
+ "landscape_large": (1440, 1024),
35
+ "portrait": (768, 1024),
36
+ "portrait_large": (1024, 1440),
37
  }
38
 
39
+ @staticmethod
 
40
  async def create_image(
41
+ generate_image_instruction: str, # Text description for the image to generate
42
+ image_format: str = "default", # Format key from FORMATS dict
43
+ model: Optional[str] = "flux-realism", # Model name for generation, default 'flux-realism'
44
+ seed: Optional[int] = None, # Optional seed for reproducible randomness
45
+ nologo: bool = True, # Whether to exclude logo watermark
46
+ private: bool = True, # Whether the image should be private
47
+ enhance: bool = True, # Whether to apply enhancement filters
48
  ) -> str:
49
  """
50
+ Asynchronously generate an image URL by sending requests to the image generation service.
51
+ Uses httpx for initial requests and aiohttp as fallback, retrying indefinitely until success.
52
 
53
  Args:
54
+ generate_image_instruction (str): Text prompt describing the desired image.
55
+ image_format (str): Key for image dimensions.
56
+ model (Optional[str]): Model to use for generation.
57
+ seed (Optional[int]): Seed for randomization control.
58
+ nologo (bool): Flag to exclude logo watermark.
59
+ private (bool): Flag to mark image as private.
60
+ enhance (bool): Flag to apply image enhancement.
61
 
62
  Returns:
63
+ str: URL of the generated image on success.
64
 
65
  Raises:
66
+ ValueError: If image_format is invalid.
 
67
  """
68
+ # Validate image format key
69
  if image_format not in ImageGeneration.FORMATS:
70
  raise ValueError("Invalid image format.")
71
 
72
+ # Extract width and height for the requested format
73
  width, height = ImageGeneration.FORMATS[image_format]
74
 
75
+ # Initialize tools and get image generation service endpoint URL
76
  _, image_tool, _ = initialize_tools()
77
 
78
+ # Encode instruction safely for URL path usage
79
  generate_image_instruct = quote(generate_image_instruction)
80
 
81
+ # Construct the full URL endpoint for image generation
82
+ url = f"{image_tool}{generate_image_instruct}"
83
 
84
+ # Prepare query parameters with image size, model, flags as strings
85
  params = {
86
+ "width": width,
87
+ "height": height,
88
+ "model": model,
89
+ "nologo": "true" if nologo else "false",
90
+ "private": "true" if private else "false",
91
+ "enhance": "true" if enhance else "false",
92
  }
93
 
94
+ # Add seed parameter if provided
95
  if seed is not None:
96
+ params["seed"] = seed
97
 
98
+ # Prepare headers
99
  headers = {
100
  "X-Forwarded-For": generate_ip() # Random IP address for request header to simulate client origin
101
  }
102
 
103
+ # Use httpx.AsyncClient with no timeout for initial requests
104
  async with httpx.AsyncClient(timeout=None) as client:
 
105
  while True:
106
+ try:
107
+ # Send GET request to the image generation endpoint
108
+ resp = await client.get(url, params=params, headers=headers)
109
+
110
+ # If response is successful, return the final URL
111
+ if resp.status_code == 200:
112
+ return str(resp.url)
113
+ except httpx.HTTPError:
114
+ # On httpx errors, fallback to aiohttp for robustness
115
+ pass
116
+
117
+ # Fallback retry with aiohttp client
118
+ async with aiohttp.ClientSession() as session:
119
+ try:
120
+ async with session.get(url, params=params, headers=headers) as resp:
121
+ if resp.status == 200:
122
+ # Return the final URL (aiohttp does not provide direct URL property)
123
+ return str(resp.url)
124
+ except aiohttp.ClientError:
125
+ # Ignore aiohttp errors and retry
126
+ pass
127
+
128
+ # Wait 15 seconds before retrying to avoid overwhelming the server
129
+ await asyncio.sleep(15)