如何使用LangGraph
欢迎来到Agent的"大脑"设计工具——LangGraph!🧠🔄
LangGraph是什么?
LangGraph是LangChain生态系统中的一个强大扩展,专门用于构建具有LLMs的有状态、多角色应用程序的库,用于创建代理和多代理工作流。与其他LLM框架相比,它提供了以下核心优势:循环、可控性和持久性。
它的核心设计思想是通有向图来管理应用的工作流,状态储存工作流中所有信息,节点代表任务或决策点,边控制流程的走向,从而简化LLMs的集成过程。如果说LangChain是提供了构建Agent的各种组件,那么LangGraph就是让你能够精确控制这些组件如何协同工作的"神经系统"。
想象一下你是一位建筑师,LangChain提供了砖块、钢筋和混凝土,而LangGraph则是让你能够设计整栋大楼的蓝图!
为什么需要LangGraph?
普通的Agent通常是"问一答一"的简单流程,但真正强大的应用需要更复杂的思考过程:
- 多步骤推理:将复杂问题分解为多个步骤
- 状态管理:跟踪长时间运行任务的进度和状态
- 决策分支:根据不同情况选择不同行动路径
- 循环反思:在得到结果后进行评估并改进
LangGraph就像是给了AI一个"大脑皮层",让它能够按照你设计的模式进行思考!
安装与设置
仅需使用图工作流(StateGraph/MessageGraph)、节点控制流(条件分支、循环等)等核心功能时:
当需要集成 LLM 模型、工具调用或 LangChain 组件时:- langchain-core: LangChain 基础模块(最小依赖)
- langchain-openai: OpenAI 模型集成
LangGraph核心概念
1. 状态(State)
状态是LangGraph的核心,它存储Agent工作流中的所有信息,当需要在节点间共享数据时,则会用到state
:
from typing import Dict, List, Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
# 定义状态类型
class AgentState(TypedDict):
# 用户输入
input: str
# 思考过程
thoughts: List[str]
# 已执行的操作
actions: List[str]
# 最终输出
output: str
# 消息列表
messages: Annotated[List[str], add_messages]
什么是Annotated?
Annotated 是 typing 模块中引入的一种类型提示机制,用于为类型提供附加的元数据。它允许你在类型提示中添加额外的信息,比如验证规则、默认值或其他上下文信息,而不影响类型本身。 下面是一个简单示例:
什么是TypedDict?
TypedDict 是 typing 模块中的一个工具,用于定义具有特定字段和类型的字典。它允许开发者创建更明确的字典类型,使得代码更具可读性和可维护性,特别是在处理复杂数据结构时。
2. 节点(Nodes)
节点是工作流中的处理单元,通常是常规的python函数。每个节点接收状态,执行某些操作,然后返回更新后的状态:
from langchain_openai import ChatOpenAI
import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"
# 创建语言模型
llm = ChatOpenAI(model="gpt-4.1-nano")
# 定义思考节点
def think(state: AgentState) -> AgentState:
# 获取用户输入
user_input = state["input"]
# 让模型思考解决方案
response = llm.invoke(f"思考如何解决以下问题: {user_input}。给出详细的思考过程。")
# 更新状态
thoughts = state.get("thoughts", [])
thoughts.append(response.content)
# 添加消息
state["messages"].append(f"思考过程: {response.content}")
return {"thoughts": thoughts, **state}
# 定义行动节点
def act(state: AgentState) -> AgentState:
# 基于思考过程采取行动
thoughts = state.get("thoughts", [])
if thoughts:
latest_thought = thoughts[-1]
response = llm.invoke(f"基于以下思考,应该采取什么具体行动?\n{latest_thought}")
# 更新状态
actions = state.get("actions", [])
actions.append(response.content)
# 添加消息
state["messages"].append(f"采取的行动: {response.content}")
return {"actions": actions, **state}
return state
# 定义响应节点
def respond(state: AgentState) -> AgentState:
# 生成最终输出
actions = state.get("actions", [])
if actions:
final_action = actions[-1]
response = llm.invoke(f"基于以下行动,给出最终响应:\n{final_action}")
# 更新状态
state["output"] = response.content
# 添加消息
state["messages"].append(f"最终响应: {response.content}")
return state
3. 边(Edges)
边定义了节点之间的连接和转换条件,主要分为以下两种:
-
普通边
使用
add_edge
方法直接从节点a到节点b: -
条件边
使用
类似于节点,add_conditional_edges
方法选择性地路由到一个或多个边(或选择性地终止)。此方法接受节点的名称和一个“路由函数”,该函数将在该节点执行后被调用:routing_function
接受图形的当前state
并返回一个值。 默认情况下,返回值routing_function
用作要将状态发送到下一个节点的节点名称(或节点列表)。所有这些节点将在下一个超级步骤中并行运行。您可以选择提供一个字典,该字典将routing_function
的输出映射到下一个节点的名称。
在我们的示例中,定义如下路由函数:
def should_continue_thinking(state: AgentState) -> str:
"""根据当前状态决定下一步是继续思考还是行动"""
thoughts = state.get("thoughts", [])
# 如果思考次数小于3次,继续思考
if len(thoughts) < 3:
return "think"
# 否则开始行动
return "act"
4. 图(Graph)
图将所有节点、边和条件组合在一起,形成完整的工作流:
from langgraph.graph import StateGraph
# 创建状态图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("think", think)
workflow.add_node("act", act)
workflow.add_node("respond", respond)
# 添加边
workflow.add_conditional_edges(
"think",
should_continue_thinking,
{
"think":"think",
"act":"act"
}
)
workflow.add_edge("act", "respond")
# 设置起始节点
workflow.set_entry_point("think")
# 编译图
graph = workflow.compile()
可以通过get_graph方法以及draw_ascii或draw_png等“绘制”方法来可视化图,这些绘制方法各自需要额外的依赖项:
from IPython.display import Image, display
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
# 这需要一些额外的依赖项,并且是可选的
pass

🌟 小案例:智能客服路由系统
假设我们想创建一个Agent,它能根据用户问题类型选择不同的处理分支:
from typing import Dict, List, Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI
import os
# 配置OpenAI
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"
# 定义状态类型
class AgentState(TypedDict):
input: str
category: Literal["account", "payment", "other"] # 问题分类
response: str
messages: Annotated[List[str], add_messages]
# 初始化模型
llm = ChatOpenAI(model="gpt-4.1-nano")
# 定义节点函数
def classify_input(state: AgentState) -> AgentState:
"""分类节点:识别问题类型"""
user_input = state["input"]
# 使用LLM进行分类
response = llm.invoke(
f"请分类以下用户问题(account/payment/other):\n{user_input}\n只需返回分类结果,不要解释。"
)
category = response.content.lower()
# 更新状态
state["category"] = "other" # 默认值
if "account" in category:
state["category"] = "account"
elif "payment" in category:
state["category"] = "payment"
state["messages"].append(f"分类结果: {state['category']}")
return state
def handle_account(state: AgentState) -> AgentState:
"""处理账户问题"""
response = llm.invoke(
f"你是一个账户专家,请处理以下账户问题:\n{state['input']}\n请用中文给出详细解决方案:"
)
state["response"] = response.content
state["messages"].append("进入账户问题处理流程")
return state
def handle_payment(state: AgentState) -> AgentState:
"""处理支付问题"""
response = llm.invoke(
f"你是一个支付专家,请处理以下支付问题:\n{state['input']}\n请用中文给出详细解决方案:"
)
state["response"] = response.content
state["messages"].append("进入支付问题处理流程")
return state
def handle_other(state: AgentState) -> AgentState:
"""处理其他问题"""
response = llm.invoke(
f"你是一个客服专员,请处理以下问题:\n{state['input']}\n请用中文给出详细回答:"
)
state["response"] = response.content
state["messages"].append("进入通用问题处理流程")
return state
def final_response(state: AgentState) -> AgentState:
"""最终响应节点"""
state["messages"].append(f"最终回复:{state['response']}")
return state
# 定义路由函数
def route_based_on_category(state: AgentState) -> str:
"""根据分类结果路由到不同分支"""
return state["category"]
# 构建流程图
workflow = StateGraph(AgentState)
# 添加节点
workflow.add_node("classify", classify_input)
workflow.add_node("account", handle_account)
workflow.add_node("payment", handle_payment)
workflow.add_node("other", handle_other)
workflow.add_node("respond", final_response)
# 设置条件边
workflow.add_conditional_edges(
"classify",
route_based_on_category,
{
"account": "account",
"payment": "payment",
"other": "other"
}
)
# 连接分支到最终节点
workflow.add_edge("account", "respond")
workflow.add_edge("payment", "respond")
workflow.add_edge("other", "respond")
# 设置入口点
workflow.set_entry_point("classify")
# 编译图
graph = workflow.compile()
测试不同输入
测试结果
输入:我的账号无法登录了
当前节点:classify
当前节点:account
当前节点:respond
最终回复:您好,关于您的账号无法登录的问题,以下是一些详细的排查和解决步骤,供您参考:
一、确认基本信息
1. 检查账号和密码是否正确:确保输入的账号信息无误,没有拼写错误或大小写问题。
2. 查看是否开启了Caps Lock或Num Lock键,避免输入错误。
3. 试着复制粘贴密码,以确保没有输入错误。
......#由于篇幅问题省略回答
输入:信用卡付款没有成功
当前节点:classify
当前节点:payment
当前节点:respond
最终回复:您好,关于您的信用卡付款未成功的问题,建议您按照以下步骤进行排查和解决:
1. 核实信用卡信息
- 确认卡号、有效期、CVV码等信息填写无误。
- 核对账单地址是否与提交的地址一致,确保信息准确。
......#由于篇幅问题省略回答
输入:你们什么时候放假?
当前节点:classify
当前节点:other
当前节点:respond
最终回复:您好,感谢您的咨询!关于我们的放假安排,具体的假期时间会根据国家法定节假日和公司安排而定。通常,我们会在春节、国庆节、劳动节等法定节假日期间放假,具体假期时间会提前通知。如果您需要了解某个特定时间段的放假安排,请提供具体日期或节假日名称,我们将为您查询或提供最新的假期信息。感谢您的理解与支持!

高级功能
1. 并行执行
有时需要同时执行多个操作,LangGraph支持并行节点:
# 定义两个并行节点
def search_web(state):
# 搜索网络信息
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"date:{date},搜索网络信息")
return {**state, "web_info": "从网络搜索到的信息"}
def check_database(state):
# 查询数据库
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"date:{date},查询数据库")
return {**state, "db_info": "从数据库获取的信息"}
graph_builder = StateGraph(State)
# 在图中配置并行执行
graph_builder.add_node("parallel_search", search_web)
graph_builder.add_node("parallel_db_check", check_database)
# 从分支节点开始并行执行
graph_builder.add_edge(START, "parallel_search")
graph_builder.add_edge(START, "parallel_db_check")
# 合并节点汇总结果
def merge_results(states):
return states
graph_builder.add_node("merge", merge_results)
graph_builder.add_edge("parallel_search", "merge")
graph_builder.add_edge("parallel_db_check", "merge")
graph = graph_builder.compile()

完整代码
from IPython.display import Image, display
from langgraph.graph import START,StateGraph
from datetime import datetime
import time
# 定义两个并行节点
def search_web(state):
# 搜索网络信息
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"date:{date},搜索网络信息")
time.sleep(1) #休眠1s
return {**state, "web_info": "从网络搜索到的信息"}
def check_database(state):
# 查询数据库
date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"date:{date},查询数据库")
return {**state, "db_info": "从数据库获取的信息"}
graph_builder = StateGraph(State)
# 在图中配置并行执行
graph_builder.add_node("parallel_search", search_web)
graph_builder.add_node("parallel_db_check", check_database)
# 从分支节点开始并行执行
graph_builder.add_edge(START, "parallel_search")
graph_builder.add_edge(START, "parallel_db_check")
# 合并节点汇总结果
def merge_results(states):
return states
graph_builder.add_node("merge", merge_results)
graph_builder.add_edge("parallel_search", "merge")
graph_builder.add_edge("parallel_db_check", "merge")
graph = graph_builder.compile()
# 初始化状态
initial_state = {}
result = graph.invoke(initial_state)
print(f"date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}, graph 执行结束")
# 可视化图形
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
pass
输出结果
从输出结果中可以看到,在search_web节点中使用time.sleep(1)休眠1秒钟,LangGraph 会非常智能的等待所有并行节点执行完毕异步节点执行
当节点函数是个异步的函数,即使用async def定义的函数,此时添加节点和添加边的方法不变,但是调用工作流的方法需要变为异步的ainvoke
2. 重试策略
在实际应用中,重试策略是非常重要的,尤其是在调用外部服务(如API、数据库或LLM)时。这些服务可能会因为网络问题、服务不可用、超时或其他临时性问题而失败。通过实现重试策略,可以提高系统的可靠性和容错能力,确保在遇到临时性问题时能够自动恢复,而不是直接失败。
要配置重试策略,需要将retry
参数传递给add_node
。retry
参数接受名为元组对象的RetryPolicy
。下面,我们用默认参数实例化一个RetryPolicy
对象,并将其与节点相关联:
from langgraph.pregel import RetryPolicy
builder.add_node(
"node_name",
node_function,
retry=RetryPolicy(),
)
示例代码
import sqlite3
from langchain.chat_models import init_chat_model
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.pregel import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage
db = SQLDatabase.from_uri("sqlite:///:memory:")
model = init_chat_model("anthropic:claude-3-5-haiku-latest")
def query_database(state: MessagesState):
query_result = db.run("SELECT * FROM Artist LIMIT 10;")
return {"messages": [AIMessage(content=query_result)]}
def call_model(state: MessagesState):
response = model.invoke(state["messages"])
return {"messages": [response]}
# 定义新的图
builder = StateGraph(MessagesState)
builder.add_node(
"query_database",
query_database,
retry=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)
graph = builder.compile()
# 可视化图形
try:
display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
pass

3. 人机交互
LangGraph 中的interrupt
功能通过在特定节点暂停图形执行,向人类展示信息,并使用人类的输入恢复图形,从而实现人机交互工作流程。关键组件包括:
interrupt
函数:在特定节点调用 interrupt 函数时,图形执行会暂停,并将指定的信息传递给人类。Command
对象:用于恢复图形执行,并将人类的输入传递回图形。- 检查点器(
Checkpointer
):用于保存图形的状态,以便在暂停后能够恢复执行。
from langgraph.types import interrupt, Command
def human_node(state: State):
value = interrupt(
{
"text_to_revise": state["some_text"] # 需要人类修订的文本
}
)
return {
"some_text": value # 将修订后的文本返回到图状态中
}
# 编译图形并传入检查点器
graph = graph_builder.compile(checkpointer=checkpointer)
# 运行图形,直到遇到中断点
config = {"configurable": {"thread_id": "some_id"}}
result = graph.invoke({"some_text": "original text"}, config=config) # 初始文本为 "original text"
print(result['__interrupt__'])
# 输出中断信息:
# > [
# > Interrupt(
# > value={'text_to_revise': 'original text'}, # 需要人类修订的文本
# > resumable=True, # 表示可以恢复执行
# > ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960'] # 中断节点的唯一标识
# > )
# > ]
# 使用人类输入恢复图形执行
print(graph.invoke(Command(resume="Edited text"), config=config))
# 输出修订后的结果:
# > {'some_text': 'Edited text'} # 人类修订后的文本

完整代码
from typing import TypedDict
import uuid
from IPython.display import display, Image
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command
# 定义状态类型
class State(TypedDict):
some_text: str # 图状态中需要处理的文本
# 定义人类交互节点
def human_node(state: State):
# 调用 interrupt 函数暂停图形执行,并向人类展示需要修订的文本
value = interrupt(
{
"text_to_revise": state["some_text"] # 传递当前文本内容
}
)
# 将人类修订后的文本返回到图状态中
return {
"some_text": value
}
# 构建图
graph_builder = StateGraph(State)
graph_builder.add_node("human_node", human_node) # 添加人类交互节点
graph_builder.add_edge(START, "human_node") # 从起点连接到人类交互节点
# 使用内存检查点器保存图状态
checkpointer = InMemorySaver()
# 编译图
graph = graph_builder.compile(checkpointer=checkpointer)
# 为图运行传递一个线程 ID
config = {"configurable": {"thread_id": uuid.uuid4()}}
# 运行图,直到遇到中断点
result = graph.invoke({"some_text": "original text"}, config=config)
print(result['__interrupt__'])
# 输出中断信息:
# > [
# > Interrupt(
# > value={'text_to_revise': 'original text'}, # 需要人类修订的文本内容
# > resumable=True, # 表示可以恢复执行
# > ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960'] # 中断节点的唯一标识
# > )
# > ]
# 使用人类输入恢复图执行
print(graph.invoke(Command(resume="Edited text"), config=config))
# 输出修订后的结果:
# > {'some_text': 'Edited text'} # 人类修订后的文本内容
输出结果
与LangChain集成
LangGraph与LangChain完美集成,可以在图中使用LangChain的各种组件:
from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain_openai import ChatOpenAI
# 创建工具
weather_tool = Tool(
name="天气查询",
func=lambda loc: f"{loc}今天晴朗,温度25度",
description="查询特定地点的天气情况"
)
# 创建LangChain Agent
llm = ChatOpenAI(model="gpt-4.1-nano")
agent = initialize_agent(
tools=[weather_tool],
llm=llm,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# 在LangGraph节点中使用LangChain Agent
# 创建LangGraph节点
def agent_node(state: AgentState):
result = agent.invoke({"input": state["input"]})
return {"agent_output": result["output"]}
# 添加到图中
graph.add_node("agent_task", agent_node)
完整代码
from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END
import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"
# 创建语言模型
llm = ChatOpenAI(model="gpt-4.1-nano")
# 创建工具
weather_tool = Tool(
name="天气查询",
func=lambda loc: f"{loc}今天晴朗,温度25度",
description="查询特定地点的天气情况"
)
agent = initialize_agent(
tools=[weather_tool],
llm=llm,
agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
verbose=True
)
# 定义状态结构
class AgentState(dict):
input: str
agent_output: str = None
# 创建LangGraph节点
def agent_node(state: AgentState):
result = agent.invoke({"input": state["input"]})
return {"agent_output": result["output"]}
# 初始化图
graph = StateGraph(AgentState)
graph.add_node("agent_task", agent_node)
graph.set_entry_point("agent_task")
graph.add_edge("agent_task", END)
app = graph.compile()
# 使用示例
result = app.invoke({"input": "北京天气怎么样?"})
print(result["agent_output"])
输出结果
最佳实践
- 状态设计:仔细规划状态结构,明确每个字段的用途
- 节点职责单一:每个节点只做一件事,保持逻辑清晰
- 错误处理:加入错误捕获和重试机制,提高鲁棒性
- 提示优化:花时间调整每个节点中的提示以获得最佳结果
- 可视化调试:使用tracing工具观察图的执行过程
- 逐步构建:先构建简单工作流,再逐步添加复杂功能
下一步
现在你已经掌握了LangGraph的基础,接下来可以:
- 构建更复杂的Agent思维流程,如ReAct、Reflection等
- 探索人类反馈在循环(HITL)的集成
- 学习持久化状态和长期运行工作流
- 尝试用LangGraph构建多Agent系统
祝你在LangGraph的世界里创造出智能、高效的AI系统!🚀