返回 Expert 笔记
Expert Day 156

LangGraph 深度——StateGraph / Conditional Edges / Cycles / Checkpoint

LangGraph 核心概念:StateGraph 状态合并语义、Node/Edge、Conditional edges、Cycles、Checkpoint+Persistence、Interrupt(human-in-the-loop)

2026-10-04
Phase 3 - Agent架构与多Agent (Day 149-162)
LangGraphStateGraphCheckpointHumanInTheLoopLangChain

日期: 2026-10-04 方向: AI系统工程 / Agent 阶段: Phase 3 - Agent架构与多Agent (Day 149-162) 标签: #LangGraph #StateGraph #Checkpoint #HumanInTheLoop #LangChain


今日目标

类型内容
学习LangGraph 核心概念:StateGraph 状态合并语义、Node/Edge、Conditional edges、Cycles、Checkpoint+Persistence、Interrupt(human-in-the-loop)
实操把 Day 150 的裸 ReAct 重写成 LangGraph 版本;演示 checkpoint resume 和 interrupt
产出lg_agent.py(约 500 行)+ 与裸版本的代码量/可读性对比

一、LangGraph 是什么 / 为什么

1.1 来源

LangGraph 是 LangChain Inc 2024 年推出的 agent 编排库。背景:LangChain 早期的 AgentExecutor 只能跑线性 ReAct,难以表达复杂 agent。LangGraph 把 agent 抽象成有向图(带 cycles)+ 共享状态,本质是受 Pregel / Apache Flink 启发的 stateful streaming computation。

1.2 核心抽象

抽象含义
State一个 TypedDict 或 Pydantic model,所有 node 读写它
Node一个函数 (state) → state_update,state_update 会被 reducer merge 进 state
Edgenode 间的连接
Conditional Edge一个函数决定 next node
Cycle允许图里有环(agent loop 必需)
Checkpointer持久化 state 到 SQLite/Postgres/Redis
Interrupt在指定 node 前/后暂停,等用户输入
Stream实时输出 state updates

1.3 与裸 SDK 的取舍

维度裸 Anthropic SDKLangGraph
行数50-100100-200
学习曲线中(reducer 语义)
Persistence自己写内置 checkpointer
Resume / time travel自己写内置
Human-in-loop自己写 interrupt内置 interrupt()
可视化LangSmith / draw_graph
锁定与 LangChain 生态绑定
Debug直接看 messagesLangSmith trace 较好

经验法则:① POC 阶段用裸 SDK;② 生产化(要 persistence/HIL/observability)切 LangGraph;③ 复杂多 agent 也是 LangGraph 优势区。


二、架构图——LangGraph ReAct

┌──────────────────────────────────────────────────────────────────┐
│                      StateGraph                                   │
│                                                                  │
│   State {                                                         │
│     messages: Annotated[list[Msg], add_messages]                 │
│     iter_count: int                                              │
│     last_tool_error: str                                         │
│   }                                                              │
│                                                                  │
│       ┌───────────┐                                              │
│       │   START   │                                              │
│       └─────┬─────┘                                              │
│             │                                                    │
│             ▼                                                    │
│       ┌───────────┐                                              │
│       │   agent   │  ← LLM call (claude-opus-4-7)                │
│       │  (node)   │                                              │
│       └─────┬─────┘                                              │
│             │                                                    │
│       conditional edge                                           │
│             │                                                    │
│        ┌────┴─────┐                                              │
│        │          │                                              │
│   tool_use?    end_turn?                                         │
│        │          │                                              │
│        ▼          ▼                                              │
│   ┌─────────┐  ┌─────┐                                           │
│   │  tools  │  │ END │                                           │
│   │ (node)  │  └─────┘                                           │
│   └────┬────┘                                                    │
│        │                                                         │
│        └─────► back to agent (cycle)                             │
│                                                                  │
│   Checkpointer: SqliteSaver, persists state per thread_id        │
│   Interrupt: before "tools" node if destructive                  │
└──────────────────────────────────────────────────────────────────┘

三、代码——lg_agent.py

3.1 安装

pip install langgraph langchain-anthropic langchain-core

3.2 实现

# lg_agent.py
"""
Day 156 - LangGraph version of the Day 150 ReAct agent.

Adds:
  - SqliteSaver checkpointer (resume across runs)
  - interrupt before destructive tools
  - streaming state updates

Run:
  python lg_agent.py "Analyze AAPL services revenue %"
"""
from __future__ import annotations
import json
import os
import sys
from typing import Annotated, Literal, TypedDict
from operator import add

from langchain_anthropic import ChatAnthropic
from langchain_core.messages import (
    AIMessage, HumanMessage, ToolMessage, SystemMessage, BaseMessage,
)
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt, Command

# ====================================================================
# Tools (decorator style; LangGraph auto-generates schema from typing)
# ====================================================================
@tool
def search_filings(ticker: str, form: str = "10-Q") -> str:
    """Search SEC filings for a US-listed company.

    Args:
        ticker: stock ticker, e.g. AAPL
        form: 10-K | 10-Q | 8-K
    """
    db = {"AAPL": [{"form": "10-Q", "date": "2026-08-01",
                    "url": "sec.gov/aapl-10q-2026q3"}]}
    return json.dumps(db.get(ticker.upper(), []))

@tool
def fetch_filing(url: str) -> str:
    """Fetch and summarize a specific filing by URL."""
    return json.dumps({
        "url": url,
        "summary": "Revenue $94.9B (+3% YoY). Services $24.2B. Net cash $48B.",
    })

@tool
def calculate(expression: str) -> str:
    """Evaluate a numeric Python expression. Example: '24.2 / 94.9 * 100'."""
    try:
        return str(eval(expression, {"__builtins__": {}}, {}))
    except Exception as e:
        return f"calc_error: {e}"

@tool
def send_email(to: str, subject: str, body: str) -> str:
    """*** DESTRUCTIVE *** Send an email. Requires human approval."""
    return json.dumps({"sent_to": to, "subject": subject})

ALL_TOOLS = [search_filings, fetch_filing, calculate, send_email]
DESTRUCTIVE = {"send_email"}

# ====================================================================
# State
# ====================================================================
class AgentState(TypedDict):
    messages: Annotated[list[BaseMessage], add_messages]
    iter_count: Annotated[int, add]
    pending_destructive: list[dict]   # tool calls awaiting approval

# ====================================================================
# LLM
# ====================================================================
llm = ChatAnthropic(model="claude-opus-4-7", max_tokens=2048).bind_tools(ALL_TOOLS)

SYSTEM = SystemMessage(content=(
    "You are a senior financial analyst. Use tools as needed. "
    "Stop when you have enough info to answer."
))

# ====================================================================
# Nodes
# ====================================================================
def agent_node(state: AgentState) -> dict:
    msgs = [SYSTEM] + state["messages"]
    resp = llm.invoke(msgs)
    return {"messages": [resp], "iter_count": 1}

def tools_node(state: AgentState) -> dict:
    """Execute tool calls. If any are destructive, interrupt for approval."""
    last = state["messages"][-1]
    if not isinstance(last, AIMessage) or not last.tool_calls:
        return {}

    # Split destructive vs safe
    destructive = [tc for tc in last.tool_calls if tc["name"] in DESTRUCTIVE]
    safe = [tc for tc in last.tool_calls if tc["name"] not in DESTRUCTIVE]

    # Run safe immediately
    tool_msgs: list[ToolMessage] = []
    tool_map = {t.name: t for t in ALL_TOOLS}
    for tc in safe:
        try:
            out = tool_map[tc["name"]].invoke(tc["args"])
            tool_msgs.append(ToolMessage(content=str(out), tool_call_id=tc["id"]))
        except Exception as e:
            tool_msgs.append(ToolMessage(
                content=f"tool_error: {e}",
                tool_call_id=tc["id"],
                status="error",
            ))

    if destructive:
        # Pause for human approval
        approval = interrupt({
            "type": "destructive_tool_approval",
            "tool_calls": destructive,
            "context": "These tool calls are marked destructive. Approve?",
        })
        # When resumed, `approval` is whatever the human sent
        if approval == "approve":
            for tc in destructive:
                out = tool_map[tc["name"]].invoke(tc["args"])
                tool_msgs.append(ToolMessage(content=str(out), tool_call_id=tc["id"]))
        else:
            for tc in destructive:
                tool_msgs.append(ToolMessage(
                    content=f"rejected_by_human: {approval}",
                    tool_call_id=tc["id"],
                    status="error",
                ))
    return {"messages": tool_msgs}

# ====================================================================
# Conditional edge
# ====================================================================
def should_continue(state: AgentState) -> Literal["tools", "end"]:
    last = state["messages"][-1]
    if isinstance(last, AIMessage) and last.tool_calls:
        return "tools"
    return "end"

def hit_max_iters(state: AgentState) -> bool:
    return state["iter_count"] >= 12

# ====================================================================
# Build the graph
# ====================================================================
def build_graph(checkpointer):
    g = StateGraph(AgentState)
    g.add_node("agent", agent_node)
    g.add_node("tools", tools_node)

    g.add_edge(START, "agent")
    g.add_conditional_edges("agent", should_continue, {
        "tools": "tools",
        "end": END,
    })
    g.add_edge("tools", "agent")   # cycle back

    return g.compile(checkpointer=checkpointer)

# ====================================================================
# CLI
# ====================================================================
def main():
    task = sys.argv[1] if len(sys.argv) > 1 else \
        "Analyze AAPL's services revenue % of total from latest 10-Q."

    with SqliteSaver.from_conn_string("agent_checkpoints.db") as ckpt:
        graph = build_graph(ckpt)
        thread_id = "thread-1"
        config = {"configurable": {"thread_id": thread_id}}

        # Stream the run
        events = graph.stream(
            {"messages": [HumanMessage(content=task)],
             "iter_count": 0,
             "pending_destructive": []},
            config=config,
            stream_mode="updates",
        )
        for ev in events:
            for node, update in ev.items():
                print(f"\n--- node: {node} ---")
                if "messages" in update:
                    for m in update["messages"]:
                        print(f"  [{type(m).__name__}] {str(m.content)[:300]}")

        # If interrupted, simulate human approval and resume
        snap = graph.get_state(config)
        if snap.next:
            print(f"\n*** Interrupted at: {snap.next} ***")
            print("(simulating human approval)")
            for ev in graph.stream(Command(resume="approve"), config=config,
                                   stream_mode="updates"):
                for node, update in ev.items():
                    print(f"  resume node: {node}")

        final_state = graph.get_state(config).values
        print("\n=== FINAL ===")
        for m in final_state["messages"]:
            if isinstance(m, AIMessage) and m.content and not m.tool_calls:
                print(m.content)
        print(f"\niters: {final_state['iter_count']}")

if __name__ == "__main__":
    main()

3.3 关键设计点解读

(a) State 的 reducer 语义

messages: Annotated[list[BaseMessage], add_messages]
iter_count: Annotated[int, add]
  • add_messages 是 LangGraph 内置 reducer:append 新消息(处理 tool_call_id 匹配)
  • add 是 operator.add:纯加法
  • 没标 reducer 的字段:node 返回值覆盖

这是初学者最大坑:返回 {"messages": [...]} 不会替换 messages 列表,而是 append。

(b) Conditional edge

g.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})

等价于:跑完 agent 后调用 should_continue(state),根据返回值决定下一 node。

(c) Cycle

g.add_edge("tools", "agent") 制造环。LangGraph 用 recursion_limit(默认 25)防死循环,可调。

(d) Checkpoint

with SqliteSaver.from_conn_string("agent_checkpoints.db") as ckpt:
    graph = build_graph(ckpt)
    config = {"configurable": {"thread_id": "thread-1"}}

thread_id 是会话 id。每次 stream / invoke 自动 checkpoint state 到 SQLite。下次同 thread_id 调用会从 checkpoint 恢复。

(e) Interrupt

interrupt({...}) 暂停图执行并把 payload 抛出。client 通过 Command(resume=...) 恢复,resume value 成为 interrupt() 的返回值。这是 human-in-the-loop 的官方机制。

(f) Stream

graph.stream(input, config, stream_mode="updates")

stream_mode 选项:values(每步全 state)/updates(每步 diff)/messages(仅消息流)/debug


四、与裸版本的对比

裸版本 (react.py from Day 150)              LangGraph 版本 (lg_agent.py)
─────────────────────────────────────────────────────────────────────
代码行数: ~280                               ~350
LLM 调用: 显式循环                           隐藏在 add_conditional_edges
状态管理: 手动维护 messages list             TypedDict + reducer
Persistence: 无                              SqliteSaver 一行接入
Resume: 无                                   thread_id 自动 resume
HIL: 无                                      interrupt() 内置
可视化: 无                                   graph.get_graph().draw_mermaid()
Debug: print + log                           LangSmith trace
锁定: 0                                      langgraph + langchain ecosystem

当 agent 简单且不需要 persistence/HIL,裸版本仍然是更好的工程选择。当需要 stateful、resumable、可视化,LangGraph 收益明显。


五、金融领域应用

5.1 LangGraph 适合的金融场景

场景为什么需要 LangGraph
客户经理 copilot 长会话需要 thread persistence(同客户多次咨询接续)
审批流(贷款/合规)interrupt 接审批人;HIL 是必需
复杂研究 agent多步、需要 time-travel debug
自动化报表生成checkpoint 失败可 resume
回测中保存中间状态大计算的中间状态恢复

5.2 实战示例:贷款审批 agent

# 节点
load_application -> extract_features -> risk_score -> [interrupt: human review if score > X] -> decide -> notify

每步 checkpoint。审批人在 web UI 上看到 pending tasks,点击 approve / reject / request_info,对应 Command(resume=...)。LangGraph state DB 直接成为审批工单系统的 backing store。


六、Web3 集成

Onchain action 必须 interrupt

任何写链 tool 之前 hard interrupt:

def tools_node(state):
    if any(tc["name"] in {"swap","transfer"} for tc in last.tool_calls):
        approval = interrupt({
            "type": "onchain_action",
            "calls": [tc for tc in last.tool_calls if tc["name"] in {"swap","transfer"}],
            "simulation": simulate_calls(...),
            "estimated_gas": estimate(...),
        })
        if approval != "sign":
            return tool_messages_with_rejection(...)
        # else: send to wallet for signing

Frontend 接到 interrupt payload,弹钱包让用户手签。链上动作的"interrupt + Command(resume)"模型正好对应"用户审签"的产品交互。


七、生产经验与陷阱

  1. State 字段没标 reducer,被覆盖 返回 {"messages": [new]} 期望 append,结果整个 list 被换成 [new]。所有 list/dict 字段都要标 reduceradd_messages / add / 自定义 merger)。

  2. Recursion limit 撞墙 默认 25,复杂 agent 超过。graph.invoke(..., {"recursion_limit": 100}) 但同时要有 cost cap。

  3. Tool call_id 不匹配 AIMessage 里有 5 个 tool_calls,ToolNode 只回 4 个 ToolMessage → Anthropic API 报错。ToolNode(prebuilt)会自动处理;自己写要严格匹配。

  4. Checkpoint DB 膨胀 长 thread + 大 state,sqlite 文件 GB 级。Postgres + 压缩列 + 定期归档。

  5. Interrupt 后忘记恢复 生产里有人提交 task → interrupt → 客户离开 → state 永远 pending。需要 TTL + 提醒 + 兜底关闭。

  6. Stream 模式选错values 把整个 state 流式发到前端,content 暴露过多。生产用 updatesmessages

  7. add_messages 处理 tool_call 的细节 add_messages 内置去重、tool_call_id 匹配。如果你自定义 merger 搞错,tool message 顺序乱,LLM 报错。除非有非常具体需求,不要替换 add_messages

  8. LangChain 版本碎片 langgraph / langchain-core / langchain-anthropic 三个包要兼容。pin 版本。


八、Cost & Latency

LangGraph 本身 overhead 很小(dict 操作 + reducer),但:

  • Checkpoint 写入 SQLite 每节点 ~5-20ms
  • LangSmith tracing(开启时)每 LLM call +50-200ms 网络
  • Postgres checkpointer 每节点 +20-50ms

LLM cost 与裸版本相同。LangGraph 不省钱也不烧更多钱。


九、关键速查

LangGraph API 表

API用途
StateGraph(SchemaClass)创建图
g.add_node(name, fn)加节点
g.add_edge(a, b)直连
g.add_conditional_edges(a, fn, mapping)条件分支
g.compile(checkpointer=...)生成可执行图
graph.invoke(input, config)同步跑完
graph.stream(input, config, stream_mode)流式
graph.get_state(config)读取当前 state
graph.get_state_history(config)time-travel
Command(resume=value)从 interrupt 恢复
interrupt(payload)暂停并抛 payload

Reducer cheatsheet

用法效果
Annotated[list, add_messages]message append + tool_call 处理
Annotated[int, add]加法
Annotated[list, operator.add]list 拼接
不标覆盖
自定义 (a, b) -> c任意合并

十、面试题

Q1: LangGraph 的 State 用 TypedDict 而不是普通 dict,为什么?

A: 类型注解承载 reducer。Annotated[list, add_messages] 不仅是 type hint,LangGraph 通过反射拿到 reducer 函数,决定 node 返回值如何 merge 进 state。这是为什么必须用 TypedDict(或 Pydantic)。

Q2: 裸 SDK 和 LangGraph 怎么选?

A: ① 简单 / 探索阶段 → 裸(看清楚原理);② 需要 checkpoint / resume / HIL / time-travel → LangGraph;③ 团队已经用 LangChain 生态(LangSmith / LangServe)→ LangGraph;④ 不想锁定生态 → 裸(或考虑 PydanticAI / 自家框架)。生产里两者并存:最简单 agent 用裸,复杂 stateful agent 用 LangGraph。

Q3: add_messages reducer 解决什么问题?

A: ① 自动 append 新消息;② 处理 tool_call_id 匹配(确保每个 tool_call 都有 tool_message);③ 处理消息 ID 去重;④ 维护正确顺序。手动维护 message list 容易在 tool_call 匹配上出 bug,add_messages 是经验沉淀。

Q4: 在 tools 节点前加 interrupt 实现 human-in-the-loop,如何 web 化?

A: ① 后端用 LangGraph + Postgres checkpointer;② thread_id 是 session id;③ stream 模式发送 update 给前端;④ 收到 __interrupt__ 事件时前端弹 confirm UI;⑤ 用户点击后调 /resume?thread=xxx&value=approve;⑥ 后端 graph.stream(Command(resume="approve"), config) 续跑。LangGraph Studio 有现成的 UI 可参考。

Q5: 一个 LangGraph agent 跑了 100 轮 cycles 没停,怎么 debug?

A: ① 设 recursion_limit + cost cap;② 看 checkpoint history(graph.get_state_history),在哪里反复调同一 tool;③ 检查 conditional edge 函数(是否永远返回 "tools");④ 看 tool 返回是否给 LLM 错误印象(误以为还要继续);⑤ system prompt 加"如果连续 N 次没新信息就停止"。


十一、深入:Reducer 实战

LangGraph 的 reducer 不仅是 add_messages。常用模式:

11.1 Set 累积(去重)

def union(a, b):
    return list(set(a) | set(b))
class S(TypedDict):
    seen_tickers: Annotated[list[str], union]

11.2 Dict 深合并

def deep_merge(a, b):
    out = dict(a)
    for k, v in b.items():
        if k in out and isinstance(out[k], dict) and isinstance(v, dict):
            out[k] = deep_merge(out[k], v)
        else:
            out[k] = v
    return out
class S(TypedDict):
    findings: Annotated[dict, deep_merge]

11.3 Last-write-wins (no reducer)

不标 = 覆盖。适合 single-writer 字段。

11.4 Numeric max

import operator
class S(TypedDict):
    max_score: Annotated[float, max]   # 注意 max 不是 operator
    cumulative_cost: Annotated[float, operator.add]

十二、Time-travel debug 实战

LangGraph 把每个 super-step 持久化,graph.get_state_history(config) 返回所有 checkpoint:

history = list(graph.get_state_history(config))
# 找到第 3 个 checkpoint,从那里 fork 一个分支重跑
forked_config = {"configurable": {
    "thread_id": "thread-1",
    "checkpoint_id": history[3].config["configurable"]["checkpoint_id"],
}}
# 修改 state 重跑
graph.update_state(forked_config, {"messages": [...]})
graph.invoke(None, forked_config)

适合:

  • 复现客户报告的 bug
  • A/B 测试不同 prompt 在同一历史路径上
  • "如果在那一步选 B 而不是 A 会怎样"——产品决策辅助

十三、Subgraphs(嵌套图)

LangGraph 支持把图当节点:

sub = build_research_subgraph()  # itself a compiled graph
parent = StateGraph(ParentState)
parent.add_node("research_phase", sub)   # 直接当 node 用

注意 schema 兼容(parent state 必须能投影出 sub 需要的字段)。这是组织复杂 agent 的关键技巧(Day 162 的 multi-agent v1 用到)。


十四、Streaming 模式细节

stream_mode内容适合场景
values每步完整 statedemo / debug
updates每步 diff生产监控
messagesLLM token 流前端打字机效果
debug含内部事件极致 debug
custom自定义业务事件

混用:stream_mode=["updates", "messages"] 同时拿两种。


十五、扩展练习

  1. 加 cost tracking node——每个 agent_node 后记录 token & cost 到 state
  2. 加 retry node——某 tool 失败时重试 N 次
  3. 改成 async——graph.ainvoke() + langchain_anthropic async
  4. 接 LangSmith——os.environ["LANGSMITH_TRACING"] = "true" 立刻有 trace
  5. 画图导出——graph.get_graph().draw_mermaid() 写到 README
  6. Postgres checkpointer——切到 PostgresSaver,并发更安全
  7. 写 fork-and-replay——给定某 checkpoint,修改 state,重跑后续

明日预告

Day 157: CrewAI vs AutoGen vs LangGraph——同任务三框架对比

  • CrewAI 的 role/task 模型
  • AutoGen 的 ConversableAgent / GroupChat
  • 用三个框架实现同一个金融研究任务
  • 性能 / 代码量 / 调试体验对比