LangGraph 深度——StateGraph / Conditional Edges / Cycles / Checkpoint
LangGraph 核心概念:StateGraph 状态合并语义、Node/Edge、Conditional edges、Cycles、Checkpoint+Persistence、Interrupt(human-in-the-loop)
日期: 2026-10-04 方向: AI系统工程 / Agent 阶段: Phase 3 - Agent架构与多Agent (Day 149-162) 标签: #LangGraph #StateGraph #Checkpoint #HumanInTheLoop #LangChain
今日目标
| 类型 | 内容 |
|---|---|
| 学习 | LangGraph 核心概念:StateGraph 状态合并语义、Node/Edge、Conditional edges、Cycles、Checkpoint+Persistence、Interrupt(human-in-the-loop) |
| 实操 | 把 Day 150 的裸 ReAct 重写成 LangGraph 版本;演示 checkpoint resume 和 interrupt |
| 产出 | lg_agent.py(约 500 行)+ 与裸版本的代码量/可读性对比 |
一、LangGraph 是什么 / 为什么
1.1 来源
LangGraph 是 LangChain Inc 2024 年推出的 agent 编排库。背景:LangChain 早期的 AgentExecutor 只能跑线性 ReAct,难以表达复杂 agent。LangGraph 把 agent 抽象成有向图(带 cycles)+ 共享状态,本质是受 Pregel / Apache Flink 启发的 stateful streaming computation。
1.2 核心抽象
| 抽象 | 含义 |
|---|---|
| State | 一个 TypedDict 或 Pydantic model,所有 node 读写它 |
| Node | 一个函数 (state) → state_update,state_update 会被 reducer merge 进 state |
| Edge | node 间的连接 |
| Conditional Edge | 一个函数决定 next node |
| Cycle | 允许图里有环(agent loop 必需) |
| Checkpointer | 持久化 state 到 SQLite/Postgres/Redis |
| Interrupt | 在指定 node 前/后暂停,等用户输入 |
| Stream | 实时输出 state updates |
1.3 与裸 SDK 的取舍
| 维度 | 裸 Anthropic SDK | LangGraph |
|---|---|---|
| 行数 | 50-100 | 100-200 |
| 学习曲线 | 平 | 中(reducer 语义) |
| Persistence | 自己写 | 内置 checkpointer |
| Resume / time travel | 自己写 | 内置 |
| Human-in-loop | 自己写 interrupt | 内置 interrupt() |
| 可视化 | 无 | LangSmith / draw_graph |
| 锁定 | 无 | 与 LangChain 生态绑定 |
| Debug | 直接看 messages | LangSmith trace 较好 |
经验法则:① POC 阶段用裸 SDK;② 生产化(要 persistence/HIL/observability)切 LangGraph;③ 复杂多 agent 也是 LangGraph 优势区。
二、架构图——LangGraph ReAct
┌──────────────────────────────────────────────────────────────────┐
│ StateGraph │
│ │
│ State { │
│ messages: Annotated[list[Msg], add_messages] │
│ iter_count: int │
│ last_tool_error: str │
│ } │
│ │
│ ┌───────────┐ │
│ │ START │ │
│ └─────┬─────┘ │
│ │ │
│ ▼ │
│ ┌───────────┐ │
│ │ agent │ ← LLM call (claude-opus-4-7) │
│ │ (node) │ │
│ └─────┬─────┘ │
│ │ │
│ conditional edge │
│ │ │
│ ┌────┴─────┐ │
│ │ │ │
│ tool_use? end_turn? │
│ │ │ │
│ ▼ ▼ │
│ ┌─────────┐ ┌─────┐ │
│ │ tools │ │ END │ │
│ │ (node) │ └─────┘ │
│ └────┬────┘ │
│ │ │
│ └─────► back to agent (cycle) │
│ │
│ Checkpointer: SqliteSaver, persists state per thread_id │
│ Interrupt: before "tools" node if destructive │
└──────────────────────────────────────────────────────────────────┘
三、代码——lg_agent.py
3.1 安装
pip install langgraph langchain-anthropic langchain-core
3.2 实现
# lg_agent.py
"""
Day 156 - LangGraph version of the Day 150 ReAct agent.
Adds:
- SqliteSaver checkpointer (resume across runs)
- interrupt before destructive tools
- streaming state updates
Run:
python lg_agent.py "Analyze AAPL services revenue %"
"""
from __future__ import annotations
import json
import os
import sys
from typing import Annotated, Literal, TypedDict
from operator import add
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import (
AIMessage, HumanMessage, ToolMessage, SystemMessage, BaseMessage,
)
from langchain_core.tools import tool
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langgraph.checkpoint.sqlite import SqliteSaver
from langgraph.prebuilt import ToolNode
from langgraph.types import interrupt, Command
# ====================================================================
# Tools (decorator style; LangGraph auto-generates schema from typing)
# ====================================================================
@tool
def search_filings(ticker: str, form: str = "10-Q") -> str:
"""Search SEC filings for a US-listed company.
Args:
ticker: stock ticker, e.g. AAPL
form: 10-K | 10-Q | 8-K
"""
db = {"AAPL": [{"form": "10-Q", "date": "2026-08-01",
"url": "sec.gov/aapl-10q-2026q3"}]}
return json.dumps(db.get(ticker.upper(), []))
@tool
def fetch_filing(url: str) -> str:
"""Fetch and summarize a specific filing by URL."""
return json.dumps({
"url": url,
"summary": "Revenue $94.9B (+3% YoY). Services $24.2B. Net cash $48B.",
})
@tool
def calculate(expression: str) -> str:
"""Evaluate a numeric Python expression. Example: '24.2 / 94.9 * 100'."""
try:
return str(eval(expression, {"__builtins__": {}}, {}))
except Exception as e:
return f"calc_error: {e}"
@tool
def send_email(to: str, subject: str, body: str) -> str:
"""*** DESTRUCTIVE *** Send an email. Requires human approval."""
return json.dumps({"sent_to": to, "subject": subject})
ALL_TOOLS = [search_filings, fetch_filing, calculate, send_email]
DESTRUCTIVE = {"send_email"}
# ====================================================================
# State
# ====================================================================
class AgentState(TypedDict):
messages: Annotated[list[BaseMessage], add_messages]
iter_count: Annotated[int, add]
pending_destructive: list[dict] # tool calls awaiting approval
# ====================================================================
# LLM
# ====================================================================
llm = ChatAnthropic(model="claude-opus-4-7", max_tokens=2048).bind_tools(ALL_TOOLS)
SYSTEM = SystemMessage(content=(
"You are a senior financial analyst. Use tools as needed. "
"Stop when you have enough info to answer."
))
# ====================================================================
# Nodes
# ====================================================================
def agent_node(state: AgentState) -> dict:
msgs = [SYSTEM] + state["messages"]
resp = llm.invoke(msgs)
return {"messages": [resp], "iter_count": 1}
def tools_node(state: AgentState) -> dict:
"""Execute tool calls. If any are destructive, interrupt for approval."""
last = state["messages"][-1]
if not isinstance(last, AIMessage) or not last.tool_calls:
return {}
# Split destructive vs safe
destructive = [tc for tc in last.tool_calls if tc["name"] in DESTRUCTIVE]
safe = [tc for tc in last.tool_calls if tc["name"] not in DESTRUCTIVE]
# Run safe immediately
tool_msgs: list[ToolMessage] = []
tool_map = {t.name: t for t in ALL_TOOLS}
for tc in safe:
try:
out = tool_map[tc["name"]].invoke(tc["args"])
tool_msgs.append(ToolMessage(content=str(out), tool_call_id=tc["id"]))
except Exception as e:
tool_msgs.append(ToolMessage(
content=f"tool_error: {e}",
tool_call_id=tc["id"],
status="error",
))
if destructive:
# Pause for human approval
approval = interrupt({
"type": "destructive_tool_approval",
"tool_calls": destructive,
"context": "These tool calls are marked destructive. Approve?",
})
# When resumed, `approval` is whatever the human sent
if approval == "approve":
for tc in destructive:
out = tool_map[tc["name"]].invoke(tc["args"])
tool_msgs.append(ToolMessage(content=str(out), tool_call_id=tc["id"]))
else:
for tc in destructive:
tool_msgs.append(ToolMessage(
content=f"rejected_by_human: {approval}",
tool_call_id=tc["id"],
status="error",
))
return {"messages": tool_msgs}
# ====================================================================
# Conditional edge
# ====================================================================
def should_continue(state: AgentState) -> Literal["tools", "end"]:
last = state["messages"][-1]
if isinstance(last, AIMessage) and last.tool_calls:
return "tools"
return "end"
def hit_max_iters(state: AgentState) -> bool:
return state["iter_count"] >= 12
# ====================================================================
# Build the graph
# ====================================================================
def build_graph(checkpointer):
g = StateGraph(AgentState)
g.add_node("agent", agent_node)
g.add_node("tools", tools_node)
g.add_edge(START, "agent")
g.add_conditional_edges("agent", should_continue, {
"tools": "tools",
"end": END,
})
g.add_edge("tools", "agent") # cycle back
return g.compile(checkpointer=checkpointer)
# ====================================================================
# CLI
# ====================================================================
def main():
task = sys.argv[1] if len(sys.argv) > 1 else \
"Analyze AAPL's services revenue % of total from latest 10-Q."
with SqliteSaver.from_conn_string("agent_checkpoints.db") as ckpt:
graph = build_graph(ckpt)
thread_id = "thread-1"
config = {"configurable": {"thread_id": thread_id}}
# Stream the run
events = graph.stream(
{"messages": [HumanMessage(content=task)],
"iter_count": 0,
"pending_destructive": []},
config=config,
stream_mode="updates",
)
for ev in events:
for node, update in ev.items():
print(f"\n--- node: {node} ---")
if "messages" in update:
for m in update["messages"]:
print(f" [{type(m).__name__}] {str(m.content)[:300]}")
# If interrupted, simulate human approval and resume
snap = graph.get_state(config)
if snap.next:
print(f"\n*** Interrupted at: {snap.next} ***")
print("(simulating human approval)")
for ev in graph.stream(Command(resume="approve"), config=config,
stream_mode="updates"):
for node, update in ev.items():
print(f" resume node: {node}")
final_state = graph.get_state(config).values
print("\n=== FINAL ===")
for m in final_state["messages"]:
if isinstance(m, AIMessage) and m.content and not m.tool_calls:
print(m.content)
print(f"\niters: {final_state['iter_count']}")
if __name__ == "__main__":
main()
3.3 关键设计点解读
(a) State 的 reducer 语义
messages: Annotated[list[BaseMessage], add_messages]
iter_count: Annotated[int, add]
add_messages是 LangGraph 内置 reducer:append 新消息(处理 tool_call_id 匹配)add是 operator.add:纯加法- 没标 reducer 的字段:node 返回值覆盖
这是初学者最大坑:返回
{"messages": [...]}不会替换 messages 列表,而是 append。
(b) Conditional edge
g.add_conditional_edges("agent", should_continue, {"tools": "tools", "end": END})
等价于:跑完 agent 后调用 should_continue(state),根据返回值决定下一 node。
(c) Cycle
g.add_edge("tools", "agent") 制造环。LangGraph 用 recursion_limit(默认 25)防死循环,可调。
(d) Checkpoint
with SqliteSaver.from_conn_string("agent_checkpoints.db") as ckpt:
graph = build_graph(ckpt)
config = {"configurable": {"thread_id": "thread-1"}}
thread_id 是会话 id。每次 stream / invoke 自动 checkpoint state 到 SQLite。下次同 thread_id 调用会从 checkpoint 恢复。
(e) Interrupt
interrupt({...}) 暂停图执行并把 payload 抛出。client 通过 Command(resume=...) 恢复,resume value 成为 interrupt() 的返回值。这是 human-in-the-loop 的官方机制。
(f) Stream
graph.stream(input, config, stream_mode="updates")
stream_mode 选项:values(每步全 state)/updates(每步 diff)/messages(仅消息流)/debug。
四、与裸版本的对比
裸版本 (react.py from Day 150) LangGraph 版本 (lg_agent.py)
─────────────────────────────────────────────────────────────────────
代码行数: ~280 ~350
LLM 调用: 显式循环 隐藏在 add_conditional_edges
状态管理: 手动维护 messages list TypedDict + reducer
Persistence: 无 SqliteSaver 一行接入
Resume: 无 thread_id 自动 resume
HIL: 无 interrupt() 内置
可视化: 无 graph.get_graph().draw_mermaid()
Debug: print + log LangSmith trace
锁定: 0 langgraph + langchain ecosystem
当 agent 简单且不需要 persistence/HIL,裸版本仍然是更好的工程选择。当需要 stateful、resumable、可视化,LangGraph 收益明显。
五、金融领域应用
5.1 LangGraph 适合的金融场景
| 场景 | 为什么需要 LangGraph |
|---|---|
| 客户经理 copilot 长会话 | 需要 thread persistence(同客户多次咨询接续) |
| 审批流(贷款/合规) | interrupt 接审批人;HIL 是必需 |
| 复杂研究 agent | 多步、需要 time-travel debug |
| 自动化报表生成 | checkpoint 失败可 resume |
| 回测中保存中间状态 | 大计算的中间状态恢复 |
5.2 实战示例:贷款审批 agent
# 节点
load_application -> extract_features -> risk_score -> [interrupt: human review if score > X] -> decide -> notify
每步 checkpoint。审批人在 web UI 上看到 pending tasks,点击 approve / reject / request_info,对应 Command(resume=...)。LangGraph state DB 直接成为审批工单系统的 backing store。
六、Web3 集成
Onchain action 必须 interrupt
任何写链 tool 之前 hard interrupt:
def tools_node(state):
if any(tc["name"] in {"swap","transfer"} for tc in last.tool_calls):
approval = interrupt({
"type": "onchain_action",
"calls": [tc for tc in last.tool_calls if tc["name"] in {"swap","transfer"}],
"simulation": simulate_calls(...),
"estimated_gas": estimate(...),
})
if approval != "sign":
return tool_messages_with_rejection(...)
# else: send to wallet for signing
Frontend 接到 interrupt payload,弹钱包让用户手签。链上动作的"interrupt + Command(resume)"模型正好对应"用户审签"的产品交互。
七、生产经验与陷阱
-
State 字段没标 reducer,被覆盖 返回
{"messages": [new]}期望 append,结果整个 list 被换成 [new]。所有 list/dict 字段都要标 reducer(add_messages/add/ 自定义 merger)。 -
Recursion limit 撞墙 默认 25,复杂 agent 超过。
graph.invoke(..., {"recursion_limit": 100})但同时要有 cost cap。 -
Tool call_id 不匹配 AIMessage 里有 5 个 tool_calls,ToolNode 只回 4 个 ToolMessage → Anthropic API 报错。
ToolNode(prebuilt)会自动处理;自己写要严格匹配。 -
Checkpoint DB 膨胀 长 thread + 大 state,sqlite 文件 GB 级。Postgres + 压缩列 + 定期归档。
-
Interrupt 后忘记恢复 生产里有人提交 task → interrupt → 客户离开 → state 永远 pending。需要 TTL + 提醒 + 兜底关闭。
-
Stream 模式选错 用
values把整个 state 流式发到前端,content 暴露过多。生产用updates或messages。 -
add_messages 处理 tool_call 的细节 add_messages 内置去重、tool_call_id 匹配。如果你自定义 merger 搞错,tool message 顺序乱,LLM 报错。除非有非常具体需求,不要替换 add_messages。
-
LangChain 版本碎片
langgraph/langchain-core/langchain-anthropic三个包要兼容。pin 版本。
八、Cost & Latency
LangGraph 本身 overhead 很小(dict 操作 + reducer),但:
- Checkpoint 写入 SQLite 每节点 ~5-20ms
- LangSmith tracing(开启时)每 LLM call +50-200ms 网络
- Postgres checkpointer 每节点 +20-50ms
LLM cost 与裸版本相同。LangGraph 不省钱也不烧更多钱。
九、关键速查
LangGraph API 表
| API | 用途 |
|---|---|
StateGraph(SchemaClass) | 创建图 |
g.add_node(name, fn) | 加节点 |
g.add_edge(a, b) | 直连 |
g.add_conditional_edges(a, fn, mapping) | 条件分支 |
g.compile(checkpointer=...) | 生成可执行图 |
graph.invoke(input, config) | 同步跑完 |
graph.stream(input, config, stream_mode) | 流式 |
graph.get_state(config) | 读取当前 state |
graph.get_state_history(config) | time-travel |
Command(resume=value) | 从 interrupt 恢复 |
interrupt(payload) | 暂停并抛 payload |
Reducer cheatsheet
| 用法 | 效果 |
|---|---|
Annotated[list, add_messages] | message append + tool_call 处理 |
Annotated[int, add] | 加法 |
Annotated[list, operator.add] | list 拼接 |
| 不标 | 覆盖 |
自定义 (a, b) -> c | 任意合并 |
十、面试题
Q1: LangGraph 的 State 用 TypedDict 而不是普通 dict,为什么?
A: 类型注解承载 reducer。
Annotated[list, add_messages]不仅是 type hint,LangGraph 通过反射拿到 reducer 函数,决定 node 返回值如何 merge 进 state。这是为什么必须用 TypedDict(或 Pydantic)。
Q2: 裸 SDK 和 LangGraph 怎么选?
A: ① 简单 / 探索阶段 → 裸(看清楚原理);② 需要 checkpoint / resume / HIL / time-travel → LangGraph;③ 团队已经用 LangChain 生态(LangSmith / LangServe)→ LangGraph;④ 不想锁定生态 → 裸(或考虑 PydanticAI / 自家框架)。生产里两者并存:最简单 agent 用裸,复杂 stateful agent 用 LangGraph。
Q3: add_messages reducer 解决什么问题?
A: ① 自动 append 新消息;② 处理 tool_call_id 匹配(确保每个 tool_call 都有 tool_message);③ 处理消息 ID 去重;④ 维护正确顺序。手动维护 message list 容易在 tool_call 匹配上出 bug,add_messages 是经验沉淀。
Q4: 在 tools 节点前加 interrupt 实现 human-in-the-loop,如何 web 化?
A: ① 后端用 LangGraph + Postgres checkpointer;② thread_id 是 session id;③ stream 模式发送 update 给前端;④ 收到
__interrupt__事件时前端弹 confirm UI;⑤ 用户点击后调/resume?thread=xxx&value=approve;⑥ 后端graph.stream(Command(resume="approve"), config)续跑。LangGraph Studio 有现成的 UI 可参考。
Q5: 一个 LangGraph agent 跑了 100 轮 cycles 没停,怎么 debug?
A: ① 设
recursion_limit+ cost cap;② 看 checkpoint history(graph.get_state_history),在哪里反复调同一 tool;③ 检查 conditional edge 函数(是否永远返回 "tools");④ 看 tool 返回是否给 LLM 错误印象(误以为还要继续);⑤ system prompt 加"如果连续 N 次没新信息就停止"。
十一、深入:Reducer 实战
LangGraph 的 reducer 不仅是 add_messages。常用模式:
11.1 Set 累积(去重)
def union(a, b):
return list(set(a) | set(b))
class S(TypedDict):
seen_tickers: Annotated[list[str], union]
11.2 Dict 深合并
def deep_merge(a, b):
out = dict(a)
for k, v in b.items():
if k in out and isinstance(out[k], dict) and isinstance(v, dict):
out[k] = deep_merge(out[k], v)
else:
out[k] = v
return out
class S(TypedDict):
findings: Annotated[dict, deep_merge]
11.3 Last-write-wins (no reducer)
不标 = 覆盖。适合 single-writer 字段。
11.4 Numeric max
import operator
class S(TypedDict):
max_score: Annotated[float, max] # 注意 max 不是 operator
cumulative_cost: Annotated[float, operator.add]
十二、Time-travel debug 实战
LangGraph 把每个 super-step 持久化,graph.get_state_history(config) 返回所有 checkpoint:
history = list(graph.get_state_history(config))
# 找到第 3 个 checkpoint,从那里 fork 一个分支重跑
forked_config = {"configurable": {
"thread_id": "thread-1",
"checkpoint_id": history[3].config["configurable"]["checkpoint_id"],
}}
# 修改 state 重跑
graph.update_state(forked_config, {"messages": [...]})
graph.invoke(None, forked_config)
适合:
- 复现客户报告的 bug
- A/B 测试不同 prompt 在同一历史路径上
- "如果在那一步选 B 而不是 A 会怎样"——产品决策辅助
十三、Subgraphs(嵌套图)
LangGraph 支持把图当节点:
sub = build_research_subgraph() # itself a compiled graph
parent = StateGraph(ParentState)
parent.add_node("research_phase", sub) # 直接当 node 用
注意 schema 兼容(parent state 必须能投影出 sub 需要的字段)。这是组织复杂 agent 的关键技巧(Day 162 的 multi-agent v1 用到)。
十四、Streaming 模式细节
| stream_mode | 内容 | 适合场景 |
|---|---|---|
values | 每步完整 state | demo / debug |
updates | 每步 diff | 生产监控 |
messages | LLM token 流 | 前端打字机效果 |
debug | 含内部事件 | 极致 debug |
custom | 自定义 | 业务事件 |
混用:stream_mode=["updates", "messages"] 同时拿两种。
十五、扩展练习
- 加 cost tracking node——每个 agent_node 后记录 token & cost 到 state
- 加 retry node——某 tool 失败时重试 N 次
- 改成 async——
graph.ainvoke()+langchain_anthropicasync - 接 LangSmith——
os.environ["LANGSMITH_TRACING"] = "true"立刻有 trace - 画图导出——
graph.get_graph().draw_mermaid()写到 README - Postgres checkpointer——切到
PostgresSaver,并发更安全 - 写 fork-and-replay——给定某 checkpoint,修改 state,重跑后续
明日预告
Day 157: CrewAI vs AutoGen vs LangGraph——同任务三框架对比
- CrewAI 的 role/task 模型
- AutoGen 的 ConversableAgent / GroupChat
- 用三个框架实现同一个金融研究任务
- 性能 / 代码量 / 调试体验对比