File size: 16,686 Bytes
b5e7375
 
 
 
 
e96f98e
 
 
 
 
 
 
 
 
 
 
 
b5e7375
 
 
e96f98e
b5e7375
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e96f98e
 
 
b5e7375
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e96f98e
b5e7375
 
 
 
 
e96f98e
 
 
 
 
b5e7375
e96f98e
b5e7375
 
e96f98e
b5e7375
 
e96f98e
b5e7375
e96f98e
 
b5e7375
 
e96f98e
b5e7375
 
 
e96f98e
b5e7375
e96f98e
b5e7375
e96f98e
 
 
b5e7375
 
e96f98e
b5e7375
 
 
e96f98e
b5e7375
 
 
 
 
 
 
 
e96f98e
 
b5e7375
e96f98e
b5e7375
 
e96f98e
 
 
b5e7375
e96f98e
b5e7375
e96f98e
b5e7375
 
 
e96f98e
b5e7375
e96f98e
b5e7375
 
 
 
 
 
 
e96f98e
b5e7375
 
 
 
 
 
 
 
 
 
e96f98e
b5e7375
 
 
 
 
 
 
 
 
 
e96f98e
b5e7375
e96f98e
b5e7375
 
 
 
 
 
 
 
 
e96f98e
b5e7375
 
 
 
e96f98e
b5e7375
 
 
e96f98e
 
b5e7375
e96f98e
b5e7375
 
e96f98e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b5e7375
 
e96f98e
b5e7375
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#

import json  # Module to parse and handle JSON data
import uuid  # Module to generate unique identifiers (UUIDs)
from typing import List, Dict, Any  # Type hinting for list, dict, and generic types
from datetime import datetime  # To get and format current date and time
from config import *  # Import all configuration variables, including 'auth' and 'restrictions'
from src.utils.session_mapping import get_host  # Function to get server info by session ID
from src.utils.ip_generator import generate_ip  # Function to generate random IP addresses for headers
from src.utils.helper import mark  # Function to mark a server as busy/unavailable
from src.ui.reasoning import styles  # Function to apply CSS styling to reasoning output
import asyncio  # Asyncio for asynchronous programming
import httpx  # Async HTTP client supporting HTTP/2 and streaming
import aiohttp  # Async HTTP client for alternative HTTP requests and streaming

async def jarvis(
    session_id: str,  # Unique session identifier to maintain consistent server assignment
    model: str,  # AI model name specifying which model to use
    history: List[Dict[str, str]],  # List of previous conversation messages with roles and content
    user_message: str,  # Latest user input message to send to the AI model
    mode: str,  # Mode string to guide AI behavior, e.g., '/think' or '/no_think'
    files=None,  # Optional files or attachments to include with the user message
    temperature: float = 0.6,  # Sampling temperature controlling randomness in token generation
    top_k: int = 20,  # Limit token selection to top_k probable tokens
    min_p: float = 0,  # Minimum probability threshold for token selection
    top_p: float = 0.95,  # Nucleus sampling cumulative probability threshold
    repetition_penalty: float = 1,  # Penalty factor to reduce token repetition
):
    """
    Asynchronously send a chat request to a Jarvis AI server and handle streaming response incrementally.

    This function manages server selection based on the session ID, retries requests on specific error codes,
    and yields incremental parts of the AI-generated response as they arrive. It integrates CSS styling into
    the reasoning output only if the mode is not '/no_think', preserving the behavior where reasoning is streamed
    first inside a styled HTML block, followed by the main content streamed normally.

    The implementation uses both httpx (with HTTP/2 support) and aiohttp to ensure compatibility and robustness
    in streaming responses.

    Args:
        session_id (str): Identifier for the user session to maintain consistent server assignment.
        model (str): Name of the AI model to use for generating the response.
        history (List[Dict[str, str]]): List of previous messages in the conversation.
        user_message (str): The current message from the user to send to the AI model.
        mode (str): Contextual instructions to guide the AI model's response style.
        files (optional): Additional files or attachments to include with the user message.
        temperature (float): Controls randomness in token generation.
        top_k (int): Limits token selection to top_k probable tokens.
        min_p (float): Minimum probability threshold for token selection.
        top_p (float): Nucleus sampling cumulative probability threshold.
        repetition_penalty (float): Factor to reduce token repetition.

    Yields:
        str: Incremental strings of AI-generated response streamed from the server.
             Reasoning is wrapped in a styled HTML details block and streamed incrementally only if mode is not '/no_think'.
             After reasoning finishes, the main content is streamed normally.

    Notes:
        The function attempts to send the request to a server assigned for the session.
        If the server returns a specific error code indicating it is busy, it retries with another server.
        If all servers are busy or fail, it yields a message indicating the server is busy.
    """
    tried = set()  # Set to track servers already tried to avoid repeated retries

    # Loop until all available servers have been tried without success
    while len(tried) < len(auth):
        # Get server setup info assigned for this session, including endpoint, token, and error code
        setup = get_host(session_id)
        server = setup["jarvis"]  # Server identifier string
        host = setup["endpoint"]  # API endpoint URL string
        token = setup["token"]  # Authorization token string
        error = setup["error"]  # HTTP error code integer which triggers retry
        tried.add(server)  # Mark this server as tried to prevent retrying immediately

        # Format current date/time string for system instructions, e.g., "Tuesday, June 24, 2025, 08:13 PM "
        date = datetime.now().strftime("%A, %B %d, %Y, %I:%M %p %Z")

        # Combine mode instructions, usage restrictions, and date into a single system instructions string
        instructions = f"{mode}\n\n\n{restrictions}\n\n\nToday: {date}\n\n\n"

        # Copy conversation history to avoid mutating the original list outside this function
        messages = history.copy()

        # Insert system instructions as the first message in the conversation history
        messages.insert(0, {"role": "system", "content": instructions})

        # Prepare user message dictionary, include files if provided
        msg = {"role": "user", "content": user_message}
        if files:
            msg["files"] = files
        messages.append(msg)  # Append user message to the conversation messages list

        # Prepare HTTP headers with authorization and randomized client IP for X-Forwarded-For
        headers = {
            "Authorization": f"Bearer {token}",  # Bearer token for API access authentication
            "Content-Type": "application/json",  # Content type set to JSON for request body
            "X-Forwarded-For": generate_ip(),  # Random IP to simulate different client origins for load balancing or logging
        }

        # Prepare JSON payload with model parameters and conversation messages to send in POST request
        payload = {
            "model": model,
            "messages": messages,
            "stream": True,  # Enable streaming response
            "temperature": temperature,
            "top_k": top_k,
            "min_p": min_p,
            "top_p": top_p,
            "repetition_penalty": repetition_penalty,
        }

        # Initialize accumulators and flags for streamed response parts
        reasoning = ""  # String accumulator for reasoning text from the AI
        reasoning_check = None  # Flag to detect presence of reasoning in response; None means not checked yet
        reasoning_done = False  # Flag marking reasoning completion
        content = ""  # String accumulator for main content text from the AI

        try:
            # Use httpx AsyncClient with no timeout to allow long streaming connections
            async with httpx.AsyncClient(timeout=None, http2=True) as client:
                # Open async streaming POST request to Jarvis server endpoint with headers and JSON payload
                async with client.stream("POST", host, headers=headers, json=payload) as response:
                    # Iterate asynchronously over each line of streaming response as it arrives
                    async for chunk in response.aiter_lines():
                        # Skip lines that do not start with "data:" prefix as per server-sent events (SSE) format
                        if not chunk.strip().startswith("data:"):
                            continue
                        try:
                            # Parse JSON data after "data:" prefix which contains incremental response delta
                            data = json.loads(chunk[5:])
                            # Extract incremental delta message from first choice in response
                            choice = data["choices"][0]["delta"]

                            # On first delta received, detect if 'reasoning' field is present and non-empty
                            if reasoning_check is None:
                                # Initialize reasoning_check to empty string if reasoning exists and is non-empty, else None
                                reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None

                            # If reasoning is present and mode is not '/no_think' and reasoning not done yet
                            if (
                                reasoning_check == ""  # Reasoning detected in response
                                and mode != "/no_think"  # Mode allows reasoning output
                                and not reasoning_done  # Reasoning phase not finished yet
                                and "reasoning" in choice  # Current delta includes reasoning part
                                and choice["reasoning"]  # Reasoning content is not empty
                            ):
                                reasoning += choice["reasoning"]  # Append incremental reasoning text
                                # Yield reasoning wrapped in styled HTML block with details expanded
                                yield styles(reasoning=reasoning, content="", expanded=True)
                                continue  # Continue streaming reasoning increments without processing content yet

                            # When reasoning ends and content starts, mark reasoning done, yield empty string, then content
                            if (
                                reasoning_check == ""  # Reasoning was detected previously
                                and mode != "/no_think"  # Mode allows reasoning output
                                and not reasoning_done  # Reasoning phase not finished yet
                                and "content" in choice  # Current delta includes content part
                                and choice["content"]  # Content is not empty
                            ):
                                reasoning_done = True  # Mark reasoning phase complete
                                yield ""  # Yield empty string to signal end of reasoning block to the consumer
                                content += choice["content"]  # Start accumulating content text
                                yield content  # Yield first part of content to the consumer
                                continue  # Continue streaming content increments

                            # If no reasoning present or reasoning done, accumulate content and yield incrementally
                            if (
                                (reasoning_check is None or reasoning_done or mode == "/no_think")  # No reasoning or reasoning finished or mode disables reasoning
                                and "content" in choice  # Current delta includes content part
                                and choice["content"]  # Content is not empty
                            ):
                                content += choice["content"]  # Append incremental content text
                                yield content  # Yield updated content string to the consumer
                        except Exception:
                            # Ignore exceptions during JSON parsing or key access and continue streaming
                            continue
            return  # Exit function after successful streaming completion

        except httpx.HTTPStatusError as e:
            # If server returns specific error code indicating busy, retry with another server
            if e.response.status_code == error:
                # Continue to next iteration to try a different server
                continue
            else:
                # For other HTTP errors, mark this server as busy/unavailable
                mark(server)
        except Exception:
            # For other exceptions (network errors, timeouts), mark server as busy/unavailable
            mark(server)

        # If httpx fails or server is busy, fallback to aiohttp for robustness and compatibility
        try:
            # Create aiohttp client session with no timeout for streaming
            async with aiohttp.ClientSession(timeout=aiohttp.ClientTimeout(total=None)) as session:
                # Open async streaming POST request to Jarvis server endpoint with headers and JSON payload
                async with session.post(host, headers=headers, json=payload) as resp:
                    # Raise for status to catch HTTP errors
                    resp.raise_for_status()
                    # Iterate asynchronously over each line of streaming response as it arrives
                    async for line_bytes in resp.content:
                        # Decode bytes to string and strip whitespace
                        line = line_bytes.decode("utf-8").strip()
                        # Skip lines that do not start with "data:" prefix as per SSE format
                        if not line.startswith("data:"):
                            continue
                        try:
                            # Parse JSON data after "data:" prefix which contains incremental response delta
                            data = json.loads(line[5:])
                            # Extract incremental delta message from first choice in response
                            choice = data["choices"][0]["delta"]

                            # On first delta received, detect if 'reasoning' field is present and non-empty
                            if reasoning_check is None:
                                reasoning_check = "" if ("reasoning" in choice and choice["reasoning"]) else None

                            # If reasoning is present and mode is not '/no_think' and reasoning not done yet
                            if (
                                reasoning_check == ""
                                and mode != "/no_think"
                                and not reasoning_done
                                and "reasoning" in choice
                                and choice["reasoning"]
                            ):
                                reasoning += choice["reasoning"]
                                yield styles(reasoning=reasoning, content="", expanded=True)
                                continue

                            # When reasoning ends and content starts, mark reasoning done, yield empty string, then content
                            if (
                                reasoning_check == ""
                                and mode != "/no_think"
                                and not reasoning_done
                                and "content" in choice
                                and choice["content"]
                            ):
                                reasoning_done = True
                                yield ""
                                content += choice["content"]
                                yield content
                                continue

                            # If no reasoning present or reasoning done, accumulate content and yield incrementally
                            if (
                                (reasoning_check is None or reasoning_done or mode == "/no_think")
                                and "content" in choice
                                and choice["content"]
                            ):
                                content += choice["content"]
                                yield content
                        except Exception:
                            # Ignore exceptions during JSON parsing or key access and continue streaming
                            continue
            return  # Exit function after successful streaming completion with aiohttp

        except aiohttp.ClientResponseError as e:
            # If server returns specific error code indicating busy, retry with another server
            if e.status == error:
                continue  # Try next available server
            else:
                mark(server)  # Mark server as busy/unavailable for other HTTP errors
        except Exception:
            # For other exceptions (network errors, timeouts), mark server as busy/unavailable
            mark(server)

    # If all servers tried and none succeeded, yield a message indicating server busy status
    yield "The server is currently busy. Please wait a moment or try again later."
    return  # End of function