Spaces:
Running
Running
# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import json # Module to parse and handle JSON data | |
import uuid # Module to generate unique identifiers (UUIDs) | |
from typing import List, Dict, Any # Type hinting for list, dict, and generic types | |
from datetime import datetime # To get and format current date and time | |
from config import * # Import all configuration variables, including 'auth' and 'restrictions' | |
from src.utils.session_mapping import get_host # Function to get server info by session ID | |
from src.utils.ip_generator import generate_ip # Function to generate random IP addresses for headers | |
from src.utils.helper import mark # Function to mark a server as busy/unavailable | |
from src.ui.reasoning import styles # Function to apply CSS styling to reasoning output | |
import asyncio # Asyncio for asynchronous programming | |
import httpx # Async HTTP client supporting HTTP/2 and streaming | |
import aiohttp # Async HTTP client for alternative HTTP requests and streaming | |
async def jarvis( | |
session_id: str, # Unique session identifier to maintain consistent server assignment | |
model: str, # AI model name specifying which model to use | |
history: List[Dict[str, str]], # List of previous conversation messages with roles and content | |
user_message: str, # Latest user input message to send to the AI model | |
mode: str, # Mode string to guide AI behavior, e.g., '/think' or '/no_think' | |
files=None, # Optional files or attachments to include with the user message | |
temperature: float = 0.6, # Sampling temperature controlling randomness in token generation | |
top_k: int = 20, # Limit token selection to top_k probable tokens | |
min_p: float = 0, # Minimum probability threshold for token selection | |
top_p: float = 0.95, # Nucleus sampling cumulative probability threshold | |
repetition_penalty: float = 1, # Penalty factor to reduce token repetition | |
): | |
""" | |
Asynchronously send a chat request to a Jarvis AI server and handle streaming response incrementally. | |
This function manages server selection based on the session ID, retries requests on specific error codes, | |
and yields incremental parts of the AI-generated response as they arrive. It integrates CSS styling into | |
the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed | |
first inside a styled HTML block, followed by the main content streamed normally. | |
The implementation uses both httpx (with HTTP/2 support) and aiohttp to ensure compatibility and robustness | |
in streaming responses. | |
Args: | |
session_id (str): Identifier for the user session to maintain consistent server assignment. | |
model (str): Name of the AI model to use for generating the response. | |
history (List[Dict[str, str]]): List of previous messages in the conversation. | |
user_message (str): The current message from the user to send to the AI model. | |
mode (str): Contextual instructions to guide the AI model's response style. | |
files (optional): Additional files or attachments to include with the user message. | |
temperature (float): Controls randomness in token generation. | |
top_k (int): Limits token selection to top_k probable tokens. | |
min_p (float): Minimum probability threshold for token selection. | |
top_p (float): Nucleus sampling cumulative probability threshold. | |
repetition_penalty (float): Factor to reduce token repetition. | |
Yields: | |
str: Incremental strings of AI-generated response streamed from the server. | |
Reasoning is wrapped in a styled HTML details block and streamed incrementally only if mode is not '/no_think'. | |
After reasoning finishes, the main content is streamed normally. | |
Notes: | |
The function attempts to send the request to a server assigned for the session. | |
If the server returns a specific error code indicating it is busy, it retries with another server. | |
If all servers are busy or fail, it yields a message indicating the server is busy. | |
""" | |
tried = set() # Set to track servers already tried to avoid repeated retries | |
# Loop until all available servers have been tried without success | |
while len(tried) < len(auth): | |
# Get server setup info assigned for this session, including endpoint, token, and error code | |
setup = get_host(session_id) | |
server = setup["jarvis"] # Server identifier string | |
host = setup["endpoint"] # API endpoint URL string | |
token = setup["token"] # Authorization token string | |
error = setup["error"] # HTTP error code integer which triggers retry | |
tried.add(server) # Mark this server as tried to prevent retrying immediately | |
# Format current date/time string for system instructions, e.g., "Tuesday, June 24, 2025, 08:13 PM " | |
date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z") | |
# Combine mode instructions, usage restrictions, and date into a single system instructions string | |
instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n" | |
# Copy conversation history to avoid mutating the original list outside this function | |
messages = history.copy() | |
# Insert system instructions as the first message in the conversation history | |
messages.insert(0, {"role": "system", "content": instructions}) | |
# Prepare user message dictionary, include files if provided | |
msg = {"role": "user", "content": user_message} | |
if files: | |
msg["files"] = files | |
messages.append(msg) # Append user message to the conversation messages list | |
# Prepare HTTP headers with authorization and randomized client IP for X-Forwarded-For | |
headers = { | |
"Authorization": f"Bearer {token}", # Bearer token for API access authentication | |
"Content-Type": "application/json", # Content type set to JSON for request body | |
"X-Forwarded-For": generate_ip(), # Random IP to simulate different client origins for load balancing or logging | |
} | |
# Prepare JSON payload with model parameters and conversation messages to send in POST request | |
payload = { | |
"model": model, | |
"messages": messages, | |
"stream": True, # Enable streaming response | |
"temperature": temperature, | |
"top_k": top_k, | |
"min_p": min_p, | |
"top_p": top_p, | |
"repetition_penalty": repetition_penalty, | |
} | |
# Initialize accumulators and flags for streamed response parts | |
reasoning = "" # String accumulator for reasoning text from the AI | |
reasoning_check = None # Flag to detect presence of reasoning in response; None means not checked yet | |
reasoning_done = False # Flag marking reasoning completion | |
content = "" # String accumulator for main content text from the AI | |
try: | |
# Use httpx AsyncClient with no timeout to allow long streaming connections | |
async with httpx.AsyncClient(timeout=None, http2=True) as client: | |
# Open async streaming POST request to Jarvis server endpoint with headers and JSON payload | |
async with client.stream("POST", host, headers=headers, json=payload) as response: | |
# Iterate asynchronously over each line of streaming response as it arrives | |
async for chunk in response.aiter_lines(): | |
# Skip lines that do not start with "data:" prefix as per server-sent events (SSE) format | |
if not chunk.strip().startswith("data:"): | |
continue | |
try: | |
# Parse JSON data after "data:" prefix which contains incremental response delta | |
data = json.loads(chunk[5:]) | |
# Extract incremental delta message from first choice in response | |
choice = data["choices"][0]["delta"] | |
# On first delta received, detect if 'reasoning' field is present and non-empty | |
if reasoning_check is None: | |
# Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None | |
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None | |
# If reasoning is present and mode is not '/no_think' and reasoning not done yet | |
if ( | |
reasoning_check == "" # Reasoning detected in response | |
and mode != "/no_think" # Mode allows reasoning output | |
and not reasoning_done # Reasoning phase not finished yet | |
and "reasoning" in choice # Current delta includes reasoning part | |
and choice["reasoning"] # Reasoning content is not empty | |
): | |
reasoning += choice["reasoning"] # Append incremental reasoning text | |
# Yield reasoning wrapped in styled HTML block with details expanded | |
yield styles(reasoning=reasoning, content="", expanded=True) | |
continue # Continue streaming reasoning increments without processing content yet | |
# When reasoning ends and content starts, mark reasoning done, yield empty string, then content | |
if ( | |
reasoning_check == "" # Reasoning was detected previously | |
and mode != "/no_think" # Mode allows reasoning output | |
and not reasoning_done # Reasoning phase not finished yet | |
and "content" in choice # Current delta includes content part | |
and choice["content"] # Content is not empty | |
): | |
reasoning_done = True # Mark reasoning phase complete | |
yield "" # Yield empty string to signal end of reasoning block to the consumer | |
content += choice["content"] # Start accumulating content text | |
yield content # Yield first part of content to the consumer | |
continue # Continue streaming content increments | |
# If no reasoning present or reasoning done, accumulate content and yield incrementally | |
if ( | |
(reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning finished or mode disables reasoning | |
and "content" in choice # Current delta includes content part | |
and choice["content"] # Content is not empty | |
): | |
content += choice["content"] # Append incremental content text | |
yield content # Yield updated content string to the consumer | |
except Exception: | |
# Ignore exceptions during JSON parsing or key access and continue streaming | |
continue | |
return # Exit function after successful streaming completion | |
except httpx.HTTPStatusError as e: | |
# If server returns specific error code indicating busy, retry with another server | |
if e.response.status_code == error: | |
# Continue to next iteration to try a different server | |
continue | |
else: | |
# For other HTTP errors, mark this server as busy/unavailable | |
mark(server) | |
except Exception: | |
# For other exceptions (network errors, timeouts), mark server as busy/unavailable | |
mark(server) | |
# If httpx fails or server is busy, fallback to aiohttp for robustness and compatibility | |
try: | |
# Create aiohttp client session with no timeout for streaming | |
async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session: | |
# Open async streaming POST request to Jarvis server endpoint with headers and JSON payload | |
async with session.post(host, headers=headers, json=payload) as resp: | |
# Raise for status to catch HTTP errors | |
resp.raise_for_status() | |
# Iterate asynchronously over each line of streaming response as it arrives | |
async for line_bytes in resp.content: | |
# Decode bytes to string and strip whitespace | |
line = line_bytes.decode("utf-8").strip() | |
# Skip lines that do not start with "data:" prefix as per SSE format | |
if not line.startswith("data:"): | |
continue | |
try: | |
# Parse JSON data after "data:" prefix which contains incremental response delta | |
data = json.loads(line[5:]) | |
# Extract incremental delta message from first choice in response | |
choice = data["choices"][0]["delta"] | |
# On first delta received, detect if 'reasoning' field is present and non-empty | |
if reasoning_check is None: | |
reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None | |
# If reasoning is present and mode is not '/no_think' and reasoning not done yet | |
if ( | |
reasoning_check == "" | |
and mode != "/no_think" | |
and not reasoning_done | |
and "reasoning" in choice | |
and choice["reasoning"] | |
): | |
reasoning += choice["reasoning"] | |
yield styles(reasoning=reasoning, content="", expanded=True) | |
continue | |
# When reasoning ends and content starts, mark reasoning done, yield empty string, then content | |
if ( | |
reasoning_check == "" | |
and mode != "/no_think" | |
and not reasoning_done | |
and "content" in choice | |
and choice["content"] | |
): | |
reasoning_done = True | |
yield "" | |
content += choice["content"] | |
yield content | |
continue | |
# If no reasoning present or reasoning done, accumulate content and yield incrementally | |
if ( | |
(reasoning_check is None or reasoning_done or mode == "/no_think") | |
and "content" in choice | |
and choice["content"] | |
): | |
content += choice["content"] | |
yield content | |
except Exception: | |
# Ignore exceptions during JSON parsing or key access and continue streaming | |
continue | |
return # Exit function after successful streaming completion with aiohttp | |
except aiohttp.ClientResponseError as e: | |
# If server returns specific error code indicating busy, retry with another server | |
if e.status == error: | |
continue # Try next available server | |
else: | |
mark(server) # Mark server as busy/unavailable for other HTTP errors | |
except Exception: | |
# For other exceptions (network errors, timeouts), mark server as busy/unavailable | |
mark(server) | |
# If all servers tried and none succeeded, yield a message indicating server busy status | |
yield "The server is currently busy. Please wait a moment or try again later." | |
return # End of function |