Spaces:
Running
Running
# Import necessary libraries | |
import os # Interacting with the operating system (reading/writing files) | |
import chromadb # High-performance vector database for storing/querying dense vectors | |
from dotenv import load_dotenv # Loading environment variables from a .env file | |
import json # Parsing and handling JSON data | |
# LangChain imports | |
from langchain_core.documents import Document # Document data structures | |
from langchain_core.runnables import RunnablePassthrough # LangChain core library for running pipelines | |
from langchain_core.output_parsers import StrOutputParser # String output parser | |
from langchain.prompts import ChatPromptTemplate # Template for chat prompts | |
from langchain.chains.query_constructor.base import AttributeInfo # Base classes for query construction | |
from langchain.retrievers.self_query.base import SelfQueryRetriever # Base classes for self-querying retrievers | |
from langchain.retrievers.document_compressors import LLMChainExtractor, CrossEncoderReranker # Document compressors | |
from langchain.retrievers import ContextualCompressionRetriever # Contextual compression retrievers | |
# LangChain community & experimental imports | |
from langchain_community.vectorstores import Chroma # Implementations of vector stores like Chroma | |
from langchain_community.document_loaders import PyPDFDirectoryLoader, PyPDFLoader # Document loaders for PDFs | |
from langchain_community.cross_encoders import HuggingFaceCrossEncoder # Cross-encoders from HuggingFace | |
from langchain_experimental.text_splitter import SemanticChunker # Experimental text splitting methods | |
from langchain.text_splitter import ( | |
CharacterTextSplitter, # Splitting text by characters | |
RecursiveCharacterTextSplitter # Recursive splitting of text by characters | |
) | |
from langchain_core.tools import tool | |
from langchain.agents import create_tool_calling_agent, AgentExecutor | |
from langchain_core.prompts import ChatPromptTemplate | |
# LangChain OpenAI imports | |
from langchain_openai import AzureOpenAIEmbeddings, AzureChatOpenAI # OpenAI embeddings and models | |
from langchain.embeddings.openai import OpenAIEmbeddings # OpenAI embeddings for text vectors | |
# LlamaParse & LlamaIndex imports | |
from llama_parse import LlamaParse # Document parsing library | |
from llama_index.core import Settings, SimpleDirectoryReader # Core functionalities of the LlamaIndex | |
# LangGraph import | |
from langgraph.graph import StateGraph, END, START # State graph for managing states in LangChain | |
# Pydantic import | |
from pydantic import BaseModel # Pydantic for data validation | |
# Typing imports | |
from typing import Dict, List, Tuple, Any, TypedDict # Python typing for function annotations | |
# Other utilities | |
import numpy as np # Numpy for numerical operations | |
from groq import Groq | |
from mem0 import MemoryClient | |
import streamlit as st | |
from datetime import datetime | |
#====================================SETUP=====================================# | |
# Fetch secrets from Hugging Face Spaces | |
#api_key = config.get("API_KEY") | |
#endpoint = config.get("OPENAI_API_BASE") | |
api_key = os.environ["API_KEY"] | |
print(api_key) | |
endpoint = os.environ["OPENAI_API_BASE"] | |
print(endpoint ) | |
#groq_api_key = config.get('LLAMA_API_KEY') # llama_api_key = os.environ['GROQ_API_KEY'] | |
#MEM0_api_key = config.get('MEM0_API_KEY') # MEM0_api_key = os.environ['mem0'] | |
#my_api_key = config.get("MY_API_KEY") | |
groq_api_key = userdata.get('LLAMA_API_KEY') # os.environ['LLAMA_API_KEY'] # llama_api_key = os.environ['GROQ_API_KEY'] | |
print(groq_api_key) | |
MEM0_api_key = userdata.get('MEM0_API_KEY') # ['MEM0_API_KEY'] # MEM0_api_key = os.environ['mem0'] | |
print(MEM0_api_key) | |
my_api_key = userdata.get('MY_API_KEY') #os.environ["MY_API_KEY"] | |
print(my_api_key) | |
# Initialize the OpenAI embedding function for Chroma | |
embedding_function = chromadb.utils.embedding_functions.OpenAIEmbeddingFunction( | |
api_base=endpoint, # Complete the code to define the API base endpoint | |
api_key=api_key, # Complete the code to define the API key | |
model_name='text-embedding-ada-002' # This is a fixed value and does not need modification | |
) | |
# This initializes the OpenAI embedding function for the Chroma vectorstore, using the provided endpoint and API key. | |
# Initialize the OpenAI Embeddings | |
embedding_model = OpenAIEmbeddings( | |
openai_api_base=endpoint, | |
openai_api_key=api_key, | |
model='text-embedding-ada-002' | |
) | |
# Initialize the Chat OpenAI model | |
llm = ChatOpenAI( | |
openai_api_base=endpoint, | |
openai_api_key=api_key, | |
model="gpt-4o", # used gpt4o instead of gpt-4o-mini to get improved results | |
streaming=False | |
) | |
# This initializes the Chat OpenAI model with the provided endpoint, API key, deployment name, and a temperature setting of 0 (to control response variability). | |
# set the LLM and embedding model in the LlamaIndex settings. | |
Settings.llm = llm # Complete the code to define the LLM model | |
Settings.embedding = embedding_model # Complete the code to define the embedding model | |
#================================Creating Langgraph agent======================# | |
class AgentState(TypedDict): | |
query: str # The current user query | |
expanded_query: str # The expanded version of the user query | |
context: List[Dict[str, Any]] # Retrieved documents (content and metadata) | |
response: str # The generated response to the user query | |
precision_score: float # The precision score of the response | |
groundedness_score: float # The groundedness score of the response | |
groundedness_loop_count: int # Counter for groundedness refinement loops | |
precision_loop_count: int # Counter for precision refinement loops | |
feedback: str | |
query_feedback: str | |
groundedness_check: bool | |
loop_max_iter: int | |
def expand_query(state): | |
""" | |
Expands the user query to improve retrieval of nutrition disorder-related information. | |
Args: | |
state (Dict): The current state of the workflow, containing the user query. | |
Returns: | |
Dict: The updated state with the expanded query. | |
""" | |
print("---------Expanding Query---------") | |
system_message = ''' | |
You are a domain expert assisting in answering questions related to research papers. | |
Convert the user query into something that a nutritionist would understand. Use domain related words. | |
Return 3 related search queries based on the user's request seperated by newline. | |
Return only 3 versions of the question as a list. | |
Perform query expansion on the question received. If there are multiple common ways of phrasing a user question \ | |
or common synonyms for key words in the question, make sure to return multiple versions \ | |
of the query with the different phrasings. | |
If the query has multiple parts, split them into separate simpler queries. This is the only case where you can generate more than 3 queries. | |
If there are acronyms or words you are not familiar with, do not try to rephrase them. | |
Generate only a list of questions. Do not mention anything before or after the list. | |
Use the query feeback if provided to craft the search queries. | |
''' | |
expand_prompt = ChatPromptTemplate.from_messages([ | |
("system", system_message), | |
("user", "Expand this query: {query} using the feedback: {query_feedback}") | |
]) | |
chain = expand_prompt | llm | StrOutputParser() | |
expanded_query = chain.invoke({"query": state['query'], "query_feedback":state["query_feedback"]}) | |
print("expanded_query", expanded_query) | |
state["expanded_query"] = expanded_query | |
return state | |
# Initialize the Chroma vector store for retrieving documents | |
vector_store = Chroma( | |
collection_name="nutritional_hypotheticals", | |
persist_directory="./nutritional_db2", | |
embedding_function=embedding_model | |
) | |
# Create a retriever from the vector store | |
retriever = vector_store.as_retriever( | |
search_type='similarity', | |
search_kwargs={'k': 3} | |
) | |
def retrieve_context(state): | |
""" | |
Retrieves context from the vector store using the expanded or original query. | |
Args: | |
state (Dict): The current state of the workflow, containing the query and expanded query. | |
Returns: | |
Dict: The updated state with the retrieved context. | |
""" | |
print("---------retrieve_context---------") | |
query = state['query'] # Complete the code to define the key for the expanded query | |
#print("Query used for retrieval:", query) # Debugging: Print the query | |
# Retrieve documents from the vector store | |
docs = retriever.invoke(query) | |
print("Retrieved documents:", docs) # Debugging: Print the raw docs object | |
# Extract both page_content and metadata from each document | |
context= [ | |
{ | |
"content": doc.page_content, # The actual content of the document | |
"metadata": doc.metadata # The metadata (e.g., source, page number, etc.) | |
} | |
for doc in docs | |
] | |
state['context'] = context # Complete the code to define the key for storing the context | |
print("Extracted context with metadata:", context) # Debugging: Print the extracted context | |
#print(f"Groundedness loop count: {state['groundedness_loop_count']}") | |
return state | |
def craft_response(state: Dict) -> Dict: | |
""" | |
Generates a response using the retrieved context, focusing on nutrition disorders. | |
Args: | |
state (Dict): The current state of the workflow, containing the query and retrieved context. | |
Returns: | |
Dict: The updated state with the generated response. | |
""" | |
print("---------craft_response---------") | |
system_message = ''' | |
Generates a response to a user query and context provided. | |
Parameters: | |
query (str): The user's query and expanded queries based on user's query. | |
context (str): The documents retrieved relevant to the queries. | |
Returns: | |
response (str): The response generated by the model. | |
The function performs the following steps: | |
1. Constructs a prompt containing system and user prompts. | |
2. Sends the prompt containing user queries with context provided to the GPT model to generate a response. | |
3. Displays the response. | |
The answer you provide must come from the user queries with context provided. | |
If feedback is provided, use it to craft the response. | |
If information provided is not enough to answer the query respons with 'I don't know the answer. Not in my records.' | |
''' | |
response_prompt = ChatPromptTemplate.from_messages([ | |
("system", system_message), | |
("user", "Query: {query}\nContext: {context}\n\nfeedback: {feedback}") | |
]) | |
chain = response_prompt | llm | |
response = chain.invoke({ | |
"query": state['query'], | |
"context": "\n".join([doc["content"] for doc in state['context']]), | |
"feedback": state["feedback"] # add feedback to the prompt | |
}) | |
state['response'] = response | |
print("intermediate response: ", response) | |
return state | |
def score_groundedness(state: Dict) -> Dict: | |
""" | |
Checks whether the response is grounded in the retrieved context. | |
Args: | |
state (Dict): The current state of the workflow, containing the response and context. | |
Returns: | |
Dict: The updated state with the groundedness score. | |
""" | |
print("---------check_groundedness---------") | |
system_message = ''' | |
You are tasked with rating AI generated answers to questions posed by users. | |
Please act as an impartial judge and evaluate the quality of the provided answer which attempts to answer the provided question based on a provided context. | |
In the input, the context is {context}, while the AI generated response is {response}. | |
Evaluation criteria: | |
The task is to judge the extent to which the metric is followed by the answer. | |
1 - The metric is not followed at all | |
2 - The metric is followed only to a limited extent | |
3 - The metric is followed to a good extent | |
4 - The metric is followed mostly | |
5 - The metric is followed completely | |
The answer should be derived only from the information presented in the context | |
Do not show any instructions for deriving your answer. | |
Output your result as a float number between 0 and 1 using the evaluation criteria. | |
The better the criteria, the cloase it is to 1 and the worse the criteria, the closer it is to 0. | |
''' | |
groundedness_prompt = ChatPromptTemplate.from_messages([ | |
("system", system_message), | |
("user", "Context: {context}\nResponse: {response}\n\nGroundedness score:") | |
]) | |
chain = groundedness_prompt | llm | StrOutputParser() | |
groundedness_score = float(chain.invoke({ | |
"context": "\n".join([doc["content"] for doc in state['context']]), | |
"response": state['response'] # Complete the code to define the response | |
})) | |
print("groundedness_score: ", groundedness_score) | |
state['groundedness_loop_count'] += 1 | |
print("#########Groundedness Incremented###########") | |
state['groundedness_score'] = groundedness_score | |
return state | |
def check_precision(state: Dict) -> Dict: | |
""" | |
Checks whether the response precisely addresses the user’s query. | |
Args: | |
state (Dict): The current state of the workflow, containing the query and response. | |
Returns: | |
Dict: The updated state with the precision score. | |
""" | |
print("---------check_precision---------") | |
system_message = ''' | |
Given question, answer and context verify if the context was useful in arriving at the given answer. | |
Give verdict as "1" if useful and "0" if not useful. | |
Output your result as a float number between 0 and 1 | |
Give verdict as a scaled numeric value of type float between 0 and 1, such that | |
0 or near 0 if it is least useful, 0.5 or near 0.5 if retry is warranted, and 1 or close to 1 is most useful. | |
Do not show any instructions for deriving your answer. | |
''' | |
precision_prompt = ChatPromptTemplate.from_messages([ | |
("system", system_message), | |
("user", "Query: {query}\nResponse: {response}\n\nPrecision score:") | |
]) | |
chain = precision_prompt | llm | StrOutputParser() # Complete the code to define the chain of processing | |
precision_score = float(chain.invoke({ | |
"query": state['query'], | |
"response": state['response'] # Complete the code to access the response from the state | |
})) | |
state['precision_score'] = precision_score | |
print("precision_score:", precision_score) | |
state['precision_loop_count'] +=1 | |
print("#########Precision Incremented###########") | |
return state | |
def refine_response(state: Dict) -> Dict: | |
""" | |
Suggests improvements for the generated response. | |
Args: | |
state (Dict): The current state of the workflow, containing the query and response. | |
Returns: | |
Dict: The updated state with response refinement suggestions. | |
""" | |
print("---------refine_response---------") | |
system_message = ''' | |
Since the last response failded the groundedness test, and is deemed not satisfactory, | |
use the feedback in terms of the query, context and the last response | |
to identify potential gaps, ambiguities, or missing details, and | |
to suggest improvements to enhance accuracy and completeness of the response. | |
''' | |
refine_response_prompt = ChatPromptTemplate.from_messages([ | |
("system", system_message), | |
("user", "Query: {query}\nResponse: {response}\n\n" | |
"What improvements can be made to enhance accuracy and completeness?") | |
]) | |
chain = refine_response_prompt | llm| StrOutputParser() | |
# Store response suggestions in a structured format | |
feedback = f"Previous Response: {state['response']}\nSuggestions: {chain.invoke({'query': state['query'], 'response': state['response']})}" | |
print("feedback: ", feedback) | |
print(f"State: {state}") | |
state['feedback'] = feedback | |
return state | |
def refine_query(state: Dict) -> Dict: | |
""" | |
Suggests improvements for the expanded query. | |
Args: | |
state (Dict): The current state of the workflow, containing the query and expanded query. | |
Returns: | |
Dict: The updated state with query refinement suggestions. | |
""" | |
print("---------refine_query---------") | |
system_message = ''' | |
Since the last response failded the precision test, and is deemed not satisfactory, | |
use the feedback in terms of the query, context and re-generate extended queries | |
to identify specific keywords, scope refinements, or missing details, and | |
to provides structured suggestions for improvement to enhance accuracy and completeness of the response. | |
''' | |
refine_query_prompt = ChatPromptTemplate.from_messages([ | |
("system", system_message), | |
("user", "Original Query: {query}\nExpanded Query: {expanded_query}\n\n" | |
"What improvements can be made for a better search?") | |
]) | |
chain = refine_query_prompt | llm | StrOutputParser() | |
# Store refinement suggestions without modifying the original expanded query | |
query_feedback = f"Previous Expanded Query: {state['expanded_query']}\nSuggestions: {chain.invoke({'query': state['query'], 'expanded_query': state['expanded_query']})}" | |
print("query_feedback: ", query_feedback) | |
print(f"Groundedness loop count: {state['groundedness_loop_count']}") | |
state['query_feedback'] = query_feedback | |
return state | |
def should_continue_groundedness(state): | |
"""Decides if groundedness is sufficient or needs improvement.""" | |
print("---------should_continue_groundedness---------") | |
print("groundedness loop count: ", state['groundedness_loop_count']) | |
if state['groundedness_score'] >= 0.8: # Complete the code to define the threshold for groundedness | |
print("Moving to precision") | |
return "check_precision" | |
else: | |
if state["groundedness_loop_count"] > state['loop_max_iter']: | |
return "max_iterations_reached" | |
else: | |
print(f"---------Groundedness Score Threshold Not met. Refining Response-----------") | |
return "refine_response" | |
def should_continue_precision(state: Dict) -> str: | |
"""Decides if precision is sufficient or needs improvement.""" | |
print("---------should_continue_precision---------") | |
print("precision loop count: ", state["precision_loop_count"]) | |
if state['precision_score'] > 0.8: # Threshold for precision | |
return "pass" # Complete the workflow | |
else: | |
if state["precision_loop_count"] >= state['loop_max_iter']: # Maximum allowed loops | |
return "max_iterations_reached" | |
else: | |
print(f"---------Precision Score Threshold Not met. Refining Query-----------") # Debugging | |
return "refine_query" # Refine the query | |
def max_iterations_reached(state: Dict) -> Dict: | |
"""Handles the case when the maximum number of iterations is reached.""" | |
print("---------max_iterations_reached---------") | |
"""Handles the case when the maximum number of iterations is reached.""" | |
response = "I'm unable to refine the response further. Please provide more context or clarify your question." | |
state['response'] = response | |
return state | |
from langgraph.graph import END, StateGraph, START | |
def create_workflow() -> StateGraph: | |
"""Creates the updated workflow for the AI nutrition agent.""" | |
workflow = StateGraph(START) # Complete the code to define the initial state of the agent | |
# Add processing nodes | |
workflow.add_node("expand_query", expand_query) # Step 1: Expand user query. Complete with the function to expand the query | |
workflow.add_node("retrieve_context", retrieve_context) # Step 2: Retrieve relevant documents. Complete with the function to retrieve context | |
workflow.add_node("craft_response", craft_response) # Step 3: Generate a response based on retrieved data. Complete with the function to craft a response | |
workflow.add_node("score_groundedness", score_groundedness) # Step 4: Evaluate response grounding. Complete with the function to score groundedness | |
workflow.add_node("refine_response", refine_response) # Step 5: Improve response if it's weakly grounded. Complete with the function to refine the response | |
workflow.add_node("check_precision", check_precision) # Step 6: Evaluate response precision. Complete with the function to check precision | |
workflow.add_node("refine_query", refine_query) # Step 7: Improve query if response lacks precision. Complete with the function to refine the query | |
workflow.add_node("max_iterations_reached", max_iterations_reached) # Step 8: Handle max iterations. Complete with the function to handle max iterations | |
# Main flow edges | |
workflow.add_edge(START, "expand_query") | |
workflow.add_edge("expand_query", "retrieve_context") | |
workflow.add_edge("retrieve_context", "craft_response") | |
workflow.add_edge("craft_response", "score_groundedness") | |
# Conditional edges based on groundedness check | |
workflow.add_conditional_edges( | |
"score_groundedness", | |
should_continue_groundedness, # Use the conditional function | |
{ | |
"check_precision": "check_precision", # If well-grounded, proceed to precision check. | |
"refine_response": "refine_response", # If not, refine the response. | |
"max_iterations_reached": max_iterations_reached # If max loops reached, exit. | |
} | |
) | |
workflow.add_edge("refine_response", "craft_response") # Refined responses are reprocessed. | |
# Conditional edges based on precision check | |
workflow.add_conditional_edges( | |
"check_precision", | |
should_continue_precision, # Use the conditional function | |
{ | |
"pass": END, # If precise, complete the workflow. | |
"refine_query": "refine_query", # If imprecise, refine the query. | |
"max_iterations_reached": "max_iterations_reached" # If max loops reached, exit. | |
} | |
) | |
workflow.add_edge("refine_query", "expand_query") # Refined queries go through expansion again. | |
workflow.add_edge("max_iterations_reached", END) | |
return workflow | |
#=========================== Defining the agentic rag tool ====================# | |
WORKFLOW_APP = create_workflow().compile() | |
def agentic_rag(query: str): | |
""" | |
Runs the RAG-based agent with conversation history for context-aware responses. | |
Args: | |
query (str): The current user query. | |
Returns: | |
Dict[str, Any]: The updated state with the generated response and conversation history. | |
""" | |
# Initialize state with necessary parameters | |
inputs = { | |
"query": query, # Current user query | |
"expanded_query": "", # Complete the code to define the expanded version of the query | |
"context": [], # Retrieved documents (initially empty) | |
"response": "", # Complete the code to define the AI-generated response | |
"precision_score": 0.0, # Complete the code to define the precision score of the response | |
"groundedness_score": 0.0, # Complete the code to define the groundedness score of the response | |
"groundedness_loop_count": 0, # Complete the code to define the counter for groundedness loops | |
"precision_loop_count": 0, # Complete the code to define the counter for precision loops | |
"feedback": "", # Complete the code to define the feedback | |
"query_feedback": "", # Complete the code to define the query feedback | |
"loop_max_iter": 3 # Complete the code to define the maximum number of iterations for loops | |
} | |
output = WORKFLOW_APP.invoke(inputs) | |
return output | |
#================================ Guardrails ===========================# | |
llama_guard_client = Groq(api_key=groq_api_key) # Groq(api_key=llama_api_key) | |
# Function to filter user input with Llama Guard | |
def filter_input_with_llama_guard(user_input, model="llama-guard-3-8b"): | |
""" | |
Filters user input using Llama Guard to ensure it is safe. | |
Parameters: | |
- user_input: The input provided by the user. | |
- model: The Llama Guard model to be used for filtering (default is "llama-guard-3-8b"). | |
Returns: | |
- The filtered and safe input. | |
""" | |
try: | |
# Create a request to Llama Guard to filter the user input | |
response = llama_guard_client.chat.completions.create( | |
messages=[{"role": "user", "content": user_input}], | |
model=model, | |
) | |
# Return the filtered input | |
return response.choices[0].message.content.strip() | |
except Exception as e: | |
print(f"Error with Llama Guard: {e}") | |
return None | |
#============================= Adding Memory to the agent using mem0 ===============================# | |
class NutritionBot: | |
def __init__(self): | |
""" | |
Initialize the NutritionBot class, setting up memory, the LLM client, tools, and the agent executor. | |
""" | |
# Initialize a memory client to store and retrieve customer interactions | |
self.memory = MemoryClient(api_key=MEM0_api_key) # userdata.get("mem0")) # Complete the code to define the memory client API key | |
# Initialize the OpenAI client using the provided credentials | |
self.client = ChatOpenAI( | |
model_name="gpt-4o", # Used gpt-4o to get improved results; Specify the model to use (e.g., GPT-4 optimized version) | |
api_key=config.get("API_KEY"), # API key for authentication | |
endpoint = config.get("OPENAI_API_BASE"), | |
temperature=0 # Controls randomness in responses; 0 ensures deterministic results | |
) | |
# Define tools available to the chatbot, such as web search | |
tools = [agentic_rag] | |
# Define the system prompt to set the behavior of the chatbot | |
system_prompt = """You are a caring and knowledgeable Medical Support Agent, specializing in nutrition disorder-related guidance. Your goal is to provide accurate, empathetic, and tailored nutritional recommendations while ensuring a seamless customer experience. | |
Guidelines for Interaction: | |
Maintain a polite, professional, and reassuring tone. | |
Show genuine empathy for customer concerns and health challenges. | |
Reference past interactions to provide personalized and consistent advice. | |
Engage with the customer by asking about their food preferences, dietary restrictions, and lifestyle before offering recommendations. | |
Ensure consistent and accurate information across conversations. | |
If any detail is unclear or missing, proactively ask for clarification. | |
Always use the agentic_rag tool to retrieve up-to-date and evidence-based nutrition insights. | |
Keep track of ongoing issues and follow-ups to ensure continuity in support. | |
Your primary goal is to help customers make informed nutrition decisions that align with their health conditions and personal preferences. | |
""" | |
# Build the prompt template for the agent | |
prompt = ChatPromptTemplate.from_messages([ | |
("system", system_prompt), # System instructions | |
("human", "{input}"), # Placeholder for human input | |
("placeholder", "{agent_scratchpad}") # Placeholder for intermediate reasoning steps | |
]) | |
# Create an agent capable of interacting with tools and executing tasks | |
agent = create_tool_calling_agent(self.client, tools, prompt) | |
# Wrap the agent in an executor to manage tool interactions and execution flow | |
self.agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True) | |
def store_customer_interaction(self, user_id: str, message: str, response: str, metadata: Dict = None): | |
""" | |
Store customer interaction in memory for future reference. | |
Args: | |
user_id (str): Unique identifier for the customer. | |
message (str): Customer's query or message. | |
response (str): Chatbot's response. | |
metadata (Dict, optional): Additional metadata for the interaction. | |
""" | |
if metadata is None: | |
metadata = {} | |
# Add a timestamp to the metadata for tracking purposes | |
metadata["timestamp"] = datetime.now().isoformat() | |
# Format the conversation for storage | |
conversation = [ | |
{"role": "user", "content": message}, | |
{"role": "assistant", "content": response} | |
] | |
# Store the interaction in the memory client | |
self.memory.add( | |
conversation, | |
user_id=user_id, | |
output_format="v1.1", | |
metadata=metadata | |
) | |
def get_relevant_history(self, user_id: str, query: str) -> List[Dict]: | |
""" | |
Retrieve past interactions relevant to the current query. | |
Args: | |
user_id (str): Unique identifier for the customer. | |
query (str): The customer's current query. | |
Returns: | |
List[Dict]: A list of relevant past interactions. | |
""" | |
return self.memory.search( | |
query=query, # Search for interactions related to the query | |
user_id=user_id, # Restrict search to the specific user | |
limit=5 # Complete the code to define the limit for retrieved interactions | |
) | |
def handle_customer_query(self, user_id: str, query: str) -> str: | |
""" | |
Process a customer's query and provide a response, taking into account past interactions. | |
Args: | |
user_id (str): Unique identifier for the customer. | |
query (str): Customer's query. | |
Returns: | |
str: Chatbot's response. | |
""" | |
# Retrieve relevant past interactions for context | |
relevant_history = self.get_relevant_history(user_id, query) | |
# Build a context string from the relevant history | |
context = "Previous relevant interactions:\n" | |
for memory in relevant_history: | |
context += f"Customer: {memory['memory']}\n" # Customer's past messages | |
context += f"Support: {memory['memory']}\n" # Chatbot's past responses | |
context += "---\n" | |
# Print context for debugging purposes | |
print("Context: ", context) | |
# Prepare a prompt combining past context and the current query | |
prompt = f""" | |
Context: | |
{context} | |
Current customer query: {query} | |
Provide a helpful response that takes into account any relevant past interactions. | |
""" | |
# Generate a response using the agent | |
response = self.agent_executor.invoke({"input": prompt}) | |
# Store the current interaction for future reference | |
self.store_customer_interaction( | |
user_id=user_id, | |
message=query, | |
response=response["output"], | |
metadata={"type": "support_query"} | |
) | |
# Return the chatbot's response | |
return response['output'] | |
#=====================User Interface using streamlit ===========================# | |
def nutrition_disorder_streamlit(): | |
""" | |
A Streamlit-based UI for the Nutrition Disorder Specialist Agent. | |
""" | |
st.title("Nutrition Disorder Specialist") | |
st.write("Ask me anything about nutrition disorders, symptoms, causes, treatments, and more.") | |
st.write("Type 'exit' to end the conversation.") | |
# Initialize session state for chat history and user_id if they don't exist | |
if 'chat_history' not in st.session_state: | |
st.session_state.chat_history = [] | |
if 'user_id' not in st.session_state: | |
st.session_state.user_id = None | |
# Login form: Only if user is not logged in | |
if st.session_state.user_id is None: | |
with st.form("login_form", clear_on_submit=True): | |
user_id = st.text_input("Please enter your name to begin:") | |
submit_button = st.form_submit_button("Login") | |
if submit_button and user_id: | |
st.session_state.user_id = user_id | |
st.session_state.chat_history.append({ | |
"role": "assistant", | |
"content": f"Welcome, {user_id}! How can I help you with nutrition disorders today?" | |
}) | |
st.session_state.login_submitted = True # Set flag to trigger rerun | |
if st.session_state.get("login_submitted", False): | |
st.session_state.pop("login_submitted") | |
st.rerun() | |
else: | |
# Display chat history | |
for message in st.session_state.chat_history: | |
with st.chat_message(message["role"]): | |
st.write(message["content"]) | |
# Chat input with custom placeholder text | |
user_query = st.chat_input("Type your question here (or 'exit' to end)...") # Blank #1: Fill in the chat input prompt (e.g., "Type your question here (or 'exit' to end)...") | |
if user_query: | |
if user_query.lower() == "exit": | |
st.session_state.chat_history.append({"role": "user", "content": "exit"}) | |
with st.chat_message("user"): | |
st.write("exit") | |
goodbye_msg = "Goodbye! Feel free to return if you have more questions about nutrition disorders." | |
st.session_state.chat_history.append({"role": "assistant", "content": goodbye_msg}) | |
with st.chat_message("assistant"): | |
st.write(goodbye_msg) | |
st.session_state.user_id = None | |
st.rerun() | |
return | |
st.session_state.chat_history.append({"role": "user", "content": user_query}) | |
with st.chat_message("user"): | |
st.write(user_query) | |
# Filter input using Llama Guard | |
filtered_result = filter_input_with_llama_guard(user_query) # Blank #2: Fill in with the function name for filtering input (e.g., filter_input_with_llama_guard) | |
filtered_result = filtered_result.replace("\n", " ") # Normalize the result | |
# Check if input is safe based on allowed statuses | |
if filtered_result in ["safe", "safe S7", "safe S6"]: # Blanks #3, #4, #5: Fill in with allowed safe statuses (e.g., "safe", "unsafe S7", "unsafe S6") | |
try: | |
if 'chatbot' not in st.session_state: | |
st.session_state.chatbot = NutritionBot() # Blank #6: Fill in with the chatbot class initialization (e.g., NutritionBot) | |
response = st.session_state.chatbot.handle_customer_query(st.session_state.user_id, user_query) | |
# Blank #7: Fill in with the method to handle queries (e.g., handle_customer_query) | |
st.write(response) | |
st.session_state.chat_history.append({"role": "assistant", "content": response}) | |
except Exception as e: | |
error_msg = f"Sorry, I encountered an error while processing your query. Please try again. Error: {str(e)}" | |
st.write(error_msg) | |
st.session_state.chat_history.append({"role": "assistant", "content": error_msg}) | |
else: | |
inappropriate_msg = "I apologize, but I cannot process that input as it may be inappropriate. Please try again." | |
st.write(inappropriate_msg) | |
st.session_state.chat_history.append({"role": "assistant", "content": inappropriate_msg}) | |
if __name__ == "__main__": | |
nutrition_disorder_streamlit() | |