avfranco's picture
ea4all-mcp-lgs-sync-UAT-passed
4a6af9d
"""
This module contains the implementation of a Togaf reference architecture graph.
The graph represents a workflow for managing a conversation between team members
in the context of Togaf, a framework for enterprise architecture development.
The graph is defined using the StateGraph class from the langgraph library.
It consists of several nodes, each representing a specific task or action in the workflow.
The nodes are connected by edges, which control the flow of logic through the program.
The main entry point of the graph is the "ask_human" node, which prompts the user to provide
a business requirement document/file name. The input is then passed to the "enter_graph" node,
which initializes the state of the graph with the provided input.
The graph then proceeds to the "query_grader" node, which evaluates the quality of the business query.
Based on the evaluation, the graph branches to different nodes, such as "assess_query", "assess_asis",
and "generate_tobe", each representing a different task in the Togaf workflow.
The "togaf_supervisor" node acts as a router, determining the next role to act based on the conversation
and instructions. It uses an LLM (Learned Language Model) model to make the decision.
The graph continues to execute the tasks until it reaches the "return" node, which generates a response
to be returned to the user.
The graph is compiled and saved as a Togaf_reference_architecture_graph object, which can be executed
to run the workflow.
The module also includes helper functions and utility classes used by the graph, as well as import statements
for required libraries and modules.
"""
"""Changelog: 20250609
- Refactored State classes to OverallState, InputState, OutputState
- Task-1, Task-2, Task-3 State classes changed to TypedDicts
- Review what's best content to provide Retrieve with requirements or intent
#20250614
- ask_human input changed to business_query from user_feedback key
- enter_graph: (line 358) business_query = state.get('business_query') needs to be reviewed when using w/ Gradio (build & MCP versions)
- assess_asis: 'Recursion limit of 25 reached without hitting a stop condition'
#20250615
- Recursion limit fixed w/ RemainingSteps, https://langchain-ai.github.io/langgraph/how-tos/graph-api/?h=remainingsteps
"""
#core libraries
from langchain_core.runnables import RunnableConfig
from langchain_core.prompts.chat import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.messages import (
AIMessage,
HumanMessage,
)
from langchain_core.output_parsers import (
JsonOutputKeyToolsParser
)
from langgraph.graph import (
END,
StateGraph,
)
from langgraph.types import Command, interrupt
from langchain import hub
import functools
from typing import List, Union, Dict
from typing_extensions import Literal
from ea4all.src.ea4all_gra.configuration import AgentConfiguration
from ea4all.src.ea4all_gra.state import OverallState, InputState, OutputState
from ea4all.src.ea4all_gra.data import (
GradeBusinessQueryAnswer
)
from ea4all.src.shared.utils import (
get_llm_client,
clean_and_load_json,
extract_response_from_backticks,
load_mock_content,
)
from ea4all.src.shared.prompts import LLAMA31_PROMPT_FORMAT
from ea4all.src.ea4all_gra.togaf_task1.graph import task1_graph
from ea4all.src.ea4all_gra.togaf_task2.graph import task2_graph
from ea4all.src.ea4all_gra.togaf_task3.graph import task3_graph
from ea4all.src.ea4all_gra.utils import (
AsyncInterruptHandler
)
#CAPTURE business requirement asking for USER input & call togad_agentic workflow
async def _get_user_input():
interrupt_handler = AsyncInterruptHandler()
result = await interrupt_handler.handle_interrupt()
return {"user_feedback": result}
async def togaf_ask_human(state: OverallState, config: RunnableConfig):
# Check user_input method
configuration = AgentConfiguration.from_runnable_config(config)
if "interrupt" in (AgentConfiguration.ea4all_ask_human, configuration.ea4all_ask_human):
print("--- TOGAF Blueprint Team --- User input requested")
response = interrupt(
{
"task": state.get('business_query'),
"content": "Please provide your business requirement in the form of document/file name or past the content:",
"optional": False
},
)
print(f"--- TOGAF AGENTIC team --- got an answer and processing user input: {response}")
business_query = load_mock_content(response['business_query'])
else:
business_query = state.get('business_query')
return Command(update={
"business_query": business_query,
}
)
#DEFINE Helper functions
def create_team_supervisor(
state: OverallState,
config:RunnableConfig):
members = ["AssessBusinessQuery", "AssessLandscape", "GenerateToBe"] #NEEDS REFACTORING
"""An LLM-based router."""
options = ["FINISH"] + members
function_def = {
"name": "route",
"description": "Select the next role.",
"parameters": {
"title": "routeSchema",
"type": "object",
"properties": {
"next": {
"type": "string",
"title": "Next",
"anyOf": [
{"enum": options},
],
},
},
"required": ["next"],
},
}
configuration = AgentConfiguration.from_runnable_config(config)
model = get_llm_client(
configuration.supervisor_model,
api_base_url="",
)
system_prompt = " ".join([
'You are a supervisor tasked with managing a conversation between the',
'following team members: {team_members}. Respond with the worker to act next in sequence.',
'Each worker will perform a task and respond with their results and status.',
'After last worker is finished,respond with FINISH.']
)
prompt = ChatPromptTemplate.from_messages(
[
("system", system_prompt),
MessagesPlaceholder(variable_name="messages"),
(
"system",
"Based on the above conversation and instructions who should act next."
"Or should we FINISH?. Select one of: {options}.",
),
]
).partial(options=str(options),team_members=", ".join(members))
return (
prompt
| model.bind_tools(tools=[function_def], tool_choice="route")
| JsonOutputKeyToolsParser(key_name='route', first_tool_only=True)
)
# The following functions interoperate between the top level graph state
# and the state of the sub-graph
# this makes it so that the states of each graph don't get intermixed
def task1_enter_chain(state:OverallState, members: List[str]) -> dict:
results = {
"messages": [AIMessage(content=str(state))],
"team_members": ", ".join(members),
"business_query": state.get('business_query'),
"next": state.get('next'),
}
return results
def task2_enter_chain(state:OverallState, members: List[str]):
results = {
"messages": [AIMessage(content=str(state))],
"team_members": ", ".join(members),
"business_query": state.get('business_query'),
"intent": state.get('intent'),
"stakeholder": state.get('stakeholder'),
"biz_capability": state.get('biz_capability'),
"requirement": state.get('requirement'),
"userjourney": state.get('userjourney'),
"next": state.get('next')
}
return results
def task3_enter_chain(state:OverallState, members: List[str]):
results = {
"messages": [AIMessage(content=str(state))],
"team_members": ", ".join(members),
"business_query": state.get('business_query'),
"intent": state.get('intent'),
"stakeholder": state.get('stakeholder'),
"biz_capability": state.get('biz_capability'),
"requirement": state.get('requirement'),
"userjourney": state.get('userjourney'),
"landscape_asis": state.get('landscape_asis'),
"identified_asis": state.get('identified_asis'),
"landscape_gap": state.get('landscape_gap'),
"next": state.get('next'),
}
return results
def get_last_message(state: OverallState) -> dict:
results = {}
#results['messages'] = [state.get('messages')[-1].content]
results['next'] = state.get('next')
if state.get('business_query'):
results['business_query'] = state.get('business_query')
if state.get('principles'):
results['principles'] = state.get('principles')
if state.get('intent'):
results['intent'] = state.get('intent')
if state.get('stakeholder'):
results['stakeholder'] = state.get('stakeholder')
if state.get('biz_capability'):
results['biz_capability'] = state.get('biz_capability')
if state.get('requirement'):
results['requirement'] = state.get('requirement')
if state.get('userjourney'):
results['userjourney'] = state.get('userjourney')
if state.get('landscape_asis'):
results['landscape_asis'] = state.get('landscape_asis')
if state.get('identified_asis'):
results['identified_asis'] = state.get('identified_asis')
if state.get('landscape_gap'):
results['landscape_gap'] = state.get('landscape_gap')
if state.get('vision_target'):
results['vision_target'] = state.get('vision_target')
if state.get('architecture_runway'):
results['architecture_runway'] = state.get('architecture_runway')
return results
def join_graph(state: OverallState) -> dict:
results = {}
#results['messages'] = [state.get('business_query')]
results['next'] = state.get('next')
if state.get('business_query'):
results['business_query'] = state.get('business_query')
if state.get('principles'):
results['principles'] = state.get('principles')
if state.get('intent'):
results['intent'] = state.get('intent')
if state.get('stakeholder'):
results['stakeholder'] = state.get('stakeholder')
if state.get('biz_capability'):
results['biz_capability'] = state.get('biz_capability')
if state.get('requirement'):
results['requirement'] = state.get('requirement')
if state.get('userjourney'):
results['userjourney'] = state.get('userjourney')
if state.get('landscape_asis'):
results['landscape_asis'] = state.get('landscape_asis')
if state.get('identified_asis'):
results['identified_asis'] = state.get('identified_asis')
if state.get('landscape_gap'):
results['landscape_gap'] = state.get('identified_asis')
if state.get('vision_target'):
results['vision_target'] = state.get('vision_target')
if state.get('architecture_runway'):
results['architecture_runway'] = state.get('architecture_runway')
return results
##Refactored to use Command instead of conditional_edge
async def business_query_grader(state:OverallState, config:RunnableConfig) -> Command[Literal["assess_query", "return"]]:
print(f"--- TOGAF AGENTIC team --- safety/quality review of the user requirement: {state.get('business_query')}")
business_query = state.get('business_query')
#if len(business_query) < 50:
# return Command(
# # state update
# update={"query_status": False},
# # control flow
# goto="return",
# )
# Prompt
grader_prompt = hub.pull('learn-it-all-do-it-all/ea4all_business_query_grader')
# Set up a parser:
#parser = PydanticOutputParser(pydantic_object=GradeBusinessQueryAnswer)
#grader_prompt = grader_prompt.partial(
# format_instructions=parser.get_format_instructions(),
# ai_output = LLAMA31_PROMPT_FORMAT,
#)
# Get any user-provided configs - LLM model in use
configuration = AgentConfiguration.from_runnable_config(config)
model = get_llm_client(configuration.togaf_model, configuration.api_base_url)
grader = grader_prompt | model
response = await grader.ainvoke(
{"business_query": state.get('business_query')}
)
binary_score = clean_and_load_json(extract_response_from_backticks(response.content))['binary_score']
messages = [
HumanMessage(content=state.get('business_query')),
]
if binary_score == "yes":
return Command(
# state update
update={"query_status": True, "messages": messages},
# control flow
goto="assess_query",
)
else:
return Command(
# state update
update={"query_status": False},
# control flow
goto="return",
)
def return_2user(state:OverallState):
message = '{"binary_score":"no"}'
return {
"messages": [AIMessage(content=str(message), name="return")],
"next": "end",
}
async def enter_graph(state:dict, config: RunnableConfig) -> dict:
business_query = state['messages'][-1].content if not isinstance(state['messages'][-1].content, List) else state['messages'][-1].content[0]['text']
print(f"--- Entered TOGAF AGENTIC team to --- {business_query}") #state.get('business_query')[-1].content
#if isinstance(state, dict):
# user_feedback = state.get('user_feedback') if state.get('user_feedback') else state['messages'][-1].content
#else:
# user_feedback = state.get('user_feedback', state['messages'][-1].content)
#busines_query = load_mock_content(state.get('user_feedback')),
#business_query = state.get('business_query')
#business_query = state['business_query'][-1]['content']
return {"business_query": business_query}
## TOGAF Orchestrator Graph
task1_business_query_chain = (
functools.partial(task1_enter_chain, members=list(task1_graph.nodes))
| task1_graph
)
task2_assess_asis_chain = (
functools.partial(task2_enter_chain, members=list(task2_graph.nodes))
| task2_graph
)
task3_vision_target_chain = (
functools.partial(task3_enter_chain, members=list(task3_graph.nodes))
| task3_graph
)
# Define the graph.
workflow = StateGraph(OverallState, input=InputState, output=OutputState, config_schema=AgentConfiguration)
# First add the nodes, which will do the work
workflow.add_node("enter_graph", enter_graph)
workflow.add_node("ask_human", togaf_ask_human)
workflow.add_node("query_grader", business_query_grader)
workflow.add_node("assess_query", get_last_message | task1_business_query_chain | join_graph)
workflow.add_node("assess_asis", get_last_message | task2_assess_asis_chain | join_graph)
workflow.add_node("generate_tobe", get_last_message | task3_vision_target_chain | join_graph)
workflow.add_node("return", return_2user)
# Define the graph connections, which controls how the logic
# propagates through the program
#workflow.add_conditional_edges(
# "togaf_supervisor",
# lambda x: x["next"],
# {
# "AssessBusinessQuery": "assess_query",
# "AssessLandscape": "assess_asis",
# "GenerateToBe": "generate_tobe",
# "FINISH": END,
# },
#)
workflow.add_edge("enter_graph", "ask_human")
workflow.add_edge("ask_human", "query_grader")
workflow.add_edge("assess_query", "assess_asis")
workflow.add_edge("assess_asis", "generate_tobe")
workflow.add_edge("generate_tobe", END)
workflow.add_edge("return", END)
workflow.set_entry_point("enter_graph")
#memory = MemorySaver()
togaf_graph = workflow.compile() #checkpointer=memory)
togaf_graph.name = "ea4all_architecture_blueprint"