返回架构笔记
Arch Day 224

Arch Day 224: Agent编排模式 — ReAct/Plan-and-Execute/Reflection深度

Arch Day 224: Agent编排模式 — ReAct/Plan-and-Execute/Reflection深度

2026-04-01
第九阶段 - AI Agent深度
AgentPatternReActPlanExecuteReflectionToolUse状态管理

日期: 2026-04-01 (Day 224) 阶段: 第九阶段 - AI Agent深度 标签: #AgentPattern #ReAct #PlanExecute #Reflection #ToolUse #状态管理


目录

  1. 核心概念
  2. ReAct Pattern深度剖析
  3. Plan-and-Execute模式
  4. Reflection/Self-Critique模式
  5. Tool Use模式与API设计
  6. State Management架构
  7. Routing Patterns
  8. 对比分析总表
  9. 架构设计实操
  10. 与Web3/DeFi的关联
  11. 面试题准备
  12. 明日预告

核心概念

Agent编排的本质问题

AI Agent的核心挑战不是"能不能调用工具",而是如何决定调用什么工具、以什么顺序、在什么条件下。这就是编排(Orchestration)问题。

2025-2026年,随着Claude、GPT-4o、Gemini等模型的tool use能力成熟,Agent编排从学术概念变成了工程实践。几种主流模式逐渐形成共识:

模式核心思想代表实现
ReActThought→Action→Observation循环LangChain Agent, Claude tool_use
Plan-and-Execute先规划再执行,失败时修正计划Claude Code, BabyAGI
Reflection自我审查和迭代改进Reflexion, Constitutional AI
Routing根据任务复杂度分发到不同模型/流程Anthropic Model Router, OpenAI Swarm

为什么需要不同的编排模式?

简单任务 → "帮我查一下ETH价格"
  → ReAct就够了:Thought(需要查价格) → Action(调用API) → Observation(返回$3,500)

复杂任务 → "分析过去30天的DeFi协议TVL变化,找出增长最快的3个,并解释原因"
  → 需要Plan-and-Execute:
    Step 1: 获取DeFi协议列表
    Step 2: 查询每个协议的TVL历史数据
    Step 3: 计算增长率并排序
    Step 4: 分析增长原因
    Step 5: 生成报告

极复杂任务 → "设计一个跨链DEX聚合器的完整架构"
  → 需要Reflection:生成初稿 → 自我审查 → 发现遗漏 → 迭代改进

ReAct Pattern深度剖析

1. 基本原理

ReAct (Reasoning + Acting) 由Yao等人在2022年提出,核心思想是将推理(Reasoning)和行动(Acting)交织在一起,形成 Thought → Action → Observation 的循环:

用户输入: "ETH现在多少钱?比昨天涨了还是跌了?"

[Thought 1] 用户想知道ETH当前价格和价格变化。我需要先获取当前价格。
[Action 1] call_tool(get_eth_price, {})
[Observation 1] {"price": 3547.82, "currency": "USD"}

[Thought 2] 当前价格是$3,547.82。现在需要获取昨天的价格来比较。
[Action 2] call_tool(get_eth_price_history, {"date": "2026-03-31"})
[Observation 2] {"price": 3412.15, "currency": "USD"}

[Thought 3] 昨天价格$3,412.15,今天$3,547.82,涨了$135.67 (3.98%)。可以回答了。
[Final Answer] ETH当前价格$3,547.82,比昨天上涨了$135.67(+3.98%)。

2. Claude的tool_use作为ReAct实现

Claude的tool_use API本质上是ReAct模式的工程实现。2025-2026年Claude的实现已经非常成熟:

# Claude tool_use API示例
import anthropic

client = anthropic.Anthropic()

tools = [
    {
        "name": "get_eth_price",
        "description": "获取ETH当前价格",
        "input_schema": {
            "type": "object",
            "properties": {},
            "required": []
        }
    },
    {
        "name": "execute_swap",
        "description": "在DEX上执行代币交换",
        "input_schema": {
            "type": "object",
            "properties": {
                "from_token": {"type": "string"},
                "to_token": {"type": "string"},
                "amount": {"type": "number"}
            },
            "required": ["from_token", "to_token", "amount"]
        }
    }
]

response = client.messages.create(
    model="claude-sonnet-4-20250514",
    max_tokens=1024,
    tools=tools,
    messages=[
        {"role": "user", "content": "帮我把1 ETH换成USDC"}
    ]
)

# Claude会返回tool_use类型的content block
# {
#   "type": "tool_use",
#   "id": "toolu_xxx",
#   "name": "get_eth_price",       ← Claude决定先查价格(Thought隐含在决策中)
#   "input": {}
# }

关键设计点:Claude的tool_use与传统ReAct的区别在于,Thought过程可以是隐式的(模型内部推理),也可以通过extended thinking显式暴露。2025年推出的extended thinking功能让开发者能看到Claude的推理链条。

3. ReAct的优势场景

ReAct在以下场景表现最优:

场景原因示例
单步或少步工具调用推理简单,不需要长期规划查价格、查余额、发送交易
信息检索类任务搜索→获取→总结的线性流程"Aave V4的TVL是多少?"
明确的工具边界工具功能清晰,输入输出明确调用API、读写数据库
实时交互用户需要快速响应聊天机器人、客服Agent

4. ReAct的失败模式

ReAct在复杂任务中容易出现的问题:

问题一:循环陷阱(Loop Trap)

[Thought] 我需要查找这个地址的交易历史
[Action] search_transactions(address="0xabc...")
[Observation] Error: rate limit exceeded
[Thought] 出错了,我再试一次
[Action] search_transactions(address="0xabc...")     ← 重复相同的失败操作
[Observation] Error: rate limit exceeded
[Thought] 再试...                                     ← 无限循环

问题二:短视决策(Myopic Planning)

任务: "对比Uniswap和Curve的gas效率"

[Thought] 我先查Uniswap的swap gas
[Action] get_gas_data(protocol="uniswap")
[Observation] avg_gas: 150,000

[Thought] 好的,现在查Curve
[Action] get_gas_data(protocol="curve")
[Observation] avg_gas: 250,000

[Thought] Uniswap gas更低,所以Uniswap更高效。        ← 过于简化,没有考虑
                                                         交易量、滑点、多池对比等

问题三:Token成本膨胀

ReAct模式下,每一轮都需要将完整的历史(包括所有Observation)传回模型。对于多步任务:

Round 1: prompt_tokens = 500,  completion_tokens = 100
Round 2: prompt_tokens = 1200, completion_tokens = 100   ← 包含了Round 1的Observation
Round 3: prompt_tokens = 2500, completion_tokens = 100   ← 累积
Round 4: prompt_tokens = 4200, completion_tokens = 100
...

10轮交互后,prompt_tokens可能超过20,000
按Claude Sonnet $3/M input计算:仅输入就~$0.06/次任务
如果是高频Agent,成本会快速失控

5. Token成本优化策略

# 策略1: Observation压缩
def compress_observation(raw_observation: str, max_tokens: int = 500) -> str:
    """将冗长的API返回压缩为核心信息"""
    if len(raw_observation) > max_tokens * 4:  # 粗略估计
        # 用小模型做摘要
        summary = haiku_summarize(raw_observation)
        return summary
    return raw_observation

# 策略2: 滑动窗口 — 只保留最近N轮
def sliding_window_messages(messages: list, window_size: int = 5) -> list:
    """保留system message + 最近N轮对话"""
    system_msgs = [m for m in messages if m["role"] == "system"]
    recent_msgs = messages[-window_size * 2:]  # user+assistant各算一轮
    return system_msgs + recent_msgs

# 策略3: 重要信息提取
def extract_key_facts(observations: list[str]) -> str:
    """从所有observation中提取关键事实,作为上下文摘要"""
    facts = []
    for obs in observations:
        # 提取数值、状态、结论等关键信息
        key_info = extract_structured_info(obs)
        facts.append(key_info)
    return "\n".join(facts)

Plan-and-Execute模式

1. 核心思想

Plan-and-Execute模式将Agent拆分为两个独立角色:

  • Planner(规划器):分析任务,生成执行计划(步骤列表)
  • Executor(执行器):按计划逐步执行,返回每步结果
  • Re-Planner(重规划器):根据执行结果修正计划
┌─────────────────────────────────────────────────┐
│                  用户任务                         │
│  "分析Top 10 DeFi协议的安全风险"                   │
└──────────────────────┬──────────────────────────┘
                       │
                       ▼
              ┌────────────────┐
              │   Planner      │
              │  (规划阶段)     │
              └───────┬────────┘
                      │ 生成计划
                      ▼
    ┌──────────────────────────────────────┐
    │  Plan:                               │
    │  1. 获取DeFi协议TVL排名              │
    │  2. 对每个协议查询审计报告            │
    │  3. 查询历史安全事件                  │
    │  4. 分析合约风险指标                  │
    │  5. 生成综合风险评分                  │
    │  6. 撰写分析报告                     │
    └──────────────┬───────────────────────┘
                   │
                   ▼
           ┌───────────────┐
           │   Executor     │     ← 逐步执行
           │   Step 1...    │
           └───────┬───────┘
                   │ 结果
                   ▼
           ┌───────────────┐
           │  Re-Planner   │     ← 评估结果,是否需要修正计划
           │  (继续/修正)   │
           └───────┬───────┘
                   │
                   ▼
           ┌───────────────┐
           │   Executor     │
           │   Step 2...    │
           └───────────────┘

2. Claude Code作为Plan-and-Execute的典型实现

Claude Code(Anthropic官方CLI工具)在2025年发布后,成为Plan-and-Execute模式的标杆实现。其工作流程:

用户: "给这个项目添加用户认证功能"

[Planning Phase — Claude Code内部]
Claude思考:
  1. 我需要先了解项目结构 → 读取文件
  2. 确定使用的技术栈 → 检查package.json
  3. 设计认证方案 → 生成计划
  4. 逐步实现 → 创建/修改文件
  5. 验证 → 运行测试

[Execution Phase]
Step 1: Read package.json → 发现是Next.js项目
Step 2: Read app/layout.tsx → 理解现有结构
Step 3: 决定使用NextAuth.js
Step 4: 创建 src/lib/auth.ts
Step 5: 修改 app/api/auth/[...nextauth]/route.ts
Step 6: 添加 middleware.ts
Step 7: 运行 npm run build → 发现类型错误

[Re-Planning Phase]
Build失败了,需要修正:
Step 7.1: 读取错误信息
Step 7.2: 修复类型定义
Step 7.3: 重新build → 成功

[Complete]

Claude Code的关键设计选择

设计点实现方式为什么
计划不是固定的每步执行后可重新评估代码修改的结果不可完全预测
工具粒度适中Read/Edit/Bash/Grep等太细则步骤太多,太粗则不灵活
上下文保持全部对话历史代码修改需要前后一致性
失败恢复读取错误→分析→修正编程中错误是常态
并行工具调用独立的读取操作可并行减少轮次,降低延迟

3. Plan-and-Execute的优势

对比ReAct的核心优势

ReAct: 走一步看一步(贪心策略)
  ├── 优点:响应快,简单任务效率高
  └── 缺点:容易迷失方向,重复操作

Plan-and-Execute: 先看地图再出发(全局策略)
  ├── 优点:方向明确,步骤清晰,可追踪进度
  └── 缺点:计划生成有开销,计划可能不准确

具体优势分析

  1. 任务分解的透明性 — 用户和开发者能看到完整计划,便于理解和调试
  2. 进度可追踪 — 清楚知道"已完成3/6步"
  3. 失败可恢复 — 某步失败不需要从头开始,只需修正计划
  4. 资源可预估 — 根据步骤数估算Token消耗和时间

4. 计划修正(Re-Planning)策略

计划修正是Plan-and-Execute模式的核心难题。2025-2026年的实践总结出几种策略:

# 策略1: 全量重规划 — 每步执行后用完整上下文重新生成计划
def full_replan(original_plan, completed_steps, current_result):
    prompt = f"""
    原始计划: {original_plan}
    已完成步骤及结果: {completed_steps}
    当前步骤结果: {current_result}

    请根据以上信息,重新生成剩余步骤的计划。
    如果当前步骤失败,请在计划中加入修复步骤。
    """
    new_plan = llm.generate(prompt)
    return new_plan

# 策略2: 增量修正 — 只修改受影响的后续步骤
def incremental_replan(plan, failed_step_index, error_info):
    affected_steps = plan[failed_step_index:]
    prompt = f"""
    步骤{failed_step_index}执行失败: {error_info}
    受影响的后续步骤: {affected_steps}

    请只修改受影响的步骤,保持其他步骤不变。
    """
    revised_steps = llm.generate(prompt)
    plan[failed_step_index:] = revised_steps
    return plan

# 策略3: 条件分支 — 预先定义失败时的替代路径
def conditional_plan():
    return {
        "step_1": {
            "action": "fetch_data_from_api_a",
            "on_success": "step_2",
            "on_failure": "step_1_fallback"
        },
        "step_1_fallback": {
            "action": "fetch_data_from_api_b",  # 备用数据源
            "on_success": "step_2",
            "on_failure": "abort_with_error"
        },
        "step_2": { ... }
    }

5. 实现Plan-and-Execute的架构

from dataclasses import dataclass
from enum import Enum
from typing import Optional

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"
    SKIPPED = "skipped"

@dataclass
class PlanStep:
    id: int
    description: str
    tool: str
    tool_input: dict
    status: StepStatus = StepStatus.PENDING
    result: Optional[str] = None
    error: Optional[str] = None
    dependencies: list[int] = None  # 依赖的步骤ID

@dataclass
class ExecutionPlan:
    goal: str
    steps: list[PlanStep]
    current_step: int = 0
    revision_count: int = 0
    max_revisions: int = 3

class PlanAndExecuteAgent:
    def __init__(self, planner_model, executor_model):
        self.planner = planner_model    # 可以用更强的模型做规划
        self.executor = executor_model   # 用更快/便宜的模型做执行

    def create_plan(self, user_goal: str) -> ExecutionPlan:
        """第一阶段:生成执行计划"""
        planning_prompt = f"""
        用户目标: {user_goal}

        请生成一个详细的执行计划,包含:
        1. 具体步骤列表
        2. 每步需要的工具
        3. 步骤间的依赖关系

        输出JSON格式。
        """
        plan_json = self.planner.generate(planning_prompt)
        return self._parse_plan(plan_json)

    def execute_plan(self, plan: ExecutionPlan) -> str:
        """第二阶段:逐步执行"""
        while plan.current_step < len(plan.steps):
            step = plan.steps[plan.current_step]

            # 检查依赖是否满足
            if not self._dependencies_met(plan, step):
                step.status = StepStatus.SKIPPED
                plan.current_step += 1
                continue

            # 执行当前步骤
            step.status = StepStatus.RUNNING
            try:
                result = self._execute_step(step)
                step.result = result
                step.status = StepStatus.COMPLETED
            except Exception as e:
                step.error = str(e)
                step.status = StepStatus.FAILED

                # 触发重规划
                if plan.revision_count < plan.max_revisions:
                    plan = self._revise_plan(plan, step)
                    plan.revision_count += 1
                else:
                    raise RuntimeError(f"计划修正次数超限: {e}")

            plan.current_step += 1

        return self._synthesize_results(plan)

    def _revise_plan(self, plan: ExecutionPlan, failed_step: PlanStep) -> ExecutionPlan:
        """第三阶段:修正计划"""
        revision_prompt = f"""
        原始目标: {plan.goal}
        执行进度: {self._format_progress(plan)}
        失败步骤: {failed_step.description}
        错误信息: {failed_step.error}

        请修正剩余计划。
        """
        revised_plan = self.planner.generate(revision_prompt)
        return self._merge_plans(plan, revised_plan)

Reflection/Self-Critique模式

1. 核心思想

Reflection模式的核心是让Agent审查自己的输出,发现问题并迭代改进。这不同于ReAct的"向外行动",而是"向内审视"。

┌──────────┐    生成    ┌──────────┐    审查     ┌──────────┐
│  Task     │──────────→│ Draft    │──────────→ │ Critique │
│  (任务)   │           │ (初稿)   │            │ (批评)   │
└──────────┘           └──────────┘            └─────┬────┘
                            ▲                        │
                            │     修改建议            │
                            └────────────────────────┘
                            (循环直到质量满足要求)

2. Reflexion框架(2023-2025)

Reflexion由Shinn等人提出,将Reflection形式化为三个组件:

Actor:     负责执行任务,生成输出
Evaluator: 评估输出质量(可以是LLM、代码测试、人类)
Reflector: 分析失败原因,生成改进建议(存入memory)

工作流程:
  Round 1: Actor生成代码 → Evaluator运行测试(3/10通过) → Reflector分析失败原因
  Round 2: Actor读取Reflector建议 + 修改代码 → Evaluator(7/10通过) → 继续
  Round 3: Actor再次修改 → Evaluator(10/10通过) → 完成

3. Constitutional AI作为Reflection的制度化

Anthropic的Constitutional AI (CAI)可以看作Reflection模式的制度化实现:

普通Reflection:
  Agent: 我生成了一个回答
  Agent(反思): 这个回答有什么问题吗?→ 发现了偏见 → 修正

Constitutional AI:
  Agent: 我生成了一个回答
  Constitution(原则集):
    - 这个回答是否有害?
    - 是否尊重用户隐私?
    - 是否准确?
    - 是否有偏见?
  Agent: 根据这些原则审查 → 发现违反原则3 → 修正

CAI的价值在于:把"自我反思"从随意的自由评估,变成了基于明确原则的系统性审查

4. Claude的Extended Thinking作为Reflection

2025年Claude推出的extended thinking功能,本质上是在生成最终回答前的"内部Reflection":

用户: "帮我设计一个DeFi借贷协议的清算机制"

[Extended Thinking — 用户可见的推理过程]
让我思考清算机制的关键要素...

首先考虑触发条件:
- 健康因子(Health Factor)低于1.0时触发
- 但是否需要缓冲区?如果HF=0.99就触发,可能导致不必要的清算
- Aave用1.0作为阈值,Compound用collateral factor
- 我认为应该建议使用连续阈值而非单一阈值

等等,我还需要考虑清算人的激励...
- 清算奖励太高 → 鼓励恶意清算
- 清算奖励太低 → 没人愿意执行清算
- Aave的做法是5%清算奖励,这个比例是否合理?

还有一个关键问题:在市场剧烈波动时...
- 预言机价格可能延迟
- Gas费可能飙升
- 需要考虑级联清算(Cascade Liquidation)风险

让我重新组织一下思路...

[最终输出]
基于以上分析,我建议以下清算机制设计...

Extended Thinking的特点

  • 推理过程对用户透明(可选)
  • 消耗额外的thinking tokens,但不计入output token计费(Anthropic 2025年的定价策略)
  • 允许模型在输出前"改变主意"
  • 本质上是一种内置的Reflection loop

5. 实现Reflection的模式

class ReflectionAgent:
    def __init__(self, generator_model, critic_model, max_iterations=3):
        self.generator = generator_model
        self.critic = critic_model
        self.max_iterations = max_iterations

    def run(self, task: str, quality_criteria: list[str]) -> str:
        draft = self.generate_initial(task)

        for i in range(self.max_iterations):
            # 批评阶段
            critique = self.critique(draft, task, quality_criteria)

            # 检查是否满足所有标准
            if critique["passed"]:
                return draft

            # 改进阶段
            draft = self.improve(draft, critique["feedback"], task)

        return draft  # 达到最大迭代次数,返回最后版本

    def generate_initial(self, task: str) -> str:
        return self.generator.generate(f"请完成以下任务:\n{task}")

    def critique(self, draft: str, task: str, criteria: list[str]) -> dict:
        criteria_text = "\n".join(f"- {c}" for c in criteria)
        prompt = f"""
        任务: {task}
        当前输出:
        {draft}

        请根据以下标准评估输出质量:
        {criteria_text}

        对每个标准给出通过/不通过的判断和具体反馈。
        如果所有标准都通过,设passed=true。
        """
        return self.critic.generate_json(prompt)

    def improve(self, draft: str, feedback: str, task: str) -> str:
        prompt = f"""
        原始任务: {task}
        当前版本: {draft}
        改进建议: {feedback}

        请根据建议改进输出,保持已经良好的部分不变。
        """
        return self.generator.generate(prompt)

6. Reflection的成本与收益

成本分析(假设用Claude Sonnet):
  初次生成:  ~1,000 output tokens
  每轮Critique: ~500 output tokens
  每轮改进:  ~1,000 output tokens

  1次Reflection: 1,000 + 500 + 1,000 = 2,500 tokens
  3次Reflection: 1,000 + 3 × (500 + 1,000) = 5,500 tokens

收益:
  - 代码正确率提升 20-40%(Reflexion论文数据)
  - 复杂文档的完整性提升显著
  - 减少人工review的时间

ROI计算:
  如果Reflection能减少1次人工干预(假设30分钟,工程师$100/h)
  Token成本: ~$0.02
  节省: $50
  ROI: 2500x

Tool Use模式与API设计

1. Claude的Tool Use API设计哲学

Claude的tool_use API在2025-2026年经过多次迭代,形成了成熟的设计:

# 核心概念:tools数组定义可用工具
tools = [
    {
        "name": "get_defi_tvl",
        "description": "获取DeFi协议的TVL数据。支持按链和时间范围查询。",
        "input_schema": {
            "type": "object",
            "properties": {
                "protocol": {
                    "type": "string",
                    "description": "协议名称,如 'aave', 'uniswap', 'compound'"
                },
                "chain": {
                    "type": "string",
                    "enum": ["ethereum", "arbitrum", "optimism", "base"],
                    "description": "区块链网络"
                },
                "timeframe": {
                    "type": "string",
                    "enum": ["24h", "7d", "30d", "90d"],
                    "description": "时间范围"
                }
            },
            "required": ["protocol"]
        }
    }
]

# tool_choice控制模型行为
response = client.messages.create(
    model="claude-sonnet-4-20250514",
    tools=tools,
    tool_choice={"type": "auto"},      # auto: 模型自己决定是否调用工具
    # tool_choice={"type": "any"},      # any: 必须调用至少一个工具
    # tool_choice={"type": "tool", "name": "get_defi_tvl"},  # 强制调用特定工具
    messages=[...]
)

2. 并行工具调用(Parallel Tool Calls)

2025年的Claude支持在单次响应中返回多个工具调用请求,大幅减少交互轮次:

# 用户: "帮我对比Aave和Compound在Ethereum和Arbitrum上的TVL"

# Claude的响应可能包含4个并行的tool_use blocks:
response.content = [
    {
        "type": "text",
        "text": "我来同时查询这些数据。"
    },
    {
        "type": "tool_use",
        "id": "toolu_1",
        "name": "get_defi_tvl",
        "input": {"protocol": "aave", "chain": "ethereum"}
    },
    {
        "type": "tool_use",
        "id": "toolu_2",
        "name": "get_defi_tvl",
        "input": {"protocol": "aave", "chain": "arbitrum"}
    },
    {
        "type": "tool_use",
        "id": "toolu_3",
        "name": "get_defi_tvl",
        "input": {"protocol": "compound", "chain": "ethereum"}
    },
    {
        "type": "tool_use",
        "id": "toolu_4",
        "name": "get_defi_tvl",
        "input": {"protocol": "compound", "chain": "arbitrum"}
    }
]

# 开发者并行执行所有工具调用,然后一次性返回所有结果
tool_results = [
    {"type": "tool_result", "tool_use_id": "toolu_1", "content": '{"tvl": 12500000000}'},
    {"type": "tool_result", "tool_use_id": "toolu_2", "content": '{"tvl": 3200000000}'},
    {"type": "tool_result", "tool_use_id": "toolu_3", "content": '{"tvl": 2800000000}'},
    {"type": "tool_result", "tool_use_id": "toolu_4", "content": '{"tvl": 450000000}'},
]

并行调用的架构意义

  • 减少LLM调用次数 → 降低延迟
  • 独立的数据获取可以并行 → 提高效率
  • 需要开发者端支持并发执行 → 架构需要异步处理能力

3. 顺序依赖处理

当工具调用之间存在依赖关系时,模型需要分多轮处理:

Round 1:
  [tool_use] get_wallet_balance(address="0xabc")
  → 返回: {"eth": 5.2, "usdc": 10000}

Round 2: (依赖Round 1的结果)
  [tool_use] calculate_optimal_swap(
    from_token="eth",
    from_amount=2.0,        ← 基于余额决定交换数量
    to_token="usdc"
  )
  → 返回: {"route": "ETH→WETH→USDC", "expected_output": 7095.40}

Round 3: (依赖Round 2的结果)
  [tool_use] execute_swap(
    route="ETH→WETH→USDC",
    amount=2.0,
    min_output=6956.49      ← 7095.40 * 0.98 (2% slippage)
  )

4. 错误处理与重试模式

class ToolExecutor:
    """健壮的工具执行器,包含错误处理和重试逻辑"""

    def __init__(self, max_retries=3, retry_delay=1.0):
        self.max_retries = max_retries
        self.retry_delay = retry_delay

    def execute_tool(self, tool_name: str, tool_input: dict) -> dict:
        for attempt in range(self.max_retries):
            try:
                result = self._call_tool(tool_name, tool_input)
                return {"status": "success", "data": result}
            except RateLimitError:
                # 速率限制 → 等待后重试
                time.sleep(self.retry_delay * (2 ** attempt))  # 指数退避
                continue
            except ToolNotFoundError as e:
                # 工具不存在 → 直接返回错误,不重试
                return {"status": "error", "error": f"Tool not found: {e}", "retryable": False}
            except TimeoutError:
                # 超时 → 重试
                continue
            except ValidationError as e:
                # 输入验证失败 → 返回错误让Agent修正输入
                return {
                    "status": "error",
                    "error": f"Invalid input: {e}",
                    "retryable": False,
                    "suggestion": "Please check the input parameters"
                }

        return {"status": "error", "error": "Max retries exceeded", "retryable": False}

    def format_error_for_agent(self, error_result: dict) -> str:
        """将错误格式化为Agent可理解的消息"""
        if error_result["status"] == "error":
            msg = f"工具调用失败: {error_result['error']}"
            if error_result.get("suggestion"):
                msg += f"\n建议: {error_result['suggestion']}"
            return msg
        return ""

5. Tool Description的工程实践

工具描述(Tool Description)的质量直接影响Agent选择和使用工具的准确性:

# 差的工具描述
bad_tool = {
    "name": "swap",
    "description": "做交换",       # 太模糊
    "input_schema": {
        "type": "object",
        "properties": {
            "a": {"type": "string"},  # 参数名无意义
            "b": {"type": "string"},
            "c": {"type": "number"}
        }
    }
}

# 好的工具描述
good_tool = {
    "name": "execute_dex_swap",
    "description": """在去中心化交易所执行代币交换。

    注意事项:
    - 交换前会自动检查余额是否充足
    - 滑点保护默认为1%,可自定义
    - 支持Uniswap V3, Curve, Balancer路由
    - 交换金额必须大于$10(防止gas费超过交易价值)

    常见错误:
    - INSUFFICIENT_BALANCE: 余额不足
    - SLIPPAGE_EXCEEDED: 实际滑点超过设定值
    - POOL_NOT_FOUND: 交易对不存在""",
    "input_schema": {
        "type": "object",
        "properties": {
            "from_token": {
                "type": "string",
                "description": "源代币地址或符号(如 'ETH', 'USDC', '0xa0b8...')"
            },
            "to_token": {
                "type": "string",
                "description": "目标代币地址或符号"
            },
            "amount": {
                "type": "number",
                "description": "交换数量(以源代币为单位,如交换2.5 ETH则填2.5)"
            },
            "max_slippage_percent": {
                "type": "number",
                "description": "最大可接受滑点百分比,默认1.0",
                "default": 1.0
            }
        },
        "required": ["from_token", "to_token", "amount"]
    }
}

State Management架构

1. Agent状态的三层模型

Agent需要管理三个层次的状态:

┌─────────────────────────────────────────────┐
│  Layer 3: Long-term Memory (持久记忆)        │
│  ├── 用户偏好和历史                           │
│  ├── 学到的知识                               │
│  └── 存储: Vector DB / 文件系统               │
│  保留时间: 跨会话持久                          │
├─────────────────────────────────────────────┤
│  Layer 2: Working Memory (工作记忆)           │
│  ├── 当前任务的中间结果                        │
│  ├── 已收集的数据和事实                        │
│  └── 存储: 结构化变量 / scratchpad             │
│  保留时间: 当前任务                            │
├─────────────────────────────────────────────┤
│  Layer 1: Conversation History (对话历史)     │
│  ├── 用户消息和Agent回复                       │
│  ├── 工具调用和结果                            │
│  └── 存储: messages数组                       │
│  保留时间: 当前会话                            │
└─────────────────────────────────────────────┘

2. Context Window限制的挑战

2025-2026年主流模型的context window:

模型Context Window有效利用范围
Claude Opus 4200K tokens (标准), 1M (beta)100K-500K可靠处理
Claude Sonnet 4200K tokens80K-150K最佳性能
Claude Haiku 3.5200K tokens全范围高效
GPT-4o128K tokens80K以内最佳
Gemini 2.5 Pro1M tokens500K以内可靠

即使有1M context,也不能无限塞入

  • Token成本与context length线性增长
  • 注意力在超长上下文中会"稀释"(lost in the middle问题在2025年有改善但未完全解决)
  • 延迟随context增长(首token延迟增加)

3. 应对Context限制的策略

策略一:Summarization(摘要压缩)

class ConversationSummarizer:
    """当对话历史过长时,压缩早期对话为摘要"""

    def __init__(self, max_tokens=100000, summary_threshold=80000):
        self.max_tokens = max_tokens
        self.summary_threshold = summary_threshold

    def manage_context(self, messages: list) -> list:
        current_tokens = self.count_tokens(messages)

        if current_tokens < self.summary_threshold:
            return messages  # 未超限,不处理

        # 找到需要压缩的部分(保留最近的消息)
        keep_recent = self._find_cutoff(messages, target_tokens=50000)
        old_messages = messages[:keep_recent]
        recent_messages = messages[keep_recent:]

        # 将旧消息压缩为摘要
        summary = self._summarize(old_messages)

        # 构建新的消息列表
        return [
            {"role": "user", "content": f"[对话历史摘要]\n{summary}"},
            {"role": "assistant", "content": "我已了解之前的对话内容,请继续。"},
            *recent_messages
        ]

    def _summarize(self, messages: list) -> str:
        """用小模型快速摘要"""
        conversation_text = self._format_messages(messages)
        summary = haiku_client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=2000,
            messages=[{
                "role": "user",
                "content": f"请将以下对话摘要为关键信息和结论,保留所有重要数据和决策:\n{conversation_text}"
            }]
        )
        return summary.content[0].text

策略二:Sliding Window(滑动窗口)

class SlidingWindowManager:
    """只保留最近N轮对话"""

    def __init__(self, window_size=10):
        self.window_size = window_size
        self.full_history = []  # 完整历史存储(用于RAG检索)

    def add_message(self, message: dict):
        self.full_history.append(message)

    def get_context(self) -> list:
        """返回滑动窗口内的消息"""
        # 始终保留system message
        system_msgs = [m for m in self.full_history if m.get("role") == "system"]

        # 最近N轮(每轮=1个user + 1个assistant)
        recent = self.full_history[-self.window_size * 2:]

        return system_msgs + recent

策略三:RAG over Conversation(对话检索)

class ConversationRAG:
    """将对话历史索引化,按需检索相关片段"""

    def __init__(self, vector_db):
        self.vector_db = vector_db
        self.conversation_id = str(uuid4())

    def store_exchange(self, user_msg: str, assistant_msg: str,
                       tool_calls: list = None):
        """将每轮对话存入向量数据库"""
        content = f"User: {user_msg}\nAssistant: {assistant_msg}"
        if tool_calls:
            content += f"\nTools used: {json.dumps(tool_calls)}"

        self.vector_db.upsert(
            id=f"{self.conversation_id}_{len(self.vector_db)}",
            text=content,
            metadata={
                "conversation_id": self.conversation_id,
                "timestamp": datetime.now().isoformat()
            }
        )

    def retrieve_relevant(self, current_query: str, top_k: int = 5) -> str:
        """检索与当前问题相关的历史对话"""
        results = self.vector_db.query(
            text=current_query,
            filter={"conversation_id": self.conversation_id},
            top_k=top_k
        )

        context = "相关历史对话:\n"
        for r in results:
            context += f"---\n{r.text}\n"
        return context

4. Claude Code的状态管理实践

Claude Code(2025-2026年版本)的状态管理非常值得学习:

Claude Code的状态管理策略:

1. 对话历史: 完整保留在messages数组中
   - 包括所有文件读取结果、命令输出
   - 当接近context limit时,会提示用户开启新会话

2. 工作记忆: 通过CLAUDE.md文件实现
   - 项目级别的上下文(技术栈、编码规范等)
   - 用户级别的偏好(在~/.claude/目录下)
   - 每次会话自动加载

3. 长期记忆: 通过memory文件系统
   - 用户可以要求Claude Code"记住"某些信息
   - 存储在项目的.claude/memory/目录
   - 跨会话持久化

4. 文件系统作为外部状态:
   - 代码文件本身就是持久状态
   - 每次修改都有git作为版本控制
   - 可以随时读取文件恢复上下文

Routing Patterns

1. Model Routing(模型路由)

根据任务复杂度选择不同的模型,是2025-2026年Agent架构的重要优化手段:

class ModelRouter:
    """根据任务复杂度路由到不同模型"""

    # 2025-2026年Anthropic模型矩阵
    MODELS = {
        "fast": "claude-3-5-haiku-20241022",    # 快速、便宜
        "balanced": "claude-sonnet-4-20250514",  # 平衡
        "powerful": "claude-opus-4-20250514",    # 最强推理
    }

    # 成本参考 (per 1M tokens, 2025年价格)
    COSTS = {
        "fast":      {"input": 0.25, "output": 1.25},
        "balanced":  {"input": 3.0,  "output": 15.0},
        "powerful":  {"input": 15.0, "output": 75.0},
    }

    def route(self, task: str, context: dict) -> str:
        complexity = self.assess_complexity(task, context)

        if complexity == "simple":
            return self.MODELS["fast"]
        elif complexity == "moderate":
            return self.MODELS["balanced"]
        else:
            return self.MODELS["powerful"]

    def assess_complexity(self, task: str, context: dict) -> str:
        """评估任务复杂度"""
        # 简单任务的信号
        simple_signals = [
            len(task) < 100,                          # 短指令
            "查询" in task or "获取" in task,           # 简单检索
            context.get("tool_count", 0) <= 1,         # 只需一个工具
        ]

        # 复杂任务的信号
        complex_signals = [
            "设计" in task or "分析" in task,           # 创造性/分析性
            "对比" in task and "方案" in task,           # 多维度比较
            context.get("requires_reasoning", False),   # 需要推理
            len(task) > 500,                           # 长指令
        ]

        simple_score = sum(simple_signals)
        complex_score = sum(complex_signals)

        if complex_score >= 2:
            return "complex"
        elif simple_score >= 2:
            return "simple"
        else:
            return "moderate"

2. Cascade Pattern(级联模式)

先用便宜的模型尝试,不确定时升级到更强的模型:

class CascadeRouter:
    """级联模式:先尝试小模型,失败时升级"""

    async def process(self, task: str) -> str:
        # Level 1: 尝试Haiku
        haiku_result = await self.try_with_model(
            "claude-3-5-haiku-20241022", task
        )

        if haiku_result.confidence > 0.9:
            return haiku_result.text

        # Level 2: 升级到Sonnet
        sonnet_result = await self.try_with_model(
            "claude-sonnet-4-20250514", task,
            context=f"之前的尝试(置信度不足): {haiku_result.text}"
        )

        if sonnet_result.confidence > 0.8:
            return sonnet_result.text

        # Level 3: 使用Opus
        opus_result = await self.try_with_model(
            "claude-opus-4-20250514", task,
            context=f"需要深度推理。之前的尝试: {sonnet_result.text}",
            use_extended_thinking=True
        )

        return opus_result.text

    async def try_with_model(self, model: str, task: str,
                              context: str = None,
                              use_extended_thinking: bool = False) -> Result:
        messages = [{"role": "user", "content": task}]
        if context:
            messages[0]["content"] = f"{context}\n\n{task}"

        kwargs = {"model": model, "messages": messages, "max_tokens": 4096}
        if use_extended_thinking:
            kwargs["thinking"] = {"type": "enabled", "budget_tokens": 10000}

        response = await client.messages.create(**kwargs)

        # 评估置信度(可以通过多种方式)
        confidence = self.evaluate_confidence(response)

        return Result(text=response.content[0].text, confidence=confidence)

3. Prompt Routing(提示路由)

根据任务类型选择不同的system prompt和工具集:

class PromptRouter:
    """根据任务类型路由到专门的prompt和工具集"""

    ROUTES = {
        "defi_analysis": {
            "system_prompt": """你是一个DeFi协议分析专家。
            你擅长分析TVL、交易量、用户行为、代币经济学。
            你总是用数据支撑观点。""",
            "tools": ["get_defi_tvl", "get_token_price", "get_protocol_stats"],
            "model": "claude-sonnet-4-20250514"
        },
        "security_audit": {
            "system_prompt": """你是一个智能合约安全审计专家。
            你擅长识别漏洞、分析攻击向量、提出修复建议。
            你遵循OWASP Smart Contract Top 10。""",
            "tools": ["read_contract", "check_vulnerability", "get_audit_reports"],
            "model": "claude-opus-4-20250514"  # 安全需要最强推理
        },
        "trading_bot": {
            "system_prompt": """你是一个交易执行Agent。
            你负责获取价格、计算最优路由、执行交易。
            你必须严格遵守风控规则。""",
            "tools": ["get_price", "calculate_route", "execute_swap", "check_balance"],
            "model": "claude-3-5-haiku-20241022"  # 交易需要速度
        }
    }

    def route(self, user_message: str) -> dict:
        """分类用户意图并路由"""
        # 用小模型快速分类
        classification = haiku_classify(user_message, list(self.ROUTES.keys()))
        return self.ROUTES[classification]

4. 成本优化的路由实践

实际案例:DeFi监控Agent(日均处理10,000个请求)

优化前(全部使用Sonnet):
  10,000 × 平均 2,000 input tokens × $3/M = $60/天
  10,000 × 平均 500 output tokens × $15/M = $75/天
  总计: $135/天 = $4,050/月

优化后(Model Routing):
  7,000 简单查询用Haiku:
    7,000 × 2,000 × $0.25/M + 7,000 × 500 × $1.25/M = $7.88/天
  2,500 中等任务用Sonnet:
    2,500 × 3,000 × $3/M + 2,500 × 800 × $15/M = $52.50/天
  500 复杂分析用Opus:
    500 × 5,000 × $15/M + 500 × 2,000 × $75/M = $112.50/天
  总计: $172.88/天 ≈ $5,187/月

等等,Opus把成本拉高了?需要再优化:
  - 减少Opus使用比例到2%(200个)
  - 对Opus的输入做压缩

再优化后:
  Haiku: $7.88 + Sonnet: $57.00 + Opus(200): $45.00 = $109.88/天 = $3,296/月

节省: ($4,050 - $3,296) / $4,050 = 18.6%
且质量更高(复杂任务用了更强的模型)

对比分析总表

编排模式对比

维度ReActPlan-and-ExecuteReflectionRouting
核心思想推理-行动循环先规划后执行自我批评迭代任务分发
适用复杂度低-中中-高中-高全范围
延迟低(单步快)中(规划有开销)高(多轮迭代)取决于路由目标
Token成本中(历史累积)高(规划+执行)高(多轮生成)低(优化后)
可预测性低(涌现行为)高(计划透明)
容错能力低(循环陷阱)高(重规划)中(迭代修正)高(级联兜底)
实现复杂度
调试难度低(步骤清晰)高(迭代过程复杂)
代表实现LangChain ReActClaude CodeReflexion/CAIAnthropic Router

选择决策树

你的任务是什么类型?
│
├─ 简单工具调用(1-3步)
│  └─ → ReAct
│
├─ 复杂多步任务(4+步)
│  ├─ 步骤间有明确依赖
│  │  └─ → Plan-and-Execute
│  └─ 步骤间独立
│     └─ → ReAct + 并行工具调用
│
├─ 需要高质量输出(写作/设计/审计)
│  └─ → Reflection
│
├─ 需要处理大量异构请求
│  └─ → Routing (+ 内部用ReAct/Plan-Execute)
│
└─ 生产级Agent系统
   └─ → Routing + Plan-and-Execute + Reflection (组合使用)

组合使用的最佳实践

在生产环境中,这些模式通常组合使用而非单独使用:

生产级DeFi Agent架构:

用户请求 → [Router]
              │
              ├─ 简单查询 → [Haiku + ReAct]
              │               └─ 查价格/余额/Gas
              │
              ├─ 交易执行 → [Sonnet + Plan-and-Execute]
              │               ├─ Step 1: 检查余额
              │               ├─ Step 2: 获取最优路由
              │               ├─ Step 3: 模拟交易
              │               ├─ Step 4: 执行交易
              │               └─ Step 5: 确认结果
              │
              └─ 策略设计 → [Opus + Plan-and-Execute + Reflection]
                              ├─ Plan: 分析市场 → 设计策略 → 回测
                              ├─ Execute: 逐步执行
                              └─ Reflect: 审查策略是否合理

架构设计实操

实操一:设计一个DeFi交易Agent的编排架构

需求:用户可以用自然语言下达交易指令,Agent负责解析、验证、执行。

架构设计:

┌─────────────────────────────────────────────────────────┐
│                    DeFi Trading Agent                    │
├─────────────────────────────────────────────────────────┤
│                                                         │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │ Intent   │───→│ Risk         │───→│ Execution    │  │
│  │ Parser   │    │ Validator    │    │ Engine       │  │
│  │ (Sonnet) │    │ (Rule-based) │    │ (Haiku+Tool) │  │
│  └──────────┘    └──────────────┘    └──────────────┘  │
│       │                │                    │           │
│       ▼                ▼                    ▼           │
│  ┌──────────┐    ┌──────────────┐    ┌──────────────┐  │
│  │ Context  │    │ Risk Rules   │    │ DEX Router   │  │
│  │ Manager  │    │ Engine       │    │ (1inch/CoW)  │  │
│  └──────────┘    └──────────────┘    └──────────────┘  │
│                                                         │
│  ┌─────────────────────────────────────────────────┐   │
│  │              State Manager                       │   │
│  │  ├── Conversation History                        │   │
│  │  ├── Portfolio State                             │   │
│  │  ├── Pending Transactions                        │   │
│  │  └── Risk Limits                                 │   │
│  └─────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────┘

编排模式选择

class DeFiTradingAgent:
    """DeFi交易Agent — Plan-and-Execute + Risk Gate"""

    def __init__(self):
        self.intent_parser = IntentParser(model="claude-sonnet-4-20250514")
        self.risk_validator = RiskValidator()
        self.executor = TradeExecutor(model="claude-3-5-haiku-20241022")
        self.state = StateManager()

    async def process_command(self, user_input: str) -> str:
        # Phase 1: 解析意图 (Sonnet做复杂理解)
        intent = await self.intent_parser.parse(user_input)
        # intent = {"action": "swap", "from": "ETH", "to": "USDC",
        #           "amount": 2.0, "urgency": "normal"}

        # Phase 2: 生成执行计划
        plan = self.create_plan(intent)

        # Phase 3: 风险验证 (规则引擎,不用LLM)
        risk_check = self.risk_validator.validate(plan, self.state)
        if not risk_check.passed:
            return f"交易被风控拦截: {risk_check.reason}"

        # Phase 4: 逐步执行 (Haiku做简单工具调用)
        result = await self.executor.execute_plan(plan)

        # Phase 5: 更新状态
        self.state.update(result)

        return self.format_result(result)

    def create_plan(self, intent: dict) -> ExecutionPlan:
        """根据意图生成执行计划"""
        steps = []

        if intent["action"] == "swap":
            steps = [
                PlanStep("check_balance", {"token": intent["from"]}),
                PlanStep("get_price", {"token": intent["from"]}),
                PlanStep("find_best_route", {
                    "from": intent["from"],
                    "to": intent["to"],
                    "amount": intent["amount"]
                }),
                PlanStep("simulate_transaction", {}),  # 依赖前一步结果
                PlanStep("request_user_confirmation", {}),
                PlanStep("execute_transaction", {}),
                PlanStep("verify_result", {}),
            ]

        return ExecutionPlan(steps=steps)

实操二:设计Agent的记忆系统

class AgentMemorySystem:
    """
    三层记忆系统设计

    Layer 1: 即时记忆(对话历史)— 当前会话
    Layer 2: 工作记忆(任务状态)— 当前任务
    Layer 3: 长期记忆(知识库)  — 跨会话
    """

    def __init__(self, vector_db, max_context_tokens=150000):
        self.conversation_history = []          # Layer 1
        self.working_memory = {}                # Layer 2
        self.long_term_db = vector_db           # Layer 3
        self.max_context_tokens = max_context_tokens

    def build_context(self, current_query: str) -> list:
        """构建发送给LLM的完整上下文"""
        context_parts = []

        # 1. System prompt (固定)
        context_parts.append({
            "role": "system",
            "content": self._build_system_prompt()
        })

        # 2. 从长期记忆检索相关信息
        relevant_memories = self.long_term_db.query(
            current_query, top_k=5
        )
        if relevant_memories:
            context_parts.append({
                "role": "user",
                "content": f"[相关历史记忆]\n{self._format_memories(relevant_memories)}"
            })
            context_parts.append({
                "role": "assistant",
                "content": "我已了解相关的历史背景。"
            })

        # 3. 工作记忆摘要
        if self.working_memory:
            context_parts.append({
                "role": "user",
                "content": f"[当前任务状态]\n{json.dumps(self.working_memory, indent=2)}"
            })
            context_parts.append({
                "role": "assistant",
                "content": "我已了解当前任务进度。"
            })

        # 4. 对话历史(可能需要压缩)
        history = self._maybe_compress_history()
        context_parts.extend(history)

        return context_parts

    def _maybe_compress_history(self) -> list:
        """如果对话历史太长,进行压缩"""
        total_tokens = sum(
            self._estimate_tokens(m["content"])
            for m in self.conversation_history
        )

        if total_tokens > self.max_context_tokens * 0.6:
            # 压缩前半部分
            midpoint = len(self.conversation_history) // 2
            old_part = self.conversation_history[:midpoint]
            recent_part = self.conversation_history[midpoint:]

            summary = self._summarize_messages(old_part)

            return [
                {"role": "user", "content": f"[早期对话摘要]\n{summary}"},
                {"role": "assistant", "content": "我已了解早期对话的要点。"},
                *recent_part
            ]

        return self.conversation_history

    def save_to_long_term(self, key: str, content: str, metadata: dict = None):
        """显式存储到长期记忆"""
        self.long_term_db.upsert(
            id=key,
            text=content,
            metadata=metadata or {}
        )

    def update_working_memory(self, key: str, value):
        """更新工作记忆"""
        self.working_memory[key] = {
            "value": value,
            "updated_at": datetime.now().isoformat()
        }

与Web3/DeFi的关联

1. DeFi Agent的编排挑战

DeFi Agent与普通Agent相比,面临独特的编排挑战:

挑战原因编排解决方案
交易不可逆链上交易确认后无法撤回Plan-and-Execute + 确认环节
价格波动计划执行期间价格可能变化ReAct(快速响应)+ 滑点保护
Gas费不确定Gas价格随网络拥堵变化动态Gas估算 + 重试机制
MEV风险三明治攻击、抢跑私有交易池 + 交易模拟
多链操作跨链交易的复杂性Plan-and-Execute + 状态跟踪

2. 实际场景:DeFi价格监控+自动交易Agent

class DeFiMonitorAgent:
    """
    监控价格 → 发现机会 → 评估风险 → 执行交易 → 报告结果

    编排模式:事件驱动 + Plan-and-Execute + Reflection
    """

    def __init__(self):
        self.price_monitor = PriceMonitor()     # 事件源
        self.analyzer = MarketAnalyzer()         # Sonnet分析
        self.executor = TradeExecutor()          # Haiku执行
        self.risk_engine = RiskEngine()          # 规则引擎
        self.reflector = StrategyReflector()     # Opus反思

    async def monitor_loop(self):
        """主监控循环 — 事件驱动"""
        async for price_event in self.price_monitor.stream():
            # 快速过滤(规则引擎,不用LLM)
            if not self.risk_engine.should_analyze(price_event):
                continue

            # Phase 1: 分析机会 (Sonnet)
            opportunity = await self.analyzer.analyze(price_event)
            if not opportunity.is_profitable:
                continue

            # Phase 2: 生成交易计划 (Plan-and-Execute)
            plan = await self.create_trade_plan(opportunity)

            # Phase 3: 风险检查 (规则引擎 + LLM双重检查)
            risk_ok = self.risk_engine.validate(plan)
            if not risk_ok:
                await self.log("风险检查未通过", plan)
                continue

            # Phase 4: 执行 (Haiku + 工具调用)
            result = await self.executor.execute(plan)

            # Phase 5: 反思 (Opus,低频执行)
            if self.should_reflect():
                reflection = await self.reflector.review_recent_trades()
                await self.apply_reflection(reflection)

    async def create_trade_plan(self, opportunity) -> TradePlan:
        """Plan-and-Execute: 生成交易计划"""
        return TradePlan(
            steps=[
                # Step 1: 确认当前价格(可能已经变化)
                TradeStep("verify_price", {
                    "token": opportunity.token,
                    "expected_price": opportunity.price,
                    "max_deviation": 0.5  # 允许0.5%偏差
                }),
                # Step 2: 检查余额
                TradeStep("check_balance", {
                    "token": opportunity.base_token,
                    "min_required": opportunity.trade_amount
                }),
                # Step 3: 模拟交易
                TradeStep("simulate_swap", {
                    "from": opportunity.base_token,
                    "to": opportunity.token,
                    "amount": opportunity.trade_amount,
                }),
                # Step 4: 执行(带MEV保护)
                TradeStep("execute_swap", {
                    "use_private_pool": True,  # Flashbots保护
                    "max_slippage": 1.0,
                    "deadline_seconds": 120,
                }),
                # Step 5: 验证结果
                TradeStep("verify_execution", {}),
            ],
            risk_limits={
                "max_trade_value_usd": 10000,
                "max_gas_usd": 50,
                "min_profit_after_gas_usd": 20,
            }
        )

    def should_reflect(self) -> bool:
        """控制反思频率(避免过度消耗Opus token)"""
        # 每100笔交易或每天反思一次
        return (self.trade_count % 100 == 0 or
                self.hours_since_last_reflection() > 24)

3. Web3 Agent的安全编排原则

Web3 Agent安全编排的7条原则:

1. 永远先模拟再执行
   ├── 使用Tenderly/Alchemy Simulate API
   └── 模拟失败 → 绝不执行

2. 金额分级授权
   ├── < $100: 自动执行
   ├── $100 - $10,000: 需要确认
   └── > $10,000: 需要多签

3. 最小权限原则
   ├── 只授权必要的token allowance
   ├── 使用permit2而非无限approve
   └── Session key设置过期时间

4. 失败时保守处理
   ├── 不确定时 → 不执行
   ├── 网络异常时 → 等待而非重试
   └── 价格异常时 → 暂停策略

5. 审计轨迹
   ├── 记录所有决策过程
   ├── 记录所有工具调用和返回
   └── 可追溯每笔交易的决策原因

6. 熔断机制
   ├── 单日亏损超限 → 停止交易
   ├── 连续失败N次 → 暂停并告警
   └── 异常模式检测 → 人工介入

7. 链上验证优先
   ├── 不信任链下数据
   ├── 关键数据从链上直接读取
   └── 交易结果通过区块确认验证

面试题准备

Q1: ReAct vs Plan-and-Execute如何选择?

简短回答 (30秒)

ReAct适合简单、少步骤的工具调用场景,因为它响应快且实现简单。Plan-and-Execute适合复杂、多步骤的任务,因为它有全局规划能力和失败恢复机制。实践中通常组合使用:用Router分流,简单任务走ReAct,复杂任务走Plan-and-Execute。

详细回答 (2分钟)

两种模式的核心区别在于是否有显式的规划阶段

ReAct是"走一步看一步"的贪心策略:

  • 优势:延迟低,实现简单,适合实时交互
  • 劣势:容易陷入循环,缺乏全局视角,Token成本随轮次线性增长
  • 适用场景:信息检索、简单查询、1-3步工具调用

Plan-and-Execute是"先看地图再出发"的全局策略:

  • 优势:步骤透明,进度可追踪,失败可恢复,支持复杂依赖关系
  • 劣势:规划本身有额外的Token和延迟开销,计划可能不准确
  • 适用场景:多步骤分析、代码修改(Claude Code就是这个模式)、需要协调多个资源的任务

在生产环境中,我会用Routing + 组合模式:用一个轻量级的Router对请求分类,简单请求走ReAct(用Haiku),复杂请求走Plan-and-Execute(用Sonnet/Opus)。这样既控制了成本,又保证了质量。

以DeFi Agent为例:查价格用ReAct,执行多步交易策略用Plan-and-Execute。

追问准备

  • Q: Plan-and-Execute中如果计划一开始就是错的怎么办? → A: 设计Re-Planning机制。每步执行后评估结果是否符合预期,偏差超过阈值时触发重规划。设置最大重规划次数防止无限循环。Claude Code的做法是在遇到build错误时自动读取错误信息并修正。

  • Q: 如何评估Agent的编排模式是否合适? → A: 看三个指标:任务完成率(准确性)、平均延迟(速度)、Token成本(效率)。如果完成率低可能需要更强的模式(ReAct→Plan-Execute),如果成本高可能需要Router优化。


Q2: 如何设计Agent的状态管理?

简短回答 (30秒)

Agent状态管理分三层:对话历史(当前会话的messages数组)、工作记忆(当前任务的中间状态,结构化存储)、长期记忆(跨会话的知识,用Vector DB存储)。核心挑战是context window限制,通过摘要压缩、滑动窗口、RAG检索三种策略应对。

详细回答 (2分钟)

状态管理是Agent架构中最容易被低估但最影响实际表现的部分。我把它分为三层:

第一层:对话历史。这是最直接的状态,就是messages数组。挑战在于随着对话进行,历史越来越长。应对策略包括:

  • 滑动窗口:只保留最近N轮
  • 摘要压缩:用小模型(Haiku)将旧对话压缩为要点
  • RAG检索:将历史索引到Vector DB,按需检索

第二层:工作记忆。当前任务的中间结果,比如"已经查询到了ETH价格$3,500"。这层用结构化的key-value存储,不占用太多context。Claude Code的CLAUDE.md机制就是一种工作记忆。

第三层:长期记忆。跨会话持久化的知识,比如用户偏好、历史交易记录。用Vector DB存储,每次新会话时根据当前query检索最相关的信息注入context。

实际设计时要注意几个权衡:

  • 记忆太多 → context膨胀,成本高,注意力稀释
  • 记忆太少 → Agent"忘记"关键信息,重复提问
  • 关键是选择性记忆:只存储和检索真正重要的信息

在Web3场景中,状态管理还有个特殊挑战:链上状态是实时变化的。一个持仓信息在5分钟前可能还是对的,现在可能因为价格波动已经不准确了。所以需要给记忆加上时间戳和"新鲜度"概念。

追问准备

  • Q: 如何解决"Lost in the Middle"问题? → A: 将关键信息放在context的开头和结尾,避免中间位置。2025年的模型在这方面有很大改善,但仍然建议重要信息不要埋在长context的中间。另外可以用structured format(JSON/XML)帮助模型定位关键信息。

  • Q: Vector DB选型有什么建议? → A: 轻量级场景用Chroma或Qdrant(开源,本地部署),生产环境用Pinecone或Weaviate(托管服务,可靠性高)。2025-2026年的趋势是在PostgreSQL上用pgvector扩展,对已有PostgreSQL基础设施的团队来说是最低成本选择。


Q3: 如何为Web3场景选择合适的Agent编排模式?

简短回答 (30秒)

Web3 Agent的关键约束是交易不可逆和金融安全性要求。所以核心原则是"先模拟再执行",一定要有Plan-and-Execute中的确认环节。价格查询等简单操作用ReAct(快速),交易执行用Plan-and-Execute(安全),策略优化用Reflection(质量)。

详细回答 (2分钟)

Web3场景对Agent编排有三个特殊要求:

第一,安全性优先于速度。传统Web2 Agent出错了可以回滚,但链上交易是不可逆的。所以编排中必须有明确的"模拟→确认→执行"流程,这天然适合Plan-and-Execute模式。

第二,实时性要求高。DeFi市场价格秒级变化,套利窗口可能只有几个区块(十几秒)。这意味着读取和分析环节要用ReAct+Haiku(最快),但执行环节不能省略安全检查。

第三,多链多协议的复杂性。一个跨链交易可能涉及:检查源链余额→发起跨链转账→等待确认→在目标链执行swap。这是典型的Plan-and-Execute场景,且需要处理异步等待和超时。

我的推荐架构是分层编排

  • 入口层:Router(分类用户意图)
  • 查询层:ReAct + Haiku(快速工具调用)
  • 执行层:Plan-and-Execute + Sonnet(安全的多步执行)
  • 策略层:Reflection + Opus(定期审查交易策略)
  • 安全层:规则引擎(不用LLM,确定性的风控规则)

追问准备

  • Q: 如果Agent在交易执行到一半时遇到网络问题怎么办? → A: 这是Plan-and-Execute的Re-Planning场景。设计"交易状态机"(pending→simulated→submitted→confirmed/failed),每个状态有对应的恢复策略。submitted但未confirmed时需要监控链上状态,而不是盲目重试(可能导致double spend)。

Q4: 如何控制Agent的Token成本?

简短回答 (30秒)

三管齐下:Model Routing将简单任务路由到便宜模型(Haiku),Observation压缩减少context膨胀,以及合理的缓存策略避免重复查询。实践中可以做到降低40-60%的成本而不牺牲质量。

详细回答 (2分钟)

Token成本优化是Agent从原型到生产的关键环节。主要策略有五个:

  1. Model Routing:70%的请求是简单的,用Haiku(成本是Opus的1/60)处理。通过快速分类器判断复杂度,只在必要时使用大模型。

  2. Observation压缩:工具返回的结果往往很冗长(比如完整的API response),在传回模型前先提取关键信息。可以用正则或小模型做压缩。

  3. 缓存:相同或相似查询的结果缓存起来。价格数据可以缓存30秒,协议元数据可以缓存24小时。用semantic cache(基于embedding相似度)处理措辞不同但含义相同的查询。

  4. 提前终止:如果Agent在3轮后仍未取得进展,果断终止而不是继续消耗token。设置max_iterations和cost_budget两个上限。

  5. Batch Processing:将多个独立请求合并处理,利用并行tool calls减少交互轮次。


知识点总结

关键要点

1. ReAct是最基础的Agent模式,适合简单工具调用
   → Claude的tool_use就是ReAct的工程实现

2. Plan-and-Execute是复杂任务的首选
   → Claude Code是最成功的实现之一
   → Re-Planning能力是核心差异化

3. Reflection提高输出质量但成本高
   → Constitutional AI将反思制度化
   → Extended Thinking是内置的Reflection

4. 生产环境中组合使用 + Routing优化
   → 没有银弹,根据场景选择
   → 成本优化是从原型到生产的关键

5. Web3场景有特殊要求
   → 交易不可逆 → 必须有确认环节
   → 实时性 → 查询快、执行稳
   → 安全性 → 规则引擎 + LLM双重检查

术语表

术语英文定义
编排Orchestration协调Agent的推理、工具调用、状态管理的整体架构
推理-行动循环ReAct LoopThought→Action→Observation的迭代模式
规划-执行Plan-and-Execute先生成计划、再逐步执行、失败时重规划的模式
反思ReflectionAgent审查自身输出并迭代改进
模型路由Model Routing根据任务复杂度选择不同模型
级联Cascade从小模型开始,不够时升级到大模型
滑动窗口Sliding Window只保留最近N轮对话的上下文管理策略
工作记忆Working Memory当前任务的临时状态存储
工具描述Tool Description告诉LLM工具功能和使用方法的结构化说明
扩展思维Extended ThinkingClaude在输出前的显式推理过程

参考资源

论文

  • Yao et al. "ReAct: Synergizing Reasoning and Acting in Language Models" (2022, ICLR 2023)
  • Shinn et al. "Reflexion: Language Agents with Verbal Reinforcement Learning" (2023, NeurIPS 2023)
  • Wang et al. "Plan-and-Solve Prompting" (2023, ACL)
  • Bai et al. "Constitutional AI: Harmlessness from AI Feedback" (2022, Anthropic)

工程实践

2025-2026行业趋势

  • Anthropic "Building effective agents" 博文 (2024.12): 定义了5种Agent设计模式
  • Claude Code成为Agent编排的标杆实现 (2025)
  • Model Context Protocol (MCP) 标准化Agent工具接入 (2024-2025)
  • Multi-Agent框架涌现: CrewAI, AutoGen, LangGraph (2025-2026)

明日预告

Day 225: Multi-Agent系统设计

核心内容

  • Multi-Agent vs Single Agent的trade-off
  • Agent间通信协议设计(直接调用/消息队列/黑板模式)
  • 角色分工模式(Manager-Worker/Peer-to-Peer/Hierarchical)
  • CrewAI/AutoGen/LangGraph框架对比
  • Multi-Agent在Web3中的应用:DAO自动治理Agent集群
  • 面试题:如何设计一个Multi-Agent系统来管理DeFi投资组合?

今日收获: 深入理解了ReAct/Plan-and-Execute/Reflection三大Agent编排模式的原理、实现和适用场景。核心认知是:没有万能的编排模式,生产环境中需要通过Routing组合使用,且Web3场景的不可逆性要求格外重视安全编排。Claude Code作为Plan-and-Execute的标杆实现,值得深入学习其设计哲学。