File size: 3,628 Bytes
7042c3c ba8e285 7042c3c ba8e285 7042c3c ba8e285 7042c3c ba8e285 7042c3c ba8e285 7042c3c f40da4b 7042c3c ba8e285 7042c3c f40da4b 7042c3c ba8e285 7042c3c |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
from typing import Literal, Annotated
from typing_extensions import TypedDict
import json
import tempfile
import os
from langchain_core.runnables import RunnableLambda, RunnableConfig
from langgraph.graph import END
from langgraph.types import Command
from langgraph.prebuilt import InjectedState
from langchain_community.utilities import BingSearchAPIWrapper
from langchain_community.tools.bing_search.tool import BingSearchResults
from langchain_community.document_loaders import JSONLoader
from langchain.agents import tool
from ea4all.src.shared.configuration import (
BaseConfiguration
)
from ea4all.src.shared.state import (
State
)
from ea4all.src.shared.utils import (
get_llm_client,
format_docs,
)
def make_supervisor_node(config: RunnableConfig, members: list[str]) -> RunnableLambda:
options = ["FINISH"] + members
system_prompt = (
"You are a supervisor tasked with managing a conversation between the"
f" following workers: {members}. Given the following user request,"
" respond with the worker to act next. Each worker will perform a"
" task and respond with their results and status. When finished,"
" respond with FINISH."
)
configuration = BaseConfiguration.from_runnable_config(config)
model = get_llm_client(
configuration.supervisor_model,
api_base_url="",
)
class Router(TypedDict):
"""Worker to route to next. If no workers needed, route to FINISH."""
next: Literal[*options]
def supervisor_node(state: State) -> Command[Literal[*members, "__end__"]]:
"""An LLM-based router."""
messages = [
{"role": "system", "content": system_prompt},
] + state["messages"]
response = model.with_structured_output(Router).invoke(messages)
goto = response["next"]
if goto == "FINISH":
goto = END
return Command(goto=goto, update={"next": goto})
return RunnableLambda(supervisor_node)
async def websearch(state: dict[str, dict | str]) -> dict[str,dict[str,str]]:
"""
Web search based on the re-phrased question.
Args:
state (dict): The current graph state
config (RunnableConfig): Configuration with the model used for query analysis.
Returns:
state (dict): Updates documents key with appended web results
"""
##API Wrapper
bing_subscription_key = os.environ.get("BING_SUBSCRIPTION_KEY", "")
bing_search_url = os.environ.get("BING_SEARCH_URL", "https://api.bing.microsoft.com/v7.0/search")
search = BingSearchAPIWrapper(
bing_subscription_key=bing_subscription_key,
bing_search_url=bing_search_url
)
question = getattr(state,'messages')[-1].content if getattr(state,'messages', False) else getattr(state,'question')
##Bing Search Results
web_results = BingSearchResults(
api_wrapper=search,
handle_tool_error=True,
args_schema={"k":"5"},
)
result = await web_results.ainvoke({"query": question})
fixed_string = result.replace("'", "\"")
result_json = json.loads(fixed_string)
# Create a temporary file
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
# Write the JSON data to the temporary file
json.dump(result_json, temp_file)
temp_file.flush()
# Load the JSON data from the temporary file
loader = JSONLoader(file_path=temp_file.name, jq_schema=".[]", text_content=False)
docs = loader.load()
return {"messages": {"role":"assistant", "content":format_docs(docs)}}
|