|
import ast |
|
|
|
|
|
from langchain_core.runnables import RunnableConfig |
|
|
|
from langchain_core.messages import ( |
|
AIMessage, |
|
) |
|
from langchain_core.output_parsers import PydanticOutputParser |
|
from langchain_core.prompts.chat import ChatPromptTemplate |
|
|
|
from langchain import hub |
|
|
|
from langgraph.graph import ( |
|
END, |
|
StateGraph, |
|
) |
|
|
|
from ea4all.src.ea4all_gra.configuration import AgentConfiguration |
|
from ea4all.src.ea4all_gra.data import ( |
|
CapabilityGap, |
|
GradeAnswer, |
|
GradeDocuments, |
|
LandscapeAsIs, |
|
) |
|
|
|
from ea4all.src.shared.utils import ( |
|
get_llm_client, |
|
extract_structured_output, |
|
extract_topic_from_business_input, |
|
set_max_new_tokens, |
|
get_predicted_num_tokens_from_prompt, |
|
) |
|
|
|
from ea4all.src.shared.prompts import ( |
|
LLAMA31_CHAT_PROMPT_FORMAT, |
|
LLAMA31_PROMPT_FORMAT, |
|
) |
|
|
|
from ea4all.src.shared import vectorstore |
|
|
|
from ea4all.src.ea4all_gra.togaf_task2.state import Task2State |
|
|
|
from ea4all.src.ea4all_apm.graph import get_retrieval_chain |
|
|
|
|
|
def retrieval_grader(model): |
|
|
|
structured_llm_grader = model.with_structured_output(GradeDocuments) |
|
|
|
|
|
system = """You are an enterprise architect grader assessing relevance of applications to address a business query. |
|
It does not need to be a stringent test. The objective is to filter out erroneous retrievals. |
|
If the application contains any keyword or semantic meaning related to the business query, grade it as relevant. |
|
Give a binary score 'yes' or 'no' score to indicate whether the document is relevant to the question.""" |
|
|
|
grade_prompt = ChatPromptTemplate.from_messages( |
|
[ |
|
("system", system), |
|
("ai", "Retrieved applications: \n\n {landscape_asis} \n\n Business Query: {business_query}"), |
|
] |
|
) |
|
|
|
grader = grade_prompt | structured_llm_grader |
|
|
|
return grader |
|
|
|
|
|
def gap_grader(model): |
|
|
|
gap_prompt = hub.pull("learn-it-all-do-it-all/ea4all_togaf_capability_gap") |
|
|
|
|
|
parser = PydanticOutputParser(pydantic_object=CapabilityGap) |
|
gap_prompt = gap_prompt.partial( |
|
format_instructions=parser.get_format_instructions(), |
|
) |
|
|
|
capability_gap_grader = gap_prompt | model | parser |
|
|
|
return capability_gap_grader |
|
|
|
|
|
def question_rewriter(model): |
|
|
|
rewrite_prompt = hub.pull("learn-it-all-do-it-all/ea4all_question_rewriter") |
|
rewrite_prompt = rewrite_prompt.partial(ai_output=LLAMA31_CHAT_PROMPT_FORMAT) |
|
|
|
rewriter = rewrite_prompt | model |
|
|
|
return rewriter |
|
|
|
|
|
def answer_grader(): |
|
|
|
answer_prompt = hub.pull('learn-it-all-do-it-all/ea4all_togaf_answer_grade') |
|
|
|
|
|
parser = PydanticOutputParser(pydantic_object=GradeAnswer) |
|
answer_prompt = answer_prompt.partial( |
|
format_instructions=parser.get_format_instructions(), |
|
ai_output = LLAMA31_PROMPT_FORMAT |
|
) |
|
|
|
return answer_prompt |
|
|
|
|
|
def hallucination_grader(asis, identified): |
|
|
|
|
|
|
|
grader_false = [] |
|
for d in identified: |
|
if d.lower() not in asis.lower(): |
|
grader_false.append(d) |
|
|
|
return grader_false |
|
|
|
|
|
async def retrieve(state:Task2State, config: RunnableConfig): |
|
""" |
|
Retrieve applications |
|
|
|
Args: |
|
state (dict): The current graph state |
|
|
|
Returns: |
|
state (dict): New key added to state, applications, that contains retrieved identified applications |
|
""" |
|
|
|
configuration = AgentConfiguration.from_runnable_config(config) |
|
|
|
print("---RETRIEVE---") |
|
business_query = state['business_query'] |
|
|
|
if not state.get( 'landscape_asis'): |
|
intent="" |
|
if state['messages']: |
|
intent = ','.join(ast.literal_eval(str(state['messages'][-1].content))['intent']).lower().replace("'", "") |
|
|
|
business_query=f"""What existent applications can be re-used {intent}?""" |
|
|
|
|
|
rag_input = 5 |
|
with vectorstore.make_retriever(config) as _retriever: |
|
retriever = _retriever |
|
|
|
retrieval = await get_retrieval_chain(rag_input,"ea4all_agent",business_query,retriever, config) |
|
|
|
landscape_asis = await retrieval.ainvoke( |
|
{"standalone_question": business_query}, |
|
config={"recursion_limit":configuration.ea4all_recursion_limit}) |
|
|
|
|
|
content = ';'.join(asis.page_content.strip() for asis in landscape_asis) |
|
|
|
name = state['next'] |
|
|
|
|
|
return { |
|
"messages": [AIMessage(content=content, name=name)], |
|
"landscape_asis": landscape_asis, |
|
"business_query": business_query |
|
} |
|
|
|
|
|
def grade_landscape_asis(state:Task2State, config: RunnableConfig): |
|
""" |
|
Determines whether an application is relevant to address a business query. |
|
|
|
Args: |
|
state (dict): The current graph state |
|
|
|
Returns: |
|
state (dict): Updates landscape_asis key with only filtered relevant applications |
|
""" |
|
|
|
print("---CHECK DOCUMENT RELEVANCE TO BUSINESS QUERY---") |
|
business_query = state.get('business_query') |
|
landscape_asis = state.get('landscape_asis') |
|
|
|
|
|
filtered_docs = [] |
|
if landscape_asis is not None: |
|
for d in landscape_asis: |
|
|
|
application = d.metadata['source'] |
|
capability = d.metadata['capability'] |
|
description = d.metadata['description'] |
|
business_fit = d.metadata['business fit'] |
|
roadmap = d.metadata['roadmap'] |
|
asis = f"Application:{application}; Capability:{capability}; Description:{description};Business fit: {business_fit}; Roadmap: {roadmap};" |
|
|
|
filtered_docs.append(asis) |
|
|
|
return { |
|
|
|
"business_query": business_query, |
|
"landscape_asis": landscape_asis, |
|
"identified_asis": filtered_docs |
|
} |
|
|
|
|
|
def decide_to_generate(state:Task2State, config: RunnableConfig): |
|
""" |
|
Determines whether to generate an answer, or re-generate a question. |
|
|
|
Args: |
|
state (dict): The current graph state |
|
|
|
Returns: |
|
str: Binary decision for next node to call |
|
""" |
|
|
|
print("---ASSESS GRADED APPLICATIONS---") |
|
filtered_applications = state['identified_asis'] |
|
|
|
if not filtered_applications: |
|
|
|
|
|
print( |
|
"---DECISION: ALL APPLICATIONS ARE NOT RELEVANT TO BUSINESS QUERY, TRANSFORM QUERY---" |
|
) |
|
return "transform_query" |
|
else: |
|
|
|
print("---DECISION: GENERATE---") |
|
return "generate" |
|
|
|
|
|
def generate(state:Task2State, config: RunnableConfig): |
|
""" |
|
Generate answer |
|
|
|
Args: |
|
state (dict): The current graph state |
|
|
|
Returns: |
|
state (dict): New key |
|
added to state, identified_asis, that contains LLM generation |
|
""" |
|
configuration = AgentConfiguration.from_runnable_config(config) |
|
model = get_llm_client(configuration.togaf_model, configuration.api_base_url) |
|
|
|
print("---GENERATE---") |
|
landscape_asis = state['landscape_asis'] |
|
|
|
values = { |
|
"business_query": state['business_query'], |
|
"applications": state['identified_asis'] |
|
} |
|
|
|
parser = PydanticOutputParser(pydantic_object=LandscapeAsIs) |
|
|
|
hub_prompt = hub.pull('learn-it-all-do-it-all/ea4all_togaf_landscape_business_query') |
|
hub_prompt = hub_prompt.partial( |
|
format_instructions=parser.get_format_instructions(), |
|
) |
|
|
|
model.max_tokens = set_max_new_tokens(get_predicted_num_tokens_from_prompt(model,hub_prompt,values)) |
|
|
|
task_2_generate = hub_prompt | model | parser |
|
generated_asis = task_2_generate.invoke(input=values, config={"recursion_limit":configuration.recursion_limit}) |
|
|
|
name = state['next'] |
|
|
|
return { |
|
"messages": [AIMessage(content=str(generated_asis.identified_asis), name=name)], |
|
"landscape_asis": landscape_asis, |
|
"business_query": state['business_query'], |
|
"identified_asis": generated_asis.identified_asis |
|
} |
|
|
|
|
|
def transform_query(state:Task2State, config: RunnableConfig): |
|
""" |
|
Transform the query to produce a better question. |
|
|
|
Args: |
|
state (dict): The current graph state |
|
|
|
Returns: |
|
state (dict): Updates question key with a re-phrased question |
|
""" |
|
|
|
configuration = AgentConfiguration.from_runnable_config(config) |
|
model = get_llm_client(configuration.togaf_model, configuration.api_base_url) |
|
|
|
print("---TRANSFORM QUERY---") |
|
business_query = state['business_query'] |
|
|
|
|
|
response = question_rewriter(model).invoke( |
|
{"user_question": business_query, "target": "vectorstore"}, |
|
) |
|
|
|
generated_question = extract_topic_from_business_input(response.content) |
|
better_question = generated_question['rephrased'] |
|
|
|
if better_question == None: better_question = business_query |
|
|
|
name = state['next'] |
|
|
|
return { |
|
"messages": [AIMessage(content=better_question, name=name)], |
|
"business_query": better_question |
|
} |
|
|
|
|
|
def grade_generation_v_documents_and_question(state:Task2State, config: RunnableConfig): |
|
""" |
|
Determines whether the generation is grounded in the landscape_asis and answers business query. |
|
|
|
Args: |
|
state (dict): The current graph state |
|
|
|
Returns: |
|
str: Decision for next node to call |
|
""" |
|
|
|
|
|
configuration = AgentConfiguration.from_runnable_config(config) |
|
model = get_llm_client(configuration.togaf_model, configuration.api_base_url) |
|
|
|
if state.get('remaining_steps') <= 2: |
|
return "no match" |
|
|
|
print("---CHECK HALLUCINATIONS---") |
|
business_query = state['business_query'] |
|
landscape_asis = state['landscape_asis'] |
|
identified_asis = state['identified_asis'] |
|
generated_asis = [item.application for item in identified_asis] if identified_asis else [] |
|
|
|
score = hallucination_grader(str(landscape_asis),generated_asis) |
|
|
|
if len(score)==0: |
|
print("---DECISION: IDENTIFIED APPLICATION(s) IS GROUNDED IN LANDSCAPE ASIS---") |
|
|
|
print("---GRADE GENERATION vs QUESTION---") |
|
|
|
values = {"business_query": business_query, "application": identified_asis} |
|
prompt = answer_grader() |
|
model.max_tokens = set_max_new_tokens(get_predicted_num_tokens_from_prompt(model,prompt,values)) |
|
grader_chain = prompt | model |
|
score = grader_chain.invoke(values) |
|
extracted_answer = extract_structured_output(score.content) |
|
if extracted_answer is not None: |
|
grade = extracted_answer['binary_score'] |
|
else: |
|
grade = "no" |
|
|
|
if grade == "yes": |
|
print("---DECISION: APPLICATION ADDRESSES BUSINESS QUERY---") |
|
return "useful" |
|
else: |
|
print("---DECISION: APPLICATION DOES NOT ADDRESS BUSINESS QUERY---") |
|
return "not useful" |
|
else: |
|
print("---DECISION: IDENTIFIED ASIS IS NOT GROUNDED IN LANDSCAPE ASIS, RE-TRY---") |
|
print(f"---HALLUCINATIONS: {score}---") |
|
return "not supported" |
|
|
|
|
|
def grade_landscape_asis_v_capability_gap(state:Task2State, config: RunnableConfig): |
|
""" |
|
Analyse any gap between existent applications and identified business capability to address the business query. |
|
|
|
Args: |
|
state (dict): The current graph state |
|
|
|
Returns: |
|
state (dict): Updates landscape_gap key with capability gap status |
|
""" |
|
|
|
|
|
configuration = AgentConfiguration.from_runnable_config(config) |
|
model = get_llm_client(configuration.togaf_model, configuration.api_base_url) |
|
|
|
print("---CHECK SUPPORT IDENTIFIED APP TO BUSINESS CAPABILITY---") |
|
|
|
parser = PydanticOutputParser(pydantic_object=CapabilityGap) |
|
|
|
hub_prompt = hub.pull('learn-it-all-do-it-all/ea4all_togaf_capability_gap') |
|
hub_prompt = hub_prompt.partial( |
|
format_instructions=parser.get_format_instructions(), |
|
) |
|
task_2_landscape_gap = hub_prompt | model | parser |
|
|
|
|
|
if state['identified_asis']: |
|
content = ';'.join(str(app) for app in state['identified_asis']) |
|
else: |
|
content = "No applications identified" |
|
|
|
if state['biz_capability']: |
|
capability = ', '.join(ast.literal_eval(str(state['biz_capability']))).replace("'", ", ") |
|
else: |
|
capability = "No business capabilities identified" |
|
|
|
values = { |
|
"application": content, |
|
"capability": capability |
|
} |
|
|
|
model.max_tokens = set_max_new_tokens(get_predicted_num_tokens_from_prompt(model,hub_prompt,values)) |
|
|
|
extracted_gap = task_2_landscape_gap.invoke(input=values, config={"recursion_limit":configuration.recursion_limit}) |
|
|
|
for item in extracted_gap.capability_status: |
|
print(f"---CAPABILITY: {item.capability} SUPPORT: {item.support}---") |
|
|
|
return { |
|
"messages": [AIMessage(content=str(state['messages']), name=state['next'])], |
|
"landscape_gap": extracted_gap |
|
} |
|
|
|
|
|
task2_builder = StateGraph(Task2State) |
|
|
|
|
|
task2_builder.add_node("assess_landscape", retrieve) |
|
task2_builder.add_node("grade_landscape_asis", grade_landscape_asis) |
|
task2_builder.add_node("generate", generate) |
|
task2_builder.add_node("transform_query", transform_query) |
|
task2_builder.add_node("grade_landscape_gap", grade_landscape_asis_v_capability_gap) |
|
|
|
|
|
task2_builder.set_entry_point("assess_landscape") |
|
|
|
task2_builder.add_edge("assess_landscape", "grade_landscape_asis") |
|
task2_builder.add_conditional_edges( |
|
"grade_landscape_asis", |
|
decide_to_generate, |
|
{ |
|
"transform_query": "transform_query", |
|
"generate": "generate", |
|
}, |
|
) |
|
task2_builder.add_edge("transform_query", "assess_landscape") |
|
task2_builder.add_conditional_edges( |
|
"generate", |
|
grade_generation_v_documents_and_question, |
|
{ |
|
"not supported": "generate", |
|
"useful": "grade_landscape_gap", |
|
"not useful": "transform_query", |
|
"no match": "grade_landscape_gap" |
|
}, |
|
) |
|
|
|
task2_builder.add_edge("grade_landscape_gap", END) |
|
|
|
|
|
task2_graph = task2_builder.compile() |
|
task2_graph.name = "togaf_assess_current_landscape" |
|
|