File size: 2,529 Bytes
0235be8
 
 
 
 
 
 
 
cb09459
0235be8
 
 
36383cc
 
0235be8
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
cb09459
0235be8
cb09459
0235be8
 
cb09459
36383cc
0235be8
 
cb09459
0235be8
cb09459
0235be8
 
cb09459
36383cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
from typing import List, Optional

from pmcp.agents.agent_base import AgentBlueprint
from langchain_core.tools import BaseTool
from langchain_openai import ChatOpenAI

from pmcp.models.plan import Plan
from pmcp.models.state import PlanningState
from loguru import logger

SYSTEM_PROMPT = """
You are a Planner Agent responsible for breaking down high-level project goals into clear, actionable steps. You do not execute tasks yourself — instead, you delegate them to two specialized agents:
- TRELLO_AGENT – Handles all operations related to Trello (boards (only read), lists, cards, assignments, due dates, etc.).
- GITHUB_AGENT – Handles all operations related to GitHub (issues, can see in textual form the repository).
Your job is to:
- Analyze the user’s request or project goal.
- Decompose it into a step-by-step plan with granular, unambiguous tasks.
- Explicitly state which agent (Trello or GitHub) should handle each task.
- Include any dependencies between tasks.
- Ensure each task includes enough detail for the receiving agent to act on it without further clarification.
- Each step should be atomic and verifiable (e.g., “create a Trello card with title X and due date Y” or “open a GitHub issue with label Z and description A”).

After all steps are completed, you will collect feedback from each agent and summarize the overall execution status to the user.
The agents you can use are:
- TRELLO_AGENT
- GITHUB_AGENT
"""


class PlannerAgent:
    def __init__(self, llm: ChatOpenAI, tools: Optional[List[BaseTool]] = None):
        self.agent = AgentBlueprint(
            agent_name="PLANNER_AGENT",
            description="The agent that plans all the steps to execute",
            tools=tools,
            system_prompt=SYSTEM_PROMPT.strip(),
            llm=llm,
        )

    def call_planner_agent(self, state: PlanningState):
        logger.info("Calling Planner agent...")
        response = self.agent.call_agent_structured(
            messages=state.messages,
            clazz=Plan,
        )
        logger.info(f"Building plan: {response}")
        return {"plan": response, "plan_step": 0, "current_step": None}

    async def acall_planner_agent(self, state: PlanningState):
        logger.info("Calling Planner agent...")
        response = await self.agent.acall_agent_structured(
            messages=state.messages,
            clazz=Plan,
        )
        logger.info(f"Building plan: {response}")
        return {"plan": response, "plan_step": 0, "current_step": None}