多 Agent 协作——Hierarchical / Network / Sequential 三种拓扑
多 agent 拓扑:Sequential(流水线)/ Hierarchical(主管-下属)/ Network(对等图);通信开销与一致性
日期: 2026-10-07 方向: AI系统工程 / Agent 阶段: Phase 3 - Agent架构与多Agent (Day 149-162) 标签: #MultiAgent #Hierarchical #Network #Sequential #Topology
今日目标
| 类型 | 内容 |
|---|---|
| 学习 | 多 agent 拓扑:Sequential(流水线)/ Hierarchical(主管-下属)/ Network(对等图);通信开销与一致性 |
| 实操 | 用 LangGraph 实现三种拓扑;同任务对比 cost / latency / 输出质量 |
| 产出 | multi_agent.py(约 600 行) |
一、三种拓扑的本质区别
1.1 Sequential(流水线)
A ─► B ─► C ─► D
- 每个 agent 串行处理,下游看上游结果
- 类比:装配线 / SOP
- 优点:简单、可审计、可缓存中间步
- 缺点:错误累积、不能并行
1.2 Hierarchical(主管-下属)
Manager
/ | \
A B C
- Manager LLM 决定派给哪个 worker、何时聚合
- 类比:经理 + 团队
- 优点:动态分工、扩展容易
- 缺点:manager 是瓶颈/单点;prompt 设计难
1.3 Network(对等图)
A ─── B
\ /
\ /
C
- 任何 agent 可以发消息给任何 agent
- 类比:研究小组讨论
- 优点:思路碰撞、灵活
- 缺点:成本高、易死锁、调试难
1.4 复合拓扑
实际生产里常组合:
Hierarchical Manager
/ \
Sequential Network (sub-team)
pipeline / | \
A → B → C D E F
二、架构图
2.1 Sequential
Researcher ─► Writer ─► Reviewer ─► Final
tools LLM LLM
2.2 Hierarchical
┌─────────────────────┐
│ Manager (opus) │
│ │
│ - decides next agent│
│ - aggregates output │
└────┬─┬─┬───────────┘
│ │ │
┌────────┘ │ └────────┐
▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐
│Research│ │ Fundam │ │ Risk │
│ Agent │ │ Agent │ │ Agent │
└────────┘ └────────┘ └────────┘
2.3 Network
┌────────┐
│ Bull │◄─────────┐
│ Agent │ │
└───┬────┘ │
│ │
▼ │
┌────────┐ ┌────┴────┐
│ Bear │────►│Moderator│
│ Agent │ │ Agent │
└───┬────┘ └─────────┘
│
▼
┌────────┐
│ Quant │
│ Agent │
└────────┘
三、代码——multi_agent.py
# multi_agent.py
"""
Day 159 - Three multi-agent topologies on LangGraph.
Same task: "Decide whether to long AAPL, with rationale."
- sequential: Researcher -> Writer -> Reviewer
- hierarchical: Manager + (Research, Fundamentals, Risk) workers
- network: Bull <-> Bear, both <-> Quant <-> Moderator
"""
from __future__ import annotations
import json
import sys
from typing import Annotated, Literal, TypedDict
from operator import add
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
opus = ChatAnthropic(model="claude-opus-4-7", max_tokens=2048)
sonnet = ChatAnthropic(model="claude-sonnet-4-6", max_tokens=1024)
haiku = ChatAnthropic(model="claude-haiku-4-5", max_tokens=600)
# ====================================================================
# Topology 1: Sequential
# ====================================================================
class SeqState(TypedDict):
ticker: str
research: str
draft: str
final: str
def seq_research(state: SeqState):
r = sonnet.invoke([
SystemMessage(content="You are a researcher. Output 5 bullet facts."),
HumanMessage(content=f"Research {state['ticker']}: latest revenue, margin, debt, CEO, news."),
])
return {"research": r.content}
def seq_write(state: SeqState):
r = opus.invoke([
SystemMessage(content="You are a memo writer."),
HumanMessage(content=f"Notes:\n{state['research']}\n\nWrite a 200-word long/short memo for {state['ticker']}."),
])
return {"draft": r.content}
def seq_review(state: SeqState):
r = sonnet.invoke([
SystemMessage(content="You are a reviewer. If the memo is solid, return it verbatim. Else fix it."),
HumanMessage(content=state["draft"]),
])
return {"final": r.content}
def build_sequential():
g = StateGraph(SeqState)
g.add_node("research", seq_research)
g.add_node("write", seq_write)
g.add_node("review", seq_review)
g.add_edge(START, "research")
g.add_edge("research", "write")
g.add_edge("write", "review")
g.add_edge("review", END)
return g.compile()
# ====================================================================
# Topology 2: Hierarchical
# ====================================================================
class HierState(TypedDict):
ticker: str
research_findings: dict # worker_name -> output
next_worker: str
rounds: Annotated[int, add]
final: str
WORKERS = ["fundamentals", "risk", "macro"]
def manager_node(state: HierState):
"""Decide next worker or finalize."""
findings = state.get("research_findings", {})
completed = list(findings.keys())
if state.get("rounds", 0) >= 4 or set(completed) == set(WORKERS):
# Aggregate
agg = "\n\n".join(f"### {k}\n{v}" for k, v in findings.items())
r = opus.invoke([
SystemMessage(content="You are a portfolio manager synthesizing analyst outputs."),
HumanMessage(content=f"Synthesize and decide long/short {state['ticker']}.\n\n{agg}"),
])
return {"final": r.content, "next_worker": "DONE", "rounds": 1}
# Pick next worker
pending = [w for w in WORKERS if w not in completed]
r = opus.invoke([
SystemMessage(content="Choose ONE worker name from this list (just the name): " + ",".join(pending)),
HumanMessage(content=f"Findings so far: {list(completed)}. What's most useful next?"),
])
chosen = r.content.strip().lower()
if chosen not in pending:
chosen = pending[0]
return {"next_worker": chosen, "rounds": 1}
def fundamentals_worker(state: HierState):
r = sonnet.invoke([
SystemMessage(content="You are a fundamentals analyst. Output 5 bullets on financials."),
HumanMessage(content=f"Analyze {state['ticker']} fundamentals."),
])
findings = dict(state.get("research_findings", {}))
findings["fundamentals"] = r.content
return {"research_findings": findings, "next_worker": "manager"}
def risk_worker(state: HierState):
r = sonnet.invoke([
SystemMessage(content="You are a risk analyst. Output 5 risks."),
HumanMessage(content=f"Risks for {state['ticker']}."),
])
findings = dict(state.get("research_findings", {}))
findings["risk"] = r.content
return {"research_findings": findings, "next_worker": "manager"}
def macro_worker(state: HierState):
r = sonnet.invoke([
SystemMessage(content="You are a macro analyst."),
HumanMessage(content=f"Macro outlook for {state['ticker']}'s sector."),
])
findings = dict(state.get("research_findings", {}))
findings["macro"] = r.content
return {"research_findings": findings, "next_worker": "manager"}
def hier_route(state: HierState) -> Literal["fundamentals","risk","macro","end"]:
nx = state.get("next_worker")
if nx in WORKERS:
return nx
return "end"
def build_hierarchical():
g = StateGraph(HierState)
g.add_node("manager", manager_node)
g.add_node("fundamentals", fundamentals_worker)
g.add_node("risk", risk_worker)
g.add_node("macro", macro_worker)
g.add_edge(START, "manager")
g.add_conditional_edges("manager", hier_route, {
"fundamentals": "fundamentals",
"risk": "risk",
"macro": "macro",
"end": END,
})
g.add_edge("fundamentals", "manager")
g.add_edge("risk", "manager")
g.add_edge("macro", "manager")
return g.compile()
# ====================================================================
# Topology 3: Network (debate)
# ====================================================================
class NetState(TypedDict):
ticker: str
bull_args: list[str]
bear_args: list[str]
quant_signal: str
rounds: Annotated[int, add]
verdict: str
next_speaker: str
BULL_SYS = "You are a bull. Argue why to LONG. Use 3 concrete points. Address bear's last argument if any."
BEAR_SYS = "You are a bear. Argue why to SHORT. Use 3 concrete points. Address bull's last argument if any."
QUANT_SYS = "You are a quant. Provide signal: 'POSITIVE'/'NEUTRAL'/'NEGATIVE' and 1-line statistical reason."
MOD_SYS = "You are the moderator. After enough debate (3+ rounds), declare LONG/SHORT/NEUTRAL with 2-line reasoning."
def bull_node(state: NetState):
last_bear = state["bear_args"][-1] if state["bear_args"] else ""
r = opus.invoke([
SystemMessage(content=BULL_SYS),
HumanMessage(content=f"Ticker: {state['ticker']}. Last bear: {last_bear[:500]}"),
])
return {"bull_args": [r.content], "next_speaker": "bear", "rounds": 1}
def bear_node(state: NetState):
last_bull = state["bull_args"][-1] if state["bull_args"] else ""
r = opus.invoke([
SystemMessage(content=BEAR_SYS),
HumanMessage(content=f"Ticker: {state['ticker']}. Last bull: {last_bull[:500]}"),
])
return {"bear_args": [r.content], "next_speaker": "quant", "rounds": 1}
def quant_node(state: NetState):
r = sonnet.invoke([
SystemMessage(content=QUANT_SYS),
HumanMessage(content=f"Bull: {state['bull_args'][-1] if state['bull_args'] else ''}\nBear: {state['bear_args'][-1] if state['bear_args'] else ''}"),
])
return {"quant_signal": r.content, "next_speaker": "moderator"}
def moderator_node(state: NetState):
if state["rounds"] < 3:
return {"next_speaker": "bull"}
r = opus.invoke([
SystemMessage(content=MOD_SYS),
HumanMessage(content=f"Bull args:\n{state['bull_args']}\nBear args:\n{state['bear_args']}\nQuant: {state['quant_signal']}"),
])
return {"verdict": r.content, "next_speaker": "DONE"}
def net_route(state: NetState) -> Literal["bull","bear","quant","moderator","end"]:
nx = state.get("next_speaker")
if nx == "DONE":
return "end"
return nx
def build_network():
# Custom: bull -> bear -> quant -> moderator -> (loop or end)
# Add reducers for list fields
class NetState2(TypedDict):
ticker: str
bull_args: Annotated[list[str], add]
bear_args: Annotated[list[str], add]
quant_signal: str
rounds: Annotated[int, add]
verdict: str
next_speaker: str
g = StateGraph(NetState2)
g.add_node("bull", bull_node)
g.add_node("bear", bear_node)
g.add_node("quant", quant_node)
g.add_node("moderator", moderator_node)
g.add_edge(START, "bull")
g.add_conditional_edges("bull", net_route, {"bear":"bear","quant":"quant","moderator":"moderator","end":END})
g.add_conditional_edges("bear", net_route, {"bull":"bull","quant":"quant","moderator":"moderator","end":END})
g.add_conditional_edges("quant", net_route, {"moderator":"moderator","end":END})
g.add_conditional_edges("moderator", net_route, {"bull":"bull","end":END})
return g.compile()
# ====================================================================
# CLI
# ====================================================================
def main():
mode = sys.argv[1] if len(sys.argv) > 1 else "sequential"
ticker = sys.argv[2] if len(sys.argv) > 2 else "AAPL"
if mode == "sequential":
out = build_sequential().invoke({"ticker": ticker})
print("=== Sequential FINAL ===")
print(out["final"])
elif mode == "hierarchical":
out = build_hierarchical().invoke(
{"ticker": ticker, "research_findings": {}, "rounds": 0},
{"recursion_limit": 30},
)
print("=== Hierarchical FINAL ===")
print(out["final"])
elif mode == "network":
out = build_network().invoke(
{"ticker": ticker, "bull_args": [], "bear_args": [],
"quant_signal": "", "rounds": 0, "verdict": "", "next_speaker": ""},
{"recursion_limit": 30},
)
print("=== Network VERDICT ===")
print(out["verdict"])
else:
print("usage: python multi_agent.py [sequential|hierarchical|network] [TICKER]")
if __name__ == "__main__":
main()
四、对比 — 同任务三种拓扑
任务:"决定是否做多 AAPL,给出理由"。
| 拓扑 | LLM call | 总 tokens | Cost | 延迟 | 输出风格 |
|---|---|---|---|---|---|
| Sequential | 3 | 4-6k | $0.03 | 12-18s | 整齐、单视角 |
| Hierarchical | 4-6 | 8-12k | $0.06-0.10 | 25-40s | 全面、有 manager 综合 |
| Network | 6-9 | 12-20k | $0.12-0.20 | 40-60s | 多视角、有论辩张力 |
观察:
- Sequential 适合"任务已知步骤"
- Hierarchical 适合"需要多角度但 manager 知道怎么调度"
- Network 适合"对抗性辩论 / 不同视角碰撞"——成本最高但有时唯一能产生 insight 的方式
五、金融领域应用
各拓扑典型应用
| 场景 | 拓扑 |
|---|---|
| 标准研报生产 | Sequential |
| 投资委员会自动化 | Hierarchical(CIO + 各分析师) |
| 衍生品策略论辩 | Network(多对策略 + 风控辩论) |
| 客户咨询单轮 | Sequential |
| 复杂尽调(IPO/M&A) | Hierarchical |
| 监管事件应急 | Hierarchical(指挥官 + 法务/合规/IT/PR) |
| 反欺诈调查 | Network(多源调查员对话) |
六、Web3 集成
Onchain 多 agent 协作
- Hierarchical:Manager agent(链下)调度多个特化 onchain agent(DeFi 操作 / NFT 操作 / 跨链)
- Network:DAO 治理模拟,多个 voter agent 辩论 + 投票
- Sequential:链上交易 pipeline(rebalance → simulate → sign → submit → verify)
拓扑选择对 onchain agent 的影响
| 拓扑 | 链上 fit |
|---|---|
| Sequential | 最适合(清晰 audit trail) |
| Hierarchical | 适合(manager 决定谁有权动哪资产) |
| Network | 不推荐(链上多对话 settlement 复杂) |
链上场景偏好"action 链路清晰可追溯",因此 Sequential / Hierarchical 占主导。Network 留给链下 reasoning,最终决定再交给一个 onchain agent。
七、生产经验与陷阱
-
Hierarchical manager 选错下属 Manager LLM 没看清 worker 的 capability,反复调同一 worker 或漏掉关键 worker。Mitigation:① worker description 写得像 tool description;② manager system prompt 给"如果不确定先调 fundamentals";③ 监控 worker 调用频次分布。
-
Network 死循环 bull/bear 互相反驳没完。强 max_rounds + moderator 强制收敛。
-
Sequential 错误累积 research 步骤错了 1 个数字,下游 writer/reviewer 全错。每步加 sanity check("金额是否合理")。
-
状态传递膨胀 Hierarchical 里 manager 把所有 worker 输出累积到 state,10 个 worker 后 state 超大。Manager 只需要 summary,详细写到 file。
-
并行 worker 状态合并冲突 两个 worker 同时改 state 字段,没标 reducer 一个被覆盖。所有 list/dict 标 reducer。
-
Cost 失控 Network 拓扑容易跑出 50+ LLM call。设 cost_cap,超即强停。
-
Worker 角色偏移 Risk worker 跑成"无所不评"的通用 LLM。强 system prompt + 限制 worker 工具集(risk worker 只能用 risk tools)。
八、Cost & Latency
不同拓扑的成本规模
| 拓扑 | LLM call 增长 | 适合任务规模 |
|---|---|---|
| Sequential | O(N),N=节点数 | 小-中 |
| Hierarchical | O(M·R),M=worker, R=rounds | 中-大 |
| Network | O(N²·R),最坏 | 仅高价值任务 |
优化杠杆
- Manager / moderator 用便宜模型(haiku 也行)
- Worker 输出 cap(每个 < 500 token)
- Cache common context(同 ticker 的研究 cache 一天)
- Parallel workers 真并发跑而不是顺序
九、关键速查
拓扑选择决策
任务步骤已知、固定? → Sequential
需要多个专业角色但 manager 决定调度? → Hierarchical
需要对抗 / 多视角碰撞? → Network
不确定? → Sequential 起步,复杂化时升级
多 agent 设计 checklist
- 每个 agent 角色 + 工具 + system prompt 清晰
- 终止条件(rounds / cost / verdict)
- state 字段都标了 reducer
- 有 audit trail(trace 每个 agent 输出)
- cost cap
- 并行节点不写共享字段(或用 reducer)
- manager 有清晰的"如何选 worker"指令
十、面试题
Q1: 三种拓扑的本质区别是什么?
A: 控制流模式。Sequential:决定写在代码里(DAG 静态);Hierarchical:manager LLM 动态决定(DAG 半动态);Network:每个 agent 自己决定下一个发给谁(DAG 全动态)。从"代码控制"到"LLM 控制"的频谱。LLM 控制越多,能力越强但可控性、成本、调试难度都恶化。
Q2: 给一个真实金融场景,多 agent 协作明显比单 agent 好。
A: 投资委员会模拟。单 agent 容易陷入 confirmation bias(自圆其说)。Network 拓扑里 Bull / Bear 互相挑刺,moderator 综合,输出鲁棒性显著高。线下数据:同一组投资 idea,单 agent 误判率 ~30%,多 agent debate 后误判 ~12%(来自论文 ChatEval / DebateGPT 趋势)。
Q3: Hierarchical 拓扑下 manager 选错 worker 怎么办?
A: ① 优化 worker description(同 tool description 标准);② Manager prompt 加"如果不确定先调 fundamentals";③ 引入 routing classifier(小模型预先分类);④ Manager 失败 N 次后 fallback to round-robin;⑤ 长期:训练专门的 router model。
Q4: Network 拓扑死锁如何破?
A: ① 硬 max_rounds + cost_cap;② Moderator/Judge agent 必须能强行收敛;③ 监控同一对 agent 反复对话相似度,超阈值停;④ 角色 system prompt 包含"在 N 轮内必须达成共识或宣告分歧";⑤ 引入 timeout(每个 agent 30s 内必须回答)。
Q5: 选 Sequential 而非更"高级"的拓扑,业务方质疑你不够创新,怎么回应?
A: 三个事实:① Sequential 在 80% 业务场景已足够(Anthropic 公开建议从最简单做起);② 复杂拓扑成本是 3-10x,没数据支撑就是浪费;③ 可演进——Sequential 起步,监控数据后判断升级到 Hierarchical 或加 critic agent。"最贵最复杂"不等于"最好",PM 要负责任地为公司省钱+保证可维护性。
十一、复合拓扑——真实生产里的样子
教科书把拓扑分三种,但真实大型 agent 系统往往是嵌套的:
Top-level Manager
/ \
Sub-Crew A Sub-Crew B
(Sequential) (Network)
/ | \ / | \
A1 A2 A3 B1 B2 B3
例如:
- Top:客户经理 master agent
- Sub-Crew A(Sequential):合规 → 风险 → 文档生成
- Sub-Crew B(Network):市场观点辩论(bull/bear/neutral)
设计原则:
- 每层节点数 ≤ 5——超过就拆 sub-crew
- 跨层通信收敛到 manager——禁止跨 crew 直连,否则不可审
- 每个 sub-crew 有独立 SLA——独立超时、独立 cost cap
- 嵌套 ≤ 3 层——再深就难以理解和监控
十二、状态合并的并发陷阱
Hierarchical 并行 worker 时的状态冲突
如果 manager 同时 dispatch 给多个 worker("并行扇出"),他们都返回 {"findings": new_dict}:
# WRONG: 没标 reducer
class Bad(TypedDict):
findings: dict # 后写覆盖前写
# RIGHT
def merge(a, b):
return {**a, **b}
class Good(TypedDict):
findings: Annotated[dict, merge]
Network 拓扑里的循环写入
bull/bear 都 append 到 args list:
class S(TypedDict):
bull_args: Annotated[list[str], operator.add]
bear_args: Annotated[list[str], operator.add]
教训:所有可能并行修改的字段都需要 reducer。否则数据丢失,难以发现。
十三、生产案例——多 agent 应急响应
某交易所安全事件应急响应(2025 真实案例的简化版):
Top: Incident Commander Agent
├── Forensics (Sequential): 拉日志 → 解析 → 定位 root cause
├── Communication (Sequential): 草拟用户公告 → 草拟监管报告 → 草拟内部 RCA
└── Mitigation (Network): tech 提方案 / risk 评估 / legal 审 → moderator 决定
数据
- 平均响应时间从 2-3h → 30 min
- LLM cost / 事件 ~$2-5
- 但关键决策仍人工签——agent 产文稿/方案,CEO/CTO 拍板
反思
- Sequential 部分(forensics)完全可信,自动化无碍
- Network 部分(mitigation)需要人工最终批准
- Communication 部分用 agent 草拟 + 人工 review = 提速 + 守底线
十四、扩展练习
- 加并行 fan-out——hierarchical 里 manager 同时 dispatch 3 worker,结果用 reducer 合并
- 改成 LLM-as-judge moderator——network 拓扑里 moderator 给定量评分而非定性裁决
- 加 timeout per worker——某 worker 卡住不阻塞全队
- 加 fallback worker——主 worker 失败用便宜 worker 兜底
- 可视化拓扑——
graph.get_graph().draw_mermaid()输出图供文档 - 嵌套 sub-crew——把 research_team 自己变成一个 hierarchical sub-graph
- 审计 trace——每个 agent 的 input/output 写到 audit log,模拟事后调查
明日预告
Day 160: Agent 评估——Trajectory eval / Step-by-step / End-to-end / AgentBench
- 评估 agent 比评估 LLM 难在哪
- 各种 metric:success rate / efficiency / cost / safety
- 用 AgentBench 风格设计 eval 套件