api / src /routes /v1 /chat_completions.py
hadadrjt's picture
api: Restructured repo.
e61d441
#
# SPDX-FileCopyrightText: Hadad <hadad@linuxmail.org>
# SPDX-License-Identifier: Apache-2.0
#
import time # Import time module to handle timestamps and time-based operations
import uuid # Import uuid module to generate unique identifiers for responses
from fastapi import APIRouter, HTTPException # Import APIRouter to create route groups and HTTPException for error handling
from fastapi.responses import JSONResponse, StreamingResponse # Import response classes for JSON and streaming responses
from src.models.requests import OpenAIChatRequest # Import request model defining the expected structure of chat requests
from src.cores.sessions import get_or_create_session, session_store # Import session management functions and storage
from src.services.streaming import event_generator # Import generator function for streaming chat responses
from config import MODEL # Import default model configuration
# Create an API router instance to group related endpoints for chat completions
router = APIRouter()
@router.post("/completions")
async def openai_chat_completions(req: OpenAIChatRequest):
"""
Handle OpenAI-compatible chat completion requests.
Supports streaming and non-streaming modes based on client request.
Steps:
- Validate the presence and structure of messages in the request
- Extract conversation history and current user input from messages
- Retrieve or create a session for managing conversation state
- Update session history with prior conversation
- If streaming is requested, return a streaming response
- Otherwise, submit input to AI client and collect full response
- Append new interaction to session history
- Return response formatted according to OpenAI chat completion API
Returns:
JSONResponse or StreamingResponse with chat completion data and session ID
"""
# Ensure messages list is not empty, else raise HTTP 400 error
if not req.messages:
raise HTTPException(status_code=400, detail="Messages cannot be empty")
history = [] # Initialize conversation history list
current_input = "" # Initialize variable to hold current user input
# Process messages to separate conversation history and current input
try:
# The last message must be from the user and represents current input
if req.messages[-1]["role"] != "user":
raise ValueError("Last message must be from user")
current_input = req.messages[-1]["content"] # Extract current input text
# Iterate over message pairs (user followed by assistant) to build history
messages = req.messages[:-1] # Exclude last message as it is current input
for i in range(0, len(messages), 2):
if i + 1 < len(messages):
user_msg = messages[i]
assistant_msg = messages[i + 1]
# Validate message roles; skip pairs that do not match expected pattern
if user_msg["role"] != "user" or assistant_msg["role"] != "assistant":
continue
# Append input-response pair to history
history.append({
"input": user_msg["content"],
"response": assistant_msg["content"]
})
except (KeyError, ValueError) as e:
# Raise HTTP 400 error if message format is invalid
raise HTTPException(status_code=400, detail=f"Invalid message format: {str(e)}")
# Determine model to use: requested model or default from config
model = req.model or MODEL
# Retrieve existing session or create a new one using session ID and model
session_id = get_or_create_session(req.session_id, model)
# Get last update time and session data from session store
last_update, session_data = session_store[session_id]
# Update session history with extracted conversation history
session_data["history"] = history
# Save updated session data with current timestamp
session_store[session_id] = (time.time(), session_data)
client = session_data["client"] # Retrieve AI client instance from session
# If AI client is not available, raise HTTP 503 error
if client is None:
raise HTTPException(status_code=503, detail="AI client not available")
# If streaming is requested, return a streaming response using event generator
if req.stream:
return StreamingResponse(
event_generator(current_input, model, session_id),
media_type="text/event-stream"
)
# For non-streaming requests, submit input to AI client and collect response
try:
jarvis_response = client.submit(multi={"text": current_input}, api_name="/api")
except Exception as e:
# Raise HTTP 500 error if submission to AI client fails
raise HTTPException(status_code=500, detail=f"Failed to submit to AI: {str(e)}")
buffer = "" # Initialize buffer to accumulate AI response text
# Iterate over partial responses from AI client to build full response text
for partial in jarvis_response:
text = partial[0][0][1] # Extract text from nested response structure
buffer = text # Update buffer with latest text chunk
# Append new input-response pair to session history
session_data["history"].append({"input": current_input, "response": buffer})
# Update session store with new history and timestamp
session_store[session_id] = (time.time(), session_data)
# Construct response in OpenAI chat completion format with session ID extension
response = {
"id": f"chatcmpl-{uuid.uuid4().hex[:8]}", # Unique response ID
"object": "chat.completion",
"created": int(time.time()), # Timestamp of response creation
"model": model, # Model used for completion
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": buffer # AI-generated response content
},
"finish_reason": "stop"
}
],
"session_id": session_id # Custom field for session management
}
# Return JSON response with formatted completion data
return JSONResponse(response)