File size: 5,726 Bytes
e61d441
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#

import json  # Import the json module to handle JSON encoding and decoding of data
import time  # Import the time module to work with timestamps
import uuid  # Import the uuid module to generate unique identifiers

from typing import AsyncGenerator  # Import AsyncGenerator for typing asynchronous generator functions
from src.cores.sessions import session_store  # Import the session_store object to manage user sessions

async def event_generator(user_input: str, model: str, session_id: str) -> AsyncGenerator[str, None]:
    """
    Asynchronous generator function that streams AI-generated responses incrementally as Server-Sent Events (SSE).
    
    Parameters:
    - user_input: The input text provided by the user to the AI model.
    - model: The identifier of the AI model to be used for generating responses.
    - session_id: A unique string representing the current user session.
    
    Yields:
    - JSON-formatted strings representing incremental chunks of the AI response,
      formatted as Server-Sent Events for real-time streaming to the client.
    """

    # Retrieve the last update time and session data from the session store using the session ID
    last_update, session_data = session_store.get(session_id, (0, None))

    # If no session data is found for the given session ID, yield an error message and stop
    if session_data is None:
        yield f"data: {json.dumps({'error': 'Session not found'})}\n\n"
        return

    # Extract the AI client object from the session data
    client = session_data["client"]

    # If the client is missing in the session data, yield an error message and stop
    if client is None:
        yield f"data: {json.dumps({'error': 'AI client not available'})}\n\n"
        return

    try:
        # Submit the user's input text to the AI model via the client's submit method
        # The 'multi' parameter wraps the text, and 'api_name' specifies the API endpoint
        jarvis_response = client.submit(multi={"text": user_input}, api_name="/api")
    except Exception as e:
        # If submission to the AI fails, yield an error message with the exception details and stop
        yield f"data: {json.dumps({'error': f'Failed to submit to AI: {str(e)}'})}\n\n"
        return

    buffer = ""  # Initialize an empty string buffer to accumulate the full AI response progressively

    try:
        # Iterate over the partial responses received from the AI client submission
        for partial in jarvis_response:
            # Extract the current partial text chunk from the nested response structure
            text = partial[0][0][1]

            # Determine the new delta text by comparing with the buffer
            # If the new text starts with the buffer, delta is the new appended text; otherwise, delta is the entire text
            if text.startswith(buffer):
                delta = text[len(buffer):]
            else:
                delta = text

            buffer = text  # Update the buffer with the latest full text from the AI

            # Skip yielding if the delta is empty (no new text)
            if delta == "":
                continue

            # Construct a chunk dictionary following OpenAI's streaming response format
            chunk = {
                "id": f"chatcmpl-{uuid.uuid4().hex[:8]}",  # Unique chunk ID with a UUID suffix
                "object": "chat.completion.chunk",  # Object type indicating a chunk of chat completion
                "created": int(time.time()),  # Timestamp of chunk creation in Unix time
                "model": model,  # The AI model used for generating this chunk
                "choices": [
                    {
                        "index": 0,  # Index of the choice in the response (usually 0 for single response)
                        "delta": {"content": delta},  # The incremental new text content in this chunk
                        "finish_reason": None  # No finish reason yet, stream is ongoing
                    }
                ]
            }

            # Yield the chunk as a Server-Sent Event formatted string with 'data:' prefix and double newline suffix
            yield f"data: {json.dumps(chunk)}\n\n"

        # After all chunks have been streamed, append the full input-response pair to the session history
        session_data["history"].append({"input": user_input, "response": buffer})

        # Update the session store with the new last access time and updated session data
        session_store[session_id] = (time.time(), session_data)

        # Prepare a final chunk indicating the end of the stream with finish_reason set to 'stop'
        done_chunk = {
            "id": f"chatcmpl-{uuid.uuid4().hex[:8]}",  # Unique ID for the final chunk
            "object": "chat.completion.chunk",  # Object type for consistency
            "created": int(time.time()),  # Timestamp of completion
            "model": model,  # Model identifier
            "choices": [
                {
                    "index": 0,
                    "delta": {},  # Empty delta indicating no new content
                    "finish_reason": "stop"  # Signal that the stream has finished
                }
            ]
        }

        # Yield the final completion chunk to signal the client that streaming is done
        yield f"data: {json.dumps(done_chunk)}\n\n"

    except Exception as e:
        # If any error occurs during streaming, yield an error chunk with the exception message
        error_chunk = {
            "error": {"message": f"Streaming error: {str(e)}"}
        }
        yield f"data: {json.dumps(error_chunk)}\n\n"