# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import json # Module to parse and handle JSON data import uuid # Module to generate unique identifiers (UUIDs) from typing import List, Dict, Any # Type hinting for list, dict, and generic types from datetime import datetime # To get and format current date and time from config import * # Import all configuration variables, including 'auth' and 'restrictions' from src.utils.session_mapping import get_host # Function to get server info by session ID from src.utils.ip_generator import generate_ip # Function to generate random IP addresses for headers from src.utils.helper import mark # Function to mark a server as busy/unavailable from src.ui.reasoning import styles # Function to apply CSS styling to reasoning output import asyncio # Asyncio for asynchronous programming import httpx # Async HTTP client supporting HTTP/2 and streaming import aiohttp # Async HTTP client for alternative HTTP requests and streaming async def jarvis( session_id: str, # Unique session identifier to maintain consistent server assignment model: str, # AI model name specifying which model to use history: List[Dict[str, str]], # List of previous conversation messages with roles and content user_message: str, # Latest user input message to send to the AI model mode: str, # Mode string to guide AI behavior, e.g., '/think' or '/no_think' files=None, # Optional files or attachments to include with the user message temperature: float = 0.6, # Sampling temperature controlling randomness in token generation top_k: int = 20, # Limit token selection to top_k probable tokens min_p: float = 0, # Minimum probability threshold for token selection top_p: float = 0.95, # Nucleus sampling cumulative probability threshold repetition_penalty: float = 1, # Penalty factor to reduce token repetition ): """ Asynchronously send a chat request to a Jarvis AI server and handle streaming response incrementally. This function manages server selection based on the session ID, retries requests on specific error codes, and yields incremental parts of the AI-generated response as they arrive. It integrates CSS styling into the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed first inside a styled HTML block, followed by the main content streamed normally. The implementation uses both httpx (with HTTP/2 support) and aiohttp to ensure compatibility and robustness in streaming responses. Args: session_id (str): Identifier for the user session to maintain consistent server assignment. model (str): Name of the AI model to use for generating the response. history (List[Dict[str, str]]): List of previous messages in the conversation. user_message (str): The current message from the user to send to the AI model. mode (str): Contextual instructions to guide the AI model's response style. files (optional): Additional files or attachments to include with the user message. temperature (float): Controls randomness in token generation. top_k (int): Limits token selection to top_k probable tokens. min_p (float): Minimum probability threshold for token selection. top_p (float): Nucleus sampling cumulative probability threshold. repetition_penalty (float): Factor to reduce token repetition. Yields: str: Incremental strings of AI-generated response streamed from the server. Reasoning is wrapped in a styled HTML details block and streamed incrementally only if mode is not '/no_think'. After reasoning finishes, the main content is streamed normally. Notes: The function attempts to send the request to a server assigned for the session. If the server returns a specific error code indicating it is busy, it retries with another server. If all servers are busy or fail, it yields a message indicating the server is busy. """ tried = set() # Set to track servers already tried to avoid repeated retries # Loop until all available servers have been tried without success while len(tried) < len(auth): # Get server setup info assigned for this session, including endpoint, token, and error code setup = get_host(session_id) server = setup["jarvis"] # Server identifier string host = setup["endpoint"] # API endpoint URL string token = setup["token"] # Authorization token string error = setup["error"] # HTTP error code integer which triggers retry tried.add(server) # Mark this server as tried to prevent retrying immediately # Format current date/time string for system instructions, e.g., "Tuesday, June 24, 2025, 08:13 PM " date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z") # Combine mode instructions, usage restrictions, and date into a single system instructions string instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n" # Copy conversation history to avoid mutating the original list outside this function messages = history.copy() # Insert system instructions as the first message in the conversation history messages.insert(0, {"role": "system", "content": instructions}) # Prepare user message dictionary, include files if provided msg = {"role": "user", "content": user_message} if files: msg["files"] = files messages.append(msg) # Append user message to the conversation messages list # Prepare HTTP headers with authorization and randomized client IP for X-Forwarded-For headers = { "Authorization": f"Bearer {token}", # Bearer token for API access authentication "Content-Type": "application/json", # Content type set to JSON for request body "X-Forwarded-For": generate_ip(), # Random IP to simulate different client origins for load balancing or logging } # Prepare JSON payload with model parameters and conversation messages to send in POST request payload = { "model": model, "messages": messages, "stream": True, # Enable streaming response "temperature": temperature, "top_k": top_k, "min_p": min_p, "top_p": top_p, "repetition_penalty": repetition_penalty, } # Initialize accumulators and flags for streamed response parts reasoning = "" # String accumulator for reasoning text from the AI reasoning_check = None # Flag to detect presence of reasoning in response; None means not checked yet reasoning_done = False # Flag marking reasoning completion content = "" # String accumulator for main content text from the AI try: # Use httpx AsyncClient with no timeout to allow long streaming connections async with httpx.AsyncClient(timeout=None, http2=True) as client: # Open async streaming POST request to Jarvis server endpoint with headers and JSON payload async with client.stream("POST", host, headers=headers, json=payload) as response: # Iterate asynchronously over each line of streaming response as it arrives async for chunk in response.aiter_lines(): # Skip lines that do not start with "data:" prefix as per server-sent events (SSE) format if not chunk.strip().startswith("data:"): continue try: # Parse JSON data after "data:" prefix which contains incremental response delta data = json.loads(chunk[5:]) # Extract incremental delta message from first choice in response choice = data["choices"][0]["delta"] # On first delta received, detect if 'reasoning' field is present and non-empty if reasoning_check is None: # Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None # If reasoning is present and mode is not '/no_think' and reasoning not done yet if ( reasoning_check == "" # Reasoning detected in response and mode != "/no_think" # Mode allows reasoning output and not reasoning_done # Reasoning phase not finished yet and "reasoning" in choice # Current delta includes reasoning part and choice["reasoning"] # Reasoning content is not empty ): reasoning += choice["reasoning"] # Append incremental reasoning text # Yield reasoning wrapped in styled HTML block with details expanded yield styles(reasoning=reasoning, content="", expanded=True) continue # Continue streaming reasoning increments without processing content yet # When reasoning ends and content starts, mark reasoning done, yield empty string, then content if ( reasoning_check == "" # Reasoning was detected previously and mode != "/no_think" # Mode allows reasoning output and not reasoning_done # Reasoning phase not finished yet and "content" in choice # Current delta includes content part and choice["content"] # Content is not empty ): reasoning_done = True # Mark reasoning phase complete yield "" # Yield empty string to signal end of reasoning block to the consumer content += choice["content"] # Start accumulating content text yield content # Yield first part of content to the consumer continue # Continue streaming content increments # If no reasoning present or reasoning done, accumulate content and yield incrementally if ( (reasoning_check is None or reasoning_done or mode == "/no_think") # No reasoning or reasoning finished or mode disables reasoning and "content" in choice # Current delta includes content part and choice["content"] # Content is not empty ): content += choice["content"] # Append incremental content text yield content # Yield updated content string to the consumer except Exception: # Ignore exceptions during JSON parsing or key access and continue streaming continue return # Exit function after successful streaming completion except httpx.HTTPStatusError as e: # If server returns specific error code indicating busy, retry with another server if e.response.status_code == error: # Continue to next iteration to try a different server continue else: # For other HTTP errors, mark this server as busy/unavailable mark(server) except Exception: # For other exceptions (network errors, timeouts), mark server as busy/unavailable mark(server) # If httpx fails or server is busy, fallback to aiohttp for robustness and compatibility try: # Create aiohttp client session with no timeout for streaming async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session: # Open async streaming POST request to Jarvis server endpoint with headers and JSON payload async with session.post(host, headers=headers, json=payload) as resp: # Raise for status to catch HTTP errors resp.raise_for_status() # Iterate asynchronously over each line of streaming response as it arrives async for line_bytes in resp.content: # Decode bytes to string and strip whitespace line = line_bytes.decode("utf-8").strip() # Skip lines that do not start with "data:" prefix as per SSE format if not line.startswith("data:"): continue try: # Parse JSON data after "data:" prefix which contains incremental response delta data = json.loads(line[5:]) # Extract incremental delta message from first choice in response choice = data["choices"][0]["delta"] # On first delta received, detect if 'reasoning' field is present and non-empty if reasoning_check is None: reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None # If reasoning is present and mode is not '/no_think' and reasoning not done yet if ( reasoning_check == "" and mode != "/no_think" and not reasoning_done and "reasoning" in choice and choice["reasoning"] ): reasoning += choice["reasoning"] yield styles(reasoning=reasoning, content="", expanded=True) continue # When reasoning ends and content starts, mark reasoning done, yield empty string, then content if ( reasoning_check == "" and mode != "/no_think" and not reasoning_done and "content" in choice and choice["content"] ): reasoning_done = True yield "" content += choice["content"] yield content continue # If no reasoning present or reasoning done, accumulate content and yield incrementally if ( (reasoning_check is None or reasoning_done or mode == "/no_think") and "content" in choice and choice["content"] ): content += choice["content"] yield content except Exception: # Ignore exceptions during JSON parsing or key access and continue streaming continue return # Exit function after successful streaming completion with aiohttp except aiohttp.ClientResponseError as e: # If server returns specific error code indicating busy, retry with another server if e.status == error: continue # Try next available server else: mark(server) # Mark server as busy/unavailable for other HTTP errors except Exception: # For other exceptions (network errors, timeouts), mark server as busy/unavailable mark(server) # If all servers tried and none succeeded, yield a message indicating server busy status yield "The server is currently busy. Please wait a moment or try again later." return # End of function