返回 Expert 笔记
Expert Day 135

RAG基础架构——从零搭建第一个生产级RAG

RAG完整流水线:loading → chunking → embedding → indexing → retrieval → augmentation → generation;向量空间的几何直觉;为什么RAG解决了LLM的"知识截断+幻觉+长上下文成本"三大问题

2026-09-13
Phase 3 - RAG高级模式 (Day 135-148)
RAG向量检索EmbeddingVectorDBAnthropic

日期: 2026-09-13 方向: AI系统工程 / RAG 阶段: Phase 3 - RAG高级模式 (Day 135-148) 标签: #RAG #向量检索 #Embedding #VectorDB #Anthropic


今日目标

类型内容
学习RAG完整流水线:loading → chunking → embedding → indexing → retrieval → augmentation → generation;向量空间的几何直觉;为什么RAG解决了LLM的"知识截断+幻觉+长上下文成本"三大问题
实操从零搭建rag_v1.py:处理Apple 10-K (FY2024) PDF,使用OpenAI text-embedding-3-large + Chroma本地存储 + Claude Sonnet 4.5生成;提供Pinecone可选切换
产出rag_v1.py 单文件 (~250行)、第一组金融问答benchmark(8个query × 3个金融文档)、cost记录表

核心洞察:RAG不是"魔法"。它是用 "语义搜索 + Prompt工程" 给LLM做了一层"外置知识库 + 上下文注入"。理解RAG的关键是把它拆成6个独立的工程问题,每一步都有trade-off。


一、核心概念:RAG的工程拆解

1.1 为什么需要RAG?

LLM 有三个本质局限:

问题表现RAG解
知识截断Claude Opus训练数据截至2026-01;最新10-K问不出来用最新文档动态注入
幻觉模型对没见过的事实会"自信地编造"强制模型只基于检索到的证据回答
长上下文成本Claude 1M context送进去要$15/M input token每次只送10个最相关chunk而非整本书

金融场景的特殊性

  • 10-K年报300+页、每年更新一次
  • 监管法规如MiFID II、Reg ATS动辄上万页
  • 客户合规KYC文档每个客户一份
  • 这些都是 "高更新频率 + 高领域专业性 + 强精度要求",是RAG的天然主场

1.2 RAG完整流水线

                    ┌──────────────────────────────────┐
                    │       OFFLINE INDEXING            │
                    └──────────────────────────────────┘

  [Source Documents]
  10-K PDF / 法规 /              ┌─────────────────┐
  research reports     ───────►  │  Document Loader│
  (PDF/HTML/CSV/DB)              │  (LlamaParse,   │
                                 │   Unstructured) │
                                 └────────┬────────┘
                                          ▼
                                 ┌─────────────────┐
                                 │     Chunker     │
                                 │  (recursive,    │
                                 │   semantic)     │
                                 └────────┬────────┘
                                          ▼
                                 ┌─────────────────┐
                                 │   Embedder      │
                                 │  (OpenAI/BGE/   │
                                 │   Voyage)       │
                                 └────────┬────────┘
                                          ▼
                                 ┌─────────────────┐
                                 │   Vector DB     │
                                 │  (Chroma/Pine-  │
                                 │   cone/Qdrant)  │
                                 └─────────────────┘

                    ┌──────────────────────────────────┐
                    │        ONLINE QUERY-TIME          │
                    └──────────────────────────────────┘

  User Question                  ┌─────────────────┐
  "Apple's gross    ───────────► │    Embedder     │
  margin in Q4?"                 │ (same model!)   │
                                 └────────┬────────┘
                                          ▼
                                 ┌─────────────────┐
                                 │   Retriever     │
                                 │  (top-k cosine  │
                                 │   similarity)   │
                                 └────────┬────────┘
                                          ▼
                                 ┌─────────────────┐
                                 │   Augmenter     │
                                 │ (prompt with    │
                                 │  context)       │
                                 └────────┬────────┘
                                          ▼
                                 ┌─────────────────┐
                                 │  Generator      │
                                 │ (Claude/GPT)    │
                                 └────────┬────────┘
                                          ▼
                                    [Final Answer
                                     + Citations]

1.3 向量空间的几何直觉

Embedding = 把文本投射到 d 维实数空间(OpenAI text-embedding-3-large 是 3072 维,BGE-large-en 是 1024 维)。

  • 欧几里得距离||a - b|| ——不常用,因为维度高时所有距离都"接近"
  • 余弦相似度cos(θ) = (a · b) / (||a|| · ||b||) ——主流选择,对模长不敏感
  • 点积a · b ——当embedding已normalized到单位球面,点积 = 余弦相似度

关键性质:好的embedding使语义相似的文本在向量空间中距离近。 "Apple revenue in Q4 2024" 和 "iPhone maker quarterly sales last quarter" 应该在向量空间中接近。

1.4 检索的两个维度

              Recall (找全)
                 ▲
     k=20        │        k=20+rerank
       •         │           •
                 │
       •         │           •
     k=5         │         k=5+rerank
                 │
   ───────────────────────► Precision (找准)
                 │
  • k小(如3):精度高但可能错过关键chunk
  • k大(如20):召回好但噪音多,prompt也变长(贵)
  • rerank:召回阶段k=50,rerank保留top-5,是工业最佳实践(Day 139详解)

二、最小RAG实现:rag_v1.py

2.1 项目结构

rag_v1/
├── rag_v1.py              # 主程序
├── data/
│   ├── apple_10k_2024.pdf
│   ├── tesla_10k_2024.pdf
│   └── jpmorgan_2024_annual.pdf
├── .env                   # OPENAI_API_KEY, ANTHROPIC_API_KEY
├── requirements.txt
└── chroma_db/             # 本地持久化向量库

2.2 完整代码

"""
rag_v1.py — Minimum Viable RAG for Financial Documents
依赖:
  pip install anthropic openai chromadb pypdf tiktoken python-dotenv

环境变量:
  ANTHROPIC_API_KEY=sk-ant-...
  OPENAI_API_KEY=sk-...
"""
import os
import time
import hashlib
from dataclasses import dataclass
from typing import List, Dict, Optional
from pathlib import Path

import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI
from anthropic import Anthropic
from pypdf import PdfReader
import tiktoken
from dotenv import load_dotenv

load_dotenv()

# ============================================================
# 1. 配置
# ============================================================
EMBED_MODEL = "text-embedding-3-large"   # 3072维, $0.13/M tokens
EMBED_DIM = 3072
LLM_MODEL = "claude-sonnet-4-5-20250929"  # Anthropic
CHUNK_SIZE = 800              # tokens per chunk
CHUNK_OVERLAP = 100
TOP_K = 5
COLLECTION_NAME = "financial_docs_v1"
PERSIST_DIR = "./chroma_db"

openai_client = OpenAI()
anthropic_client = Anthropic()
encoder = tiktoken.encoding_for_model("gpt-4")  # 用于chunking计数


# ============================================================
# 2. Document Loading
# ============================================================
@dataclass
class RawDoc:
    doc_id: str
    source: str
    text: str
    metadata: Dict


def load_pdf(path: str) -> RawDoc:
    """解析PDF文件,提取所有页面文字。"""
    reader = PdfReader(path)
    pages = []
    for i, page in enumerate(reader.pages):
        txt = page.extract_text() or ""
        pages.append(f"\n[PAGE {i+1}]\n{txt}")
    full_text = "\n".join(pages)

    return RawDoc(
        doc_id=hashlib.md5(path.encode()).hexdigest()[:12],
        source=Path(path).name,
        text=full_text,
        metadata={"path": path, "page_count": len(reader.pages)}
    )


# ============================================================
# 3. Chunking
# ============================================================
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE,
               overlap: int = CHUNK_OVERLAP) -> List[str]:
    """
    Recursive token-based chunking with overlap.
    生产环境推荐LangChain RecursiveCharacterTextSplitter或
    LlamaIndex SentenceSplitter,本demo用最朴素版本。
    """
    tokens = encoder.encode(text)
    chunks = []
    start = 0
    while start < len(tokens):
        end = min(start + chunk_size, len(tokens))
        chunk_tokens = tokens[start:end]
        chunks.append(encoder.decode(chunk_tokens))
        if end == len(tokens):
            break
        start += chunk_size - overlap
    return chunks


# ============================================================
# 4. Embedding
# ============================================================
def embed_batch(texts: List[str], batch_size: int = 100) -> List[List[float]]:
    """批量调用OpenAI Embedding API。"""
    all_embeddings = []
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i + batch_size]
        resp = openai_client.embeddings.create(
            model=EMBED_MODEL,
            input=batch,
        )
        all_embeddings.extend([d.embedding for d in resp.data])
        time.sleep(0.1)  # 避免rate limit
    return all_embeddings


# ============================================================
# 5. Indexing (Chroma)
# ============================================================
def get_collection():
    client = chromadb.PersistentClient(path=PERSIST_DIR)
    return client.get_or_create_collection(
        name=COLLECTION_NAME,
        metadata={"hnsw:space": "cosine"}
    )


def index_document(raw: RawDoc):
    """将一个文档chunk + embed + 写入Chroma。"""
    chunks = chunk_text(raw.text)
    print(f"[INDEX] {raw.source}: {len(chunks)} chunks")

    embeddings = embed_batch(chunks)
    ids = [f"{raw.doc_id}_chunk_{i}" for i in range(len(chunks))]
    metadatas = [
        {
            "doc_id": raw.doc_id,
            "source": raw.source,
            "chunk_index": i,
            **raw.metadata,
        }
        for i in range(len(chunks))
    ]

    coll = get_collection()
    coll.upsert(
        ids=ids,
        documents=chunks,
        embeddings=embeddings,
        metadatas=metadatas,
    )
    return len(chunks)


# ============================================================
# 6. Retrieval
# ============================================================
@dataclass
class RetrievedChunk:
    text: str
    source: str
    chunk_index: int
    distance: float


def retrieve(query: str, top_k: int = TOP_K,
             filter_source: Optional[str] = None) -> List[RetrievedChunk]:
    """语义检索top-k chunks。"""
    q_embed = embed_batch([query])[0]
    coll = get_collection()

    where = {"source": filter_source} if filter_source else None
    results = coll.query(
        query_embeddings=[q_embed],
        n_results=top_k,
        where=where,
    )

    chunks = []
    for i in range(len(results["ids"][0])):
        chunks.append(RetrievedChunk(
            text=results["documents"][0][i],
            source=results["metadatas"][0][i]["source"],
            chunk_index=results["metadatas"][0][i]["chunk_index"],
            distance=results["distances"][0][i],
        ))
    return chunks


# ============================================================
# 7. Generation (Anthropic)
# ============================================================
SYSTEM_PROMPT = """You are a senior financial analyst assistant. Answer questions
strictly based on the CONTEXT provided. If the context does not contain enough
information, say "I cannot find this in the provided documents." Always cite the
source filename and chunk index for every claim.

Format:
ANSWER: <your answer>
CITATIONS: [<source>:<chunk_index>], ...
"""


def generate(query: str, retrieved: List[RetrievedChunk]) -> str:
    context = "\n\n---\n\n".join(
        f"[Source: {c.source} | Chunk {c.chunk_index} | Distance: {c.distance:.3f}]\n{c.text}"
        for c in retrieved
    )
    user_msg = f"CONTEXT:\n{context}\n\nQUESTION: {query}"

    resp = anthropic_client.messages.create(
        model=LLM_MODEL,
        max_tokens=1024,
        system=SYSTEM_PROMPT,
        messages=[{"role": "user", "content": user_msg}],
    )
    return resp.content[0].text


# ============================================================
# 8. End-to-End RAG
# ============================================================
def rag_query(question: str, top_k: int = TOP_K) -> Dict:
    t0 = time.time()
    chunks = retrieve(question, top_k=top_k)
    t1 = time.time()
    answer = generate(question, chunks)
    t2 = time.time()

    return {
        "question": question,
        "answer": answer,
        "retrieved_chunks": [
            {"source": c.source, "chunk_index": c.chunk_index,
             "distance": c.distance, "preview": c.text[:200]}
            for c in chunks
        ],
        "latency": {
            "retrieve_ms": round((t1 - t0) * 1000, 1),
            "generate_ms": round((t2 - t1) * 1000, 1),
            "total_ms": round((t2 - t0) * 1000, 1),
        }
    }


# ============================================================
# 9. Demo
# ============================================================
def main():
    # Step 1: 索引(首次运行执行,之后注释)
    docs_to_index = [
        "data/apple_10k_2024.pdf",
        "data/tesla_10k_2024.pdf",
        "data/jpmorgan_2024_annual.pdf",
    ]
    for path in docs_to_index:
        if Path(path).exists():
            raw = load_pdf(path)
            n = index_document(raw)
            print(f"  → indexed {n} chunks from {raw.source}")

    # Step 2: 测试query集
    test_queries = [
        "What was Apple's total revenue in fiscal year 2024?",
        "Describe Tesla's main risk factors related to autonomous driving.",
        "How does JPMorgan manage interest rate risk?",
        "What is Apple's gross margin trend over the last three years?",
        "Compare Tesla and Apple's R&D spending as a percentage of revenue.",
        "What are JPMorgan's Tier 1 capital ratios?",
        "List Apple's services segment growth drivers.",
        "What regulatory actions has Tesla disclosed?",
    ]

    for q in test_queries:
        print("\n" + "=" * 80)
        print(f"Q: {q}")
        result = rag_query(q)
        print(f"A: {result['answer'][:500]}...")
        print(f"   [retrieve {result['latency']['retrieve_ms']}ms | "
              f"generate {result['latency']['generate_ms']}ms]")


if __name__ == "__main__":
    main()

2.3 跑通效果(实测样例)

Q: What was Apple's total revenue in fiscal year 2024?
A: ANSWER: Apple's total net sales for fiscal year 2024 were $391.0 billion,
   compared with $383.3 billion in fiscal year 2023, representing a 2.0% increase.
   Products revenue was $294.9 billion and Services revenue was $96.2 billion.
CITATIONS: [apple_10k_2024.pdf:42], [apple_10k_2024.pdf:43]
   [retrieve 234ms | generate 1820ms]

三、金融领域应用:10-K特殊性

3.1 10-K结构是高度模板化的

每个10-K都按SEC规定包含:

  • Item 1: Business(业务描述)
  • Item 1A: Risk Factors(风险因素,PM最关心)
  • Item 7: MD&A(管理层讨论分析,财务洞察)
  • Item 8: Financial Statements(三大报表)
  • Item 9A: Internal Controls

优化技巧:在chunking时保留Item标题,metadata里加item_section字段。Query "Apple's risks" 时直接filter item_section = "1A",召回精度提升 ~30%。

3.2 表格抽取的痛

PyPDF对表格的提取效果差。Apple 10-K的损益表会变成:

Net sales:                                                            
   Products $294,866 $298,085                                            
   Services $96,169 $85,200                                              
Total net sales $391,035 $383,285

数字和列名错位、合并单元格丢失。

生产解决方案(Day 146详解)

  • Unstructured.iopartition_pdf(strategy="hi_res")
  • LlamaParse(LlamaIndex的付费PDF服务,专门处理金融表格)
  • AWS Textract + Camelot

3.3 案例:Apple 10-K 节选(Item 1A Risk Factors)

"The Company's products are subject to risks associated with new technologies,
including continued investment in AI-related capabilities. The Company's
business and financial performance could be adversely affected if it fails to
keep pace with rapid technological developments..."

这一段被chunk为800-token block后会和"Risk Factors"标题分开。 修复:用 parent-child chunking(Day 142),让小chunk继承parent的Item标题。


四、生产经验:第一次跑RAG必踩的8个坑

#表现修复
1同一个embedding model在index和query时不一致检索结果完全乱把model_name存到collection metadata里,启动时校验
2chunk边界切断句子"...the gross margin was 4" — 数字断了用RecursiveSplitter,按段落>句子>token层级切
3PDF空格/换行混乱召回的chunk全是乱码索引前用unidecode + re清洗\s+
4embedding API超RPM索引大文档卡死tier-1只有3000 RPM;大文档分批+sleep
5OpenAI 8192 token input limit超长chunk报错chunk_size固定800,加assert
6Chroma本地距离值意义反了越高越相似?还是越低?Chroma用cosine时返回的是 1 - cos_sim,越小越相似
7没有persist重启后向量库空了必须用PersistentClient不是EphemeralClient
8同一文档反复re-index向量库膨胀用确定性ID(doc_id+chunk_index),upsert而不是add

4.1 调试RAG的"三段诊断法"

当RAG回答错误时,按顺序排查:

错误回答
   ↓
[1] 看检索到的chunks是否相关?
    ├── 否 → 检索问题:embedding model/chunking/query rewriting
    └── 是 ↓
[2] 看chunks里是否包含正确答案?
    ├── 否 → 索引问题:原文是否真的有?是否被chunking切碎?
    └── 是 ↓
[3] 看LLM是否正确利用了chunks?
    └── 是生成问题:prompt不清晰/模型能力不足/citation格式错

五、Cost & Latency分析

5.1 成本拆解(以索引一份300页10-K为例)

项目数量单价成本
Embedding (one-time)
300页 × 500 tokens/page = 150K tokens150K$0.13/M$0.0195
Storage
200 chunks × 3072 dim × 4 bytes = 2.5 MBChroma本地: 0$0
Per-query
Query embedding50 tokens$0.13/M$0.0000065
Retrieval (Chroma local)本地$0
LLM generation (Claude Sonnet 4.5)
- Input: 5 chunks × 800 + system + question ≈ 4500 tokens4500$3/M$0.0135
- Output: ~300 tokens300$15/M$0.0045
Total per query~$0.018

5.2 延迟分解(生产环境实测)

阶段延迟说明
Query embedding200-400 msOpenAI API
Vector search (Chroma local)5-30 ms200 chunks时
Vector search (Pinecone us-east)50-150 ms跨网络
LLM generation (TTFT)600-1200 msClaude Sonnet 4.5
LLM generation (full 300 tokens)1500-2500 msstreaming可改善UX
Total p502000-3000 ms
Total p954000-5000 ms

生产优化

  • prompt caching(Anthropic原生支持)把system + 高频context缓存,节省80% input成本
  • streaming显著改善perceived latency
  • 把embedding也cache(同样问题不重复embed)

六、关键速查表

6.1 Embedding Model对比(速览,详细Day 136)

Model维度成本 ($/M tok)MTEB 平均备注
OpenAI text-embedding-3-small1536$0.0262.3性价比首选
OpenAI text-embedding-3-large3072$0.1364.6最常用
Voyage voyage-3-large1024$0.1865.1金融领域强
Cohere embed-english-v3.01024$0.1064.5多语言强
BAAI/bge-large-en-v1.51024自部署64.2开源主力

6.2 Vector DB速查(详细Day 137)

DB部署起步成本latency p50千万级主要场景
Chroma本地/Docker$05-30ms不推荐原型 / <1M vectors
PineconeSaaS$70/mo (s1)50-150ms优秀中小团队,不想运维
Qdrant自部署/SaaS$0 / $2520-80ms优秀性能 + 成本平衡
Weaviate自部署/SaaS$0 / $2530-100ms优秀hybrid search原生
pgvectorPostgres扩展已有PG的话$050-200ms一般已有PG栈
Milvus自部署$020-80ms业界最强10亿+vectors

6.3 Chunking策略速查

策略大小适用
Fixed token256-1024baseline
Recursive char同上主流,LangChain默认
Sentence1-3句精度优先,召回差
Semantic自适应OpenAI Cookbook推荐
Parent-Child父1500/子400高级,Day 142
Auto-Merging动态合并LlamaIndex特色

七、面试题

Q1: 解释RAG的整个工程流水线,并指出每一步最容易出错的地方。

6步:load → chunk → embed → index → retrieve → generate。

  • Load:表格、图片信息丢失(Day 146)
  • Chunk:边界切断语义、metadata丢失
  • Embed:index/query模型必须一致;模型升级时整库需要reindex
  • Index:HNSW参数(M, ef_construction)影响召回,distance metric要对齐
  • Retrieve:top-k太小漏召回,太大噪音多;filter写错全空
  • Generate:prompt不强制只用context会幻觉;context被截断

Q2: 为什么向量空间用余弦相似度而不是欧几里得距离?

高维空间中"距离集中"现象(curse of dimensionality):所有点对的欧氏距离趋于相同,区分度差。余弦相似度只看方向不看模长,对embedding的长度(与文本长度某种关联)不敏感,更稳定。同时如果embedding已normalize到单位球面,cos sim = 1 - 0.5·(欧氏距离)²,二者等价但cos sim数值更直观([-1, 1])。

Q3: RAG vs Fine-tuning,何时选哪个?

RAG:知识更新频繁、需要citation溯源、知识库大、不想训练成本。FT:模型行为/风格/输出格式调整、领域专用术语理解、低延迟(无检索)。最佳实践:FT教模型"怎么说",RAG教模型"说什么"。金融客服bot常FT语气+RAG事实。

Q4: 如何设计一个支持"实时10-K更新"的RAG系统?

  1. SEC EDGAR有RSS feed,监听新10-K到达;2. 自动triggers Lambda → loader → chunker → embedder;3. 关键:每个文档版本带as_of_date metadata;4. Query时默认filter最新版本,但允许用户问"compare 2023 vs 2024"时检索两版;5. 旧版本不删除(合规审计);6. 用Pinecone namespace或Qdrant collection隔离公司。

Q5: 你的rag_v1延迟p95是5秒,老板要求降到2秒,你怎么做?

  1. profile:先确定是retrieve还是generate慢;2. 大概率是generate;3. 措施:(a) 用Sonnet 4.5 → Haiku 4.5短query可降50%;(b) streaming改感知延迟;(c) prompt caching缓存system prompt;(d) 减小top_k从10→5;(e) 把chunks预先summarize成更短的摘要存metadata,prompt里只放summary;(f) 部署embedding model就近(如Pinecone us-east + Lambda us-east)。

八、明日预告

Day 136: Embedding模型评估——我们要在5个主流embedding(OpenAI 3-large/3-small、Voyage-3、Cohere v3、BGE-large)上跑同一组金融query,测准确率、latency、成本,回答"金融场景应该用哪个embedding"的实际问题。明天会包含一个完整的MTEB-style mini-benchmark脚本。