# | |
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org> | |
# SPDX-License-Identifier: Apache-2.0 | |
# | |
import json # Import the json module to handle JSON encoding and decoding of data | |
import time # Import the time module to work with timestamps | |
import uuid # Import the uuid module to generate unique identifiers | |
from typing import AsyncGenerator # Import AsyncGenerator for typing asynchronous generator functions | |
from src.cores.sessions import session_store # Import the session_store object to manage user sessions | |
async def event_generator(user_input: str, model: str, session_id: str) -> AsyncGenerator[str, None]: | |
""" | |
Asynchronous generator function that streams AI-generated responses incrementally as Server-Sent Events (SSE). | |
Parameters: | |
- user_input: The input text provided by the user to the AI model. | |
- model: The identifier of the AI model to be used for generating responses. | |
- session_id: A unique string representing the current user session. | |
Yields: | |
- JSON-formatted strings representing incremental chunks of the AI response, | |
formatted as Server-Sent Events for real-time streaming to the client. | |
""" | |
# Retrieve the last update time and session data from the session store using the session ID | |
last_update, session_data = session_store.get(session_id, (0, None)) | |
# If no session data is found for the given session ID, yield an error message and stop | |
if session_data is None: | |
yield f"data: {json.dumps({'error': 'Session not found'})}\n\n" | |
return | |
# Extract the AI client object from the session data | |
client = session_data["client"] | |
# If the client is missing in the session data, yield an error message and stop | |
if client is None: | |
yield f"data: {json.dumps({'error': 'AI client not available'})}\n\n" | |
return | |
try: | |
# Submit the user's input text to the AI model via the client's submit method | |
# The 'multi' parameter wraps the text, and 'api_name' specifies the API endpoint | |
jarvis_response = client.submit(multi={"text": user_input}, api_name="/api") | |
except Exception as e: | |
# If submission to the AI fails, yield an error message with the exception details and stop | |
yield f"data: {json.dumps({'error': f'Failed to submit to AI: {str(e)}'})}\n\n" | |
return | |
buffer = "" # Initialize an empty string buffer to accumulate the full AI response progressively | |
try: | |
# Iterate over the partial responses received from the AI client submission | |
for partial in jarvis_response: | |
# Extract the current partial text chunk from the nested response structure | |
text = partial[0][0][1] | |
# Determine the new delta text by comparing with the buffer | |
# If the new text starts with the buffer, delta is the new appended text; otherwise, delta is the entire text | |
if text.startswith(buffer): | |
delta = text[len(buffer):] | |
else: | |
delta = text | |
buffer = text # Update the buffer with the latest full text from the AI | |
# Skip yielding if the delta is empty (no new text) | |
if delta == "": | |
continue | |
# Construct a chunk dictionary following OpenAI's streaming response format | |
chunk = { | |
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}", # Unique chunk ID with a UUID suffix | |
"object": "chat.completion.chunk", # Object type indicating a chunk of chat completion | |
"created": int(time.time()), # Timestamp of chunk creation in Unix time | |
"model": model, # The AI model used for generating this chunk | |
"choices": [ | |
{ | |
"index": 0, # Index of the choice in the response (usually 0 for single response) | |
"delta": {"content": delta}, # The incremental new text content in this chunk | |
"finish_reason": None # No finish reason yet, stream is ongoing | |
} | |
] | |
} | |
# Yield the chunk as a Server-Sent Event formatted string with 'data:' prefix and double newline suffix | |
yield f"data: {json.dumps(chunk)}\n\n" | |
# After all chunks have been streamed, append the full input-response pair to the session history | |
session_data["history"].append({"input": user_input, "response": buffer}) | |
# Update the session store with the new last access time and updated session data | |
session_store[session_id] = (time.time(), session_data) | |
# Prepare a final chunk indicating the end of the stream with finish_reason set to 'stop' | |
done_chunk = { | |
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}", # Unique ID for the final chunk | |
"object": "chat.completion.chunk", # Object type for consistency | |
"created": int(time.time()), # Timestamp of completion | |
"model": model, # Model identifier | |
"choices": [ | |
{ | |
"index": 0, | |
"delta": {}, # Empty delta indicating no new content | |
"finish_reason": "stop" # Signal that the stream has finished | |
} | |
] | |
} | |
# Yield the final completion chunk to signal the client that streaming is done | |
yield f"data: {json.dumps(done_chunk)}\n\n" | |
except Exception as e: | |
# If any error occurs during streaming, yield an error chunk with the exception message | |
error_chunk = { | |
"error": {"message": f"Streaming error: {str(e)}"} | |
} | |
yield f"data: {json.dumps(error_chunk)}\n\n" | |