Spaces:
Running
Running
File size: 16,686 Bytes
b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 e96f98e b5e7375 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 |
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import json # Module to parse and handle JSON data
import uuid # Module to generate unique identifiers (UUIDs)
from typing import List, Dict, Any # Type hinting for list, dict, and generic types
from datetime import datetime # To get and format current date and time
from config import * # Import all configuration variables, including 'auth' and 'restrictions'
from src.utils.session_mapping import get_host # Function to get server info by session ID
from src.utils.ip_generator import generate_ip # Function to generate random IP addresses for headers
from src.utils.helper import mark # Function to mark a server as busy/unavailable
from src.ui.reasoning import styles # Function to apply CSS styling to reasoning output
import asyncio # Asyncio for asynchronous programming
import httpx # Async HTTP client supporting HTTP/2 and streaming
import aiohttp # Async HTTP client for alternative HTTP requests and streaming
async def jarvis(
session_id: str, # Unique session identifier to maintain consistent server assignment
model: str, # AI model name specifying which model to use
history: List[Dict[str, str]], # List of previous conversation messages with roles and content
user_message: str, # Latest user input message to send to the AI model
mode: str, # Mode string to guide AI behavior, e.g., '/think' or '/no_think'
files=None, # Optional files or attachments to include with the user message
temperature: float = 0.6, # Sampling temperature controlling randomness in token generation
top_k: int = 20, # Limit token selection to top_k probable tokens
min_p: float = 0, # Minimum probability threshold for token selection
top_p: float = 0.95, # Nucleus sampling cumulative probability threshold
repetition_penalty: float = 1, # Penalty factor to reduce token repetition
):
"""
Asynchronously send a chat request to a Jarvis AI server and handle streaming response incrementally.
This function manages server selection based on the session ID, retries requests on specific error codes,
and yields incremental parts of the AI-generated response as they arrive. It integrates CSS styling into
the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed
first inside a styled HTML block, followed by the main content streamed normally.
The implementation uses both httpx (with HTTP/2 support) and aiohttp to ensure compatibility and robustness
in streaming responses.
Args:
session_id (str): Identifier for the user session to maintain consistent server assignment.
model (str): Name of the AI model to use for generating the response.
history (List[Dict[str, str]]): List of previous messages in the conversation.
user_message (str): The current message from the user to send to the AI model.
mode (str): Contextual instructions to guide the AI model's response style.
files (optional): Additional files or attachments to include with the user message.
temperature (float): Controls randomness in token generation.
top_k (int): Limits token selection to top_k probable tokens.
min_p (float): Minimum probability threshold for token selection.
top_p (float): Nucleus sampling cumulative probability threshold.
repetition_penalty (float): Factor to reduce token repetition.
Yields:
str: Incremental strings of AI-generated response streamed from the server.
Reasoning is wrapped in a styled HTML details block and streamed incrementally only if mode is not '/no_think'.
After reasoning finishes, the main content is streamed normally.
Notes:
The function attempts to send the request to a server assigned for the session.
If the server returns a specific error code indicating it is busy, it retries with another server.
If all servers are busy or fail, it yields a message indicating the server is busy.
"""
tried = set() # Set to track servers already tried to avoid repeated retries
# Loop until all available servers have been tried without success
while len(tried) < len(auth):
# Get server setup info assigned for this session, including endpoint, token, and error code
setup = get_host(session_id)
server = setup["jarvis"] # Server identifier string
host = setup["endpoint"] # API endpoint URL string
token = setup["token"] # Authorization token string
error = setup["error"] # HTTP error code integer which triggers retry
tried.add(server) # Mark this server as tried to prevent retrying immediately
# Format current date/time string for system instructions, e.g., "Tuesday, June 24, 2025, 08:13 PM "
date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z")
# Combine mode instructions, usage restrictions, and date into a single system instructions string
instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n"
# Copy conversation history to avoid mutating the original list outside this function
messages = history.copy()
# Insert system instructions as the first message in the conversation history
messages.insert(0, {"role": "system", "content": instructions})
# Prepare user message dictionary, include files if provided
msg = {"role": "user", "content": user_message}
if files:
msg["files"] = files
messages.append(msg) # Append user message to the conversation messages list
# Prepare HTTP headers with authorization and randomized client IP for X-Forwarded-For
headers = {
"Authorization": f"Bearer {token}", # Bearer token for API access authentication
"Content-Type": "application/json", # Content type set to JSON for request body
"X-Forwarded-For": generate_ip(), # Random IP to simulate different client origins for load balancing or logging
}
# Prepare JSON payload with model parameters and conversation messages to send in POST request
payload = {
"model": model,
"messages": messages,
"stream": True, # Enable streaming response
"temperature": temperature,
"top_k": top_k,
"min_p": min_p,
"top_p": top_p,
"repetition_penalty": repetition_penalty,
}
# Initialize accumulators and flags for streamed response parts
reasoning = "" # String accumulator for reasoning text from the AI
reasoning_check = None # Flag to detect presence of reasoning in response; None means not checked yet
reasoning_done = False # Flag marking reasoning completion
content = "" # String accumulator for main content text from the AI
try:
# Use httpx AsyncClient with no timeout to allow long streaming connections
async with httpx.AsyncClient(timeout=None, http2=True) as client:
# Open async streaming POST request to Jarvis server endpoint with headers and JSON payload
async with client.stream("POST", host, headers=headers, json=payload) as response:
# Iterate asynchronously over each line of streaming response as it arrives
async for chunk in response.aiter_lines():
# Skip lines that do not start with "data:" prefix as per server-sent events (SSE) format
if not chunk.strip().startswith("data:"):
continue
try:
# Parse JSON data after "data:" prefix which contains incremental response delta
data = json.loads(chunk[5:])
# Extract incremental delta message from first choice in response
choice = data["choices"][0]["delta"]
# On first delta received, detect if 'reasoning' field is present and non-empty
if reasoning_check is None:
# Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None
# If reasoning is present and mode is not '/no_think' and reasoning not done yet
if (
reasoning_check == "" # Reasoning detected in response
and mode != "/no_think" # Mode allows reasoning output
and not reasoning_done # Reasoning phase not finished yet
and "reasoning" in choice # Current delta includes reasoning part
and choice["reasoning"] # Reasoning content is not empty
):
reasoning += choice["reasoning"] # Append incremental reasoning text
# Yield reasoning wrapped in styled HTML block with details expanded
yield styles(reasoning=reasoning, content="", expanded=True)
continue # Continue streaming reasoning increments without processing content yet
# When reasoning ends and content starts, mark reasoning done, yield empty string, then content
if (
reasoning_check == "" # Reasoning was detected previously
and mode != "/no_think" # Mode allows reasoning output
and not reasoning_done # Reasoning phase not finished yet
and "content" in choice # Current delta includes content part
and choice["content"] # Content is not empty
):
reasoning_done = True # Mark reasoning phase complete
yield "" # Yield empty string to signal end of reasoning block to the consumer
content += choice["content"] # Start accumulating content text
yield content # Yield first part of content to the consumer
continue # Continue streaming content increments
# If no reasoning present or reasoning done, accumulate content and yield incrementally
if (
(reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning finished or mode disables reasoning
and "content" in choice # Current delta includes content part
and choice["content"] # Content is not empty
):
content += choice["content"] # Append incremental content text
yield content # Yield updated content string to the consumer
except Exception:
# Ignore exceptions during JSON parsing or key access and continue streaming
continue
return # Exit function after successful streaming completion
except httpx.HTTPStatusError as e:
# If server returns specific error code indicating busy, retry with another server
if e.response.status_code == error:
# Continue to next iteration to try a different server
continue
else:
# For other HTTP errors, mark this server as busy/unavailable
mark(server)
except Exception:
# For other exceptions (network errors, timeouts), mark server as busy/unavailable
mark(server)
# If httpx fails or server is busy, fallback to aiohttp for robustness and compatibility
try:
# Create aiohttp client session with no timeout for streaming
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
# Open async streaming POST request to Jarvis server endpoint with headers and JSON payload
async with session.post(host, headers=headers, json=payload) as resp:
# Raise for status to catch HTTP errors
resp.raise_for_status()
# Iterate asynchronously over each line of streaming response as it arrives
async for line_bytes in resp.content:
# Decode bytes to string and strip whitespace
line = line_bytes.decode("utf-8").strip()
# Skip lines that do not start with "data:" prefix as per SSE format
if not line.startswith("data:"):
continue
try:
# Parse JSON data after "data:" prefix which contains incremental response delta
data = json.loads(line[5:])
# Extract incremental delta message from first choice in response
choice = data["choices"][0]["delta"]
# On first delta received, detect if 'reasoning' field is present and non-empty
if reasoning_check is None:
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None
# If reasoning is present and mode is not '/no_think' and reasoning not done yet
if (
reasoning_check == ""
and mode != "/no_think"
and not reasoning_done
and "reasoning" in choice
and choice["reasoning"]
):
reasoning += choice["reasoning"]
yield styles(reasoning=reasoning, content="", expanded=True)
continue
# When reasoning ends and content starts, mark reasoning done, yield empty string, then content
if (
reasoning_check == ""
and mode != "/no_think"
and not reasoning_done
and "content" in choice
and choice["content"]
):
reasoning_done = True
yield ""
content += choice["content"]
yield content
continue
# If no reasoning present or reasoning done, accumulate content and yield incrementally
if (
(reasoning_check is None or reasoning_done or mode == "/no_think")
and "content" in choice
and choice["content"]
):
content += choice["content"]
yield content
except Exception:
# Ignore exceptions during JSON parsing or key access and continue streaming
continue
return # Exit function after successful streaming completion with aiohttp
except aiohttp.ClientResponseError as e:
# If server returns specific error code indicating busy, retry with another server
if e.status == error:
continue # Try next available server
else:
mark(server) # Mark server as busy/unavailable for other HTTP errors
except Exception:
# For other exceptions (network errors, timeouts), mark server as busy/unavailable
mark(server)
# If all servers tried and none succeeded, yield a message indicating server busy status
yield "The server is currently busy. Please wait a moment or try again later."
return # End of function |