Spaces:
Sleeping
Sleeping
from typing import Annotated, List, Tuple, Union | |
from langchain_community.tools.tavily_search import TavilySearchResults | |
from langchain_core.tools import tool | |
from langchain_experimental.tools import PythonREPLTool | |
from langchain.agents import AgentExecutor, create_openai_tools_agent | |
from langchain_core.messages import BaseMessage, HumanMessage | |
from langchain_openai import ChatOpenAI | |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langchain_core.output_parsers.openai_functions import JsonOutputFunctionsParser | |
import operator | |
from typing import Annotated, Any, Dict, List, Optional, Sequence, TypedDict | |
import functools | |
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder | |
from langgraph.graph import StateGraph, END | |
def create_agent(llm: ChatOpenAI, tools: list, system_prompt: str): | |
# Each worker node will be given a name and some tools. | |
prompt = ChatPromptTemplate.from_messages( | |
[ | |
( | |
"system", | |
system_prompt, | |
), | |
MessagesPlaceholder(variable_name="messages"), | |
MessagesPlaceholder(variable_name="agent_scratchpad"), | |
] | |
) | |
agent = create_openai_tools_agent(llm, tools, prompt) | |
executor = AgentExecutor(agent=agent, tools=tools) | |
return executor | |
def agent_node(state, agent, name): | |
result = agent.invoke(state) | |
return {"messages": [HumanMessage(content=result["output"], name=name)]} | |
# The agent state is the input to each node in the graph | |
class AgentState(TypedDict): | |
# The annotation tells the graph that new messages will always | |
# be added to the current states | |
messages: Annotated[Sequence[BaseMessage], operator.add] | |
# The 'next' field indicates where to route to next | |
next: str | |
def create_graph(topic, word_count): | |
tavily_tool = TavilySearchResults(max_results=5) | |
python_repl_tool = PythonREPLTool() | |
members = ["Researcher", "Coder"] | |
system_prompt = ( | |
"You are a supervisor tasked with managing a conversation between the" | |
" following workers: {members}. Given the following user request," | |
" respond with the worker to act next. Each worker will perform a" | |
" task and respond with their results and status. When finished," | |
" respond with FINISH." | |
) | |
# Our team supervisor is an LLM node. It just picks the next agent to process | |
# and decides when the work is completed | |
options = ["FINISH"] + members | |
# Using openai function calling can make output parsing easier for us | |
function_def = { | |
"name": "route", | |
"description": "Select the next role.", | |
"parameters": { | |
"title": "routeSchema", | |
"type": "object", | |
"properties": { | |
"next": { | |
"title": "Next", | |
"anyOf": [ | |
{"enum": options}, | |
], | |
} | |
}, | |
"required": ["next"], | |
}, | |
} | |
prompt = ChatPromptTemplate.from_messages( | |
[ | |
("system", system_prompt), | |
MessagesPlaceholder(variable_name="messages"), | |
( | |
"system", | |
"Given the conversation above, who should act next?" | |
" Or should we FINISH? Select one of: {options}", | |
), | |
] | |
).partial(options=str(options), members=", ".join(members)) | |
llm = ChatOpenAI(model="gpt-4o") | |
supervisor_chain = ( | |
prompt | |
| llm.bind_functions(functions=[function_def], function_call="route") | |
| JsonOutputFunctionsParser() | |
) | |
research_agent = create_agent(llm, [tavily_tool], "You are a web researcher.") | |
research_node = functools.partial(agent_node, agent=research_agent, name="Researcher") | |
# NOTE: THIS PERFORMS ARBITRARY CODE EXECUTION. PROCEED WITH CAUTION | |
code_agent = create_agent( | |
llm, | |
[python_repl_tool], | |
"You may generate safe python code to analyze data and generate charts using matplotlib.", | |
) | |
code_node = functools.partial(agent_node, agent=code_agent, name="Coder") | |
workflow = StateGraph(AgentState) | |
workflow.add_node("Researcher", research_node) | |
workflow.add_node("Coder", code_node) | |
workflow.add_node("supervisor", supervisor_chain) | |
for member in members: | |
# We want our workers to ALWAYS "report back" to the supervisor when done | |
workflow.add_edge(member, "supervisor") | |
# The supervisor populates the "next" field in the graph state | |
# which routes to a node or finishes | |
conditional_map = {k: k for k in members} | |
conditional_map["FINISH"] = END | |
workflow.add_conditional_edges("supervisor", lambda x: x["next"], conditional_map) | |
# Finally, add entrypoint | |
workflow.set_entry_point("supervisor") | |
return workflow.compile() | |
def run_multi_agent(topic, word_count): | |
graph = create_graph(topic, word_count) | |
result = graph.invoke({ | |
"messages": [ | |
HumanMessage(content="Code hello world and print it to the terminal") | |
] | |
} |