AI Day 52
AI Day 52: 实战(2):RAG系统实战 — 让AI回答你的文档问题
AI Day 52: 实战(2):RAG系统实战 — 让AI回答你的文档问题
2026-05-23
日期: 2026-05-23 | 阶段: 第五阶段 · 动手实战 (Day 51-60) | 主题: RAG System Implementation
学习路径 / Learning Path
AI/LLM 深度技术学习 60天计划
├── 第一阶段:模型基础 (Day 1-15) ✅
├── 第二阶段:工程实践 (Day 16-30) ✅
├── 第三阶段:金融零售AI应用 (Day 31-42) ✅
├── 第四阶段:面试冲刺 (Day 43-50) ✅
└── 第五阶段:动手实战 (Day 51-60)
├── Day 51: 本地大模型部署全流程 ✅
├── Day 52: RAG系统实战:从文档到问答 ← 你在这里
├── Day 53: RAG进阶:评估优化与生产化
├── Day 54: LoRA微调实战:训练你的专属模型
├── Day 55: Agent开发实战:构建工具调用Agent
├── Day 56: MCP Server开发:扩展AI能力边界
├── Day 57: 多模态应用:图文理解与文档分析
├── Day 58: AI应用全栈开发:前后端集成
├── Day 59: 性能调优与成本实战
└── Day 60: 总结与作品集
核心概念 / Core Concepts
用自己的笔记构建 RAG / Build RAG with Your Own Notes
你在过去几个月积累了大量笔记:
架构120天计划笔记: ~251 篇 (docs/arch/)
Web3 90天学习笔记: ~90 篇 (docs/daily/)
AI 60天学习笔记: ~52 篇 (docs/ai/)
产品分析文章: ~10 篇 (docs/)
────────────────────────────────
总计: ~400+ 篇 Markdown 文件
问题:
这么多笔记,你真的每篇都记住了吗?
想找"Day 34讲的信贷风控流程",你翻得到吗?
想交叉对比"传统风控 vs DeFi风控",你记得在哪几篇里讲过吗?
解决方案:
构建一个 RAG 系统,让 AI 帮你检索和回答!
输入: 自然语言问题
输出: 基于你的笔记的准确回答 + 来源引用
这不仅是一个练习项目——这是一个真正有用的工具:
面试前: "总结一下我学过的DeFi协议分析框架"
写文章: "我的笔记里关于Tokenomics有哪些核心观点?"
复习时: "架构计划中支付系统设计的关键点是什么?"
为什么选择本地 RAG?/ Why Local RAG?
Day 5 学 RAG 架构时是理论
Day 19-21 学生产 RAG 时是方法论
今天是真正的实现
本地 RAG 的优势:
1. 隐私安全 — 你的笔记不会发送到任何外部服务
2. 免费无限 — 查询次数没有成本限制
3. 完全可控 — 可以精确调试每个环节
4. 学习价值 — 亲手实现比看文档学得深 10 倍
今天的目标:
输入: 400+ 篇 Markdown 笔记
输出: 一个可工作的 RAG 问答系统
时间: 6 小时(从0到1)
知识点1:技术选型 / Technology Selection
向量数据库选择 / Vector Database
Day 6 学过向量数据库的原理和对比,今天做实际选择:
候选方案对比:
┌──────────────┬───────────┬───────────┬───────────┬───────────┐
│ 维度 │ ChromaDB │ LanceDB │ Qdrant │ FAISS │
├──────────────┼───────────┼───────────┼───────────┼───────────┤
│ 部署复杂度 │ pip即可 │ pip即可 │ 需容器 │ pip即可 │
│ 持久化 │ 本地文件 │ 本地文件 │ 需服务 │ 手动保存 │
│ 元数据过滤 │ ✅ 丰富 │ ✅ SQL式 │ ✅ 丰富 │ ❌ 不支持 │
│ 适合规模 │ 10K-1M │ 1K-10M │ 100K-100M │ 任意 │
│ API友好度 │ ★★★★★ │ ★★★★ │ ★★★★★ │ ★★ │
│ LangChain │ ✅ 集成 │ ✅ 集成 │ ✅ 集成 │ ✅ 集成 │
│ 学习曲线 │ 极低 │ 低 │ 中等 │ 低 │
└──────────────┴───────────┴───────────┴───────────┴───────────┘
决定: ChromaDB ✓
原因:
- 400篇文档 → 约2000-5000 chunks → ChromaDB 绰绰有余
- pip install chromadb 一行安装
- 元数据过滤支持好(按日期/标签/阶段筛选)
- LangChain 集成最成熟
- 数据持久化到本地文件夹
Embedding 模型选择 / Embedding Model
候选方案对比:
┌──────────────────┬──────────┬──────────┬──────────┬──────────┐
│ 模型 │ 维度 │ 中文支持 │ 速度 │ 部署 │
├──────────────────┼──────────┼──────────┼──────────┼──────────┤
│ BGE-M3 (本地) │ 1024 │ ★★★★★ │ 中等 │ 本地GPU │
│ nomic-embed (本地)│ 768 │ ★★★ │ 快 │ Ollama │
│ text-embedding-3 │ 3072 │ ★★★★ │ 快 │ 云端API │
│ BGE-large-zh │ 1024 │ ★★★★★ │ 中等 │ 本地GPU │
│ mxbai-embed │ 1024 │ ★★★ │ 快 │ Ollama │
└──────────────────┴──────────┴──────────┴──────────┴──────────┘
决定: BGE-M3 (BAAI/bge-m3) ✓
原因:
- 笔记是中英混合的 → 需要多语言支持
- BGE-M3 = Multilingual, Multi-Functionality, Multi-Granularity
- 支持 Dense + Sparse + ColBERT 三种检索模式
- 中文理解能力最强
- 本地运行,隐私安全
备选: 如果显存不够同时跑 Embedding + LLM
- nomic-embed-text via Ollama(占显存少,通过 Ollama 自动管理)
LLM 选择 / LLM Selection
用于生成回答的 LLM:
方案A (推荐开始): 本地 Qwen2.5-7B via Ollama
✅ 免费无限调用
✅ 中文回答质量好
✅ Day 51 已经部署好了
⚠️ 复杂问题质量有限
方案B (质量优先): 云端 Claude API
✅ 回答质量最高
✅ 长上下文能力强
⚠️ 需要 API Key
⚠️ 有成本
方案C (混合): 检索用本地,复杂问题用云端
✅ 平衡质量和成本
✅ 最灵活
今天先用方案A(全本地),明天优化时可以切换到方案C
最终技术栈 / Final Tech Stack
技术栈确定:
文档处理: Python + 自定义 Markdown 解析器
分块策略: 递归文本分割(Day 19 学过的)
Embedding: BGE-M3 via sentence-transformers
向量存储: ChromaDB (本地持久化)
LLM生成: Qwen2.5-7B via Ollama
框架: LangChain (简化集成)
API: FastAPI (可选,后续扩展)
安装命令:
pip install chromadb langchain langchain-community
pip install sentence-transformers FlagEmbedding
pip install openai # 用于 Ollama 的 OpenAI 兼容调用
pip install tiktoken # Token 计数
知识点2:文档处理 Pipeline / Document Processing Pipeline
Markdown 解析 / Markdown Parsing
"""
document_processor.py
处理 Markdown 笔记文件
"""
import os
import re
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime
@dataclass
class Document:
"""表示一个处理后的文档"""
content: str
metadata: dict = field(default_factory=dict)
source: str = ""
@property
def token_count(self) -> int:
"""粗略估算 token 数(中文约 1.5 字/token)"""
cn_chars = len(re.findall(r'[\u4e00-\u9fff]', self.content))
en_words = len(re.findall(r'[a-zA-Z]+', self.content))
return int(cn_chars * 1.5 + en_words * 1.3)
class MarkdownProcessor:
"""Markdown 文档处理器"""
def __init__(self, base_dir: str):
self.base_dir = Path(base_dir)
def scan_files(self, patterns: list[str] = None) -> list[Path]:
"""扫描所有 Markdown 文件"""
if patterns is None:
patterns = ["docs/**/*.md", "CLAUDE.md"]
files = []
for pattern in patterns:
files.extend(self.base_dir.glob(pattern))
# 去重并排序
files = sorted(set(files))
print(f"Found {len(files)} markdown files")
return files
def parse_file(self, file_path: Path) -> Document:
"""解析单个 Markdown 文件,提取内容和元数据"""
content = file_path.read_text(encoding="utf-8")
# 提取元数据
metadata = self._extract_metadata(content, file_path)
# 清洗内容
cleaned = self._clean_content(content)
return Document(
content=cleaned,
metadata=metadata,
source=str(file_path.relative_to(self.base_dir)),
)
def _extract_metadata(self, content: str, file_path: Path) -> dict:
"""从文件名和内容中提取元数据"""
metadata = {
"file_name": file_path.name,
"file_path": str(file_path.relative_to(self.base_dir)),
"category": self._detect_category(file_path),
}
# 从文件名提取 Day 编号
day_match = re.search(r'[Dd]ay\s*(\d+)', file_path.name)
if day_match:
metadata["day_number"] = int(day_match.group(1))
# 从内容提取日期
date_match = re.search(
r'\*\*日期\*\*:\s*(\d{4}-\d{2}-\d{2})', content
)
if date_match:
metadata["date"] = date_match.group(1)
# 从标题提取主题
title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
if title_match:
metadata["title"] = title_match.group(1).strip()
# 检测阶段
stage_match = re.search(
r'第([一二三四五六]|[1-6])阶段', content
)
if stage_match:
metadata["stage"] = stage_match.group(0)
# 提取标签(从内容关键词)
tags = self._extract_tags(content)
if tags:
metadata["tags"] = ",".join(tags)
return metadata
def _detect_category(self, file_path: Path) -> str:
"""根据文件路径检测分类"""
path_str = str(file_path).lower()
if "/ai/" in path_str or "\\ai\\" in path_str:
return "ai_learning"
elif "/arch/" in path_str or "\\arch\\" in path_str:
return "architecture"
elif "/daily/" in path_str or "\\daily\\" in path_str:
return "web3_daily"
elif "blog" in path_str:
return "blog"
else:
return "general"
def _extract_tags(self, content: str) -> list[str]:
"""从内容中提取关键标签"""
tag_keywords = {
"DeFi": ["defi", "lending", "dex", "amm", "liquidity"],
"Security": ["security", "audit", "attack", "vulnerability", "安全"],
"Tokenomics": ["tokenomics", "token", "vesting", "airdrop"],
"Architecture": ["architecture", "架构", "系统设计", "system design"],
"AI": ["llm", "transformer", "rag", "agent", "embedding"],
"Governance": ["governance", "dao", "voting", "治理"],
"Risk": ["risk", "风控", "风险", "compliance"],
"Payment": ["payment", "支付", "settlement"],
}
content_lower = content.lower()
found_tags = []
for tag, keywords in tag_keywords.items():
if any(kw in content_lower for kw in keywords):
found_tags.append(tag)
return found_tags[:5] # 最多5个标签
def _clean_content(self, content: str) -> str:
"""清洗 Markdown 内容"""
# 移除学习路径树(重复内容,不需要索引)
content = re.sub(
r'```\n.*?学习路径.*?```',
'[学习路径树已省略]',
content,
flags=re.DOTALL
)
# 保留代码块但标记(用于区分代码和文本)
# 不要删除代码块——它们包含有价值的示例
# 移除过长的 ASCII art / 表格分隔线
content = re.sub(r'[─═┌┐└┘├┤┬┴┼]{10,}', '---', content)
content = re.sub(r'[-=]{20,}', '---', content)
# 压缩多余空行
content = re.sub(r'\n{4,}', '\n\n\n', content)
return content.strip()
def process_all(self) -> list[Document]:
"""处理所有文档"""
files = self.scan_files()
documents = []
for f in files:
try:
doc = self.parse_file(f)
if len(doc.content) > 100: # 过滤太短的文件
documents.append(doc)
except Exception as e:
print(f"Error processing {f}: {e}")
print(f"Processed {len(documents)} documents")
return documents
递归 Chunking 实现 / Recursive Chunking
"""
chunker.py
文档分块 — Day 19 学的理论,今天实现
"""
from dataclasses import dataclass
import re
@dataclass
class Chunk:
"""表示一个文档块"""
content: str
metadata: dict
chunk_id: str
source: str
@property
def display(self) -> str:
return f"[{self.source}] {self.content[:100]}..."
class RecursiveChunker:
"""递归文本分割器
Day 19 学到的最佳实践:
1. 按语义边界分割(标题 > 段落 > 句子 > 字符)
2. 保持适当的块大小(不太大也不太小)
3. 添加重叠(overlap)避免信息断裂
4. 保留元数据(来源、标题、位置)
"""
def __init__(
self,
chunk_size: int = 512,
chunk_overlap: int = 50,
min_chunk_size: int = 100,
):
self.chunk_size = chunk_size
self.chunk_overlap = chunk_overlap
self.min_chunk_size = min_chunk_size
# Markdown 的分割层次(从大到小)
self.separators = [
"\n## ", # 二级标题(最大的语义单元)
"\n### ", # 三级标题
"\n#### ", # 四级标题
"\n```", # 代码块边界
"\n\n", # 段落
"\n", # 行
"。", # 中文句号
". ", # 英文句号
";", # 中文分号
"; ", # 英文分号
" ", # 空格(最后手段)
]
def chunk_document(self, doc) -> list[Chunk]:
"""将一个文档分成多个块"""
chunks = []
sections = self._split_by_headers(doc.content)
for section_title, section_content in sections:
if not section_content.strip():
continue
# 如果 section 够小,直接作为一个 chunk
if len(section_content) <= self.chunk_size:
if len(section_content) >= self.min_chunk_size:
chunk_meta = {**doc.metadata}
if section_title:
chunk_meta["section"] = section_title
chunks.append(Chunk(
content=self._format_chunk(
section_title, section_content
),
metadata=chunk_meta,
chunk_id=f"{doc.source}#{len(chunks)}",
source=doc.source,
))
else:
# section 太大,递归分割
sub_chunks = self._recursive_split(
section_content,
self.separators
)
for i, sub in enumerate(sub_chunks):
if len(sub) >= self.min_chunk_size:
chunk_meta = {**doc.metadata}
if section_title:
chunk_meta["section"] = section_title
chunk_meta["sub_index"] = i
chunks.append(Chunk(
content=self._format_chunk(
section_title, sub
),
metadata=chunk_meta,
chunk_id=f"{doc.source}#{len(chunks)}",
source=doc.source,
))
return chunks
def _split_by_headers(
self, content: str
) -> list[tuple[str, str]]:
"""按标题分割,保留标题作为元数据"""
sections = []
current_title = ""
current_content = []
for line in content.split("\n"):
if re.match(r'^#{1,4}\s+', line):
# 保存之前的 section
if current_content:
sections.append(
(current_title, "\n".join(current_content))
)
current_title = line.lstrip("#").strip()
current_content = [line]
else:
current_content.append(line)
# 保存最后一个 section
if current_content:
sections.append(
(current_title, "\n".join(current_content))
)
return sections
def _recursive_split(
self, text: str, separators: list[str]
) -> list[str]:
"""递归分割文本"""
if len(text) <= self.chunk_size:
return [text]
if not separators:
# 没有分隔符了,硬切
return self._hard_split(text)
sep = separators[0]
remaining_seps = separators[1:]
parts = text.split(sep)
if len(parts) == 1:
# 这个分隔符没有分割效果,尝试下一个
return self._recursive_split(text, remaining_seps)
# 合并小块,分割大块
chunks = []
current = ""
for i, part in enumerate(parts):
# 加回分隔符(除了第一个)
piece = (sep + part) if i > 0 else part
if len(current) + len(piece) <= self.chunk_size:
current += piece
else:
if current:
chunks.append(current)
current = piece
if current:
chunks.append(current)
# 对仍然太大的块继续递归
result = []
for chunk in chunks:
if len(chunk) > self.chunk_size:
result.extend(
self._recursive_split(chunk, remaining_seps)
)
else:
result.append(chunk)
# 添加重叠
result = self._add_overlap(result)
return result
def _hard_split(self, text: str) -> list[str]:
"""硬切分(最后手段)"""
chunks = []
for i in range(0, len(text), self.chunk_size - self.chunk_overlap):
chunk = text[i:i + self.chunk_size]
if len(chunk) >= self.min_chunk_size:
chunks.append(chunk)
return chunks
def _add_overlap(self, chunks: list[str]) -> list[str]:
"""添加块间重叠"""
if len(chunks) <= 1 or self.chunk_overlap == 0:
return chunks
result = [chunks[0]]
for i in range(1, len(chunks)):
# 从上一个块的尾部取 overlap
prev_tail = chunks[i - 1][-self.chunk_overlap:]
result.append(prev_tail + chunks[i])
return result
def _format_chunk(self, title: str, content: str) -> str:
"""格式化 chunk,添加上下文信息"""
if title:
return f"[Section: {title}]\n{content}"
return content
def chunk_all_documents(documents: list, **kwargs) -> list[Chunk]:
"""分块所有文档"""
chunker = RecursiveChunker(**kwargs)
all_chunks = []
for doc in documents:
chunks = chunker.chunk_document(doc)
all_chunks.extend(chunks)
print(f"Total chunks: {len(all_chunks)}")
print(f"Avg chunk size: {sum(len(c.content) for c in all_chunks) / len(all_chunks):.0f} chars")
return all_chunks
Chunk 质量检查 / Chunk Quality Check
"""
chunk_quality.py
检查分块质量
"""
def analyze_chunks(chunks: list) -> dict:
"""分析分块质量"""
sizes = [len(c.content) for c in chunks]
categories = {}
for c in chunks:
cat = c.metadata.get("category", "unknown")
categories[cat] = categories.get(cat, 0) + 1
report = {
"total_chunks": len(chunks),
"avg_size": sum(sizes) / len(sizes),
"min_size": min(sizes),
"max_size": max(sizes),
"median_size": sorted(sizes)[len(sizes) // 2],
"categories": categories,
"size_distribution": {
"<200": sum(1 for s in sizes if s < 200),
"200-500": sum(1 for s in sizes if 200 <= s < 500),
"500-800": sum(1 for s in sizes if 500 <= s < 800),
"800-1200": sum(1 for s in sizes if 800 <= s < 1200),
">1200": sum(1 for s in sizes if s >= 1200),
}
}
print("=== Chunk Quality Report ===")
print(f"Total chunks: {report['total_chunks']}")
print(f"Avg size: {report['avg_size']:.0f} chars")
print(f"Min/Max: {report['min_size']}/{report['max_size']}")
print(f"\nSize distribution:")
for range_name, count in report["size_distribution"].items():
bar = "█" * (count // 5)
print(f" {range_name:>10}: {count:>4} {bar}")
print(f"\nCategories:")
for cat, count in sorted(categories.items()):
print(f" {cat}: {count}")
return report
# 质量标准:
# ✅ 平均大小 300-600 chars
# ✅ 没有 <100 chars 的碎片
# ✅ 没有 >2000 chars 的超大块
# ✅ 各类别分布合理
知识点3:索引构建 / Index Building
Embedding 生成 / Embedding Generation
"""
embedding_service.py
Embedding 生成服务
"""
import numpy as np
from typing import Union
class EmbeddingService:
"""Embedding 服务 — 支持本地和 Ollama 两种模式"""
def __init__(self, mode: str = "local", model_name: str = None):
"""
Args:
mode: "local" (sentence-transformers) 或 "ollama"
model_name: 模型名称
"""
self.mode = mode
if mode == "local":
from sentence_transformers import SentenceTransformer
self.model_name = model_name or "BAAI/bge-m3"
print(f"Loading embedding model: {self.model_name}...")
self.model = SentenceTransformer(self.model_name)
self.dimension = self.model.get_sentence_embedding_dimension()
print(f"Loaded! Dimension: {self.dimension}")
elif mode == "ollama":
import requests
self.model_name = model_name or "nomic-embed-text"
self.ollama_url = "http://localhost:11434/api/embeddings"
# 获取维度
test = self._ollama_embed("test")
self.dimension = len(test)
print(f"Using Ollama embedding: {self.model_name}, dim={self.dimension}")
def embed(self, texts: Union[str, list[str]]) -> np.ndarray:
"""生成 Embedding"""
if isinstance(texts, str):
texts = [texts]
if self.mode == "local":
return self.model.encode(
texts,
normalize_embeddings=True, # L2 归一化
show_progress_bar=len(texts) > 10,
batch_size=32,
)
elif self.mode == "ollama":
embeddings = [self._ollama_embed(t) for t in texts]
return np.array(embeddings)
def _ollama_embed(self, text: str) -> list[float]:
"""通过 Ollama API 获取 embedding"""
import requests
resp = requests.post(self.ollama_url, json={
"model": self.model_name,
"prompt": text,
})
return resp.json()["embedding"]
# 使用示例
if __name__ == "__main__":
# 方式1: 本地 BGE-M3(推荐,质量更好)
emb_service = EmbeddingService(mode="local", model_name="BAAI/bge-m3")
# 方式2: Ollama(如果显存紧张)
# emb_service = EmbeddingService(mode="ollama")
# 测试
texts = [
"DeFi借贷协议的清算机制如何运作",
"How does liquidation work in DeFi lending",
"智能合约安全审计的关键步骤",
]
vectors = emb_service.embed(texts)
print(f"Shape: {vectors.shape}") # (3, 1024)
# 计算相似度
from numpy import dot
sim_01 = dot(vectors[0], vectors[1]) # 中英同义,应该高
sim_02 = dot(vectors[0], vectors[2]) # 不同主题,应该低
print(f"中英同义相似度: {sim_01:.3f}") # 预期 > 0.8
print(f"不同主题相似度: {sim_02:.3f}") # 预期 < 0.5
向量库初始化与索引 / Vector Store Setup
"""
vector_store.py
ChromaDB 向量存储管理
"""
import chromadb
from chromadb.config import Settings
import hashlib
import time
class VectorStore:
"""ChromaDB 向量存储"""
def __init__(
self,
persist_dir: str = "./chroma_db",
collection_name: str = "my_notes",
):
self.client = chromadb.PersistentClient(
path=persist_dir,
settings=Settings(anonymized_telemetry=False),
)
self.collection = self.client.get_or_create_collection(
name=collection_name,
metadata={
"hnsw:space": "cosine", # 余弦相似度
"hnsw:M": 16, # HNSW 连接数
"hnsw:construction_ef": 200,
}
)
print(f"Collection '{collection_name}': {self.collection.count()} documents")
def add_chunks(
self,
chunks: list,
embeddings: list,
batch_size: int = 100,
):
"""批量添加 chunks 到向量库"""
total = len(chunks)
added = 0
for i in range(0, total, batch_size):
batch_chunks = chunks[i:i + batch_size]
batch_embeddings = embeddings[i:i + batch_size]
ids = [c.chunk_id for c in batch_chunks]
documents = [c.content for c in batch_chunks]
metadatas = [c.metadata for c in batch_chunks]
self.collection.add(
ids=ids,
embeddings=[e.tolist() for e in batch_embeddings],
documents=documents,
metadatas=metadatas,
)
added += len(batch_chunks)
print(f" Added {added}/{total} chunks...")
print(f"Total documents in collection: {self.collection.count()}")
def search(
self,
query_embedding: list,
n_results: int = 5,
where: dict = None,
) -> dict:
"""相似度搜索"""
kwargs = {
"query_embeddings": [query_embedding],
"n_results": n_results,
"include": ["documents", "metadatas", "distances"],
}
if where:
kwargs["where"] = where
return self.collection.query(**kwargs)
def get_stats(self) -> dict:
"""获取存储统计"""
count = self.collection.count()
sample = self.collection.peek(limit=5)
return {
"total_documents": count,
"sample_ids": sample["ids"][:3] if sample["ids"] else [],
}
def build_index(
documents: list,
embedding_service,
persist_dir: str = "./chroma_db",
) -> VectorStore:
"""完整的索引构建流程"""
from chunker import chunk_all_documents, analyze_chunks
print("=== Step 1: Chunking ===")
chunks = chunk_all_documents(
documents,
chunk_size=512,
chunk_overlap=50,
min_chunk_size=100,
)
analyze_chunks(chunks)
print("\n=== Step 2: Embedding ===")
start = time.time()
texts = [c.content for c in chunks]
embeddings = embedding_service.embed(texts)
elapsed = time.time() - start
print(f"Embedded {len(texts)} chunks in {elapsed:.1f}s "
f"({len(texts)/elapsed:.0f} chunks/s)")
print("\n=== Step 3: Indexing ===")
store = VectorStore(persist_dir=persist_dir)
store.add_chunks(chunks, embeddings)
print("\n=== Done! ===")
print(store.get_stats())
return store
增量更新策略 / Incremental Update
"""
incremental_update.py
增量更新 — 只处理新增/修改的文件
"""
import json
import hashlib
from pathlib import Path
from datetime import datetime
class IndexManager:
"""索引管理器 — 追踪文件变化,支持增量更新"""
STATE_FILE = "index_state.json"
def __init__(self, persist_dir: str = "./chroma_db"):
self.persist_dir = Path(persist_dir)
self.state_path = self.persist_dir / self.STATE_FILE
self.state = self._load_state()
def _load_state(self) -> dict:
"""加载索引状态"""
if self.state_path.exists():
return json.loads(self.state_path.read_text())
return {"files": {}, "last_update": None}
def _save_state(self):
"""保存索引状态"""
self.state["last_update"] = datetime.now().isoformat()
self.state_path.write_text(
json.dumps(self.state, indent=2, ensure_ascii=False)
)
def _file_hash(self, file_path: Path) -> str:
"""计算文件哈希"""
content = file_path.read_bytes()
return hashlib.md5(content).hexdigest()
def get_changed_files(self, files: list[Path]) -> dict:
"""识别新增/修改/删除的文件"""
current_hashes = {
str(f): self._file_hash(f) for f in files
}
old_hashes = self.state.get("files", {})
new_files = [
f for f in files
if str(f) not in old_hashes
]
modified_files = [
f for f in files
if str(f) in old_hashes
and current_hashes[str(f)] != old_hashes[str(f)]
]
deleted_files = [
f for f in old_hashes.keys()
if f not in current_hashes
]
return {
"new": new_files,
"modified": modified_files,
"deleted": deleted_files,
"unchanged": len(files) - len(new_files) - len(modified_files),
}
def update_state(self, files: list[Path]):
"""更新文件状态"""
self.state["files"] = {
str(f): self._file_hash(f) for f in files
}
self._save_state()
# 使用流程:
# 1. 首次: build_index() 全量构建
# 2. 后续: incremental_update() 只处理变化的文件
# 3. 每次更新后保存状态
知识点4:检索与生成 / Retrieval and Generation
相似度搜索 / Similarity Search
"""
retriever.py
检索模块
"""
import numpy as np
class Retriever:
"""RAG 检索器"""
def __init__(self, vector_store, embedding_service):
self.store = vector_store
self.emb = embedding_service
def search(
self,
query: str,
n_results: int = 5,
category: str = None,
min_score: float = 0.3,
) -> list[dict]:
"""搜索相关文档块
Args:
query: 用户查询
n_results: 返回结果数
category: 可选分类过滤
min_score: 最低相关性阈值 (cosine similarity)
"""
# 生成查询向量
query_embedding = self.emb.embed(query)[0].tolist()
# 构建过滤条件
where = None
if category:
where = {"category": category}
# 向量搜索
results = self.store.search(
query_embedding=query_embedding,
n_results=n_results,
where=where,
)
# 格式化结果
formatted = []
for i in range(len(results["ids"][0])):
# ChromaDB 返回的是距离,需要转换为相似度
distance = results["distances"][0][i]
similarity = 1 - distance # cosine distance → similarity
if similarity >= min_score:
formatted.append({
"content": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"similarity": round(similarity, 3),
"id": results["ids"][0][i],
})
return formatted
Hybrid Search 实现 / Hybrid Search
"""
hybrid_retriever.py
混合检索:向量搜索 + 关键词搜索
"""
import re
from collections import Counter
class HybridRetriever:
"""混合检索器 — Day 20 学过的原理
结合两种检索方式:
1. Dense (向量搜索): 语义理解,找意思相近的内容
2. Sparse (关键词搜索): 精确匹配,找包含关键词的内容
为什么需要混合?
- 用户问"什么是ERC-4337" → 关键词"ERC-4337"很重要
- 用户问"如何改善钱包使用体验" → 语义理解更重要
- 混合搜索两种情况都能处理好
"""
def __init__(self, vector_store, embedding_service, all_chunks):
self.vector_store = vector_store
self.emb = embedding_service
self.all_chunks = all_chunks
# 构建关键词倒排索引
self.keyword_index = self._build_keyword_index()
def _build_keyword_index(self) -> dict:
"""构建简单的关键词倒排索引"""
index = {}
for chunk in self.all_chunks:
# 提取关键词(简单的分词)
words = re.findall(
r'[a-zA-Z][\w-]*|[\u4e00-\u9fff]+',
chunk.content.lower()
)
for word in set(words):
if word not in index:
index[word] = []
index[word].append(chunk.chunk_id)
return index
def _keyword_search(
self, query: str, n_results: int = 10
) -> list[tuple[str, float]]:
"""关键词搜索,返回 (chunk_id, score) 列表"""
# 提取查询关键词
query_words = re.findall(
r'[a-zA-Z][\w-]*|[\u4e00-\u9fff]+',
query.lower()
)
# 计算每个 chunk 的关键词匹配分数
scores = Counter()
for word in query_words:
if word in self.keyword_index:
for chunk_id in self.keyword_index[word]:
scores[chunk_id] += 1
# 归一化分数
max_score = max(scores.values()) if scores else 1
results = [
(cid, score / max_score)
for cid, score in scores.most_common(n_results)
]
return results
def search(
self,
query: str,
n_results: int = 5,
dense_weight: float = 0.7,
sparse_weight: float = 0.3,
category: str = None,
) -> list[dict]:
"""混合搜索
Args:
dense_weight: 向量搜索权重 (默认 0.7)
sparse_weight: 关键词搜索权重 (默认 0.3)
"""
# Dense search (向量)
query_embedding = self.emb.embed(query)[0].tolist()
where = {"category": category} if category else None
dense_results = self.vector_store.search(
query_embedding=query_embedding,
n_results=n_results * 2, # 多取一些用于融合
where=where,
)
# Sparse search (关键词)
sparse_results = self._keyword_search(query, n_results * 2)
# 融合分数 (Reciprocal Rank Fusion 简化版)
combined_scores = {}
# Dense scores
for i in range(len(dense_results["ids"][0])):
cid = dense_results["ids"][0][i]
distance = dense_results["distances"][0][i]
score = (1 - distance) * dense_weight
combined_scores[cid] = {
"score": score,
"content": dense_results["documents"][0][i],
"metadata": dense_results["metadatas"][0][i],
"dense_score": 1 - distance,
"sparse_score": 0,
}
# Sparse scores
for cid, sparse_score in sparse_results:
if cid in combined_scores:
combined_scores[cid]["score"] += sparse_score * sparse_weight
combined_scores[cid]["sparse_score"] = sparse_score
else:
# 从 chunks 中找内容
chunk = next(
(c for c in self.all_chunks if c.chunk_id == cid),
None
)
if chunk:
combined_scores[cid] = {
"score": sparse_score * sparse_weight,
"content": chunk.content,
"metadata": chunk.metadata,
"dense_score": 0,
"sparse_score": sparse_score,
}
# 排序并返回 top N
sorted_results = sorted(
combined_scores.items(),
key=lambda x: x[1]["score"],
reverse=True,
)[:n_results]
return [
{
"id": cid,
"content": info["content"],
"metadata": info["metadata"],
"combined_score": round(info["score"], 3),
"dense_score": round(info["dense_score"], 3),
"sparse_score": round(info["sparse_score"], 3),
}
for cid, info in sorted_results
]
Prompt 模板设计 / Prompt Template Design
"""
prompt_templates.py
RAG Prompt 模板 — Day 4 Prompt Engineering 的实际应用
"""
# === System Prompt ===
SYSTEM_PROMPT = """你是一个基于个人学习笔记的AI助手。你的知识来源于用户的笔记文档。
重要规则:
1. 只基于提供的上下文内容回答问题
2. 如果上下文中没有相关信息,明确说"在笔记中未找到相关内容"
3. 回答时引用来源文件,格式: [来源: 文件名]
4. 使用中文回答,关键术语保留英文
5. 保持简洁但完整
你擅长的领域(基于笔记内容):
- AI/LLM 技术深度学习
- 金融零售业务架构
- Web3/DeFi 协议分析
- 软件架构设计"""
# === RAG Prompt 模板 ===
RAG_PROMPT_TEMPLATE = """基于以下上下文内容回答用户的问题。
## 相关上下文
{context}
## 用户问题
{question}
## 要求
- 基于上下文内容回答,不要编造信息
- 引用来源: [来源: 文件名]
- 如果上下文不足以回答,说明你不确定的部分
- 优先使用中文,术语保留英文
## 回答"""
def build_context(search_results: list[dict], max_chars: int = 3000) -> str:
"""构建上下文字符串"""
context_parts = []
total_chars = 0
for i, result in enumerate(search_results):
source = result["metadata"].get("file_path", "unknown")
title = result["metadata"].get("title", "")
score = result.get("combined_score", result.get("similarity", 0))
header = f"### 参考 {i+1} [来源: {source}] (相关度: {score:.2f})"
if title:
header += f"\n标题: {title}"
content = result["content"]
# 控制总长度
entry = f"{header}\n{content}\n"
if total_chars + len(entry) > max_chars:
# 截断
remaining = max_chars - total_chars
if remaining > 200:
entry = entry[:remaining] + "\n[...截断...]"
else:
break
context_parts.append(entry)
total_chars += len(entry)
return "\n---\n".join(context_parts)
def build_rag_prompt(question: str, search_results: list[dict]) -> list[dict]:
"""构建完整的 RAG prompt(OpenAI 消息格式)"""
context = build_context(search_results)
user_message = RAG_PROMPT_TEMPLATE.format(
context=context,
question=question,
)
return [
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": user_message},
]
引用溯源 / Citation Tracking
"""
citation.py
引用溯源 — 让回答可验证
"""
import re
def extract_citations(response: str) -> list[str]:
"""从回答中提取引用的来源"""
pattern = r'\[来源:\s*([^\]]+)\]'
return re.findall(pattern, response)
def format_response_with_citations(
response: str, search_results: list[dict]
) -> str:
"""格式化回答,附加引用信息"""
citations = extract_citations(response)
# 构建引用列表
source_list = []
for i, result in enumerate(search_results):
source = result["metadata"].get("file_path", "unknown")
title = result["metadata"].get("title", "")
score = result.get("combined_score", result.get("similarity", 0))
marker = " *" if source in citations else " "
source_list.append(
f"{marker} [{i+1}] {source}"
f"{' - ' + title if title else ''}"
f" (相关度: {score:.2f})"
)
footer = "\n\n---\n**引用来源:**\n" + "\n".join(source_list)
footer += "\n\n* 标记 = 回答中引用的来源"
return response + footer
知识点5:端到端测试 / End-to-End Testing
准备测试问题 / Test Questions
"""
test_rag.py
端到端测试 RAG 系统
"""
# 10个测试问题(覆盖不同类别和难度)
TEST_QUESTIONS = [
# === 简单检索(答案在单个文档中) ===
{
"id": 1,
"question": "Transformer的自注意力机制是如何工作的?",
"expected_source": "day1-transformer",
"difficulty": "easy",
},
{
"id": 2,
"question": "什么是Q4_K_M量化?它与FP16相比有什么优劣?",
"expected_source": "day2-quantization",
"difficulty": "easy",
},
# === 中等检索(需要综合多个文档) ===
{
"id": 3,
"question": "RAG系统中的检索优化有哪些方法?",
"expected_source": ["day5-rag", "day19-production-rag", "day20-production-rag"],
"difficulty": "medium",
},
{
"id": 4,
"question": "金融风控中如何使用AI/ML技术?",
"expected_source": ["day31-financial-ai", "day34-credit-ai"],
"difficulty": "medium",
},
# === 跨领域(需要关联不同主题的知识) ===
{
"id": 5,
"question": "对比传统金融风控与DeFi协议的风控机制有什么异同?",
"expected_source": ["day31-financial-ai", "web3笔记"],
"difficulty": "hard",
},
{
"id": 6,
"question": "如何设计一个可持续的代币激励机制?",
"expected_source": ["web3笔记中的tokenomics相关"],
"difficulty": "hard",
},
# === 精确检索(需要找到特定信息) ===
{
"id": 7,
"question": "Agent开发中如何处理错误恢复和状态管理?",
"expected_source": "day22-agent-state",
"difficulty": "medium",
},
{
"id": 8,
"question": "LLM应用的成本优化有哪些策略?",
"expected_source": "day26-llm-cost",
"difficulty": "medium",
},
# === 开放式问题 ===
{
"id": 9,
"question": "总结一下AI+Web3的主要产品机会有哪些?",
"expected_source": ["day41-cefi-defi-ai", "day42-ai-fusion"],
"difficulty": "hard",
},
{
"id": 10,
"question": "设计系统面试中,RAG系统架构需要考虑哪些关键点?",
"expected_source": "day44-system-design-rag",
"difficulty": "medium",
},
]
运行测试 / Run Tests
def run_e2e_test(rag_pipeline, questions=TEST_QUESTIONS):
"""运行端到端测试"""
results = []
for q in questions:
print(f"\n{'='*60}")
print(f"Q{q['id']} [{q['difficulty']}]: {q['question']}")
print(f"{'='*60}")
# 检索
search_results = rag_pipeline.retrieve(q["question"])
# 生成
answer = rag_pipeline.generate(q["question"], search_results)
# 记录
result = {
"question_id": q["id"],
"question": q["question"],
"difficulty": q["difficulty"],
"answer": answer,
"sources_found": [
r["metadata"].get("file_path") for r in search_results
],
"expected_source": q["expected_source"],
"top_score": search_results[0]["similarity"] if search_results else 0,
}
results.append(result)
# 打印摘要
print(f"\n回答 (前200字):\n{answer[:200]}...")
print(f"\n检索到的来源:")
for r in search_results[:3]:
print(f" - {r['metadata'].get('file_path')} "
f"(score: {r.get('similarity', 0):.3f})")
return results
评估回答质量 / Quality Assessment
手动评估模板(用于 V1 基线):
对每个回答评分(1-5):
1. 相关性 (Relevance)
5 = 完全回答了问题
3 = 部分回答,有遗漏
1 = 答非所问
2. 准确性 (Accuracy)
5 = 所有信息都准确
3 = 大部分准确,有小错误
1 = 包含明显错误信息
3. 完整性 (Completeness)
5 = 覆盖了所有相关要点
3 = 覆盖了主要要点
1 = 非常不完整
4. 引用质量 (Citation)
5 = 所有引用都正确且有帮助
3 = 有引用但不够准确
1 = 没有引用或引用错误
评估记录表:
┌────┬──────┬──────┬──────┬──────┬──────┬─────────────────────┐
│ Q# │ 相关 │ 准确 │ 完整 │ 引用 │ 均分 │ 问题备注 │
├────┼──────┼──────┼──────┼──────┼──────┼─────────────────────┤
│ 1 │ __/5 │ __/5 │ __/5 │ __/5 │ __/5 │ │
│ 2 │ __/5 │ __/5 │ __/5 │ __/5 │ __/5 │ │
│ ...│ │ │ │ │ │ │
│ 10 │ __/5 │ __/5 │ __/5 │ __/5 │ __/5 │ │
├────┼──────┼──────┼──────┼──────┼──────┼─────────────────────┤
│AVG │ __/5 │ __/5 │ __/5 │ __/5 │ __/5 │ V1 Baseline │
└────┴──────┴──────┴──────┴──────┴──────┴─────────────────────┘
发现问题 / Problem Discovery
V1 常见问题(预计会遇到):
问题1: 检索不准
症状: 问"量化技术",检索到"金融量化交易"
原因: 语义歧义,"量化"在不同领域含义不同
方向: Day 53 用 Reranker 解决
问题2: 回答偏离
症状: 检索到了相关内容,但回答跑偏
原因: Prompt 模板不够好 / 上下文太多噪声
方向: Day 53 优化 Prompt 和 Context 构建
问题3: 幻觉 (Hallucination)
症状: 回答中包含笔记里没有的信息
原因: LLM 用了自身知识而非上下文
方向: Day 53 添加"如果不确定请说不知道"
问题4: 碎片化回答
症状: 只引用了一个 chunk,遗漏了相关内容
原因: 相关信息分散在多个 chunk 中
方向: Day 53 增加检索数量 + Multi-Query
问题5: 速度慢
症状: 从提问到回答需要 10+ 秒
原因: Embedding 计算 + LLM 生成都需要时间
方向: Day 53 添加缓存
记录这些问题!明天 Day 53 专门解决它们。
知识点6:代码架构 / Code Architecture
项目结构 / Project Structure
my-rag-system/
├── config.py # 配置管理
├── document_processor.py # 文档解析
├── chunker.py # 文档分块
├── embedding_service.py # Embedding 服务
├── vector_store.py # 向量存储
├── retriever.py # 检索模块
├── hybrid_retriever.py # 混合检索
├── prompt_templates.py # Prompt 模板
├── citation.py # 引用溯源
├── rag_pipeline.py # 主流程编排
├── test_rag.py # 测试脚本
├── build_index.py # 索引构建入口
├── query.py # 交互式查询入口
├── chroma_db/ # 向量数据库存储
└── logs/ # 日志
核心 Pipeline / Core Pipeline
"""
rag_pipeline.py
RAG 主流程 — 串联所有组件
"""
from openai import OpenAI
from document_processor import MarkdownProcessor
from chunker import chunk_all_documents
from embedding_service import EmbeddingService
from vector_store import VectorStore
from prompt_templates import build_rag_prompt
from citation import format_response_with_citations
class RAGPipeline:
"""RAG 完整流程"""
def __init__(self, config: dict = None):
self.config = config or self._default_config()
# 初始化组件
print("Initializing RAG Pipeline...")
self.embedding = EmbeddingService(
mode=self.config["embedding_mode"],
model_name=self.config["embedding_model"],
)
self.store = VectorStore(
persist_dir=self.config["persist_dir"],
collection_name=self.config["collection_name"],
)
self.llm_client = OpenAI(
base_url=self.config["llm_base_url"],
api_key=self.config["llm_api_key"],
)
print("Pipeline ready!")
def _default_config(self) -> dict:
return {
"embedding_mode": "local",
"embedding_model": "BAAI/bge-m3",
"persist_dir": "./chroma_db",
"collection_name": "my_notes",
"llm_base_url": "http://localhost:11434/v1",
"llm_api_key": "ollama",
"llm_model": "qwen2.5:7b",
"n_results": 5,
"docs_dir": "E:/code/momofinance/momoweb3",
}
def build_index(self):
"""构建索引(首次或重建)"""
processor = MarkdownProcessor(self.config["docs_dir"])
documents = processor.process_all()
chunks = chunk_all_documents(
documents, chunk_size=512, chunk_overlap=50
)
texts = [c.content for c in chunks]
embeddings = self.embedding.embed(texts)
self.store.add_chunks(chunks, embeddings)
print(f"Index built: {len(chunks)} chunks")
def retrieve(self, query: str, n_results: int = None) -> list[dict]:
"""检索相关文档"""
n = n_results or self.config["n_results"]
query_emb = self.embedding.embed(query)[0].tolist()
results = self.store.search(
query_embedding=query_emb,
n_results=n,
)
formatted = []
for i in range(len(results["ids"][0])):
formatted.append({
"content": results["documents"][0][i],
"metadata": results["metadatas"][0][i],
"similarity": round(1 - results["distances"][0][i], 3),
})
return formatted
def generate(self, question: str, context_results: list[dict]) -> str:
"""基于检索结果生成回答"""
messages = build_rag_prompt(question, context_results)
response = self.llm_client.chat.completions.create(
model=self.config["llm_model"],
messages=messages,
temperature=0.3,
max_tokens=1000,
)
return response.choices[0].message.content
def query(self, question: str) -> str:
"""完整的 RAG 查询流程"""
# Step 1: 检索
results = self.retrieve(question)
# Step 2: 生成
answer = self.generate(question, results)
# Step 3: 添加引用
answer_with_citations = format_response_with_citations(
answer, results
)
return answer_with_citations
def interactive(self):
"""交互式查询模式"""
print("\n=== RAG 问答系统 ===")
print("输入问题开始查询,输入 'quit' 退出\n")
while True:
question = input("你的问题: ").strip()
if question.lower() in ("quit", "exit", "q"):
break
if not question:
continue
print("\n检索中...")
answer = self.query(question)
print(f"\n{answer}\n")
print("-" * 60)
# === 入口脚本 ===
if __name__ == "__main__":
import sys
pipeline = RAGPipeline()
if len(sys.argv) > 1 and sys.argv[1] == "build":
pipeline.build_index()
else:
pipeline.interactive()
配置管理 / Configuration
"""
config.py
配置管理
"""
import os
from pathlib import Path
# 项目根目录
PROJECT_ROOT = Path("E:/code/momofinance/momoweb3")
# RAG 配置
RAG_CONFIG = {
# 文档源
"docs_dir": str(PROJECT_ROOT),
"scan_patterns": ["docs/**/*.md", "CLAUDE.md"],
# 分块参数
"chunk_size": 512,
"chunk_overlap": 50,
"min_chunk_size": 100,
# Embedding
"embedding_mode": "local", # "local" or "ollama"
"embedding_model": "BAAI/bge-m3",
# 向量存储
"persist_dir": str(PROJECT_ROOT / "rag_data" / "chroma_db"),
"collection_name": "my_notes",
# LLM
"llm_base_url": "http://localhost:11434/v1",
"llm_api_key": "ollama",
"llm_model": "qwen2.5:7b",
# 检索
"n_results": 5,
"min_score": 0.3,
# 生成
"temperature": 0.3,
"max_tokens": 1000,
}
# 可以通过环境变量覆盖
def get_config() -> dict:
config = RAG_CONFIG.copy()
if os.environ.get("RAG_LLM_MODEL"):
config["llm_model"] = os.environ["RAG_LLM_MODEL"]
if os.environ.get("RAG_EMBEDDING_MODE"):
config["embedding_mode"] = os.environ["RAG_EMBEDDING_MODE"]
return config
今日思考 / Today's Reflections
思考1:理论和实践的差距 / Gap Between Theory and Practice
Day 5 学 RAG 时写的笔记:
"RAG = Retrieval + Augmented + Generation,
先检索再生成,简单直观"
今天实际构建后的感受:
"天哪,光是文档处理就花了2小时"
"Chunk 大小选 512 还是 1024,影响比想象的大"
"中英混合文本的 Embedding 质量不好验证"
"OpenAI 兼容 API 真的省了大量适配工作"
最大的收获:
理论学的是"架构图",实践做的是"砖头和水泥"
Day 19-21 的生产 RAG 笔记,现在每一条都有了切身体会
思考2:自己的数据是最好的测试场景 / Your Own Data = Best Test Case
用公开的 benchmark 测试 RAG,数字好看但没有感觉
用自己的笔记构建 RAG:
- 你知道答案应该是什么 → 能准确评估质量
- 你知道信息在哪个文件 → 能验证检索准确性
- 你知道哪些问题有价值 → 测试更有意义
- 你发现的问题更真实 → 优化方向更明确
而且,这个 RAG 系统之后真的能用:
- 面试前快速复习知识点
- 写文章时查找相关笔记
- 交叉关联不同领域的知识
思考3:MVP 思维的重要性 / MVP Mindset
今天构建的是 V1 — 一个最小可用版本
V1 的特点:
✅ 能工作(问答基本可用)
✅ 可评估(知道哪里不好)
✅ 可迭代(明天优化)
V1 故意没做的:
❌ Reranker(明天加)
❌ Multi-Query(明天加)
❌ 缓存(明天加)
❌ Web UI(Day 58 做)
为什么不一次做完?
因为:
1. 先有基线才能衡量改进
2. 每个优化都需要对比效果
3. 过早优化是万恶之源
这也是 PM 的核心能力:
知道什么先做、什么后做、什么不做
学习资源 / Resources
RAG 实战参考
- LangChain RAG Tutorial: https://python.langchain.com/docs/tutorials/rag/
- ChromaDB 文档: https://docs.trychroma.com/
- BGE-M3 Paper: https://arxiv.org/abs/2402.03216
- Sentence Transformers: https://sbert.net/
分块策略
- LangChain Text Splitters: https://python.langchain.com/docs/how_to/#text-splitters
- Chunking Strategies: https://www.pinecone.io/learn/chunking-strategies/
评估方法
- RAGAS: https://docs.ragas.io/
- RAG Evaluation: https://www.trulens.org/
明日预告 / Tomorrow's Preview
Day 53: RAG进阶 — 评估驱动的持续优化
今天发现的所有问题,明天一一解决:
1. 评估体系搭建
- 构建 Golden QA 集
- RAGAS 自动评估
2. 检索优化
- Chunk Size 对比实验
- 添加 Reranker (BGE-Reranker)
- Hybrid Search 调权
3. 生成优化
- Prompt 迭代 v1→v2→v3
- 减少幻觉
4. 高级特性
- Multi-Query 检索
- 对话式 RAG
5. 性能优化
- 缓存层
- 异步处理
V1 → V2 → V3,每次改进都有数据支撑!
Day 52 完成! 从 400+ 篇笔记到一个可工作的 RAG 问答系统。 这是你的第一个完整 AI 应用——虽然还粗糙,但已经能用了。 明天开始评估和优化,让它真正好用起来!