返回 Expert 笔记
Expert Day 159

多 Agent 协作——Hierarchical / Network / Sequential 三种拓扑

多 agent 拓扑:Sequential(流水线)/ Hierarchical(主管-下属)/ Network(对等图);通信开销与一致性

2026-10-07
Phase 3 - Agent架构与多Agent (Day 149-162)
MultiAgentHierarchicalNetworkSequentialTopology

日期: 2026-10-07 方向: AI系统工程 / Agent 阶段: Phase 3 - Agent架构与多Agent (Day 149-162) 标签: #MultiAgent #Hierarchical #Network #Sequential #Topology


今日目标

类型内容
学习多 agent 拓扑:Sequential(流水线)/ Hierarchical(主管-下属)/ Network(对等图);通信开销与一致性
实操用 LangGraph 实现三种拓扑;同任务对比 cost / latency / 输出质量
产出multi_agent.py(约 600 行)

一、三种拓扑的本质区别

1.1 Sequential(流水线)

A ─► B ─► C ─► D
  • 每个 agent 串行处理,下游看上游结果
  • 类比:装配线 / SOP
  • 优点:简单、可审计、可缓存中间步
  • 缺点:错误累积、不能并行

1.2 Hierarchical(主管-下属)

        Manager
        /  |  \
       A   B   C
  • Manager LLM 决定派给哪个 worker、何时聚合
  • 类比:经理 + 团队
  • 优点:动态分工、扩展容易
  • 缺点:manager 是瓶颈/单点;prompt 设计难

1.3 Network(对等图)

   A ─── B
    \   /
     \ /
      C
  • 任何 agent 可以发消息给任何 agent
  • 类比:研究小组讨论
  • 优点:思路碰撞、灵活
  • 缺点:成本高、易死锁、调试难

1.4 复合拓扑

实际生产里常组合:

    Hierarchical Manager
       /              \
   Sequential       Network (sub-team)
   pipeline             /  |  \
   A → B → C           D   E   F

二、架构图

2.1 Sequential

Researcher ─► Writer ─► Reviewer ─► Final
   tools        LLM       LLM

2.2 Hierarchical

                ┌─────────────────────┐
                │   Manager (opus)    │
                │                     │
                │ - decides next agent│
                │ - aggregates output │
                └────┬─┬─┬───────────┘
                     │ │ │
            ┌────────┘ │ └────────┐
            ▼          ▼          ▼
       ┌────────┐ ┌────────┐ ┌────────┐
       │Research│ │ Fundam │ │ Risk   │
       │ Agent  │ │ Agent  │ │ Agent  │
       └────────┘ └────────┘ └────────┘

2.3 Network

       ┌────────┐
       │ Bull   │◄─────────┐
       │ Agent  │          │
       └───┬────┘          │
           │               │
           ▼               │
       ┌────────┐     ┌────┴────┐
       │ Bear   │────►│Moderator│
       │ Agent  │     │  Agent  │
       └───┬────┘     └─────────┘
           │
           ▼
       ┌────────┐
       │ Quant  │
       │ Agent  │
       └────────┘

三、代码——multi_agent.py

# multi_agent.py
"""
Day 159 - Three multi-agent topologies on LangGraph.

Same task: "Decide whether to long AAPL, with rationale."

  - sequential: Researcher -> Writer -> Reviewer
  - hierarchical: Manager + (Research, Fundamentals, Risk) workers
  - network: Bull <-> Bear, both <-> Quant <-> Moderator
"""
from __future__ import annotations
import json
import sys
from typing import Annotated, Literal, TypedDict
from operator import add

from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage, SystemMessage, BaseMessage, AIMessage
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages

opus = ChatAnthropic(model="claude-opus-4-7", max_tokens=2048)
sonnet = ChatAnthropic(model="claude-sonnet-4-6", max_tokens=1024)
haiku = ChatAnthropic(model="claude-haiku-4-5", max_tokens=600)

# ====================================================================
# Topology 1: Sequential
# ====================================================================
class SeqState(TypedDict):
    ticker: str
    research: str
    draft: str
    final: str

def seq_research(state: SeqState):
    r = sonnet.invoke([
        SystemMessage(content="You are a researcher. Output 5 bullet facts."),
        HumanMessage(content=f"Research {state['ticker']}: latest revenue, margin, debt, CEO, news."),
    ])
    return {"research": r.content}

def seq_write(state: SeqState):
    r = opus.invoke([
        SystemMessage(content="You are a memo writer."),
        HumanMessage(content=f"Notes:\n{state['research']}\n\nWrite a 200-word long/short memo for {state['ticker']}."),
    ])
    return {"draft": r.content}

def seq_review(state: SeqState):
    r = sonnet.invoke([
        SystemMessage(content="You are a reviewer. If the memo is solid, return it verbatim. Else fix it."),
        HumanMessage(content=state["draft"]),
    ])
    return {"final": r.content}

def build_sequential():
    g = StateGraph(SeqState)
    g.add_node("research", seq_research)
    g.add_node("write", seq_write)
    g.add_node("review", seq_review)
    g.add_edge(START, "research")
    g.add_edge("research", "write")
    g.add_edge("write", "review")
    g.add_edge("review", END)
    return g.compile()

# ====================================================================
# Topology 2: Hierarchical
# ====================================================================
class HierState(TypedDict):
    ticker: str
    research_findings: dict      # worker_name -> output
    next_worker: str
    rounds: Annotated[int, add]
    final: str

WORKERS = ["fundamentals", "risk", "macro"]

def manager_node(state: HierState):
    """Decide next worker or finalize."""
    findings = state.get("research_findings", {})
    completed = list(findings.keys())
    if state.get("rounds", 0) >= 4 or set(completed) == set(WORKERS):
        # Aggregate
        agg = "\n\n".join(f"### {k}\n{v}" for k, v in findings.items())
        r = opus.invoke([
            SystemMessage(content="You are a portfolio manager synthesizing analyst outputs."),
            HumanMessage(content=f"Synthesize and decide long/short {state['ticker']}.\n\n{agg}"),
        ])
        return {"final": r.content, "next_worker": "DONE", "rounds": 1}

    # Pick next worker
    pending = [w for w in WORKERS if w not in completed]
    r = opus.invoke([
        SystemMessage(content="Choose ONE worker name from this list (just the name): " + ",".join(pending)),
        HumanMessage(content=f"Findings so far: {list(completed)}. What's most useful next?"),
    ])
    chosen = r.content.strip().lower()
    if chosen not in pending:
        chosen = pending[0]
    return {"next_worker": chosen, "rounds": 1}

def fundamentals_worker(state: HierState):
    r = sonnet.invoke([
        SystemMessage(content="You are a fundamentals analyst. Output 5 bullets on financials."),
        HumanMessage(content=f"Analyze {state['ticker']} fundamentals."),
    ])
    findings = dict(state.get("research_findings", {}))
    findings["fundamentals"] = r.content
    return {"research_findings": findings, "next_worker": "manager"}

def risk_worker(state: HierState):
    r = sonnet.invoke([
        SystemMessage(content="You are a risk analyst. Output 5 risks."),
        HumanMessage(content=f"Risks for {state['ticker']}."),
    ])
    findings = dict(state.get("research_findings", {}))
    findings["risk"] = r.content
    return {"research_findings": findings, "next_worker": "manager"}

def macro_worker(state: HierState):
    r = sonnet.invoke([
        SystemMessage(content="You are a macro analyst."),
        HumanMessage(content=f"Macro outlook for {state['ticker']}'s sector."),
    ])
    findings = dict(state.get("research_findings", {}))
    findings["macro"] = r.content
    return {"research_findings": findings, "next_worker": "manager"}

def hier_route(state: HierState) -> Literal["fundamentals","risk","macro","end"]:
    nx = state.get("next_worker")
    if nx in WORKERS:
        return nx
    return "end"

def build_hierarchical():
    g = StateGraph(HierState)
    g.add_node("manager", manager_node)
    g.add_node("fundamentals", fundamentals_worker)
    g.add_node("risk", risk_worker)
    g.add_node("macro", macro_worker)
    g.add_edge(START, "manager")
    g.add_conditional_edges("manager", hier_route, {
        "fundamentals": "fundamentals",
        "risk": "risk",
        "macro": "macro",
        "end": END,
    })
    g.add_edge("fundamentals", "manager")
    g.add_edge("risk", "manager")
    g.add_edge("macro", "manager")
    return g.compile()

# ====================================================================
# Topology 3: Network (debate)
# ====================================================================
class NetState(TypedDict):
    ticker: str
    bull_args: list[str]
    bear_args: list[str]
    quant_signal: str
    rounds: Annotated[int, add]
    verdict: str
    next_speaker: str

BULL_SYS = "You are a bull. Argue why to LONG. Use 3 concrete points. Address bear's last argument if any."
BEAR_SYS = "You are a bear. Argue why to SHORT. Use 3 concrete points. Address bull's last argument if any."
QUANT_SYS = "You are a quant. Provide signal: 'POSITIVE'/'NEUTRAL'/'NEGATIVE' and 1-line statistical reason."
MOD_SYS = "You are the moderator. After enough debate (3+ rounds), declare LONG/SHORT/NEUTRAL with 2-line reasoning."

def bull_node(state: NetState):
    last_bear = state["bear_args"][-1] if state["bear_args"] else ""
    r = opus.invoke([
        SystemMessage(content=BULL_SYS),
        HumanMessage(content=f"Ticker: {state['ticker']}. Last bear: {last_bear[:500]}"),
    ])
    return {"bull_args": [r.content], "next_speaker": "bear", "rounds": 1}

def bear_node(state: NetState):
    last_bull = state["bull_args"][-1] if state["bull_args"] else ""
    r = opus.invoke([
        SystemMessage(content=BEAR_SYS),
        HumanMessage(content=f"Ticker: {state['ticker']}. Last bull: {last_bull[:500]}"),
    ])
    return {"bear_args": [r.content], "next_speaker": "quant", "rounds": 1}

def quant_node(state: NetState):
    r = sonnet.invoke([
        SystemMessage(content=QUANT_SYS),
        HumanMessage(content=f"Bull: {state['bull_args'][-1] if state['bull_args'] else ''}\nBear: {state['bear_args'][-1] if state['bear_args'] else ''}"),
    ])
    return {"quant_signal": r.content, "next_speaker": "moderator"}

def moderator_node(state: NetState):
    if state["rounds"] < 3:
        return {"next_speaker": "bull"}
    r = opus.invoke([
        SystemMessage(content=MOD_SYS),
        HumanMessage(content=f"Bull args:\n{state['bull_args']}\nBear args:\n{state['bear_args']}\nQuant: {state['quant_signal']}"),
    ])
    return {"verdict": r.content, "next_speaker": "DONE"}

def net_route(state: NetState) -> Literal["bull","bear","quant","moderator","end"]:
    nx = state.get("next_speaker")
    if nx == "DONE":
        return "end"
    return nx

def build_network():
    # Custom: bull -> bear -> quant -> moderator -> (loop or end)
    # Add reducers for list fields
    class NetState2(TypedDict):
        ticker: str
        bull_args: Annotated[list[str], add]
        bear_args: Annotated[list[str], add]
        quant_signal: str
        rounds: Annotated[int, add]
        verdict: str
        next_speaker: str

    g = StateGraph(NetState2)
    g.add_node("bull", bull_node)
    g.add_node("bear", bear_node)
    g.add_node("quant", quant_node)
    g.add_node("moderator", moderator_node)
    g.add_edge(START, "bull")
    g.add_conditional_edges("bull", net_route, {"bear":"bear","quant":"quant","moderator":"moderator","end":END})
    g.add_conditional_edges("bear", net_route, {"bull":"bull","quant":"quant","moderator":"moderator","end":END})
    g.add_conditional_edges("quant", net_route, {"moderator":"moderator","end":END})
    g.add_conditional_edges("moderator", net_route, {"bull":"bull","end":END})
    return g.compile()

# ====================================================================
# CLI
# ====================================================================
def main():
    mode = sys.argv[1] if len(sys.argv) > 1 else "sequential"
    ticker = sys.argv[2] if len(sys.argv) > 2 else "AAPL"

    if mode == "sequential":
        out = build_sequential().invoke({"ticker": ticker})
        print("=== Sequential FINAL ===")
        print(out["final"])
    elif mode == "hierarchical":
        out = build_hierarchical().invoke(
            {"ticker": ticker, "research_findings": {}, "rounds": 0},
            {"recursion_limit": 30},
        )
        print("=== Hierarchical FINAL ===")
        print(out["final"])
    elif mode == "network":
        out = build_network().invoke(
            {"ticker": ticker, "bull_args": [], "bear_args": [],
             "quant_signal": "", "rounds": 0, "verdict": "", "next_speaker": ""},
            {"recursion_limit": 30},
        )
        print("=== Network VERDICT ===")
        print(out["verdict"])
    else:
        print("usage: python multi_agent.py [sequential|hierarchical|network] [TICKER]")

if __name__ == "__main__":
    main()

四、对比 — 同任务三种拓扑

任务:"决定是否做多 AAPL,给出理由"。

拓扑LLM call总 tokensCost延迟输出风格
Sequential34-6k$0.0312-18s整齐、单视角
Hierarchical4-68-12k$0.06-0.1025-40s全面、有 manager 综合
Network6-912-20k$0.12-0.2040-60s多视角、有论辩张力

观察:

  • Sequential 适合"任务已知步骤"
  • Hierarchical 适合"需要多角度但 manager 知道怎么调度"
  • Network 适合"对抗性辩论 / 不同视角碰撞"——成本最高但有时唯一能产生 insight 的方式

五、金融领域应用

各拓扑典型应用

场景拓扑
标准研报生产Sequential
投资委员会自动化Hierarchical(CIO + 各分析师)
衍生品策略论辩Network(多对策略 + 风控辩论)
客户咨询单轮Sequential
复杂尽调(IPO/M&A)Hierarchical
监管事件应急Hierarchical(指挥官 + 法务/合规/IT/PR)
反欺诈调查Network(多源调查员对话)

六、Web3 集成

Onchain 多 agent 协作

  • Hierarchical:Manager agent(链下)调度多个特化 onchain agent(DeFi 操作 / NFT 操作 / 跨链)
  • Network:DAO 治理模拟,多个 voter agent 辩论 + 投票
  • Sequential:链上交易 pipeline(rebalance → simulate → sign → submit → verify)

拓扑选择对 onchain agent 的影响

拓扑链上 fit
Sequential最适合(清晰 audit trail)
Hierarchical适合(manager 决定谁有权动哪资产)
Network不推荐(链上多对话 settlement 复杂)

链上场景偏好"action 链路清晰可追溯",因此 Sequential / Hierarchical 占主导。Network 留给链下 reasoning,最终决定再交给一个 onchain agent。


七、生产经验与陷阱

  1. Hierarchical manager 选错下属 Manager LLM 没看清 worker 的 capability,反复调同一 worker 或漏掉关键 worker。Mitigation:① worker description 写得像 tool description;② manager system prompt 给"如果不确定先调 fundamentals";③ 监控 worker 调用频次分布。

  2. Network 死循环 bull/bear 互相反驳没完。强 max_rounds + moderator 强制收敛。

  3. Sequential 错误累积 research 步骤错了 1 个数字,下游 writer/reviewer 全错。每步加 sanity check("金额是否合理")。

  4. 状态传递膨胀 Hierarchical 里 manager 把所有 worker 输出累积到 state,10 个 worker 后 state 超大。Manager 只需要 summary,详细写到 file。

  5. 并行 worker 状态合并冲突 两个 worker 同时改 state 字段,没标 reducer 一个被覆盖。所有 list/dict 标 reducer。

  6. Cost 失控 Network 拓扑容易跑出 50+ LLM call。设 cost_cap,超即强停。

  7. Worker 角色偏移 Risk worker 跑成"无所不评"的通用 LLM。强 system prompt + 限制 worker 工具集(risk worker 只能用 risk tools)。


八、Cost & Latency

不同拓扑的成本规模

拓扑LLM call 增长适合任务规模
SequentialO(N),N=节点数小-中
HierarchicalO(M·R),M=worker, R=rounds中-大
NetworkO(N²·R),最坏仅高价值任务

优化杠杆

  • Manager / moderator 用便宜模型(haiku 也行)
  • Worker 输出 cap(每个 < 500 token)
  • Cache common context(同 ticker 的研究 cache 一天)
  • Parallel workers 真并发跑而不是顺序

九、关键速查

拓扑选择决策

任务步骤已知、固定?     → Sequential
需要多个专业角色但 manager 决定调度? → Hierarchical
需要对抗 / 多视角碰撞?    → Network
不确定?               → Sequential 起步,复杂化时升级

多 agent 设计 checklist

  • 每个 agent 角色 + 工具 + system prompt 清晰
  • 终止条件(rounds / cost / verdict)
  • state 字段都标了 reducer
  • 有 audit trail(trace 每个 agent 输出)
  • cost cap
  • 并行节点不写共享字段(或用 reducer)
  • manager 有清晰的"如何选 worker"指令

十、面试题

Q1: 三种拓扑的本质区别是什么?

A: 控制流模式。Sequential:决定写在代码里(DAG 静态);Hierarchical:manager LLM 动态决定(DAG 半动态);Network:每个 agent 自己决定下一个发给谁(DAG 全动态)。从"代码控制"到"LLM 控制"的频谱。LLM 控制越多,能力越强但可控性、成本、调试难度都恶化。

Q2: 给一个真实金融场景,多 agent 协作明显比单 agent 好。

A: 投资委员会模拟。单 agent 容易陷入 confirmation bias(自圆其说)。Network 拓扑里 Bull / Bear 互相挑刺,moderator 综合,输出鲁棒性显著高。线下数据:同一组投资 idea,单 agent 误判率 ~30%,多 agent debate 后误判 ~12%(来自论文 ChatEval / DebateGPT 趋势)。

Q3: Hierarchical 拓扑下 manager 选错 worker 怎么办?

A: ① 优化 worker description(同 tool description 标准);② Manager prompt 加"如果不确定先调 fundamentals";③ 引入 routing classifier(小模型预先分类);④ Manager 失败 N 次后 fallback to round-robin;⑤ 长期:训练专门的 router model。

Q4: Network 拓扑死锁如何破?

A: ① 硬 max_rounds + cost_cap;② Moderator/Judge agent 必须能强行收敛;③ 监控同一对 agent 反复对话相似度,超阈值停;④ 角色 system prompt 包含"在 N 轮内必须达成共识或宣告分歧";⑤ 引入 timeout(每个 agent 30s 内必须回答)。

Q5: 选 Sequential 而非更"高级"的拓扑,业务方质疑你不够创新,怎么回应?

A: 三个事实:① Sequential 在 80% 业务场景已足够(Anthropic 公开建议从最简单做起);② 复杂拓扑成本是 3-10x,没数据支撑就是浪费;③ 可演进——Sequential 起步,监控数据后判断升级到 Hierarchical 或加 critic agent。"最贵最复杂"不等于"最好",PM 要负责任地为公司省钱+保证可维护性。


十一、复合拓扑——真实生产里的样子

教科书把拓扑分三种,但真实大型 agent 系统往往是嵌套的:

              Top-level Manager
                /           \
         Sub-Crew A       Sub-Crew B
         (Sequential)     (Network)
         /  |  \         / | \
        A1  A2  A3     B1 B2 B3

例如:

  • Top:客户经理 master agent
  • Sub-Crew A(Sequential):合规 → 风险 → 文档生成
  • Sub-Crew B(Network):市场观点辩论(bull/bear/neutral)

设计原则:

  1. 每层节点数 ≤ 5——超过就拆 sub-crew
  2. 跨层通信收敛到 manager——禁止跨 crew 直连,否则不可审
  3. 每个 sub-crew 有独立 SLA——独立超时、独立 cost cap
  4. 嵌套 ≤ 3 层——再深就难以理解和监控

十二、状态合并的并发陷阱

Hierarchical 并行 worker 时的状态冲突

如果 manager 同时 dispatch 给多个 worker("并行扇出"),他们都返回 {"findings": new_dict}

# WRONG: 没标 reducer
class Bad(TypedDict):
    findings: dict   # 后写覆盖前写

# RIGHT
def merge(a, b):
    return {**a, **b}
class Good(TypedDict):
    findings: Annotated[dict, merge]

Network 拓扑里的循环写入

bull/bear 都 append 到 args list:

class S(TypedDict):
    bull_args: Annotated[list[str], operator.add]
    bear_args: Annotated[list[str], operator.add]

教训:所有可能并行修改的字段都需要 reducer。否则数据丢失,难以发现。


十三、生产案例——多 agent 应急响应

某交易所安全事件应急响应(2025 真实案例的简化版):

Top: Incident Commander Agent
   ├── Forensics (Sequential): 拉日志 → 解析 → 定位 root cause
   ├── Communication (Sequential): 草拟用户公告 → 草拟监管报告 → 草拟内部 RCA
   └── Mitigation (Network): tech 提方案 / risk 评估 / legal 审 → moderator 决定

数据

  • 平均响应时间从 2-3h → 30 min
  • LLM cost / 事件 ~$2-5
  • 关键决策仍人工签——agent 产文稿/方案,CEO/CTO 拍板

反思

  • Sequential 部分(forensics)完全可信,自动化无碍
  • Network 部分(mitigation)需要人工最终批准
  • Communication 部分用 agent 草拟 + 人工 review = 提速 + 守底线

十四、扩展练习

  1. 加并行 fan-out——hierarchical 里 manager 同时 dispatch 3 worker,结果用 reducer 合并
  2. 改成 LLM-as-judge moderator——network 拓扑里 moderator 给定量评分而非定性裁决
  3. 加 timeout per worker——某 worker 卡住不阻塞全队
  4. 加 fallback worker——主 worker 失败用便宜 worker 兜底
  5. 可视化拓扑——graph.get_graph().draw_mermaid() 输出图供文档
  6. 嵌套 sub-crew——把 research_team 自己变成一个 hierarchical sub-graph
  7. 审计 trace——每个 agent 的 input/output 写到 audit log,模拟事后调查

明日预告

Day 160: Agent 评估——Trajectory eval / Step-by-step / End-to-end / AgentBench

  • 评估 agent 比评估 LLM 难在哪
  • 各种 metric:success rate / efficiency / cost / safety
  • 用 AgentBench 风格设计 eval 套件