|
"""Main file for constructing the EA4ALL hierarchical graph""" |
|
|
|
""" |
|
EA4ALL Hierarchical Graph |
|
This module defines the main file for constructing the EA4ALL hierarchical graph. It contains functions and classes for creating and managing the graph structure. |
|
Functions: |
|
- make_supervisor_node: Creates a supervisor node for managing a conversation between architect workers. |
|
- call_landscape_agentic: Calls the landscape agentic graph. |
|
- call_diagram_agentic: Calls the diagram agentic graph. |
|
- call_togaf_agentic: Calls the togaf agentic graph. |
|
- websearch: Search for real-time data to answer user's question |
|
Classes: |
|
- Router: TypedDict representing the worker to route to next. |
|
Attributes: |
|
- model: The LLM client for the supervisor model. |
|
- super_builder: The StateGraph builder for constructing the graph. |
|
- super_graph: The compiled EA4ALL Agentic Workflow Graph. |
|
Note: This module depends on other modules and packages such as langchain_core, langgraph, shared, ea4all_apm, ea4all_vqa, and ea4all_gra. |
|
""" |
|
|
|
"""Changelog: |
|
- lanchain_openapi: 0.2.9 (0.3.6 issue with max_tokens for HF models) |
|
#2025-06-03 |
|
- Refactored code to fix problems with linter and type checking (Standard mode) |
|
""" |
|
|
|
from langgraph.types import Command |
|
from langchain_core.messages import ( |
|
HumanMessage, |
|
AIMessage |
|
) |
|
from langchain_core.language_models.chat_models import BaseChatModel |
|
from langchain_core.runnables import RunnableConfig |
|
|
|
from langchain import hub |
|
|
|
from langgraph.graph import ( |
|
START, |
|
END, |
|
StateGraph, |
|
) |
|
from langgraph.checkpoint.memory import MemorySaver |
|
|
|
from typing_extensions import Literal, TypedDict |
|
import uuid |
|
|
|
from ea4all.src.shared.configuration import BaseConfiguration |
|
from ea4all.src.shared.utils import get_llm_client |
|
from ea4all.src.shared.state import State |
|
from ea4all.src.tools.tools import websearch |
|
|
|
from ea4all.src.ea4all_indexer.graph import indexer_graph |
|
from ea4all.src.ea4all_apm.graph import apm_graph |
|
from ea4all.src.ea4all_vqa.graph import diagram_graph |
|
from ea4all.src.ea4all_gra.graph import togaf_graph |
|
|
|
async def call_indexer_apm(state: State, config: RunnableConfig): |
|
response = await indexer_graph.ainvoke(input={"docs":[]}, config=config) |
|
|
|
def make_supervisor_node(model: BaseChatModel, members: list[str]): |
|
options = ["FINISH"] + members |
|
|
|
system_prompt = hub.pull("ea4all_super_graph").template |
|
|
|
class Router(TypedDict): |
|
"""Worker to route to next. If no workers needed, route to FINISH.""" |
|
next: Literal["FINISH", "portfolio_team", "diagram_team", "blueprint_team", "websearch_team"] |
|
|
|
async def supervisor_node(state: State, config: RunnableConfig) -> Command[Literal["portfolio_team", "diagram_team", "blueprint_team", "websearch_team", '__end__']]: |
|
|
|
"""An LLM-based router.""" |
|
messages = [ |
|
{"role": "system", "content": system_prompt}, |
|
] + [state["messages"][-1]] |
|
|
|
response = await model.with_structured_output(Router).ainvoke(messages, config=config) |
|
|
|
_goto = "__end__" |
|
|
|
if isinstance(response, dict): |
|
_goto = response["next"] |
|
|
|
if _goto not in ["portfolio_team", "diagram_team", "blueprint_team", "websearch_team"]: |
|
_goto = "__end__" |
|
|
|
print(f"---Supervisor got a request--- Question: {state['messages'][-1].content} ==> Routing to {_goto}\n") |
|
|
|
return Command( |
|
|
|
goto=_goto |
|
) |
|
|
|
return supervisor_node |
|
|
|
async def call_landscape_agentic(state: State, config: RunnableConfig) -> Command[Literal['__end__']]: |
|
response = await apm_graph.ainvoke({"question": state["messages"][-1].content}, config=config) |
|
return Command( |
|
update={ |
|
"messages": [ |
|
AIMessage( |
|
content=str(response), name="landscape_agentic" |
|
) |
|
] |
|
}, |
|
goto="__end__", |
|
) |
|
|
|
async def call_diagram_agentic(state: State, config: RunnableConfig) -> Command[Literal['__end__']]: |
|
|
|
inputs = { |
|
"messages": [{"role": "user", "content": state.get('messages')[-1].content}], |
|
"question": state['messages'][-1].content, "image":"" |
|
} |
|
|
|
response = await diagram_graph.ainvoke( |
|
input=inputs, |
|
config=config |
|
) |
|
|
|
return Command( |
|
update={ |
|
"messages": [ |
|
AIMessage( |
|
content=response['messages'][-1].content, name="landscape_agentic" |
|
) |
|
] |
|
}, |
|
goto="__end__", |
|
) |
|
|
|
async def call_togaf_agentic(state: State, config: RunnableConfig) -> Command[Literal["__end__"]]: |
|
print(f"---TOGAF ROUTE team node ready to --- CALL_TOGAF_AGENTIC Routing to {state['next']} with User Question: {state['messages'][-1].content}") |
|
|
|
inputs = {"messages": [{"role": "user", "content": state.get('messages')[-1].content}]} |
|
|
|
response = await togaf_graph.ainvoke( |
|
input=inputs, |
|
config=config |
|
) |
|
|
|
return Command( |
|
update={ |
|
"messages": [ |
|
AIMessage( |
|
content=response["messages"][-1].content, name="togaf_route" |
|
) |
|
] |
|
}, |
|
goto="__end__", |
|
) |
|
|
|
|
|
async def call_generate_websearch(state:State, config: RunnableConfig) -> Command[Literal["__end__"]]: |
|
from ea4all.src.ea4all_apm.state import OverallState |
|
|
|
if config is not None: |
|
source = config.get('metadata', {}).get('langgraph_node', 'unknown') |
|
|
|
|
|
state_dict = { |
|
"documents": state['messages'][-1].content, |
|
"web_search": "Yes", |
|
"question": state['messages'][-2].content, |
|
"source": source |
|
} |
|
|
|
apm_state = OverallState(**state_dict) |
|
generation = await apm_graph.nodes["generate"].ainvoke(apm_state, config) |
|
|
|
return Command( |
|
update={ |
|
"messages": [ |
|
AIMessage( |
|
content=generation['generation'], name="generate_websearch" |
|
) |
|
] |
|
}, |
|
goto="__end__", |
|
) |
|
|
|
async def blueprint_team(state: State) -> Command[Literal["togaf_route"]]: |
|
print("---Blueprint team got a request--- Routing to TOGAF_ROUTE node") |
|
|
|
return Command(update={**state}, goto="togaf_route") |
|
|
|
async def diagram_team(state: State) -> Command[Literal["diagram_route"]]: |
|
print("---Diagram team got a request--- Routing to DIAGRAM_ROUTE node") |
|
|
|
return Command(update={**state}, goto="diagram_route") |
|
|
|
async def super_graph_entry_point(state: State): |
|
|
|
thread_config = RunnableConfig({"configurable": {"thread_id": str(uuid.uuid4())}}) |
|
|
|
|
|
if state is None: |
|
state = { |
|
"messages": [ |
|
("system", "You are a helpful assistant"), |
|
("human", "Start the workflow") |
|
] |
|
} |
|
|
|
|
|
graph = build_super_graph() |
|
|
|
|
|
try: |
|
|
|
result = await graph.ainvoke(state, config=RunnableConfig(thread_config)) |
|
return result |
|
except Exception as e: |
|
print(f"Graph execution error: {e}") |
|
raise |
|
|
|
|
|
def build_super_graph(): |
|
|
|
model = get_llm_client(BaseConfiguration.supervisor_model, api_base_url="", streaming=BaseConfiguration.streaming) |
|
teams_supervisor_node = make_supervisor_node(model, ["portfolio_team", "diagram_team", "blueprint_team","websearch_team"]) |
|
|
|
super_builder = StateGraph(State, config_schema=BaseConfiguration) |
|
|
|
super_builder.add_node("apm_indexer", call_indexer_apm) |
|
super_builder.add_node("supervisor", teams_supervisor_node) |
|
super_builder.add_node("portfolio_team", call_landscape_agentic) |
|
super_builder.add_node("websearch_team", websearch) |
|
super_builder.add_node("diagram_team", diagram_team) |
|
super_builder.add_node("blueprint_team", blueprint_team) |
|
super_builder.add_node("generate_websearch", call_generate_websearch) |
|
super_builder.add_node("diagram_route", call_diagram_agentic) |
|
super_builder.add_node("togaf_route", call_togaf_agentic) |
|
|
|
|
|
super_builder.add_edge(START, "apm_indexer") |
|
super_builder.add_edge("apm_indexer", "supervisor") |
|
|
|
super_builder.add_edge("websearch_team", "generate_websearch") |
|
super_builder.add_edge("blueprint_team", "togaf_route") |
|
super_builder.add_edge("diagram_team", "diagram_route") |
|
|
|
super_builder.add_edge("portfolio_team", END) |
|
super_builder.add_edge("generate_websearch", END) |
|
super_builder.add_edge("togaf_route", END) |
|
super_builder.add_edge("diagram_route", END) |
|
|
|
|
|
super_graph = super_builder.compile() |
|
super_graph.name = "EA4ALL Agentic Workflow Graph" |
|
|
|
return super_graph |
|
|
|
|
|
super_graph = build_super_graph() |
|
|