|
""" |
|
This module contains the implementation of a Togaf reference architecture graph. |
|
The graph represents a workflow for managing a conversation between team members |
|
in the context of Togaf, a framework for enterprise architecture development. |
|
|
|
The graph is defined using the StateGraph class from the langgraph library. |
|
It consists of several nodes, each representing a specific task or action in the workflow. |
|
The nodes are connected by edges, which control the flow of logic through the program. |
|
|
|
The main entry point of the graph is the "ask_human" node, which prompts the user to provide |
|
a business requirement document/file name. The input is then passed to the "enter_graph" node, |
|
which initializes the state of the graph with the provided input. |
|
|
|
The graph then proceeds to the "query_grader" node, which evaluates the quality of the business query. |
|
Based on the evaluation, the graph branches to different nodes, such as "assess_query", "assess_asis", |
|
and "generate_tobe", each representing a different task in the Togaf workflow. |
|
|
|
The "togaf_supervisor" node acts as a router, determining the next role to act based on the conversation |
|
and instructions. It uses an LLM (Learned Language Model) model to make the decision. |
|
|
|
The graph continues to execute the tasks until it reaches the "return" node, which generates a response |
|
to be returned to the user. |
|
|
|
The graph is compiled and saved as a Togaf_reference_architecture_graph object, which can be executed |
|
to run the workflow. |
|
|
|
The module also includes helper functions and utility classes used by the graph, as well as import statements |
|
for required libraries and modules. |
|
""" |
|
|
|
"""Changelog: 20250609 |
|
- Refactored State classes to OverallState, InputState, OutputState |
|
- Task-1, Task-2, Task-3 State classes changed to TypedDicts |
|
- Review what's best content to provide Retrieve with requirements or intent |
|
#20250614 |
|
- ask_human input changed to business_query from user_feedback key |
|
- enter_graph: (line 358) business_query = state.get('business_query') needs to be reviewed when using w/ Gradio (build & MCP versions) |
|
- assess_asis: 'Recursion limit of 25 reached without hitting a stop condition' |
|
#20250615 |
|
- Recursion limit fixed w/ RemainingSteps, https://langchain-ai.github.io/langgraph/how-tos/graph-api/?h=remainingsteps |
|
""" |
|
|
|
|
|
from langchain_core.runnables import RunnableConfig |
|
from langchain_core.prompts.chat import ChatPromptTemplate, MessagesPlaceholder |
|
from langchain_core.prompts import ChatPromptTemplate |
|
from langchain_core.messages import ( |
|
AIMessage, |
|
HumanMessage, |
|
) |
|
from langchain_core.output_parsers import ( |
|
JsonOutputKeyToolsParser |
|
) |
|
from langgraph.graph import ( |
|
END, |
|
StateGraph, |
|
) |
|
from langgraph.types import Command, interrupt |
|
|
|
from langchain import hub |
|
|
|
import functools |
|
|
|
from typing import List, Union, Dict |
|
from typing_extensions import Literal |
|
|
|
from ea4all.src.ea4all_gra.configuration import AgentConfiguration |
|
from ea4all.src.ea4all_gra.state import OverallState, InputState, OutputState |
|
from ea4all.src.ea4all_gra.data import ( |
|
GradeBusinessQueryAnswer |
|
) |
|
|
|
from ea4all.src.shared.utils import ( |
|
get_llm_client, |
|
clean_and_load_json, |
|
extract_response_from_backticks, |
|
load_mock_content, |
|
) |
|
from ea4all.src.shared.prompts import LLAMA31_PROMPT_FORMAT |
|
|
|
from ea4all.src.ea4all_gra.togaf_task1.graph import task1_graph |
|
from ea4all.src.ea4all_gra.togaf_task2.graph import task2_graph |
|
from ea4all.src.ea4all_gra.togaf_task3.graph import task3_graph |
|
|
|
from ea4all.src.ea4all_gra.utils import ( |
|
AsyncInterruptHandler |
|
) |
|
|
|
|
|
async def _get_user_input(): |
|
|
|
interrupt_handler = AsyncInterruptHandler() |
|
result = await interrupt_handler.handle_interrupt() |
|
|
|
return {"user_feedback": result} |
|
|
|
async def togaf_ask_human(state: OverallState, config: RunnableConfig): |
|
|
|
configuration = AgentConfiguration.from_runnable_config(config) |
|
|
|
if "interrupt" in (AgentConfiguration.ea4all_ask_human, configuration.ea4all_ask_human): |
|
print("--- TOGAF Blueprint Team --- User input requested") |
|
response = interrupt( |
|
{ |
|
"task": state.get('business_query'), |
|
"content": "Please provide your business requirement in the form of document/file name or past the content:", |
|
"optional": False |
|
}, |
|
) |
|
|
|
print(f"--- TOGAF AGENTIC team --- got an answer and processing user input: {response}") |
|
|
|
business_query = load_mock_content(response['business_query']) |
|
else: |
|
business_query = state.get('business_query') |
|
|
|
return Command(update={ |
|
"business_query": business_query, |
|
} |
|
) |
|
|
|
|
|
def create_team_supervisor( |
|
state: OverallState, |
|
config:RunnableConfig): |
|
members = ["AssessBusinessQuery", "AssessLandscape", "GenerateToBe"] |
|
|
|
"""An LLM-based router.""" |
|
options = ["FINISH"] + members |
|
function_def = { |
|
"name": "route", |
|
"description": "Select the next role.", |
|
"parameters": { |
|
"title": "routeSchema", |
|
"type": "object", |
|
"properties": { |
|
"next": { |
|
"type": "string", |
|
"title": "Next", |
|
"anyOf": [ |
|
{"enum": options}, |
|
], |
|
}, |
|
}, |
|
"required": ["next"], |
|
}, |
|
} |
|
|
|
configuration = AgentConfiguration.from_runnable_config(config) |
|
model = get_llm_client( |
|
configuration.supervisor_model, |
|
api_base_url="", |
|
) |
|
|
|
system_prompt = " ".join([ |
|
'You are a supervisor tasked with managing a conversation between the', |
|
'following team members: {team_members}. Respond with the worker to act next in sequence.', |
|
'Each worker will perform a task and respond with their results and status.', |
|
'After last worker is finished,respond with FINISH.'] |
|
) |
|
|
|
prompt = ChatPromptTemplate.from_messages( |
|
[ |
|
("system", system_prompt), |
|
MessagesPlaceholder(variable_name="messages"), |
|
( |
|
"system", |
|
"Based on the above conversation and instructions who should act next." |
|
"Or should we FINISH?. Select one of: {options}.", |
|
), |
|
] |
|
).partial(options=str(options),team_members=", ".join(members)) |
|
|
|
return ( |
|
prompt |
|
| model.bind_tools(tools=[function_def], tool_choice="route") |
|
| JsonOutputKeyToolsParser(key_name='route', first_tool_only=True) |
|
) |
|
|
|
|
|
|
|
|
|
def task1_enter_chain(state:OverallState, members: List[str]) -> dict: |
|
results = { |
|
"messages": [AIMessage(content=str(state))], |
|
"team_members": ", ".join(members), |
|
"business_query": state.get('business_query'), |
|
"next": state.get('next'), |
|
} |
|
return results |
|
|
|
def task2_enter_chain(state:OverallState, members: List[str]): |
|
results = { |
|
"messages": [AIMessage(content=str(state))], |
|
"team_members": ", ".join(members), |
|
"business_query": state.get('business_query'), |
|
"intent": state.get('intent'), |
|
"stakeholder": state.get('stakeholder'), |
|
"biz_capability": state.get('biz_capability'), |
|
"requirement": state.get('requirement'), |
|
"userjourney": state.get('userjourney'), |
|
"next": state.get('next') |
|
} |
|
return results |
|
|
|
def task3_enter_chain(state:OverallState, members: List[str]): |
|
results = { |
|
"messages": [AIMessage(content=str(state))], |
|
"team_members": ", ".join(members), |
|
"business_query": state.get('business_query'), |
|
"intent": state.get('intent'), |
|
"stakeholder": state.get('stakeholder'), |
|
"biz_capability": state.get('biz_capability'), |
|
"requirement": state.get('requirement'), |
|
"userjourney": state.get('userjourney'), |
|
"landscape_asis": state.get('landscape_asis'), |
|
"identified_asis": state.get('identified_asis'), |
|
"landscape_gap": state.get('landscape_gap'), |
|
"next": state.get('next'), |
|
} |
|
return results |
|
|
|
def get_last_message(state: OverallState) -> dict: |
|
results = {} |
|
|
|
results['next'] = state.get('next') |
|
if state.get('business_query'): |
|
results['business_query'] = state.get('business_query') |
|
if state.get('principles'): |
|
results['principles'] = state.get('principles') |
|
if state.get('intent'): |
|
results['intent'] = state.get('intent') |
|
if state.get('stakeholder'): |
|
results['stakeholder'] = state.get('stakeholder') |
|
if state.get('biz_capability'): |
|
results['biz_capability'] = state.get('biz_capability') |
|
if state.get('requirement'): |
|
results['requirement'] = state.get('requirement') |
|
if state.get('userjourney'): |
|
results['userjourney'] = state.get('userjourney') |
|
if state.get('landscape_asis'): |
|
results['landscape_asis'] = state.get('landscape_asis') |
|
if state.get('identified_asis'): |
|
results['identified_asis'] = state.get('identified_asis') |
|
if state.get('landscape_gap'): |
|
results['landscape_gap'] = state.get('landscape_gap') |
|
if state.get('vision_target'): |
|
results['vision_target'] = state.get('vision_target') |
|
if state.get('architecture_runway'): |
|
results['architecture_runway'] = state.get('architecture_runway') |
|
|
|
return results |
|
|
|
def join_graph(state: OverallState) -> dict: |
|
results = {} |
|
|
|
results['next'] = state.get('next') |
|
if state.get('business_query'): |
|
results['business_query'] = state.get('business_query') |
|
if state.get('principles'): |
|
results['principles'] = state.get('principles') |
|
if state.get('intent'): |
|
results['intent'] = state.get('intent') |
|
if state.get('stakeholder'): |
|
results['stakeholder'] = state.get('stakeholder') |
|
if state.get('biz_capability'): |
|
results['biz_capability'] = state.get('biz_capability') |
|
if state.get('requirement'): |
|
results['requirement'] = state.get('requirement') |
|
if state.get('userjourney'): |
|
results['userjourney'] = state.get('userjourney') |
|
if state.get('landscape_asis'): |
|
results['landscape_asis'] = state.get('landscape_asis') |
|
if state.get('identified_asis'): |
|
results['identified_asis'] = state.get('identified_asis') |
|
if state.get('landscape_gap'): |
|
results['landscape_gap'] = state.get('identified_asis') |
|
if state.get('vision_target'): |
|
results['vision_target'] = state.get('vision_target') |
|
if state.get('architecture_runway'): |
|
results['architecture_runway'] = state.get('architecture_runway') |
|
|
|
return results |
|
|
|
|
|
async def business_query_grader(state:OverallState, config:RunnableConfig) -> Command[Literal["assess_query", "return"]]: |
|
print(f"--- TOGAF AGENTIC team --- safety/quality review of the user requirement: {state.get('business_query')}") |
|
business_query = state.get('business_query') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
grader_prompt = hub.pull('learn-it-all-do-it-all/ea4all_business_query_grader') |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
configuration = AgentConfiguration.from_runnable_config(config) |
|
model = get_llm_client(configuration.togaf_model, configuration.api_base_url) |
|
|
|
grader = grader_prompt | model |
|
|
|
response = await grader.ainvoke( |
|
{"business_query": state.get('business_query')} |
|
) |
|
|
|
binary_score = clean_and_load_json(extract_response_from_backticks(response.content))['binary_score'] |
|
|
|
messages = [ |
|
HumanMessage(content=state.get('business_query')), |
|
] |
|
|
|
if binary_score == "yes": |
|
return Command( |
|
|
|
update={"query_status": True, "messages": messages}, |
|
|
|
goto="assess_query", |
|
) |
|
else: |
|
return Command( |
|
|
|
update={"query_status": False}, |
|
|
|
goto="return", |
|
) |
|
|
|
def return_2user(state:OverallState): |
|
message = '{"binary_score":"no"}' |
|
|
|
return { |
|
"messages": [AIMessage(content=str(message), name="return")], |
|
"next": "end", |
|
} |
|
|
|
async def enter_graph(state:dict, config: RunnableConfig) -> dict: |
|
|
|
business_query = state['messages'][-1].content if not isinstance(state['messages'][-1].content, List) else state['messages'][-1].content[0]['text'] |
|
|
|
print(f"--- Entered TOGAF AGENTIC team to --- {business_query}") |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
return {"business_query": business_query} |
|
|
|
|
|
task1_business_query_chain = ( |
|
functools.partial(task1_enter_chain, members=list(task1_graph.nodes)) |
|
| task1_graph |
|
) |
|
|
|
task2_assess_asis_chain = ( |
|
functools.partial(task2_enter_chain, members=list(task2_graph.nodes)) |
|
| task2_graph |
|
) |
|
|
|
task3_vision_target_chain = ( |
|
functools.partial(task3_enter_chain, members=list(task3_graph.nodes)) |
|
| task3_graph |
|
) |
|
|
|
|
|
workflow = StateGraph(OverallState, input=InputState, output=OutputState, config_schema=AgentConfiguration) |
|
|
|
workflow.add_node("enter_graph", enter_graph) |
|
workflow.add_node("ask_human", togaf_ask_human) |
|
workflow.add_node("query_grader", business_query_grader) |
|
workflow.add_node("assess_query", get_last_message | task1_business_query_chain | join_graph) |
|
workflow.add_node("assess_asis", get_last_message | task2_assess_asis_chain | join_graph) |
|
workflow.add_node("generate_tobe", get_last_message | task3_vision_target_chain | join_graph) |
|
workflow.add_node("return", return_2user) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
workflow.add_edge("enter_graph", "ask_human") |
|
workflow.add_edge("ask_human", "query_grader") |
|
workflow.add_edge("assess_query", "assess_asis") |
|
workflow.add_edge("assess_asis", "generate_tobe") |
|
workflow.add_edge("generate_tobe", END) |
|
workflow.add_edge("return", END) |
|
|
|
workflow.set_entry_point("enter_graph") |
|
|
|
|
|
togaf_graph = workflow.compile() |
|
togaf_graph.name = "ea4all_architecture_blueprint" |
|
|