from typing import Dict, Any, Optional from langgraph.graph import Graph from langgraph.prebuilt import ToolNode from langgraph.checkpoint.sqlite import SqliteSaver from ai_tools import AITools class CustomAgent: def __init__(self): self.tools = AITools() self.workflow = self._build_workflow() def _build_workflow(self): # 定义工具节点 tool_node = ToolNode([self._route_to_tool]) # 构建工作流 workflow = Graph() workflow.add_node("tools", tool_node) workflow.add_node("process", self._process_result) # 设置入口点 workflow.set_entry_point("tools") # 添加条件边 workflow.add_conditional_edges( "tools", self._decide_next_step, { "continue": "process", "end": END } ) workflow.add_edge("process", END) # 添加持久化 workflow.checkpointer = SqliteSaver.from_conn_string(":memory:") return workflow.compile() def _route_to_tool(self, state: Dict[str, Any]): """路由问题到适当的工具""" question = state.get("question", "") file_name = state.get("file_name", "") # 处理反转文本问题 if "rewsna" in question or "dnatsrednu" in question: return {"result": self.tools.reverse_text(question.split('"')[1])} # 处理蔬菜分类问题 if "grocery list" in question.lower() or "vegetables" in question.lower(): items = re.findall(r"[a-zA-Z]+(?=\W|\Z)", question) return {"result": ", ".join(self.tools.categorize_vegetables(items))} # 处理棋局问题 if "chess position" in question.lower() and file_name.endswith(".png"): return {"result": self.tools.analyze_chess_position(file_name)} # 处理音频文件问题 if file_name.endswith(".mp3"): transcript = self.tools.extract_audio_transcript(file_name) if "page numbers" in question.lower(): return {"result": transcript} else: return {"result": ", ".join(sorted(transcript.split(", ")))} # 处理表格运算问题 if "* on the set S" in question: table_data = {"operation": "*", "set": ["a", "b", "c", "d", "e"]} return {"result": self.tools.process_table_operation(table_data)} # 处理Python代码问题 if file_name.endswith(".py"): return {"result": self.tools.analyze_python_code(file_name)} # 处理Excel文件问题 if file_name.endswith(".xlsx"): return {"result": self.tools.process_excel_file(file_name)} return {"result": "I don't have a tool to answer this question."} def _process_result(self, state: Dict[str, Any]): """处理工具返回的结果""" result = state.get("result", "") return {"answer": result} def _decide_next_step(self, state: Dict[str, Any]): """决定下一步""" result = state.get("result", "") if result == "I don't have a tool to answer this question.": return "end" return "continue" def __call__(self, question: str, file_name: str = "") -> str: """执行Agent""" state = {"question": question, "file_name": file_name} for step in self.workflow.stream(state): if "__end__" in step: return step["__end__"]["answer"] return "No answer generated."