RAG基础架构——从零搭建第一个生产级RAG
RAG完整流水线:loading → chunking → embedding → indexing → retrieval → augmentation → generation;向量空间的几何直觉;为什么RAG解决了LLM的"知识截断+幻觉+长上下文成本"三大问题
日期: 2026-09-13 方向: AI系统工程 / RAG 阶段: Phase 3 - RAG高级模式 (Day 135-148) 标签: #RAG #向量检索 #Embedding #VectorDB #Anthropic
今日目标
| 类型 | 内容 |
|---|---|
| 学习 | RAG完整流水线:loading → chunking → embedding → indexing → retrieval → augmentation → generation;向量空间的几何直觉;为什么RAG解决了LLM的"知识截断+幻觉+长上下文成本"三大问题 |
| 实操 | 从零搭建rag_v1.py:处理Apple 10-K (FY2024) PDF,使用OpenAI text-embedding-3-large + Chroma本地存储 + Claude Sonnet 4.5生成;提供Pinecone可选切换 |
| 产出 | rag_v1.py 单文件 (~250行)、第一组金融问答benchmark(8个query × 3个金融文档)、cost记录表 |
核心洞察:RAG不是"魔法"。它是用 "语义搜索 + Prompt工程" 给LLM做了一层"外置知识库 + 上下文注入"。理解RAG的关键是把它拆成6个独立的工程问题,每一步都有trade-off。
一、核心概念:RAG的工程拆解
1.1 为什么需要RAG?
LLM 有三个本质局限:
| 问题 | 表现 | RAG解 |
|---|---|---|
| 知识截断 | Claude Opus训练数据截至2026-01;最新10-K问不出来 | 用最新文档动态注入 |
| 幻觉 | 模型对没见过的事实会"自信地编造" | 强制模型只基于检索到的证据回答 |
| 长上下文成本 | Claude 1M context送进去要$15/M input token | 每次只送10个最相关chunk而非整本书 |
金融场景的特殊性:
- 10-K年报300+页、每年更新一次
- 监管法规如MiFID II、Reg ATS动辄上万页
- 客户合规KYC文档每个客户一份
- 这些都是 "高更新频率 + 高领域专业性 + 强精度要求",是RAG的天然主场
1.2 RAG完整流水线
┌──────────────────────────────────┐
│ OFFLINE INDEXING │
└──────────────────────────────────┘
[Source Documents]
10-K PDF / 法规 / ┌─────────────────┐
research reports ───────► │ Document Loader│
(PDF/HTML/CSV/DB) │ (LlamaParse, │
│ Unstructured) │
└────────┬────────┘
▼
┌─────────────────┐
│ Chunker │
│ (recursive, │
│ semantic) │
└────────┬────────┘
▼
┌─────────────────┐
│ Embedder │
│ (OpenAI/BGE/ │
│ Voyage) │
└────────┬────────┘
▼
┌─────────────────┐
│ Vector DB │
│ (Chroma/Pine- │
│ cone/Qdrant) │
└─────────────────┘
┌──────────────────────────────────┐
│ ONLINE QUERY-TIME │
└──────────────────────────────────┘
User Question ┌─────────────────┐
"Apple's gross ───────────► │ Embedder │
margin in Q4?" │ (same model!) │
└────────┬────────┘
▼
┌─────────────────┐
│ Retriever │
│ (top-k cosine │
│ similarity) │
└────────┬────────┘
▼
┌─────────────────┐
│ Augmenter │
│ (prompt with │
│ context) │
└────────┬────────┘
▼
┌─────────────────┐
│ Generator │
│ (Claude/GPT) │
└────────┬────────┘
▼
[Final Answer
+ Citations]
1.3 向量空间的几何直觉
Embedding = 把文本投射到 d 维实数空间(OpenAI text-embedding-3-large 是 3072 维,BGE-large-en 是 1024 维)。
- 欧几里得距离:
||a - b||——不常用,因为维度高时所有距离都"接近" - 余弦相似度:
cos(θ) = (a · b) / (||a|| · ||b||)——主流选择,对模长不敏感 - 点积:
a · b——当embedding已normalized到单位球面,点积 = 余弦相似度
关键性质:好的embedding使语义相似的文本在向量空间中距离近。 "Apple revenue in Q4 2024" 和 "iPhone maker quarterly sales last quarter" 应该在向量空间中接近。
1.4 检索的两个维度
Recall (找全)
▲
k=20 │ k=20+rerank
• │ •
│
• │ •
k=5 │ k=5+rerank
│
───────────────────────► Precision (找准)
│
- k小(如3):精度高但可能错过关键chunk
- k大(如20):召回好但噪音多,prompt也变长(贵)
- rerank:召回阶段k=50,rerank保留top-5,是工业最佳实践(Day 139详解)
二、最小RAG实现:rag_v1.py
2.1 项目结构
rag_v1/
├── rag_v1.py # 主程序
├── data/
│ ├── apple_10k_2024.pdf
│ ├── tesla_10k_2024.pdf
│ └── jpmorgan_2024_annual.pdf
├── .env # OPENAI_API_KEY, ANTHROPIC_API_KEY
├── requirements.txt
└── chroma_db/ # 本地持久化向量库
2.2 完整代码
"""
rag_v1.py — Minimum Viable RAG for Financial Documents
依赖:
pip install anthropic openai chromadb pypdf tiktoken python-dotenv
环境变量:
ANTHROPIC_API_KEY=sk-ant-...
OPENAI_API_KEY=sk-...
"""
import os
import time
import hashlib
from dataclasses import dataclass
from typing import List, Dict, Optional
from pathlib import Path
import chromadb
from chromadb.utils import embedding_functions
from openai import OpenAI
from anthropic import Anthropic
from pypdf import PdfReader
import tiktoken
from dotenv import load_dotenv
load_dotenv()
# ============================================================
# 1. 配置
# ============================================================
EMBED_MODEL = "text-embedding-3-large" # 3072维, $0.13/M tokens
EMBED_DIM = 3072
LLM_MODEL = "claude-sonnet-4-5-20250929" # Anthropic
CHUNK_SIZE = 800 # tokens per chunk
CHUNK_OVERLAP = 100
TOP_K = 5
COLLECTION_NAME = "financial_docs_v1"
PERSIST_DIR = "./chroma_db"
openai_client = OpenAI()
anthropic_client = Anthropic()
encoder = tiktoken.encoding_for_model("gpt-4") # 用于chunking计数
# ============================================================
# 2. Document Loading
# ============================================================
@dataclass
class RawDoc:
doc_id: str
source: str
text: str
metadata: Dict
def load_pdf(path: str) -> RawDoc:
"""解析PDF文件,提取所有页面文字。"""
reader = PdfReader(path)
pages = []
for i, page in enumerate(reader.pages):
txt = page.extract_text() or ""
pages.append(f"\n[PAGE {i+1}]\n{txt}")
full_text = "\n".join(pages)
return RawDoc(
doc_id=hashlib.md5(path.encode()).hexdigest()[:12],
source=Path(path).name,
text=full_text,
metadata={"path": path, "page_count": len(reader.pages)}
)
# ============================================================
# 3. Chunking
# ============================================================
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE,
overlap: int = CHUNK_OVERLAP) -> List[str]:
"""
Recursive token-based chunking with overlap.
生产环境推荐LangChain RecursiveCharacterTextSplitter或
LlamaIndex SentenceSplitter,本demo用最朴素版本。
"""
tokens = encoder.encode(text)
chunks = []
start = 0
while start < len(tokens):
end = min(start + chunk_size, len(tokens))
chunk_tokens = tokens[start:end]
chunks.append(encoder.decode(chunk_tokens))
if end == len(tokens):
break
start += chunk_size - overlap
return chunks
# ============================================================
# 4. Embedding
# ============================================================
def embed_batch(texts: List[str], batch_size: int = 100) -> List[List[float]]:
"""批量调用OpenAI Embedding API。"""
all_embeddings = []
for i in range(0, len(texts), batch_size):
batch = texts[i:i + batch_size]
resp = openai_client.embeddings.create(
model=EMBED_MODEL,
input=batch,
)
all_embeddings.extend([d.embedding for d in resp.data])
time.sleep(0.1) # 避免rate limit
return all_embeddings
# ============================================================
# 5. Indexing (Chroma)
# ============================================================
def get_collection():
client = chromadb.PersistentClient(path=PERSIST_DIR)
return client.get_or_create_collection(
name=COLLECTION_NAME,
metadata={"hnsw:space": "cosine"}
)
def index_document(raw: RawDoc):
"""将一个文档chunk + embed + 写入Chroma。"""
chunks = chunk_text(raw.text)
print(f"[INDEX] {raw.source}: {len(chunks)} chunks")
embeddings = embed_batch(chunks)
ids = [f"{raw.doc_id}_chunk_{i}" for i in range(len(chunks))]
metadatas = [
{
"doc_id": raw.doc_id,
"source": raw.source,
"chunk_index": i,
**raw.metadata,
}
for i in range(len(chunks))
]
coll = get_collection()
coll.upsert(
ids=ids,
documents=chunks,
embeddings=embeddings,
metadatas=metadatas,
)
return len(chunks)
# ============================================================
# 6. Retrieval
# ============================================================
@dataclass
class RetrievedChunk:
text: str
source: str
chunk_index: int
distance: float
def retrieve(query: str, top_k: int = TOP_K,
filter_source: Optional[str] = None) -> List[RetrievedChunk]:
"""语义检索top-k chunks。"""
q_embed = embed_batch([query])[0]
coll = get_collection()
where = {"source": filter_source} if filter_source else None
results = coll.query(
query_embeddings=[q_embed],
n_results=top_k,
where=where,
)
chunks = []
for i in range(len(results["ids"][0])):
chunks.append(RetrievedChunk(
text=results["documents"][0][i],
source=results["metadatas"][0][i]["source"],
chunk_index=results["metadatas"][0][i]["chunk_index"],
distance=results["distances"][0][i],
))
return chunks
# ============================================================
# 7. Generation (Anthropic)
# ============================================================
SYSTEM_PROMPT = """You are a senior financial analyst assistant. Answer questions
strictly based on the CONTEXT provided. If the context does not contain enough
information, say "I cannot find this in the provided documents." Always cite the
source filename and chunk index for every claim.
Format:
ANSWER: <your answer>
CITATIONS: [<source>:<chunk_index>], ...
"""
def generate(query: str, retrieved: List[RetrievedChunk]) -> str:
context = "\n\n---\n\n".join(
f"[Source: {c.source} | Chunk {c.chunk_index} | Distance: {c.distance:.3f}]\n{c.text}"
for c in retrieved
)
user_msg = f"CONTEXT:\n{context}\n\nQUESTION: {query}"
resp = anthropic_client.messages.create(
model=LLM_MODEL,
max_tokens=1024,
system=SYSTEM_PROMPT,
messages=[{"role": "user", "content": user_msg}],
)
return resp.content[0].text
# ============================================================
# 8. End-to-End RAG
# ============================================================
def rag_query(question: str, top_k: int = TOP_K) -> Dict:
t0 = time.time()
chunks = retrieve(question, top_k=top_k)
t1 = time.time()
answer = generate(question, chunks)
t2 = time.time()
return {
"question": question,
"answer": answer,
"retrieved_chunks": [
{"source": c.source, "chunk_index": c.chunk_index,
"distance": c.distance, "preview": c.text[:200]}
for c in chunks
],
"latency": {
"retrieve_ms": round((t1 - t0) * 1000, 1),
"generate_ms": round((t2 - t1) * 1000, 1),
"total_ms": round((t2 - t0) * 1000, 1),
}
}
# ============================================================
# 9. Demo
# ============================================================
def main():
# Step 1: 索引(首次运行执行,之后注释)
docs_to_index = [
"data/apple_10k_2024.pdf",
"data/tesla_10k_2024.pdf",
"data/jpmorgan_2024_annual.pdf",
]
for path in docs_to_index:
if Path(path).exists():
raw = load_pdf(path)
n = index_document(raw)
print(f" → indexed {n} chunks from {raw.source}")
# Step 2: 测试query集
test_queries = [
"What was Apple's total revenue in fiscal year 2024?",
"Describe Tesla's main risk factors related to autonomous driving.",
"How does JPMorgan manage interest rate risk?",
"What is Apple's gross margin trend over the last three years?",
"Compare Tesla and Apple's R&D spending as a percentage of revenue.",
"What are JPMorgan's Tier 1 capital ratios?",
"List Apple's services segment growth drivers.",
"What regulatory actions has Tesla disclosed?",
]
for q in test_queries:
print("\n" + "=" * 80)
print(f"Q: {q}")
result = rag_query(q)
print(f"A: {result['answer'][:500]}...")
print(f" [retrieve {result['latency']['retrieve_ms']}ms | "
f"generate {result['latency']['generate_ms']}ms]")
if __name__ == "__main__":
main()
2.3 跑通效果(实测样例)
Q: What was Apple's total revenue in fiscal year 2024?
A: ANSWER: Apple's total net sales for fiscal year 2024 were $391.0 billion,
compared with $383.3 billion in fiscal year 2023, representing a 2.0% increase.
Products revenue was $294.9 billion and Services revenue was $96.2 billion.
CITATIONS: [apple_10k_2024.pdf:42], [apple_10k_2024.pdf:43]
[retrieve 234ms | generate 1820ms]
三、金融领域应用:10-K特殊性
3.1 10-K结构是高度模板化的
每个10-K都按SEC规定包含:
- Item 1: Business(业务描述)
- Item 1A: Risk Factors(风险因素,PM最关心)
- Item 7: MD&A(管理层讨论分析,财务洞察)
- Item 8: Financial Statements(三大报表)
- Item 9A: Internal Controls
优化技巧:在chunking时保留Item标题,metadata里加
item_section字段。Query "Apple's risks" 时直接filteritem_section = "1A",召回精度提升 ~30%。
3.2 表格抽取的痛
PyPDF对表格的提取效果差。Apple 10-K的损益表会变成:
Net sales:
Products $294,866 $298,085
Services $96,169 $85,200
Total net sales $391,035 $383,285
数字和列名错位、合并单元格丢失。
生产解决方案(Day 146详解):
- Unstructured.io 的
partition_pdf(strategy="hi_res") - LlamaParse(LlamaIndex的付费PDF服务,专门处理金融表格)
- AWS Textract + Camelot
3.3 案例:Apple 10-K 节选(Item 1A Risk Factors)
"The Company's products are subject to risks associated with new technologies,
including continued investment in AI-related capabilities. The Company's
business and financial performance could be adversely affected if it fails to
keep pace with rapid technological developments..."
这一段被chunk为800-token block后会和"Risk Factors"标题分开。 修复:用 parent-child chunking(Day 142),让小chunk继承parent的Item标题。
四、生产经验:第一次跑RAG必踩的8个坑
| # | 坑 | 表现 | 修复 |
|---|---|---|---|
| 1 | 同一个embedding model在index和query时不一致 | 检索结果完全乱 | 把model_name存到collection metadata里,启动时校验 |
| 2 | chunk边界切断句子 | "...the gross margin was 4" — 数字断了 | 用RecursiveSplitter,按段落>句子>token层级切 |
| 3 | PDF空格/换行混乱 | 召回的chunk全是乱码 | 索引前用unidecode + re清洗\s+→ |
| 4 | embedding API超RPM | 索引大文档卡死 | tier-1只有3000 RPM;大文档分批+sleep |
| 5 | OpenAI 8192 token input limit | 超长chunk报错 | chunk_size固定800,加assert |
| 6 | Chroma本地距离值意义反了 | 越高越相似?还是越低? | Chroma用cosine时返回的是 1 - cos_sim,越小越相似 |
| 7 | 没有persist | 重启后向量库空了 | 必须用PersistentClient不是EphemeralClient |
| 8 | 同一文档反复re-index | 向量库膨胀 | 用确定性ID(doc_id+chunk_index),upsert而不是add |
4.1 调试RAG的"三段诊断法"
当RAG回答错误时,按顺序排查:
错误回答
↓
[1] 看检索到的chunks是否相关?
├── 否 → 检索问题:embedding model/chunking/query rewriting
└── 是 ↓
[2] 看chunks里是否包含正确答案?
├── 否 → 索引问题:原文是否真的有?是否被chunking切碎?
└── 是 ↓
[3] 看LLM是否正确利用了chunks?
└── 是生成问题:prompt不清晰/模型能力不足/citation格式错
五、Cost & Latency分析
5.1 成本拆解(以索引一份300页10-K为例)
| 项目 | 数量 | 单价 | 成本 |
|---|---|---|---|
| Embedding (one-time) | |||
| 300页 × 500 tokens/page = 150K tokens | 150K | $0.13/M | $0.0195 |
| Storage | |||
| 200 chunks × 3072 dim × 4 bytes = 2.5 MB | — | Chroma本地: 0 | $0 |
| Per-query | |||
| Query embedding | 50 tokens | $0.13/M | $0.0000065 |
| Retrieval (Chroma local) | — | 本地 | $0 |
| LLM generation (Claude Sonnet 4.5) | |||
| - Input: 5 chunks × 800 + system + question ≈ 4500 tokens | 4500 | $3/M | $0.0135 |
| - Output: ~300 tokens | 300 | $15/M | $0.0045 |
| Total per query | ~$0.018 |
5.2 延迟分解(生产环境实测)
| 阶段 | 延迟 | 说明 |
|---|---|---|
| Query embedding | 200-400 ms | OpenAI API |
| Vector search (Chroma local) | 5-30 ms | 200 chunks时 |
| Vector search (Pinecone us-east) | 50-150 ms | 跨网络 |
| LLM generation (TTFT) | 600-1200 ms | Claude Sonnet 4.5 |
| LLM generation (full 300 tokens) | 1500-2500 ms | streaming可改善UX |
| Total p50 | 2000-3000 ms | |
| Total p95 | 4000-5000 ms |
生产优化:
- 用 prompt caching(Anthropic原生支持)把system + 高频context缓存,节省80% input成本
- 用 streaming显著改善perceived latency
- 把embedding也cache(同样问题不重复embed)
六、关键速查表
6.1 Embedding Model对比(速览,详细Day 136)
| Model | 维度 | 成本 ($/M tok) | MTEB 平均 | 备注 |
|---|---|---|---|---|
| OpenAI text-embedding-3-small | 1536 | $0.02 | 62.3 | 性价比首选 |
| OpenAI text-embedding-3-large | 3072 | $0.13 | 64.6 | 最常用 |
| Voyage voyage-3-large | 1024 | $0.18 | 65.1 | 金融领域强 |
| Cohere embed-english-v3.0 | 1024 | $0.10 | 64.5 | 多语言强 |
| BAAI/bge-large-en-v1.5 | 1024 | 自部署 | 64.2 | 开源主力 |
6.2 Vector DB速查(详细Day 137)
| DB | 部署 | 起步成本 | latency p50 | 千万级 | 主要场景 |
|---|---|---|---|---|---|
| Chroma | 本地/Docker | $0 | 5-30ms | 不推荐 | 原型 / <1M vectors |
| Pinecone | SaaS | $70/mo (s1) | 50-150ms | 优秀 | 中小团队,不想运维 |
| Qdrant | 自部署/SaaS | $0 / $25 | 20-80ms | 优秀 | 性能 + 成本平衡 |
| Weaviate | 自部署/SaaS | $0 / $25 | 30-100ms | 优秀 | hybrid search原生 |
| pgvector | Postgres扩展 | 已有PG的话$0 | 50-200ms | 一般 | 已有PG栈 |
| Milvus | 自部署 | $0 | 20-80ms | 业界最强 | 10亿+vectors |
6.3 Chunking策略速查
| 策略 | 大小 | 适用 |
|---|---|---|
| Fixed token | 256-1024 | baseline |
| Recursive char | 同上 | 主流,LangChain默认 |
| Sentence | 1-3句 | 精度优先,召回差 |
| Semantic | 自适应 | OpenAI Cookbook推荐 |
| Parent-Child | 父1500/子400 | 高级,Day 142 |
| Auto-Merging | 动态合并 | LlamaIndex特色 |
七、面试题
Q1: 解释RAG的整个工程流水线,并指出每一步最容易出错的地方。
6步:load → chunk → embed → index → retrieve → generate。
- Load:表格、图片信息丢失(Day 146)
- Chunk:边界切断语义、metadata丢失
- Embed:index/query模型必须一致;模型升级时整库需要reindex
- Index:HNSW参数(M, ef_construction)影响召回,distance metric要对齐
- Retrieve:top-k太小漏召回,太大噪音多;filter写错全空
- Generate:prompt不强制只用context会幻觉;context被截断
Q2: 为什么向量空间用余弦相似度而不是欧几里得距离?
高维空间中"距离集中"现象(curse of dimensionality):所有点对的欧氏距离趋于相同,区分度差。余弦相似度只看方向不看模长,对embedding的长度(与文本长度某种关联)不敏感,更稳定。同时如果embedding已normalize到单位球面,cos sim = 1 - 0.5·(欧氏距离)²,二者等价但cos sim数值更直观([-1, 1])。
Q3: RAG vs Fine-tuning,何时选哪个?
RAG:知识更新频繁、需要citation溯源、知识库大、不想训练成本。FT:模型行为/风格/输出格式调整、领域专用术语理解、低延迟(无检索)。最佳实践:FT教模型"怎么说",RAG教模型"说什么"。金融客服bot常FT语气+RAG事实。
Q4: 如何设计一个支持"实时10-K更新"的RAG系统?
- SEC EDGAR有RSS feed,监听新10-K到达;2. 自动triggers Lambda → loader → chunker → embedder;3. 关键:每个文档版本带
as_of_datemetadata;4. Query时默认filter最新版本,但允许用户问"compare 2023 vs 2024"时检索两版;5. 旧版本不删除(合规审计);6. 用Pinecone namespace或Qdrant collection隔离公司。
Q5: 你的rag_v1延迟p95是5秒,老板要求降到2秒,你怎么做?
- profile:先确定是retrieve还是generate慢;2. 大概率是generate;3. 措施:(a) 用Sonnet 4.5 → Haiku 4.5短query可降50%;(b) streaming改感知延迟;(c) prompt caching缓存system prompt;(d) 减小top_k从10→5;(e) 把chunks预先summarize成更短的摘要存metadata,prompt里只放summary;(f) 部署embedding model就近(如Pinecone us-east + Lambda us-east)。
八、明日预告
Day 136: Embedding模型评估——我们要在5个主流embedding(OpenAI 3-large/3-small、Voyage-3、Cohere v3、BGE-large)上跑同一组金融query,测准确率、latency、成本,回答"金融场景应该用哪个embedding"的实际问题。明天会包含一个完整的MTEB-style mini-benchmark脚本。