|
from typing import List, Optional |
|
|
|
from pmcp.agents.agent_base import AgentBlueprint |
|
from langchain_core.tools import BaseTool |
|
from langchain_openai import ChatOpenAI |
|
from langchain_core.messages import HumanMessage |
|
|
|
from pmcp.models.state import PlanningState |
|
from loguru import logger |
|
|
|
|
|
class ExecutorAgent: |
|
def __init__(self, llm: ChatOpenAI, tools: Optional[List[BaseTool]] = None): |
|
self.agent = AgentBlueprint( |
|
agent_name="EXECUTOR_AGENT", |
|
description="The agent that executes all the steps for the planner", |
|
llm=llm, |
|
) |
|
|
|
def call_executor_agent(self, state: PlanningState): |
|
plan_step_index = state.plan_step |
|
current_step = None |
|
messages = [] |
|
if len(state.plan.steps) > plan_step_index: |
|
current_step = state.plan.steps[plan_step_index] |
|
messages = [ |
|
HumanMessage( |
|
content=f"The {current_step.agent} agent should perform the following action:\n{current_step.description}" |
|
) |
|
] |
|
logger.info(f"The Executor is executing step: {current_step}") |
|
return { |
|
"plan_step": plan_step_index + 1, |
|
"messages": messages, |
|
"current_step": current_step, |
|
} |
|
|
|
async def acall_executor_agent(self, state: PlanningState): |
|
plan_step_index = state.plan_step |
|
current_step = None |
|
messages = [] |
|
if len(state.plan.steps) > plan_step_index: |
|
current_step = state.plan.steps[plan_step_index] |
|
messages = [ |
|
HumanMessage( |
|
content=f"The {current_step.agent} agent should perform the following action:\n{current_step.description}" |
|
) |
|
] |
|
logger.info(f"The Executor is executing step: {current_step}") |
|
return { |
|
"plan_step": plan_step_index + 1, |
|
"messages": messages, |
|
"current_step": current_step, |
|
} |
|
|