# # SPDX-FileCopyrightText: Hadad # SPDX-License-Identifier: Apache-2.0 # import json # Import the json module to handle JSON encoding and decoding of data import time # Import the time module to work with timestamps import uuid # Import the uuid module to generate unique identifiers from typing import AsyncGenerator # Import AsyncGenerator for typing asynchronous generator functions from src.cores.sessions import session_store # Import the session_store object to manage user sessions async def event_generator(user_input: str, model: str, session_id: str) -> AsyncGenerator[str, None]: """ Asynchronous generator function that streams AI-generated responses incrementally as Server-Sent Events (SSE). Parameters: - user_input: The input text provided by the user to the AI model. - model: The identifier of the AI model to be used for generating responses. - session_id: A unique string representing the current user session. Yields: - JSON-formatted strings representing incremental chunks of the AI response, formatted as Server-Sent Events for real-time streaming to the client. """ # Retrieve the last update time and session data from the session store using the session ID last_update, session_data = session_store.get(session_id, (0, None)) # If no session data is found for the given session ID, yield an error message and stop if session_data is None: yield f"data: {json.dumps({'error': 'Session not found'})}\n\n" return # Extract the AI client object from the session data client = session_data["client"] # If the client is missing in the session data, yield an error message and stop if client is None: yield f"data: {json.dumps({'error': 'AI client not available'})}\n\n" return try: # Submit the user's input text to the AI model via the client's submit method # The 'multi' parameter wraps the text, and 'api_name' specifies the API endpoint jarvis_response = client.submit(multi={"text": user_input}, api_name="/api") except Exception as e: # If submission to the AI fails, yield an error message with the exception details and stop yield f"data: {json.dumps({'error': f'Failed to submit to AI: {str(e)}'})}\n\n" return buffer = "" # Initialize an empty string buffer to accumulate the full AI response progressively try: # Iterate over the partial responses received from the AI client submission for partial in jarvis_response: # Extract the current partial text chunk from the nested response structure text = partial[0][0][1] # Determine the new delta text by comparing with the buffer # If the new text starts with the buffer, delta is the new appended text; otherwise, delta is the entire text if text.startswith(buffer): delta = text[len(buffer):] else: delta = text buffer = text # Update the buffer with the latest full text from the AI # Skip yielding if the delta is empty (no new text) if delta == "": continue # Construct a chunk dictionary following OpenAI's streaming response format chunk = { "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", # Unique chunk ID with a UUID suffix "object": "chat.completion.chunk", # Object type indicating a chunk of chat completion "created": int(time.time()), # Timestamp of chunk creation in Unix time "model": model, # The AI model used for generating this chunk "choices": [ { "index": 0, # Index of the choice in the response (usually 0 for single response) "delta": {"content": delta}, # The incremental new text content in this chunk "finish_reason": None # No finish reason yet, stream is ongoing } ] } # Yield the chunk as a Server-Sent Event formatted string with 'data:' prefix and double newline suffix yield f"data: {json.dumps(chunk)}\n\n" # After all chunks have been streamed, append the full input-response pair to the session history session_data["history"].append({"input": user_input, "response": buffer}) # Update the session store with the new last access time and updated session data session_store[session_id] = (time.time(), session_data) # Prepare a final chunk indicating the end of the stream with finish_reason set to 'stop' done_chunk = { "id": f"chatcmpl-{uuid.uuid4().hex[:8]}", # Unique ID for the final chunk "object": "chat.completion.chunk", # Object type for consistency "created": int(time.time()), # Timestamp of completion "model": model, # Model identifier "choices": [ { "index": 0, "delta": {}, # Empty delta indicating no new content "finish_reason": "stop" # Signal that the stream has finished } ] } # Yield the final completion chunk to signal the client that streaming is done yield f"data: {json.dumps(done_chunk)}\n\n" except Exception as e: # If any error occurs during streaming, yield an error chunk with the exception message error_chunk = { "error": {"message": f"Streaming error: {str(e)}"} } yield f"data: {json.dumps(error_chunk)}\n\n"