Langgraph应用,执行流程由线转图

张开发
2026/5/23 0:03:42 15 分钟阅读
Langgraph应用,执行流程由线转图
langgraph从chain转到langgraph从数学的角度上来讲执行流从线性流程转到了流程图。langgraph的组成主要有三部分Langgraph节点边状态节点一个节点就是一个执行单元相当于一次函数的调用。可以是一次模型的调用一次搜索一次加密等等边边能够连接一个个节点它决定了下一个应该去到哪个节点执行状态实现数据共享是实现AI短期记忆的灵魂乍一看好像有些云里雾里的我打个比方玩家状态在玩一个大富翁每个节点和边组成地图玩家初始资金数据有1000块钱玩家每走一格可能会发生一些事件比如说后退一步被小偷偷300块钱买房子等等这些事件相当于节点走的方向相当于边最后玩家成功走到了终点END你可以得知玩家状态最后还有多少钱有多少资产。下面用一个示例来演示。改造RAG知识库示例这个是之前文章中构建知识库的相关代码本次用langgraph改造流程import os import operator from typing import TypedDict, Annotated from langchain_community.chat_models.tongyi import ChatTongyi from langchain_community.embeddings import DashScopeEmbeddings from langchain_community.vectorstores import FAISS from langchain_core.messages import ToolMessage from langchain_core.tools import tool from langchain_core.messages import HumanMessage, ToolMessage, BaseMessage from langchain_core.documents import Document from langchain_text_splitters import RecursiveCharacterTextSplitter from langgraph.graph import StateGraph, END from langgraph.prebuilt.tool_node import ToolNode os.environ[DASHSCOPE_API_KEY] apikey tool def calculator(expression: str) - str: 计算数学表达式。需要精确计算时使用。 参数: expression: 数学算式如 2 2 或 500 * 0.8。 返回: str: 计算结果如 4.0 或 400.0。 print(f[️ 工具调用] 计算器正在计算: {expression}) try: return str(eval(expression)) except Exception as e: return f计算错误: {e} tool def rag_search(query: str) - str: 从数据库中搜索与查询公司内部相关的文档包括公司计划名代号截止日期等详细信息。 参数: query (str): 要搜索的查询字符串。 返回: str: 与查询相关的文档内容。 # 从RAG数据库中检索文档 raw_text 【公司内部机密代号“深蓝计划”】 1. 项目目标开发一款能听懂猫语的翻译器。 2. 核心技术基于Transformer的“喵声波”分析算法。 3. 团队暗号如果有人问“今天天气怎么样”必须回答“我想吃鱼”。 4. 截止日期2026年12月31日。 5. 经费预算仅剩50元人民币主要用于购买猫条。 RAG_PATH faiss_index docs [Document(page_contentraw_text)] text_splitter RecursiveCharacterTextSplitter(chunk_size25, chunk_overlap5) split_docs text_splitter.split_documents(docs) embeddings DashScopeEmbeddings(modeltext-embedding-v1) if os.path.exists(RAG_PATH): print(公司内部数据库已存在) ragdb FAISS.load_local(RAG_PATH, embeddings, allow_dangerous_deserializationTrue) else: print(创建公司内部数据库) ragdb FAISS.from_documents(split_docs, embeddings) ragdb.save_local(RAG_PATH) return \n\n.join(doc.page_content for doc in ragdb.similarity_search(query, k2)) #构造agent流程图 def Init_Agent(): #初始化模型 tool_maps{ rag_search: rag_search, calculator: calculator } llm ChatTongyi(model_nameqwen-plus) tool_llm llm.bind_tools(toolslist(tool_maps.values())) #创建state class TaskState(TypedDict): messages: Annotated[list[BaseMessage], operator.add] #创建node def agent_node(state: TaskState): 节点思考 (Think) 接收当前状态调用 LLM返回新消息 messages state[messages] response tool_llm.invoke(messages) return {messages: [response]} #定义边 def condition_tools(state: TaskState): 节点工具 (Tool) 接收当前状态调用工具返回新消息 messages state[messages][-1] if messages.tool_calls: return tool_node else: return END #添加边 workflow StateGraph(TaskState) workflow.add_node(agent_node, agent_node) workflow.add_node(tool_node, ToolNode(tool_maps.values())) workflow.add_conditional_edges(agent_node, condition_tools, { tool_node: tool_node, END: END }) workflow.add_edge(tool_node, agent_node) workflow.set_entry_point(agent_node) return workflow.compile() if __name__ __main__: app Init_Agent() input 公司的经费预算是多少如果预算预算提高46%后多少 for event in app.stream({messages: [HumanMessage(contentinput)]}): for key, value in event.items(): print(f\n[{key}]) print(value[messages][-1].content)代码解释本次代码中重点讲langgraph的构建对于其他的细节请看前面文章。代码流程如下初始化工具集-定义状态定义条件边节点-构建节点-连接边-构建图-运行图初始化工具集这个前面文章有就不废话了。定义状态定义条件边节点状态#创建state class TaskState(TypedDict): messages: Annotated[list[BaseMessage], operator.add]状态是TypedDict的子类字典。上面的BaseMessage是ToolMessage,AIMessage,HumanMessage等的父类这个list主要用于存放每个节点的历史消息短期记忆Annotated[..., operator.add]表示追加将节点返回的消息追加到后面而不是覆盖。格式如下可以创建多个自定义字段class StateName(TypedDict): fieldName: fieldType条件边def condition_tools(state: TaskState): 节点工具 (Tool) 接收当前状态调用工具返回新消息 messages state[messages][-1] if messages.tool_calls: return tool_node else: return END返回值END和tool_node表示定义的节点名称END默认是结束节点格式如下def EdgeName(state: StateClass) return NextNode节点tool def calculator(expression: str) - str: ...... tool def rag_search(query: str) - str: ...... def agent_node(state: TaskState): ......节点可以是工具函数也可以是普通函数普通函数需要用state传入构建节点workflow StateGraph(TaskState) workflow.add_node(agent_node, agent_node) workflow.add_node(tool_node, ToolNode(tool_maps.values()))StateGraph(TaskState)初始化图将刚刚创建的状态传入add_node方法是创建节点tool_node节点名称自定义用于标识节点agent_node创建的节点函数ToolNode是langchain提供的创建工具节点的函数帮我们完成了调用工具集更新状态的全过程不用这个需要我们自己手动创建工具循环节点比较麻烦参考之前文章连接边workflow.add_conditional_edges(agent_node, condition_tools, { tool_node: tool_node, END: END }) workflow.add_edge(tool_node, agent_node)add_conditional_edges创建条件边方法分支根据返回内容决定节点走向add_edge固定走向如上tool_node-agent_node构建图workflow.set_entry_point(agent_node) workflow.compile()set_entry_point确定图的入口compile构建图运行图if __name__ __main__: app Init_Agent() input 公司的经费预算是多少如果预算预算提高46%后多少 for event in app.stream({messages: [HumanMessage(contentinput)]}): for key, value in event.items(): print(f\n[{key}]) print(value[messages][-1].content)运行跟之前运行普通模型一样stream方法会返回每个节点中的状态上面定义的类invoke方法直接返回最终状态

更多文章