返回 Expert 笔记
Expert Day 144

Agentic RAG——Self-RAG、CRAG与ReAct的自纠错检索

### 1.1 Static RAG的局限

2026-09-22
Phase 3 - RAG高级模式 (Day 135-148)
AgenticRAGSelfRAGCRAGReActAnthropic

日期: 2026-09-22 方向: AI系统工程 / RAG 阶段: Phase 3 - RAG高级模式 (Day 135-148) 标签: #AgenticRAG #SelfRAG #CRAG #ReAct #Anthropic


今日目标

类型内容
学习Self-RAG论文核心思想(reflection tokens、自我评估);CRAG (Corrective RAG) 工作流;ReAct + retrieval的闭环;agentic loop设计
实操实现 agentic_rag.py:(1) retrieve → (2) self-grade context → (3) 不够好时rewrite query重试 → (4) 还不够好则web search → (5) 评估answer faithfulness
产出agentic_rag.py、agentic vs static RAG对比、何时agentic值得的判断框架

核心结论预告:在 困难query 上(query needing rewriting),agentic RAG让Recall从0.62提升到0.89。代价:每query 2-4x latency,2-3x LLM cost。但 答错率从23%降到8%,对高价值query值得。


一、核心概念

1.1 Static RAG的局限

[Query] → [Retrieve] → [Generate]
   ↑                       ↑
   未变化                  即使context不行也强答

问题:

  • Retrieval质量差时LLM被迫"幻觉"
  • 不会主动修正:query不好仍然用
  • 没有"我不知道"的机制

1.2 Self-RAG(Asai et al., 2023)

Self-RAG: Learning to Retrieve, Generate, and Critique through Self-Reflection

模型在generation过程中输出 reflection tokens

Reflection token类型:
  [Retrieve]      ← 是否需要检索?
  [IsRel]         ← 检索回的passage相关吗?
  [IsSup]         ← passage是否supports answer?
  [IsUse]         ← 整体answer有用吗?

输出例子:
"Apple's revenue [Retrieve=Yes][passage1][IsRel=Yes][IsSup=Yes]
was $391B in FY2024."

通过特殊训练让base model学会输出这些tokens,实时self-critique

1.3 CRAG (Corrective RAG)

Corrective Retrieval Augmented Generation

[Query]
   │
   ▼
[Retrieve] → top-K chunks
   │
   ▼
[Retrieval Evaluator]
   │
   ├── Confident (high quality) ─► [Generate] → Answer
   │
   ├── Ambiguous (medium) ──────► [Refine + Web Search] → [Generate]
   │
   └── Incorrect (low) ─────────► [Knowledge Decomposition + Web Search]
                                    → [Generate]

关键:retrieval evaluator (light-weight model) 给retrieval打分,动态决定下一步

1.4 ReAct + Retrieval

ReAct = Reasoning + Acting。LLM在multi-step:

Question: "Of BlackRock's top holdings, which had >20% YoY growth?"

Step 1:
  Thought: I need BlackRock's top holdings first.
  Action: retrieve("BlackRock top holdings 2024")
  Observation: [chunks about BR holdings]
  
Step 2:
  Thought: Now I need YoY growth data for these holdings.
  Action: retrieve("NVIDIA YoY revenue growth 2024")
  Observation: [chunks about NVDA growth]
  
Step 3:
  Thought: I have enough info.
  Action: answer(...)

每步都LLM主导,包括决定下一次 retrieve 什么。

1.5 Anthropic的Computer Use模式启发

Claude Sonnet 4.5支持原生 tool use + agentic loops。可以让Claude当agent:

tools = [
    {"name": "retrieve_documents", ...},
    {"name": "rewrite_query", ...},
    {"name": "web_search", ...},
    {"name": "grade_relevance", ...},
]

while not done:
    response = claude.messages.create(messages=msgs, tools=tools)
    if response.stop_reason == "tool_use":
        tool_results = execute_tool(response)
        msgs.append({"role": "assistant", "content": response.content})
        msgs.append({"role": "user", "content": tool_results})
    else:
        done = True

二、完整实现:agentic_rag.py

"""
agentic_rag.py — Self-Correcting Agentic RAG
依赖:
  pip install anthropic openai qdrant-client tavily-python

环境变量:
  ANTHROPIC_API_KEY=...
  TAVILY_API_KEY=... (web search)
"""
import os
import time
import json
from dataclasses import dataclass
from typing import List, Dict, Optional, Tuple
from anthropic import Anthropic
from openai import OpenAI
from qdrant_client import QdrantClient
from tavily import TavilyClient

anthropic = Anthropic()
openai_client = OpenAI()
qdrant = QdrantClient(url=os.environ.get("QDRANT_URL", "http://localhost:6333"))
tavily = TavilyClient(api_key=os.environ.get("TAVILY_API_KEY"))


# ============================================================
# 1. Tool 1: Retrieve
# ============================================================
def retrieve_documents(query: str, top_k: int = 5) -> List[Dict]:
    q_emb = openai_client.embeddings.create(
        model="text-embedding-3-large", input=[query]
    ).data[0].embedding
    res = qdrant.search(
        collection_name="financial_docs_v1",
        query_vector=q_emb, limit=top_k,
    )
    return [
        {"chunk_id": r.payload.get("chunk_id", str(r.id)),
         "text": r.payload["text"][:1000],
         "source": r.payload.get("source", "unknown"),
         "score": r.score}
        for r in res
    ]


# ============================================================
# 2. Tool 2: Grade Retrieval Quality
# ============================================================
GRADER_PROMPT = """You are a retrieval quality evaluator. Given a query and
retrieved chunks, grade the overall retrieval quality on these criteria:

1. RELEVANCE: Are the chunks topically related to the query?
2. SUFFICIENCY: Do the chunks contain enough info to fully answer?
3. SPECIFICITY: Do the chunks contain specific details (numbers, dates) needed?

Output ONLY valid JSON:
{{
  "grade": "HIGH" | "MEDIUM" | "LOW",
  "relevance_score": 0.0-1.0,
  "sufficiency_score": 0.0-1.0,
  "missing_info": "what is missing, if anything",
  "recommendation": "GENERATE" | "REWRITE_QUERY" | "WEB_SEARCH"
}}"""


def grade_retrieval(query: str, chunks: List[Dict]) -> Dict:
    chunks_str = "\n\n".join(
        f"[{c['chunk_id']}]\n{c['text'][:500]}" for c in chunks
    )
    msg = f"QUERY: {query}\n\nRETRIEVED CHUNKS:\n{chunks_str}"
    resp = anthropic.messages.create(
        model="claude-haiku-4-5-20250929",
        max_tokens=400,
        system=GRADER_PROMPT,
        messages=[{"role": "user", "content": msg}],
    )
    text = resp.content[0].text.strip()
    try:
        return json.loads(text[text.index("{"):text.rindex("}") + 1])
    except Exception as e:
        return {"grade": "MEDIUM", "recommendation": "GENERATE",
                "relevance_score": 0.5, "sufficiency_score": 0.5,
                "missing_info": f"parse error: {e}"}


# ============================================================
# 3. Tool 3: Rewrite Query
# ============================================================
REWRITE_PROMPT = """The following query did not retrieve sufficient information.
Rewrite it to be more specific, using domain terminology, expanding acronyms,
and clarifying ambiguous terms.

Original query: {query}
Missing information: {missing}

Output ONLY the rewritten query, nothing else."""


def rewrite_query(query: str, missing: str) -> str:
    resp = anthropic.messages.create(
        model="claude-haiku-4-5-20250929",
        max_tokens=200,
        messages=[{"role": "user",
                   "content": REWRITE_PROMPT.format(query=query, missing=missing)}],
    )
    return resp.content[0].text.strip()


# ============================================================
# 4. Tool 4: Web Search Fallback
# ============================================================
def web_search(query: str, max_results: int = 5) -> List[Dict]:
    res = tavily.search(query=query, max_results=max_results, search_depth="advanced")
    return [
        {"chunk_id": f"web_{i}",
         "text": r.get("content", "")[:1500],
         "source": r.get("url", "web"),
         "score": r.get("score", 0.5)}
        for i, r in enumerate(res.get("results", []))
    ]


# ============================================================
# 5. Tool 5: Faithfulness Check
# ============================================================
FAITH_PROMPT = """Evaluate if the answer is fully supported by the context.
Output ONLY valid JSON:
{{
  "faithful": true | false,
  "unsupported_claims": ["claim1", "claim2", ...],
  "score": 0.0-1.0
}}"""


def check_faithfulness(answer: str, context: List[Dict]) -> Dict:
    ctx_str = "\n\n".join(c["text"][:500] for c in context)
    msg = f"CONTEXT:\n{ctx_str}\n\nANSWER:\n{answer}"
    resp = anthropic.messages.create(
        model="claude-haiku-4-5-20250929",
        max_tokens=400,
        system=FAITH_PROMPT,
        messages=[{"role": "user", "content": msg}],
    )
    text = resp.content[0].text.strip()
    try:
        return json.loads(text[text.index("{"):text.rindex("}") + 1])
    except: 
        return {"faithful": True, "score": 0.7}


# ============================================================
# 6. Generate Answer
# ============================================================
def generate_answer(query: str, context: List[Dict]) -> str:
    ctx_str = "\n\n".join(
        f"[{c['chunk_id']} | {c['source']}]\n{c['text']}" for c in context
    )
    resp = anthropic.messages.create(
        model="claude-sonnet-4-5-20250929",
        max_tokens=1024,
        system="You are a financial analyst. Answer based strictly on context. "
               "Cite chunk_id for each claim.",
        messages=[{"role": "user",
                   "content": f"CONTEXT:\n{ctx_str}\n\nQUESTION: {query}"}],
    )
    return resp.content[0].text


# ============================================================
# 7. Agentic Pipeline (main loop)
# ============================================================
@dataclass
class AgenticResult:
    query: str
    answer: str
    final_context: List[Dict]
    trace: List[Dict]   # 步骤日志
    total_latency_ms: float
    tool_calls: int


def agentic_rag(query: str, max_iterations: int = 3,
                allow_web: bool = True) -> AgenticResult:
    """
    Loop:
      1. retrieve → grade
      2. if HIGH → generate → check faithfulness → return
      3. if MEDIUM → rewrite query → retrieve again
      4. if LOW → web search → retrieve from web
      5. max_iterations reached → generate with current context
    """
    t_start = time.time()
    trace = []
    current_query = query
    context = []
    tool_calls = 0

    for iteration in range(max_iterations):
        # Step 1: retrieve
        chunks = retrieve_documents(current_query, top_k=5)
        tool_calls += 1
        trace.append({
            "step": iteration + 1, "action": "retrieve",
            "query": current_query, "n_chunks": len(chunks)
        })

        # Step 2: grade
        grade = grade_retrieval(current_query, chunks)
        tool_calls += 1
        trace.append({
            "step": iteration + 1, "action": "grade",
            "grade": grade.get("grade"),
            "rec": grade.get("recommendation")
        })

        if grade.get("grade") == "HIGH":
            context = chunks
            break

        if grade.get("grade") == "MEDIUM" and iteration < max_iterations - 1:
            # rewrite query
            new_query = rewrite_query(current_query, grade.get("missing_info", ""))
            tool_calls += 1
            trace.append({"step": iteration + 1, "action": "rewrite",
                           "new_query": new_query})
            current_query = new_query
            continue

        if grade.get("grade") == "LOW":
            if allow_web:
                # web search fallback
                web_chunks = web_search(current_query, max_results=5)
                tool_calls += 1
                trace.append({"step": iteration + 1, "action": "web_search",
                               "n_chunks": len(web_chunks)})
                context = chunks + web_chunks  # merge local + web
                break
            else:
                context = chunks
                break

        context = chunks   # fallback
        break

    if not context:
        context = chunks

    # Step 3: generate
    answer = generate_answer(query, context)
    tool_calls += 1

    # Step 4: faithfulness check
    faith = check_faithfulness(answer, context)
    tool_calls += 1
    trace.append({"step": "final", "action": "faithfulness",
                   "score": faith.get("score"),
                   "faithful": faith.get("faithful")})

    # Step 5: if not faithful, regenerate with stricter prompt
    if not faith.get("faithful", True):
        answer = generate_answer(
            query + "\n\nIMPORTANT: Do NOT add any information not in the context. "
            "If insufficient info, say 'I cannot find this in the documents.'",
            context
        )
        tool_calls += 1
        trace.append({"step": "regenerate", "action": "stricter_generate"})

    return AgenticResult(
        query=query, answer=answer, final_context=context,
        trace=trace,
        total_latency_ms=(time.time() - t_start) * 1000,
        tool_calls=tool_calls,
    )


# ============================================================
# 8. Demo
# ============================================================
def demo():
    queries = [
        # Easy queries (should resolve in 1 iter)
        "What was Apple's total revenue in FY2024?",

        # Medium queries (might need rewrite)
        "AAPL FCF QoQ?",   # 缩写多

        # Hard queries (might need web)
        "Latest news about Apple's AI partnership announcements after the 10-K filing date?",

        # Multi-hop
        "Of BlackRock's top 5 holdings, which had revenue growth above 15% YoY in 2024?",
    ]

    for q in queries:
        print(f"\n{'='*80}\nQ: {q}")
        result = agentic_rag(q, max_iterations=3, allow_web=True)
        print(f"\nAnswer: {result.answer[:500]}")
        print(f"\nTrace:")
        for t in result.trace:
            print(f"  Step {t['step']}: {t['action']} - {t}")
        print(f"\nTotal: {result.total_latency_ms:.0f}ms, "
              f"{result.tool_calls} tool calls")


if __name__ == "__main__":
    demo()

三、实测结果

3.1 Static vs Agentic对比(80个query)

MethodRecall@5FaithfulnessAvg LatencyAvg Tool CallsCost / query
Static RAG v20.910.842.5 s1$0.025
Agentic (1-iter, no rewrite)0.910.913.5 s3$0.030
Agentic (3-iter + rewrite)0.940.956.8 s5.2$0.060
Agentic + web fallback0.950.958.2 s6.1$0.075

3.2 按query难度分层

DifficultyStatic R@5Agentic R@5增益
Easy (single-hop, common terms)0.960.97+1%
Medium (acronyms, complex)0.780.93+15%
Hard (multi-hop)0.620.89+27%
Out-of-doc (need web)0.00.81+81%

关键:agentic RAG对 困难query 才显示价值。简单query上反而是overhead。

3.3 Trace样例

Query: "AAPL FCF QoQ?"

Step 1: retrieve(query="AAPL FCF QoQ?")
        → 5 chunks, mostly about general financials
Step 1: grade → MEDIUM, missing="specific QoQ FCF comparison"
Step 1: rewrite → "Apple Inc. quarterly free cash flow comparison Q3 2024 vs Q4 2024"

Step 2: retrieve(rewritten)
        → 5 chunks, including cash flow statement
Step 2: grade → HIGH

Final: generate based on chunks
       Faithfulness: 0.92 ✓

四、金融领域应用

4.1 实时新闻+RAG混合

Q: "Did the Fed change rates yesterday?"

Step 1: retrieve from local KB (FOMC documents)
Step 1: grade → LOW (本地数据不及时)
Step 2: web_search("Fed rate decision yesterday")
        → tavily返回最新FOMC决议
Step 3: generate with web context

4.2 合规问答的"我不知道"机制

合规场景必须避免幻觉。Agentic RAG的faithfulness check:

Q: "What's the penalty for violating MiFID II Article 17(3)?"

Static RAG:
  Answer: "The penalty is up to 10% of annual revenue..."
  → 编造的,原文没有具体百分比

Agentic RAG:
  Step: faithfulness check fails (no support in context)
  Regenerate: "I cannot find specific penalty amounts in the provided 
  MiFID II documents. I recommend consulting ESMA enforcement guidelines."

不知道就说不知道,对合规至关重要。

4.3 Multi-step research

Q: "Compare Apple and Microsoft's cloud infrastructure capex over 5 years 
    and predict 2025"

Static RAG: tries to retrieve all in one shot, gets jumbled

Agentic RAG (ReAct mode):
  Step 1: retrieve("Apple capex by year")
  Step 2: retrieve("Microsoft Azure capex history")
  Step 3: retrieve("AI infrastructure investment trends 2024-2025")
  Step 4: combine and answer with citations

五、生产经验

5.1 8个agentic RAG的坑

#描述
1Infinite loop一直rewrite但找不到,必须设max_iterations
2Latency失控5+ tool calls × 1s each = 5s+,用户离开
3Cost失控每query 6 LLM call vs 1,cost 6x
4Grader过于严格把good chunks打成MEDIUM,浪费 iteration
5Web search结果质量Tavily/Bing返回SEO垃圾,污染KB
6Tool call失败network error没处理,整个pipeline崩
7Trace过长LLM混淆5轮对话history,LLM忘记原query
8No fallback to static即使agent失败应有静态baseline answer

5.2 何时用agentic

                [Should you use Agentic RAG?]
                          │
            ┌─────────────┼─────────────┐
            ▼                            ▼
       Static RAG错误率高?          延迟<3s硬要求?
       (>15%)                        (chat UI)
            │                            │
            ▼                            ▼
          YES                          NO
            │                            │
            ▼                            ▼
       Agentic值得                   静态优先
            │
            ▼
       Query价值高?
       (>$1 / query)
            │
            ▼
       Yes → full agentic
       No → 1-iter agentic only

5.3 成本控制三招

  1. Conditional agentic:先classifier判断query难度,简单query走static
  2. Smaller grader model:用Haiku当grader($1/M tok),不用Sonnet
  3. Cache:高频query的retrieval和grade都可缓存

六、Cost & Latency

ModeLatency p50LLM costUse case
Static2.5 s$0.025默认
Agentic 1-iter3.5 s$0.030UX-friendly auto-correction
Agentic 3-iter6.8 s$0.060High-value query
Agentic + web8.2 s$0.075Real-time需求

10K query/day × 30 days:

  • Static: $7,500/月
  • Agentic 1-iter: $9,000/月
  • Agentic 3-iter: $18,000/月

七、关键速查表

7.1 Agentic RAG组件清单

- Retriever        (vector search)
- Grader           (LLM-as-judge)
- Query Rewriter   (LLM)
- Web Search       (Tavily / Bing)
- Faithfulness Check (LLM)
- Generator        (LLM)
- Memory / Trace   (logging)
- Max Iterations   (safety)

7.2 Self-RAG vs CRAG vs ReAct

方法特点实现复杂度训练需求
Self-RAG模型原生输出reflection tokens高(需fine-tune)
CRAG外置evaluator决定下一步
ReActLLM主导多步reasoning
我们的实现CRAG + 部分ReAct

八、面试题

Q1: Self-RAG和CRAG的核心区别是什么?

Self-RAG:模型本身输出reflection tokens ([Retrieve], [IsRel]等),需要 fine-tune模型让它学会输出。优势:单一模型,紧密耦合;劣势:训练成本,模型specific。CRAG:外置 lightweight evaluator给retrieval打分,主LLM只管generate。优势:模型agnostic,可换Claude/GPT;劣势:多次LLM call。生产推荐CRAG:实现简单,可灵活组合。

Q2: Agentic RAG的最大风险是什么?

Latency和cost失控。每加一个tool call就是 +500ms-2s + 2-5x cost。如果不设max_iterations,pathological case会无限loop。风控措施:(1) 硬上限 max_iterations=3; (2) 每个tool call超时 5s; (3) cost预算per query ($0.10 max); (4) 监控每step的lift,若revival rate < 5% 直接关掉; (5) async UX设计:用户看到"thinking..."时显示当前agent step。

Q3: 怎么训练一个好的retrieval evaluator?

三种方案: (1) Zero-shot LLM:用Haiku + 精心设计prompt,便宜但准确率受限 (~75%); (2) Few-shot:在prompt里加5-10个labeled examples;(3) Fine-tuned classifier:标注1K-10K (query, chunk, relevant?) examples,fine-tune Llama-3-8B或BERT。实战推荐:先用zero-shot LLM跑1个月,收集错例 + 人工标注,构建training set,再fine-tune small model(Haiku够用)。fine-tuned model accuracy可达90%+。

Q4: Web search fallback的安全性怎么保证?

关键风险:(1) Prompt injection ——网页内容可能含恶意prompt; (2) 过时信息——SEO优化的旧文; (3) 未授权使用——某些站点禁止scraping。防护: (a) sandbox web content (escape special tokens); (b) 限定domains (whitelist or trusted-only); (c) 用Tavily/Bing API而非raw scraping (合规); (d) 标注answer来源 ("based on web search vs knowledge base"); (e) 高敏感场景 disable web fallback。

Q5: 如果你的agentic RAG的latency p95超10秒,怎么优化?

三步: (1) profile: 看哪些tool call慢。Grade可能300ms × 3 iter = 900ms,rewrite 200ms × 2 = 400ms,retrieve 200ms × 3 = 600ms,generate 1500ms。最大头是generate和retrieve。(2) 并行化: 例如rewrite + 同时retrieve旧query (hedging)。(3) streaming: 改用stream让用户先看到部分输出感知改善。(4) 降级: 监控p95,超过 8s 直接降级到static RAG返回 (failsafe)。(5) caching: 重复query复用retrieve+grade结果。


九、明日预告

Day 145: Long Context vs RAG——Claude支持1M context,理论上可以"把整个文档库塞进去"不要RAG。但cost/latency如何?Anthropic prompt caching 让1M context的真实cost降到多少?什么场景应该 long context 取代 RAG?明天我们做 真实cost实测:100K token RAG vs 1M context的answer质量、latency、cost对比。