avfranco's picture
ea4all-mcp-lgs-sync-UAT-passed
4a6af9d
import ast
#core libraries
from langchain_core.runnables import RunnableConfig
from langchain_core.messages import (
AIMessage,
)
from langchain_core.output_parsers import PydanticOutputParser
from langchain_core.prompts.chat import ChatPromptTemplate
from langchain import hub
from langgraph.graph import (
END,
StateGraph,
)
from ea4all.src.ea4all_gra.configuration import AgentConfiguration
from ea4all.src.ea4all_gra.data import (
CapabilityGap,
GradeAnswer,
GradeDocuments,
LandscapeAsIs,
)
from ea4all.src.shared.utils import (
get_llm_client,
extract_structured_output,
extract_topic_from_business_input,
set_max_new_tokens,
get_predicted_num_tokens_from_prompt,
)
from ea4all.src.shared.prompts import (
LLAMA31_CHAT_PROMPT_FORMAT,
LLAMA31_PROMPT_FORMAT,
)
from ea4all.src.shared import vectorstore
from ea4all.src.ea4all_gra.togaf_task2.state import Task2State
from ea4all.src.ea4all_apm.graph import get_retrieval_chain
# Retrieval Grader score whether retrieved IT Landscape address business query
def retrieval_grader(model):
# LLM with function call
structured_llm_grader = model.with_structured_output(GradeDocuments)
#Prompt
system = """You are an enterprise architect grader assessing relevance of applications to address a business query.
It does not need to be a stringent test. The objective is to filter out erroneous retrievals.
If the application contains any keyword or semantic meaning related to the business query, grade it as relevant.
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question."""
grade_prompt = ChatPromptTemplate.from_messages(
[
("system", system),
("ai", "Retrieved applications: \n\n {landscape_asis} \n\n Business Query: {business_query}"),
]
)
grader = grade_prompt | structured_llm_grader
return grader
# Business Capability needs vs landscap asis gap analysis
def gap_grader(model):
gap_prompt = hub.pull("learn-it-all-do-it-all/ea4all_togaf_capability_gap")
# Set up a parser:
parser = PydanticOutputParser(pydantic_object=CapabilityGap)
gap_prompt = gap_prompt.partial(
format_instructions=parser.get_format_instructions(),
)
capability_gap_grader = gap_prompt | model | parser
return capability_gap_grader
## Question Re-writer
def question_rewriter(model):
# Rerwriter Prompt
rewrite_prompt = hub.pull("learn-it-all-do-it-all/ea4all_question_rewriter")
rewrite_prompt = rewrite_prompt.partial(ai_output=LLAMA31_CHAT_PROMPT_FORMAT)
rewriter = rewrite_prompt | model
return rewriter
##Answer Grade: score whether RAG + LLM answer address business query
def answer_grader():
# Prompt
answer_prompt = hub.pull('learn-it-all-do-it-all/ea4all_togaf_answer_grade')
# Set up a parser:
parser = PydanticOutputParser(pydantic_object=GradeAnswer)
answer_prompt = answer_prompt.partial(
format_instructions=parser.get_format_instructions(),
ai_output = LLAMA31_PROMPT_FORMAT
)
return answer_prompt
## Hallucination Grader score whether there is any hallucination with between RAG and LLM answers
def hallucination_grader(asis, identified):
# Prompt": REVISED TO WORK WIHT LLAMA-3 - issue w/ multi-word app
#changes on prompting e.g. role, rules and restrictions, explicit instructions, change from word to application(s)
#changed to one-by-one assessment using single text search
grader_false = []
for d in identified:
if d.lower() not in asis.lower():
grader_false.append(d)
return grader_false
##Action-1 RAG retrieval - Assess-AsIs-Landscape
async def retrieve(state:Task2State, config: RunnableConfig):
"""
Retrieve applications
Args:
state (dict): The current graph state
Returns:
state (dict): New key added to state, applications, that contains retrieved identified applications
"""
configuration = AgentConfiguration.from_runnable_config(config)
print("---RETRIEVE---")
business_query = state['business_query']
if not state.get( 'landscape_asis'):
intent=""
if state['messages']:
intent = ','.join(ast.literal_eval(str(state['messages'][-1].content))['intent']).lower().replace("'", "")
business_query=f"""What existent applications can be re-used {intent}?"""
# Retrieval
rag_input = 5
with vectorstore.make_retriever(config) as _retriever:
retriever = _retriever
retrieval = await get_retrieval_chain(rag_input,"ea4all_agent",business_query,retriever, config)
landscape_asis = await retrieval.ainvoke(
{"standalone_question": business_query},
config={"recursion_limit":configuration.ea4all_recursion_limit})
## return Document page_content
content = ';'.join(asis.page_content.strip() for asis in landscape_asis)
name = state['next']
return {
"messages": [AIMessage(content=content, name=name)],
"landscape_asis": landscape_asis,
"business_query": business_query
}
##Action-2 Grade retrieval against business query, filter out not relevant applications
def grade_landscape_asis(state:Task2State, config: RunnableConfig):
"""
Determines whether an application is relevant to address a business query.
Args:
state (dict): The current graph state
Returns:
state (dict): Updates landscape_asis key with only filtered relevant applications
"""
print("---CHECK DOCUMENT RELEVANCE TO BUSINESS QUERY---")
business_query = state.get('business_query')
landscape_asis = state.get('landscape_asis')
# Score each doc
filtered_docs = []
if landscape_asis is not None:
for d in landscape_asis:
##Pick relevant Metadata
application = d.metadata['source']
capability = d.metadata['capability']
description = d.metadata['description']
business_fit = d.metadata['business fit']
roadmap = d.metadata['roadmap']
asis = f"Application:{application}; Capability:{capability}; Description:{description};Business fit: {business_fit}; Roadmap: {roadmap};"
filtered_docs.append(asis)
return {
#"messages": [AIMessage(content=str(filtered_docs), name=name)],
"business_query": business_query,
"landscape_asis": landscape_asis,
"identified_asis": filtered_docs
}
##Action-3 Is there relevant applications? Yes, generate, otherwise transform_query
def decide_to_generate(state:Task2State, config: RunnableConfig):
"""
Determines whether to generate an answer, or re-generate a question.
Args:
state (dict): The current graph state
Returns:
str: Binary decision for next node to call
"""
print("---ASSESS GRADED APPLICATIONS---")
filtered_applications = state['identified_asis']
if not filtered_applications:
# All documents have been filtered check_relevance
# We will re-generate a new query
print(
"---DECISION: ALL APPLICATIONS ARE NOT RELEVANT TO BUSINESS QUERY, TRANSFORM QUERY---"
)
return "transform_query"
else:
# We have relevant documents, so generate answer
print("---DECISION: GENERATE---")
return "generate"
##Action-4a Generate if relevant applications found
def generate(state:Task2State, config: RunnableConfig):
"""
Generate answer
Args:
state (dict): The current graph state
Returns:
state (dict): New key
added to state, identified_asis, that contains LLM generation
"""
configuration = AgentConfiguration.from_runnable_config(config)
model = get_llm_client(configuration.togaf_model, configuration.api_base_url)
print("---GENERATE---")
landscape_asis = state['landscape_asis']
values = {
"business_query": state['business_query'],
"applications": state['identified_asis']
}
parser = PydanticOutputParser(pydantic_object=LandscapeAsIs)
hub_prompt = hub.pull('learn-it-all-do-it-all/ea4all_togaf_landscape_business_query')
hub_prompt = hub_prompt.partial(
format_instructions=parser.get_format_instructions(),
)
model.max_tokens = set_max_new_tokens(get_predicted_num_tokens_from_prompt(model,hub_prompt,values))
task_2_generate = hub_prompt | model | parser
generated_asis = task_2_generate.invoke(input=values, config={"recursion_limit":configuration.recursion_limit})
name = state['next']
return {
"messages": [AIMessage(content=str(generated_asis.identified_asis), name=name)],
"landscape_asis": landscape_asis,
"business_query": state['business_query'],
"identified_asis": generated_asis.identified_asis
}
##Action-4b Re-write query otherwise
def transform_query(state:Task2State, config: RunnableConfig):
"""
Transform the query to produce a better question.
Args:
state (dict): The current graph state
Returns:
state (dict): Updates question key with a re-phrased question
"""
# Get any user-provided configs - LLM model in use
configuration = AgentConfiguration.from_runnable_config(config)
model = get_llm_client(configuration.togaf_model, configuration.api_base_url)
print("---TRANSFORM QUERY---")
business_query = state['business_query']
# Re-write question
response = question_rewriter(model).invoke(
{"user_question": business_query, "target": "vectorstore"},
)
generated_question = extract_topic_from_business_input(response.content)
better_question = generated_question['rephrased']
if better_question == None: better_question = business_query
name = state['next']
return {
"messages": [AIMessage(content=better_question, name=name)],
"business_query": better_question
}
##Action-5 Grade final response
def grade_generation_v_documents_and_question(state:Task2State, config: RunnableConfig):
"""
Determines whether the generation is grounded in the landscape_asis and answers business query.
Args:
state (dict): The current graph state
Returns:
str: Decision for next node to call
"""
# Get any user-provided configs - LLM model in use
configuration = AgentConfiguration.from_runnable_config(config)
model = get_llm_client(configuration.togaf_model, configuration.api_base_url)
if state.get('remaining_steps') <= 2:
return "no match"
print("---CHECK HALLUCINATIONS---")
business_query = state['business_query']
landscape_asis = state['landscape_asis']
identified_asis = state['identified_asis']
generated_asis = [item.application for item in identified_asis] if identified_asis else []
score = hallucination_grader(str(landscape_asis),generated_asis)
if len(score)==0:
print("---DECISION: IDENTIFIED APPLICATION(s) IS GROUNDED IN LANDSCAPE ASIS---")
# Check question-answering
print("---GRADE GENERATION vs QUESTION---")
values = {"business_query": business_query, "application": identified_asis}
prompt = answer_grader()
model.max_tokens = set_max_new_tokens(get_predicted_num_tokens_from_prompt(model,prompt,values))
grader_chain = prompt | model
score = grader_chain.invoke(values)
extracted_answer = extract_structured_output(score.content)
if extracted_answer is not None: ##REVIEW PROMPT W/ LLAMA3.1-70B
grade = extracted_answer['binary_score']
else:
grade = "no"
if grade == "yes":
print("---DECISION: APPLICATION ADDRESSES BUSINESS QUERY---")
return "useful"
else:
print("---DECISION: APPLICATION DOES NOT ADDRESS BUSINESS QUERY---")
return "not useful"
else:
print("---DECISION: IDENTIFIED ASIS IS NOT GROUNDED IN LANDSCAPE ASIS, RE-TRY---")
print(f"---HALLUCINATIONS: {score}---")
return "not supported"
##Action-6 Analyse gap between current state and the desired future state - identified capabilities
def grade_landscape_asis_v_capability_gap(state:Task2State, config: RunnableConfig):
"""
Analyse any gap between existent applications and identified business capability to address the business query.
Args:
state (dict): The current graph state
Returns:
state (dict): Updates landscape_gap key with capability gap status
"""
# Get any user-provided configs - LLM model in use
configuration = AgentConfiguration.from_runnable_config(config)
model = get_llm_client(configuration.togaf_model, configuration.api_base_url)
print("---CHECK SUPPORT IDENTIFIED APP TO BUSINESS CAPABILITY---")
parser = PydanticOutputParser(pydantic_object=CapabilityGap)
hub_prompt = hub.pull('learn-it-all-do-it-all/ea4all_togaf_capability_gap')
hub_prompt = hub_prompt.partial(
format_instructions=parser.get_format_instructions(),
)
task_2_landscape_gap = hub_prompt | model | parser
#capability_gap_grader
if state['identified_asis']:
content = ';'.join(str(app) for app in state['identified_asis'])
else:
content = "No applications identified"
if state['biz_capability']:
capability = ', '.join(ast.literal_eval(str(state['biz_capability']))).replace("'", ", ")
else:
capability = "No business capabilities identified"
values = {
"application": content,
"capability": capability
}
model.max_tokens = set_max_new_tokens(get_predicted_num_tokens_from_prompt(model,hub_prompt,values))
extracted_gap = task_2_landscape_gap.invoke(input=values, config={"recursion_limit":configuration.recursion_limit})
for item in extracted_gap.capability_status:
print(f"---CAPABILITY: {item.capability} SUPPORT: {item.support}---")
return {
"messages": [AIMessage(content=str(state['messages']), name=state['next'])],
"landscape_gap": extracted_gap #landscape_gap.content
}
##TASK-2 Graph
task2_builder = StateGraph(Task2State)
# Define the nodes
task2_builder.add_node("assess_landscape", retrieve) # retrieve
task2_builder.add_node("grade_landscape_asis", grade_landscape_asis) # grade documents
task2_builder.add_node("generate", generate) # generate
task2_builder.add_node("transform_query", transform_query) # transform_query
task2_builder.add_node("grade_landscape_gap", grade_landscape_asis_v_capability_gap) #analyse asis gap
# Build graph
task2_builder.set_entry_point("assess_landscape")
task2_builder.add_edge("assess_landscape", "grade_landscape_asis")
task2_builder.add_conditional_edges(
"grade_landscape_asis",
decide_to_generate,
{
"transform_query": "transform_query",
"generate": "generate",
},
)
task2_builder.add_edge("transform_query", "assess_landscape")
task2_builder.add_conditional_edges(
"generate",
grade_generation_v_documents_and_question,
{
"not supported": "generate",
"useful": "grade_landscape_gap",
"not useful": "transform_query",
"no match": "grade_landscape_gap"
},
)
task2_builder.add_edge("grade_landscape_gap", END)
# Compile
task2_graph = task2_builder.compile()
task2_graph.name = "togaf_assess_current_landscape"