api / src /services /streaming.py
hadadrjt's picture
api: Restructured repo.
e61d441
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import json # Import the json module to handle JSON encoding and decoding of data
import time # Import the time module to work with timestamps
import uuid # Import the uuid module to generate unique identifiers
from typing import AsyncGenerator # Import AsyncGenerator for typing asynchronous generator functions
from src.cores.sessions import session_store # Import the session_store object to manage user sessions
async def event_generator(user_input: str, model: str, session_id: str) -> AsyncGenerator[str, None]:
"""
Asynchronous generator function that streams AI-generated responses incrementally as Server-Sent Events (SSE).
Parameters:
- user_input: The input text provided by the user to the AI model.
- model: The identifier of the AI model to be used for generating responses.
- session_id: A unique string representing the current user session.
Yields:
- JSON-formatted strings representing incremental chunks of the AI response,
formatted as Server-Sent Events for real-time streaming to the client.
"""
# Retrieve the last update time and session data from the session store using the session ID
last_update, session_data = session_store.get(session_id, (0, None))
# If no session data is found for the given session ID, yield an error message and stop
if session_data is None:
yield f"data: {json.dumps({'error': 'Session not found'})}\n\n"
return
# Extract the AI client object from the session data
client = session_data["client"]
# If the client is missing in the session data, yield an error message and stop
if client is None:
yield f"data: {json.dumps({'error': 'AI client not available'})}\n\n"
return
try:
# Submit the user's input text to the AI model via the client's submit method
# The 'multi' parameter wraps the text, and 'api_name' specifies the API endpoint
jarvis_response = client.submit(multi={"text": user_input}, api_name="/api")
except Exception as e:
# If submission to the AI fails, yield an error message with the exception details and stop
yield f"data: {json.dumps({'error': f'Failed to submit to AI: {str(e)}'})}\n\n"
return
buffer = "" # Initialize an empty string buffer to accumulate the full AI response progressively
try:
# Iterate over the partial responses received from the AI client submission
for partial in jarvis_response:
# Extract the current partial text chunk from the nested response structure
text = partial[0][0][1]
# Determine the new delta text by comparing with the buffer
# If the new text starts with the buffer, delta is the new appended text; otherwise, delta is the entire text
if text.startswith(buffer):
delta = text[len(buffer):]
else:
delta = text
buffer = text # Update the buffer with the latest full text from the AI
# Skip yielding if the delta is empty (no new text)
if delta == "":
continue
# Construct a chunk dictionary following OpenAI's streaming response format
chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}", # Unique chunk ID with a UUID suffix
"object": "chat.completion.chunk", # Object type indicating a chunk of chat completion
"created": int(time.time()), # Timestamp of chunk creation in Unix time
"model": model, # The AI model used for generating this chunk
"choices": [
{
"index": 0, # Index of the choice in the response (usually 0 for single response)
"delta": {"content": delta}, # The incremental new text content in this chunk
"finish_reason": None # No finish reason yet, stream is ongoing
}
]
}
# Yield the chunk as a Server-Sent Event formatted string with 'data:' prefix and double newline suffix
yield f"data: {json.dumps(chunk)}\n\n"
# After all chunks have been streamed, append the full input-response pair to the session history
session_data["history"].append({"input": user_input, "response": buffer})
# Update the session store with the new last access time and updated session data
session_store[session_id] = (time.time(), session_data)
# Prepare a final chunk indicating the end of the stream with finish_reason set to 'stop'
done_chunk = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}", # Unique ID for the final chunk
"object": "chat.completion.chunk", # Object type for consistency
"created": int(time.time()), # Timestamp of completion
"model": model, # Model identifier
"choices": [
{
"index": 0,
"delta": {}, # Empty delta indicating no new content
"finish_reason": "stop" # Signal that the stream has finished
}
]
}
# Yield the final completion chunk to signal the client that streaming is done
yield f"data: {json.dumps(done_chunk)}\n\n"
except Exception as e:
# If any error occurs during streaming, yield an error chunk with the exception message
error_chunk = {
"error": {"message": f"Streaming error: {str(e)}"}
}
yield f"data: {json.dumps(error_chunk)}\n\n"