跳转至

如何使用LangGraph

欢迎来到Agent的"大脑"设计工具——LangGraph!🧠🔄

LangGraph是什么?

LangGraph是LangChain生态系统中的一个强大扩展,专门用于构建具有LLMs的有状态、多角色应用程序的库,用于创建代理和多代理工作流。与其他LLM框架相比,它提供了以下核心优势:循环、可控性和持久性。

它的核心设计思想是通有向图来管理应用的工作流,状态储存工作流中所有信息,节点代表任务或决策点,边控制流程的走向,从而简化LLMs的集成过程。如果说LangChain是提供了构建Agent的各种组件,那么LangGraph就是让你能够精确控制这些组件如何协同工作的"神经系统"。

想象一下你是一位建筑师,LangChain提供了砖块、钢筋和混凝土,而LangGraph则是让你能够设计整栋大楼的蓝图!

为什么需要LangGraph?

普通的Agent通常是"问一答一"的简单流程,但真正强大的应用需要更复杂的思考过程:

  • 多步骤推理:将复杂问题分解为多个步骤
  • 状态管理:跟踪长时间运行任务的进度和状态
  • 决策分支:根据不同情况选择不同行动路径
  • 循环反思:在得到结果后进行评估并改进

LangGraph就像是给了AI一个"大脑皮层",让它能够按照你设计的模式进行思考!

安装与设置

仅需使用图工作流(StateGraph/MessageGraph)、节点控制流(条件分支、循环等)等核心功能时:

pip install langgraph
当需要集成 LLM 模型、工具调用或 LangChain 组件时:
pip install langchain langchain-openai

  • langchain-core: LangChain 基础模块(最小依赖)
  • langchain-openai: OpenAI 模型集成

LangGraph核心概念

1. 状态(State)

状态是LangGraph的核心,它存储Agent工作流中的所有信息,当需要在节点间共享数据时,则会用到state

from typing import Dict, List, Annotated
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages

# 定义状态类型
class AgentState(TypedDict):
    # 用户输入
    input: str
    # 思考过程
    thoughts: List[str]
    # 已执行的操作
    actions: List[str]
    # 最终输出
    output: str
    # 消息列表
    messages: Annotated[List[str], add_messages]
什么是Annotated?

Annotated 是 typing 模块中引入的一种类型提示机制,用于为类型提供附加的元数据。它允许你在类型提示中添加额外的信息,比如验证规则、默认值或其他上下文信息,而不影响类型本身。 下面是一个简单示例:

from typing import Annotated

# 定义一个带有附加信息的类型
Age = Annotated[int, "年龄,必须是正整数"]

def set_age(age: Age) -> None:
    if age <= 0:
        raise ValueError("年龄必须是正整数")
    print(f"年龄设置为: {age}")

# 使用示例
set_age(25)  # 正常使用
set_age(-5)  # 会引发 ValueError

什么是TypedDict?

TypedDict 是 typing 模块中的一个工具,用于定义具有特定字段和类型的字典。它允许开发者创建更明确的字典类型,使得代码更具可读性和可维护性,特别是在处理复杂数据结构时。

2. 节点(Nodes)

节点是工作流中的处理单元,通常是常规的python函数。每个节点接收状态,执行某些操作,然后返回更新后的状态:

from langchain_openai import ChatOpenAI

import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"

# 创建语言模型
llm = ChatOpenAI(model="gpt-4.1-nano")

# 定义思考节点
def think(state: AgentState) -> AgentState:
    # 获取用户输入
    user_input = state["input"]

    # 让模型思考解决方案
    response = llm.invoke(f"思考如何解决以下问题: {user_input}。给出详细的思考过程。")

    # 更新状态
    thoughts = state.get("thoughts", [])
    thoughts.append(response.content)

    # 添加消息
    state["messages"].append(f"思考过程: {response.content}")

    return {"thoughts": thoughts, **state}

# 定义行动节点
def act(state: AgentState) -> AgentState:
    # 基于思考过程采取行动
    thoughts = state.get("thoughts", [])
    if thoughts:
        latest_thought = thoughts[-1]
        response = llm.invoke(f"基于以下思考,应该采取什么具体行动?\n{latest_thought}")

        # 更新状态
        actions = state.get("actions", [])
        actions.append(response.content)

        # 添加消息
        state["messages"].append(f"采取的行动: {response.content}")

        return {"actions": actions, **state}

    return state

# 定义响应节点
def respond(state: AgentState) -> AgentState:
    # 生成最终输出
    actions = state.get("actions", [])
    if actions:
        final_action = actions[-1]
        response = llm.invoke(f"基于以下行动,给出最终响应:\n{final_action}")

        # 更新状态
        state["output"] = response.content

        # 添加消息
        state["messages"].append(f"最终响应: {response.content}")

    return state

3. 边(Edges)

边定义了节点之间的连接和转换条件,主要分为以下两种:

  • 普通边

    使用add_edge方法直接从节点a到节点b:

    graph.add_edge("node_a", "node_b")
    

  • 条件边

    使用add_conditional_edges方法选择性地路由到一个或多个边(或选择性地终止)。此方法接受节点的名称和一个“路由函数”,该函数将在该节点执行后被调用:

    graph.add_conditional_edges("node_a", routing_function)
    
    类似于节点,routing_function接受图形的当前state并返回一个值。 默认情况下,返回值routing_function用作要将状态发送到下一个节点的节点名称(或节点列表)。所有这些节点将在下一个超级步骤中并行运行。您可以选择提供一个字典,该字典将routing_function的输出映射到下一个节点的名称。
    graph.add_conditional_edges("node_a", routing_function, {True: "node_b", False:"node_c"})
    

在我们的示例中,定义如下路由函数:

def should_continue_thinking(state: AgentState) -> str:
    """根据当前状态决定下一步是继续思考还是行动"""
    thoughts = state.get("thoughts", [])

    # 如果思考次数小于3次,继续思考
    if len(thoughts) < 3:
        return "think"

    # 否则开始行动
    return "act"

4. 图(Graph)

图将所有节点、边和条件组合在一起,形成完整的工作流:

from langgraph.graph import StateGraph

# 创建状态图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("think", think)
workflow.add_node("act", act)
workflow.add_node("respond", respond)

# 添加边

workflow.add_conditional_edges(
    "think",
    should_continue_thinking,
    {
        "think":"think",
        "act":"act"
    }
)
workflow.add_edge("act", "respond")

# 设置起始节点
workflow.set_entry_point("think")

# 编译图
graph = workflow.compile()

可以通过get_graph方法以及draw_ascii或draw_png等“绘制”方法来可视化图,这些绘制方法各自需要额外的依赖项:

from IPython.display import Image, display

try:
    display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
    # 这需要一些额外的依赖项,并且是可选的
    pass
graph

🌟 小案例:智能客服路由系统

假设我们想创建一个Agent,它能根据用户问题类型选择不同的处理分支:

from typing import Dict, List, Annotated, Literal
from typing_extensions import TypedDict
from langgraph.graph.message import add_messages
from langgraph.graph import StateGraph
from langchain_openai import ChatOpenAI
import os

# 配置OpenAI
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"

# 定义状态类型
class AgentState(TypedDict):
    input: str
    category: Literal["account", "payment", "other"]  # 问题分类
    response: str
    messages: Annotated[List[str], add_messages]

# 初始化模型
llm = ChatOpenAI(model="gpt-4.1-nano")

# 定义节点函数
def classify_input(state: AgentState) -> AgentState:
    """分类节点:识别问题类型"""
    user_input = state["input"]

    # 使用LLM进行分类
    response = llm.invoke(
        f"请分类以下用户问题(account/payment/other):\n{user_input}\n只需返回分类结果,不要解释。"
    )
    category = response.content.lower()

    # 更新状态
    state["category"] = "other"  # 默认值
    if "account" in category:
        state["category"] = "account"
    elif "payment" in category:
        state["category"] = "payment"

    state["messages"].append(f"分类结果: {state['category']}")
    return state

def handle_account(state: AgentState) -> AgentState:
    """处理账户问题"""
    response = llm.invoke(
        f"你是一个账户专家,请处理以下账户问题:\n{state['input']}\n请用中文给出详细解决方案:"
    )
    state["response"] = response.content
    state["messages"].append("进入账户问题处理流程")
    return state

def handle_payment(state: AgentState) -> AgentState:
    """处理支付问题"""
    response = llm.invoke(
        f"你是一个支付专家,请处理以下支付问题:\n{state['input']}\n请用中文给出详细解决方案:"
    )
    state["response"] = response.content
    state["messages"].append("进入支付问题处理流程")
    return state

def handle_other(state: AgentState) -> AgentState:
    """处理其他问题"""
    response = llm.invoke(
        f"你是一个客服专员,请处理以下问题:\n{state['input']}\n请用中文给出详细回答:"
    )
    state["response"] = response.content
    state["messages"].append("进入通用问题处理流程")
    return state

def final_response(state: AgentState) -> AgentState:
    """最终响应节点"""
    state["messages"].append(f"最终回复:{state['response']}")
    return state

# 定义路由函数
def route_based_on_category(state: AgentState) -> str:
    """根据分类结果路由到不同分支"""
    return state["category"]

# 构建流程图
workflow = StateGraph(AgentState)

# 添加节点
workflow.add_node("classify", classify_input)
workflow.add_node("account", handle_account)
workflow.add_node("payment", handle_payment)
workflow.add_node("other", handle_other)
workflow.add_node("respond", final_response)

# 设置条件边
workflow.add_conditional_edges(
    "classify",
    route_based_on_category,
    {
        "account": "account",
        "payment": "payment",
        "other": "other"
    }
)

# 连接分支到最终节点
workflow.add_edge("account", "respond")
workflow.add_edge("payment", "respond")
workflow.add_edge("other", "respond")

# 设置入口点
workflow.set_entry_point("classify")

# 编译图
graph = workflow.compile()
测试不同输入
test_cases = [
    "我的账号无法登录了",
    "信用卡付款没有成功",
    "你们什么时候放假?"
]

for query in test_cases:
    print(f"输入:{query}")
    for step in graph.stream({"input": query, "messages": []}):
        if "__end__" not in step:
            print(f"当前节点:{list(step.keys())[0]}")
    print(f"最终回复:{step['respond']['response']}\n")
测试结果
输入我的账号无法登录了
当前节点classify
当前节点account
当前节点respond
最终回复您好关于您的账号无法登录的问题以下是一些详细的排查和解决步骤供您参考

确认基本信息
1. 检查账号和密码是否正确确保输入的账号信息无误没有拼写错误或大小写问题
2. 查看是否开启了Caps Lock或Num Lock键避免输入错误
3. 试着复制粘贴密码以确保没有输入错误
......#由于篇幅问题省略回答

输入信用卡付款没有成功
当前节点classify
当前节点payment
当前节点respond
最终回复您好关于您的信用卡付款未成功的问题建议您按照以下步骤进行排查和解决

1. 核实信用卡信息
- 确认卡号有效期CVV码等信息填写无误
- 核对账单地址是否与提交的地址一致确保信息准确
......#由于篇幅问题省略回答

输入你们什么时候放假
当前节点classify
当前节点other
当前节点respond
最终回复您好感谢您的咨询关于我们的放假安排具体的假期时间会根据国家法定节假日和公司安排而定通常我们会在春节国庆节劳动节等法定节假日期间放假具体假期时间会提前通知如果您需要了解某个特定时间段的放假安排请提供具体日期或节假日名称我们将为您查询或提供最新的假期信息感谢您的理解与支持

aicsgraph

高级功能

1. 并行执行

有时需要同时执行多个操作,LangGraph支持并行节点:

# 定义两个并行节点
def search_web(state):
    # 搜索网络信息
    date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"date:{date},搜索网络信息")
    return {**state, "web_info": "从网络搜索到的信息"}

def check_database(state):
    # 查询数据库
    date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"date:{date},查询数据库")
    return {**state, "db_info": "从数据库获取的信息"}

graph_builder = StateGraph(State)

# 在图中配置并行执行
graph_builder.add_node("parallel_search", search_web)
graph_builder.add_node("parallel_db_check", check_database)

# 从分支节点开始并行执行
graph_builder.add_edge(START, "parallel_search")
graph_builder.add_edge(START, "parallel_db_check")

# 合并节点汇总结果
def merge_results(states):
    return states

graph_builder.add_node("merge", merge_results)
graph_builder.add_edge("parallel_search", "merge")
graph_builder.add_edge("parallel_db_check", "merge")

graph = graph_builder.compile()
graph1

完整代码
from IPython.display import Image, display
from langgraph.graph import START,StateGraph
from datetime import datetime
import time

# 定义两个并行节点
def search_web(state):
    # 搜索网络信息
    date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"date:{date},搜索网络信息")
    time.sleep(1) #休眠1s
    return {**state, "web_info": "从网络搜索到的信息"}

def check_database(state):
    # 查询数据库
    date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    print(f"date:{date},查询数据库")
    return {**state, "db_info": "从数据库获取的信息"}

graph_builder = StateGraph(State)
# 在图中配置并行执行
graph_builder.add_node("parallel_search", search_web)
graph_builder.add_node("parallel_db_check", check_database)

# 从分支节点开始并行执行
graph_builder.add_edge(START, "parallel_search")
graph_builder.add_edge(START, "parallel_db_check")

# 合并节点汇总结果
def merge_results(states):
    return states

graph_builder.add_node("merge", merge_results)
graph_builder.add_edge("parallel_search", "merge")
graph_builder.add_edge("parallel_db_check", "merge")

graph = graph_builder.compile()

# 初始化状态
initial_state = {}
result = graph.invoke(initial_state)                      
print(f"date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}, graph 执行结束")

# 可视化图形
try:
    display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
    pass
输出结果

date:2025-05-28 03:11:40查询数据库
date:2025-05-28 03:11:40搜索网络信息
date: 2025-05-28 03:11:41 graph 执行结束
从输出结果中可以看到,在search_web节点中使用time.sleep(1)休眠1秒钟,LangGraph 会非常智能的等待所有并行节点执行完毕

异步节点执行

当节点函数是个异步的函数,即使用async def定义的函数,此时添加节点和添加边的方法不变,但是调用工作流的方法需要变为异步的ainvoke

2. 重试策略

在实际应用中,重试策略是非常重要的,尤其是在调用外部服务(如API、数据库或LLM)时。这些服务可能会因为网络问题、服务不可用、超时或其他临时性问题而失败。通过实现重试策略,可以提高系统的可靠性和容错能力,确保在遇到临时性问题时能够自动恢复,而不是直接失败。

要配置重试策略,需要将retry参数传递给add_noderetry参数接受名为元组对象的RetryPolicy。下面,我们用默认参数实例化一个RetryPolicy对象,并将其与节点相关联:

from langgraph.pregel import RetryPolicy

builder.add_node(
    "node_name",
    node_function,
    retry=RetryPolicy(),
)
示例代码

import sqlite3
from langchain.chat_models import init_chat_model
from langgraph.graph import END, MessagesState, StateGraph, START
from langgraph.pregel import RetryPolicy
from langchain_community.utilities import SQLDatabase
from langchain_core.messages import AIMessage

db = SQLDatabase.from_uri("sqlite:///:memory:")
model = init_chat_model("anthropic:claude-3-5-haiku-latest")

def query_database(state: MessagesState):
    query_result = db.run("SELECT * FROM Artist LIMIT 10;")
    return {"messages": [AIMessage(content=query_result)]}

def call_model(state: MessagesState):
    response = model.invoke(state["messages"])
    return {"messages": [response]}

# 定义新的图
builder = StateGraph(MessagesState)
builder.add_node(
    "query_database",
    query_database,
    retry=RetryPolicy(retry_on=sqlite3.OperationalError),
)
builder.add_node("model", call_model, retry=RetryPolicy(max_attempts=5))
builder.add_edge(START, "model")
builder.add_edge("model", "query_database")
builder.add_edge("query_database", END)

graph = builder.compile()
# 可视化图形
try:
    display(Image(graph.get_graph().draw_mermaid_png()))
except Exception:
    pass
graph2

3. 人机交互

LangGraph 中的interrupt功能通过在特定节点暂停图形执行,向人类展示信息,并使用人类的输入恢复图形,从而实现人机交互工作流程。关键组件包括:

  • interrupt函数:在特定节点调用 interrupt 函数时,图形执行会暂停,并将指定的信息传递给人类。
  • Command对象:用于恢复图形执行,并将人类的输入传递回图形。
  • 检查点器(Checkpointer):用于保存图形的状态,以便在暂停后能够恢复执行。
from langgraph.types import interrupt, Command

def human_node(state: State):
    value = interrupt( 
        {
            "text_to_revise": state["some_text"]  # 需要人类修订的文本
        }
    )
    return {
        "some_text": value  # 将修订后的文本返回到图状态中
    }

# 编译图形并传入检查点器
graph = graph_builder.compile(checkpointer=checkpointer) 

# 运行图形,直到遇到中断点
config = {"configurable": {"thread_id": "some_id"}}
result = graph.invoke({"some_text": "original text"}, config=config)  # 初始文本为 "original text"
print(result['__interrupt__']) 
# 输出中断信息:
# > [
# >    Interrupt(
# >       value={'text_to_revise': 'original text'},  # 需要人类修订的文本
# >       resumable=True,  # 表示可以恢复执行
# >       ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']  # 中断节点的唯一标识
# >    )
# > ] 

# 使用人类输入恢复图形执行
print(graph.invoke(Command(resume="Edited text"), config=config)) 
# 输出修订后的结果:
# > {'some_text': 'Edited text'}  # 人类修订后的文本

graph3

完整代码
from typing import TypedDict
import uuid
from IPython.display import display, Image
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.constants import START
from langgraph.graph import StateGraph
from langgraph.types import interrupt, Command

# 定义状态类型
class State(TypedDict):
    some_text: str  # 图状态中需要处理的文本

# 定义人类交互节点
def human_node(state: State):
    # 调用 interrupt 函数暂停图形执行,并向人类展示需要修订的文本
    value = interrupt( 
        {
            "text_to_revise": state["some_text"]  # 传递当前文本内容
        }
    )
    # 将人类修订后的文本返回到图状态中
    return {
        "some_text": value 
    }

# 构建图
graph_builder = StateGraph(State)
graph_builder.add_node("human_node", human_node)  # 添加人类交互节点
graph_builder.add_edge(START, "human_node")  # 从起点连接到人类交互节点

# 使用内存检查点器保存图状态
checkpointer = InMemorySaver() 

# 编译图
graph = graph_builder.compile(checkpointer=checkpointer)

# 为图运行传递一个线程 ID
config = {"configurable": {"thread_id": uuid.uuid4()}}

# 运行图,直到遇到中断点
result = graph.invoke({"some_text": "original text"}, config=config) 

print(result['__interrupt__']) 
# 输出中断信息:
# > [
# >    Interrupt(
# >       value={'text_to_revise': 'original text'},  # 需要人类修订的文本内容
# >       resumable=True,  # 表示可以恢复执行
# >       ns=['human_node:6ce9e64f-edef-fe5d-f7dc-511fa9526960']  # 中断节点的唯一标识
# >    )
# > ] 

# 使用人类输入恢复图执行
print(graph.invoke(Command(resume="Edited text"), config=config)) 
# 输出修订后的结果:
# > {'some_text': 'Edited text'}  # 人类修订后的文本内容
输出结果
[Interrupt(value={'text_to_revise': 'original text'}, resumable=True, ns=['human_node:a08775a1-fc79-cebc-5444-df5e905de026'])]
{'some_text': 'Edited text'}

与LangChain集成

LangGraph与LangChain完美集成,可以在图中使用LangChain的各种组件:

from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain_openai import ChatOpenAI

# 创建工具
weather_tool = Tool(
    name="天气查询",
    func=lambda loc: f"{loc}今天晴朗,温度25度",
    description="查询特定地点的天气情况"
)

# 创建LangChain Agent
llm = ChatOpenAI(model="gpt-4.1-nano")
agent = initialize_agent(
    tools=[weather_tool],
    llm=llm,
    agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# 在LangGraph节点中使用LangChain Agent
# 创建LangGraph节点
def agent_node(state: AgentState):
    result = agent.invoke({"input": state["input"]})
    return {"agent_output": result["output"]}

# 添加到图中
graph.add_node("agent_task", agent_node)
完整代码
from langchain.tools import Tool
from langchain.agents import initialize_agent, AgentType
from langchain_openai import ChatOpenAI
from langgraph.graph import StateGraph, END

import os
os.environ["OPENAI_BASE_URL"] = "https://oneapi.handbook.cool/v1"
os.environ["OPENAI_API_KEY"] = "sk-XnbHbzBOmPYGHgL_8q1nHn9pF7SRIQO-3M0QhYcpYAmV3kxQJ7SiqbzfETE"

# 创建语言模型
llm = ChatOpenAI(model="gpt-4.1-nano")

# 创建工具
weather_tool = Tool(
    name="天气查询",
    func=lambda loc: f"{loc}今天晴朗,温度25度",
    description="查询特定地点的天气情况"
)

agent = initialize_agent(
    tools=[weather_tool],
    llm=llm,
    agent=AgentType.STRUCTURED_CHAT_ZERO_SHOT_REACT_DESCRIPTION,
    verbose=True
)

# 定义状态结构
class AgentState(dict):
    input: str
    agent_output: str = None

# 创建LangGraph节点
def agent_node(state: AgentState):
    result = agent.invoke({"input": state["input"]})
    return {"agent_output": result["output"]}

# 初始化图
graph = StateGraph(AgentState)
graph.add_node("agent_task", agent_node)
graph.set_entry_point("agent_task")
graph.add_edge("agent_task", END)
app = graph.compile()

# 使用示例
result = app.invoke({"input": "北京天气怎么样?"})
print(result["agent_output"])
输出结果
> Entering new AgentExecutor chain...
{
"action": "天气查询",
"action_input": {
    "tool_input": "北京"
}
}

> Finished chain.
{
"action": "天气查询",
"action_input": {
    "tool_input": "北京"
}
}

最佳实践

  1. 状态设计:仔细规划状态结构,明确每个字段的用途
  2. 节点职责单一:每个节点只做一件事,保持逻辑清晰
  3. 错误处理:加入错误捕获和重试机制,提高鲁棒性
  4. 提示优化:花时间调整每个节点中的提示以获得最佳结果
  5. 可视化调试:使用tracing工具观察图的执行过程
  6. 逐步构建:先构建简单工作流,再逐步添加复杂功能

下一步

现在你已经掌握了LangGraph的基础,接下来可以:

  • 构建更复杂的Agent思维流程,如ReAct、Reflection等
  • 探索人类反馈在循环(HITL)的集成
  • 学习持久化状态和长期运行工作流
  • 尝试用LangGraph构建多Agent系统

祝你在LangGraph的世界里创造出智能、高效的AI系统!🚀