返回AI笔记
AI Day 52

AI Day 52: 实战(2):RAG系统实战 — 让AI回答你的文档问题

AI Day 52: 实战(2):RAG系统实战 — 让AI回答你的文档问题

2026-05-23

日期: 2026-05-23 | 阶段: 第五阶段 · 动手实战 (Day 51-60) | 主题: RAG System Implementation

学习路径 / Learning Path

AI/LLM 深度技术学习 60天计划
├── 第一阶段:模型基础 (Day 1-15) ✅
├── 第二阶段:工程实践 (Day 16-30) ✅
├── 第三阶段:金融零售AI应用 (Day 31-42) ✅
├── 第四阶段:面试冲刺 (Day 43-50) ✅
└── 第五阶段:动手实战 (Day 51-60)
    ├── Day 51: 本地大模型部署全流程 ✅
    ├── Day 52: RAG系统实战:从文档到问答 ← 你在这里
    ├── Day 53: RAG进阶:评估优化与生产化
    ├── Day 54: LoRA微调实战:训练你的专属模型
    ├── Day 55: Agent开发实战:构建工具调用Agent
    ├── Day 56: MCP Server开发:扩展AI能力边界
    ├── Day 57: 多模态应用:图文理解与文档分析
    ├── Day 58: AI应用全栈开发:前后端集成
    ├── Day 59: 性能调优与成本实战
    └── Day 60: 总结与作品集

核心概念 / Core Concepts

用自己的笔记构建 RAG / Build RAG with Your Own Notes

你在过去几个月积累了大量笔记:

架构120天计划笔记: ~251 篇 (docs/arch/)
Web3 90天学习笔记: ~90 篇 (docs/daily/)
AI 60天学习笔记:   ~52 篇 (docs/ai/)
产品分析文章:      ~10 篇 (docs/)
────────────────────────────────
总计:             ~400+ 篇 Markdown 文件

问题:
  这么多笔记,你真的每篇都记住了吗?
  想找"Day 34讲的信贷风控流程",你翻得到吗?
  想交叉对比"传统风控 vs DeFi风控",你记得在哪几篇里讲过吗?

解决方案:
  构建一个 RAG 系统,让 AI 帮你检索和回答!
  输入: 自然语言问题
  输出: 基于你的笔记的准确回答 + 来源引用

这不仅是一个练习项目——这是一个真正有用的工具:
  面试前: "总结一下我学过的DeFi协议分析框架"
  写文章: "我的笔记里关于Tokenomics有哪些核心观点?"
  复习时: "架构计划中支付系统设计的关键点是什么?"

为什么选择本地 RAG?/ Why Local RAG?

Day 5 学 RAG 架构时是理论
Day 19-21 学生产 RAG 时是方法论
今天是真正的实现

本地 RAG 的优势:
1. 隐私安全 — 你的笔记不会发送到任何外部服务
2. 免费无限 — 查询次数没有成本限制
3. 完全可控 — 可以精确调试每个环节
4. 学习价值 — 亲手实现比看文档学得深 10 倍

今天的目标:
  输入: 400+ 篇 Markdown 笔记
  输出: 一个可工作的 RAG 问答系统
  时间: 6 小时(从0到1)

知识点1:技术选型 / Technology Selection

向量数据库选择 / Vector Database

Day 6 学过向量数据库的原理和对比,今天做实际选择:

候选方案对比:
┌──────────────┬───────────┬───────────┬───────────┬───────────┐
│ 维度         │ ChromaDB  │ LanceDB   │ Qdrant    │ FAISS     │
├──────────────┼───────────┼───────────┼───────────┼───────────┤
│ 部署复杂度   │ pip即可   │ pip即可   │ 需容器    │ pip即可   │
│ 持久化       │ 本地文件  │ 本地文件  │ 需服务    │ 手动保存  │
│ 元数据过滤   │ ✅ 丰富   │ ✅ SQL式  │ ✅ 丰富   │ ❌ 不支持  │
│ 适合规模     │ 10K-1M    │ 1K-10M    │ 100K-100M │ 任意      │
│ API友好度    │ ★★★★★    │ ★★★★     │ ★★★★★    │ ★★       │
│ LangChain    │ ✅ 集成   │ ✅ 集成   │ ✅ 集成   │ ✅ 集成   │
│ 学习曲线     │ 极低      │ 低        │ 中等      │ 低        │
└──────────────┴───────────┴───────────┴───────────┴───────────┘

决定: ChromaDB ✓
原因:
- 400篇文档 → 约2000-5000 chunks → ChromaDB 绰绰有余
- pip install chromadb 一行安装
- 元数据过滤支持好(按日期/标签/阶段筛选)
- LangChain 集成最成熟
- 数据持久化到本地文件夹

Embedding 模型选择 / Embedding Model

候选方案对比:
┌──────────────────┬──────────┬──────────┬──────────┬──────────┐
│ 模型             │ 维度     │ 中文支持 │ 速度     │ 部署     │
├──────────────────┼──────────┼──────────┼──────────┼──────────┤
│ BGE-M3 (本地)    │ 1024     │ ★★★★★  │ 中等     │ 本地GPU  │
│ nomic-embed (本地)│ 768     │ ★★★     │ 快       │ Ollama   │
│ text-embedding-3 │ 3072     │ ★★★★   │ 快       │ 云端API  │
│ BGE-large-zh     │ 1024     │ ★★★★★  │ 中等     │ 本地GPU  │
│ mxbai-embed      │ 1024     │ ★★★     │ 快       │ Ollama   │
└──────────────────┴──────────┴──────────┴──────────┴──────────┘

决定: BGE-M3 (BAAI/bge-m3) ✓
原因:
- 笔记是中英混合的 → 需要多语言支持
- BGE-M3 = Multilingual, Multi-Functionality, Multi-Granularity
- 支持 Dense + Sparse + ColBERT 三种检索模式
- 中文理解能力最强
- 本地运行,隐私安全

备选: 如果显存不够同时跑 Embedding + LLM
- nomic-embed-text via Ollama(占显存少,通过 Ollama 自动管理)

LLM 选择 / LLM Selection

用于生成回答的 LLM:

方案A (推荐开始): 本地 Qwen2.5-7B via Ollama
  ✅ 免费无限调用
  ✅ 中文回答质量好
  ✅ Day 51 已经部署好了
  ⚠️ 复杂问题质量有限

方案B (质量优先): 云端 Claude API
  ✅ 回答质量最高
  ✅ 长上下文能力强
  ⚠️ 需要 API Key
  ⚠️ 有成本

方案C (混合): 检索用本地,复杂问题用云端
  ✅ 平衡质量和成本
  ✅ 最灵活

今天先用方案A(全本地),明天优化时可以切换到方案C

最终技术栈 / Final Tech Stack

技术栈确定:

文档处理: Python + 自定义 Markdown 解析器
分块策略: 递归文本分割(Day 19 学过的)
Embedding: BGE-M3 via sentence-transformers
向量存储: ChromaDB (本地持久化)
LLM生成:  Qwen2.5-7B via Ollama
框架:     LangChain (简化集成)
API:      FastAPI (可选,后续扩展)

安装命令:
pip install chromadb langchain langchain-community
pip install sentence-transformers FlagEmbedding
pip install openai  # 用于 Ollama 的 OpenAI 兼容调用
pip install tiktoken  # Token 计数

知识点2:文档处理 Pipeline / Document Processing Pipeline

Markdown 解析 / Markdown Parsing

"""
document_processor.py
处理 Markdown 笔记文件
"""

import os
import re
from pathlib import Path
from dataclasses import dataclass, field
from datetime import datetime

@dataclass
class Document:
    """表示一个处理后的文档"""
    content: str
    metadata: dict = field(default_factory=dict)
    source: str = ""

    @property
    def token_count(self) -> int:
        """粗略估算 token 数(中文约 1.5 字/token)"""
        cn_chars = len(re.findall(r'[\u4e00-\u9fff]', self.content))
        en_words = len(re.findall(r'[a-zA-Z]+', self.content))
        return int(cn_chars * 1.5 + en_words * 1.3)


class MarkdownProcessor:
    """Markdown 文档处理器"""

    def __init__(self, base_dir: str):
        self.base_dir = Path(base_dir)

    def scan_files(self, patterns: list[str] = None) -> list[Path]:
        """扫描所有 Markdown 文件"""
        if patterns is None:
            patterns = ["docs/**/*.md", "CLAUDE.md"]

        files = []
        for pattern in patterns:
            files.extend(self.base_dir.glob(pattern))

        # 去重并排序
        files = sorted(set(files))
        print(f"Found {len(files)} markdown files")
        return files

    def parse_file(self, file_path: Path) -> Document:
        """解析单个 Markdown 文件,提取内容和元数据"""
        content = file_path.read_text(encoding="utf-8")

        # 提取元数据
        metadata = self._extract_metadata(content, file_path)

        # 清洗内容
        cleaned = self._clean_content(content)

        return Document(
            content=cleaned,
            metadata=metadata,
            source=str(file_path.relative_to(self.base_dir)),
        )

    def _extract_metadata(self, content: str, file_path: Path) -> dict:
        """从文件名和内容中提取元数据"""
        metadata = {
            "file_name": file_path.name,
            "file_path": str(file_path.relative_to(self.base_dir)),
            "category": self._detect_category(file_path),
        }

        # 从文件名提取 Day 编号
        day_match = re.search(r'[Dd]ay\s*(\d+)', file_path.name)
        if day_match:
            metadata["day_number"] = int(day_match.group(1))

        # 从内容提取日期
        date_match = re.search(
            r'\*\*日期\*\*:\s*(\d{4}-\d{2}-\d{2})', content
        )
        if date_match:
            metadata["date"] = date_match.group(1)

        # 从标题提取主题
        title_match = re.search(r'^#\s+(.+)$', content, re.MULTILINE)
        if title_match:
            metadata["title"] = title_match.group(1).strip()

        # 检测阶段
        stage_match = re.search(
            r'第([一二三四五六]|[1-6])阶段', content
        )
        if stage_match:
            metadata["stage"] = stage_match.group(0)

        # 提取标签(从内容关键词)
        tags = self._extract_tags(content)
        if tags:
            metadata["tags"] = ",".join(tags)

        return metadata

    def _detect_category(self, file_path: Path) -> str:
        """根据文件路径检测分类"""
        path_str = str(file_path).lower()

        if "/ai/" in path_str or "\\ai\\" in path_str:
            return "ai_learning"
        elif "/arch/" in path_str or "\\arch\\" in path_str:
            return "architecture"
        elif "/daily/" in path_str or "\\daily\\" in path_str:
            return "web3_daily"
        elif "blog" in path_str:
            return "blog"
        else:
            return "general"

    def _extract_tags(self, content: str) -> list[str]:
        """从内容中提取关键标签"""
        tag_keywords = {
            "DeFi": ["defi", "lending", "dex", "amm", "liquidity"],
            "Security": ["security", "audit", "attack", "vulnerability", "安全"],
            "Tokenomics": ["tokenomics", "token", "vesting", "airdrop"],
            "Architecture": ["architecture", "架构", "系统设计", "system design"],
            "AI": ["llm", "transformer", "rag", "agent", "embedding"],
            "Governance": ["governance", "dao", "voting", "治理"],
            "Risk": ["risk", "风控", "风险", "compliance"],
            "Payment": ["payment", "支付", "settlement"],
        }

        content_lower = content.lower()
        found_tags = []
        for tag, keywords in tag_keywords.items():
            if any(kw in content_lower for kw in keywords):
                found_tags.append(tag)

        return found_tags[:5]  # 最多5个标签

    def _clean_content(self, content: str) -> str:
        """清洗 Markdown 内容"""
        # 移除学习路径树(重复内容,不需要索引)
        content = re.sub(
            r'```\n.*?学习路径.*?```',
            '[学习路径树已省略]',
            content,
            flags=re.DOTALL
        )

        # 保留代码块但标记(用于区分代码和文本)
        # 不要删除代码块——它们包含有价值的示例

        # 移除过长的 ASCII art / 表格分隔线
        content = re.sub(r'[─═┌┐└┘├┤┬┴┼]{10,}', '---', content)
        content = re.sub(r'[-=]{20,}', '---', content)

        # 压缩多余空行
        content = re.sub(r'\n{4,}', '\n\n\n', content)

        return content.strip()

    def process_all(self) -> list[Document]:
        """处理所有文档"""
        files = self.scan_files()
        documents = []

        for f in files:
            try:
                doc = self.parse_file(f)
                if len(doc.content) > 100:  # 过滤太短的文件
                    documents.append(doc)
            except Exception as e:
                print(f"Error processing {f}: {e}")

        print(f"Processed {len(documents)} documents")
        return documents

递归 Chunking 实现 / Recursive Chunking

"""
chunker.py
文档分块 — Day 19 学的理论,今天实现
"""

from dataclasses import dataclass
import re

@dataclass
class Chunk:
    """表示一个文档块"""
    content: str
    metadata: dict
    chunk_id: str
    source: str

    @property
    def display(self) -> str:
        return f"[{self.source}] {self.content[:100]}..."


class RecursiveChunker:
    """递归文本分割器

    Day 19 学到的最佳实践:
    1. 按语义边界分割(标题 > 段落 > 句子 > 字符)
    2. 保持适当的块大小(不太大也不太小)
    3. 添加重叠(overlap)避免信息断裂
    4. 保留元数据(来源、标题、位置)
    """

    def __init__(
        self,
        chunk_size: int = 512,
        chunk_overlap: int = 50,
        min_chunk_size: int = 100,
    ):
        self.chunk_size = chunk_size
        self.chunk_overlap = chunk_overlap
        self.min_chunk_size = min_chunk_size

        # Markdown 的分割层次(从大到小)
        self.separators = [
            "\n## ",      # 二级标题(最大的语义单元)
            "\n### ",     # 三级标题
            "\n#### ",    # 四级标题
            "\n```",      # 代码块边界
            "\n\n",       # 段落
            "\n",         # 行
            "。",          # 中文句号
            ". ",         # 英文句号
            ";",          # 中文分号
            "; ",         # 英文分号
            " ",          # 空格(最后手段)
        ]

    def chunk_document(self, doc) -> list[Chunk]:
        """将一个文档分成多个块"""
        chunks = []
        sections = self._split_by_headers(doc.content)

        for section_title, section_content in sections:
            if not section_content.strip():
                continue

            # 如果 section 够小,直接作为一个 chunk
            if len(section_content) <= self.chunk_size:
                if len(section_content) >= self.min_chunk_size:
                    chunk_meta = {**doc.metadata}
                    if section_title:
                        chunk_meta["section"] = section_title

                    chunks.append(Chunk(
                        content=self._format_chunk(
                            section_title, section_content
                        ),
                        metadata=chunk_meta,
                        chunk_id=f"{doc.source}#{len(chunks)}",
                        source=doc.source,
                    ))
            else:
                # section 太大,递归分割
                sub_chunks = self._recursive_split(
                    section_content,
                    self.separators
                )
                for i, sub in enumerate(sub_chunks):
                    if len(sub) >= self.min_chunk_size:
                        chunk_meta = {**doc.metadata}
                        if section_title:
                            chunk_meta["section"] = section_title
                        chunk_meta["sub_index"] = i

                        chunks.append(Chunk(
                            content=self._format_chunk(
                                section_title, sub
                            ),
                            metadata=chunk_meta,
                            chunk_id=f"{doc.source}#{len(chunks)}",
                            source=doc.source,
                        ))

        return chunks

    def _split_by_headers(
        self, content: str
    ) -> list[tuple[str, str]]:
        """按标题分割,保留标题作为元数据"""
        sections = []
        current_title = ""
        current_content = []

        for line in content.split("\n"):
            if re.match(r'^#{1,4}\s+', line):
                # 保存之前的 section
                if current_content:
                    sections.append(
                        (current_title, "\n".join(current_content))
                    )
                current_title = line.lstrip("#").strip()
                current_content = [line]
            else:
                current_content.append(line)

        # 保存最后一个 section
        if current_content:
            sections.append(
                (current_title, "\n".join(current_content))
            )

        return sections

    def _recursive_split(
        self, text: str, separators: list[str]
    ) -> list[str]:
        """递归分割文本"""
        if len(text) <= self.chunk_size:
            return [text]

        if not separators:
            # 没有分隔符了,硬切
            return self._hard_split(text)

        sep = separators[0]
        remaining_seps = separators[1:]

        parts = text.split(sep)

        if len(parts) == 1:
            # 这个分隔符没有分割效果,尝试下一个
            return self._recursive_split(text, remaining_seps)

        # 合并小块,分割大块
        chunks = []
        current = ""

        for i, part in enumerate(parts):
            # 加回分隔符(除了第一个)
            piece = (sep + part) if i > 0 else part

            if len(current) + len(piece) <= self.chunk_size:
                current += piece
            else:
                if current:
                    chunks.append(current)
                current = piece

        if current:
            chunks.append(current)

        # 对仍然太大的块继续递归
        result = []
        for chunk in chunks:
            if len(chunk) > self.chunk_size:
                result.extend(
                    self._recursive_split(chunk, remaining_seps)
                )
            else:
                result.append(chunk)

        # 添加重叠
        result = self._add_overlap(result)

        return result

    def _hard_split(self, text: str) -> list[str]:
        """硬切分(最后手段)"""
        chunks = []
        for i in range(0, len(text), self.chunk_size - self.chunk_overlap):
            chunk = text[i:i + self.chunk_size]
            if len(chunk) >= self.min_chunk_size:
                chunks.append(chunk)
        return chunks

    def _add_overlap(self, chunks: list[str]) -> list[str]:
        """添加块间重叠"""
        if len(chunks) <= 1 or self.chunk_overlap == 0:
            return chunks

        result = [chunks[0]]
        for i in range(1, len(chunks)):
            # 从上一个块的尾部取 overlap
            prev_tail = chunks[i - 1][-self.chunk_overlap:]
            result.append(prev_tail + chunks[i])

        return result

    def _format_chunk(self, title: str, content: str) -> str:
        """格式化 chunk,添加上下文信息"""
        if title:
            return f"[Section: {title}]\n{content}"
        return content


def chunk_all_documents(documents: list, **kwargs) -> list[Chunk]:
    """分块所有文档"""
    chunker = RecursiveChunker(**kwargs)
    all_chunks = []

    for doc in documents:
        chunks = chunker.chunk_document(doc)
        all_chunks.extend(chunks)

    print(f"Total chunks: {len(all_chunks)}")
    print(f"Avg chunk size: {sum(len(c.content) for c in all_chunks) / len(all_chunks):.0f} chars")

    return all_chunks

Chunk 质量检查 / Chunk Quality Check

"""
chunk_quality.py
检查分块质量
"""

def analyze_chunks(chunks: list) -> dict:
    """分析分块质量"""
    sizes = [len(c.content) for c in chunks]
    categories = {}

    for c in chunks:
        cat = c.metadata.get("category", "unknown")
        categories[cat] = categories.get(cat, 0) + 1

    report = {
        "total_chunks": len(chunks),
        "avg_size": sum(sizes) / len(sizes),
        "min_size": min(sizes),
        "max_size": max(sizes),
        "median_size": sorted(sizes)[len(sizes) // 2],
        "categories": categories,
        "size_distribution": {
            "<200": sum(1 for s in sizes if s < 200),
            "200-500": sum(1 for s in sizes if 200 <= s < 500),
            "500-800": sum(1 for s in sizes if 500 <= s < 800),
            "800-1200": sum(1 for s in sizes if 800 <= s < 1200),
            ">1200": sum(1 for s in sizes if s >= 1200),
        }
    }

    print("=== Chunk Quality Report ===")
    print(f"Total chunks: {report['total_chunks']}")
    print(f"Avg size: {report['avg_size']:.0f} chars")
    print(f"Min/Max: {report['min_size']}/{report['max_size']}")
    print(f"\nSize distribution:")
    for range_name, count in report["size_distribution"].items():
        bar = "█" * (count // 5)
        print(f"  {range_name:>10}: {count:>4} {bar}")
    print(f"\nCategories:")
    for cat, count in sorted(categories.items()):
        print(f"  {cat}: {count}")

    return report

# 质量标准:
# ✅ 平均大小 300-600 chars
# ✅ 没有 <100 chars 的碎片
# ✅ 没有 >2000 chars 的超大块
# ✅ 各类别分布合理

知识点3:索引构建 / Index Building

Embedding 生成 / Embedding Generation

"""
embedding_service.py
Embedding 生成服务
"""

import numpy as np
from typing import Union

class EmbeddingService:
    """Embedding 服务 — 支持本地和 Ollama 两种模式"""

    def __init__(self, mode: str = "local", model_name: str = None):
        """
        Args:
            mode: "local" (sentence-transformers) 或 "ollama"
            model_name: 模型名称
        """
        self.mode = mode

        if mode == "local":
            from sentence_transformers import SentenceTransformer
            self.model_name = model_name or "BAAI/bge-m3"
            print(f"Loading embedding model: {self.model_name}...")
            self.model = SentenceTransformer(self.model_name)
            self.dimension = self.model.get_sentence_embedding_dimension()
            print(f"Loaded! Dimension: {self.dimension}")

        elif mode == "ollama":
            import requests
            self.model_name = model_name or "nomic-embed-text"
            self.ollama_url = "http://localhost:11434/api/embeddings"
            # 获取维度
            test = self._ollama_embed("test")
            self.dimension = len(test)
            print(f"Using Ollama embedding: {self.model_name}, dim={self.dimension}")

    def embed(self, texts: Union[str, list[str]]) -> np.ndarray:
        """生成 Embedding"""
        if isinstance(texts, str):
            texts = [texts]

        if self.mode == "local":
            return self.model.encode(
                texts,
                normalize_embeddings=True,  # L2 归一化
                show_progress_bar=len(texts) > 10,
                batch_size=32,
            )
        elif self.mode == "ollama":
            embeddings = [self._ollama_embed(t) for t in texts]
            return np.array(embeddings)

    def _ollama_embed(self, text: str) -> list[float]:
        """通过 Ollama API 获取 embedding"""
        import requests
        resp = requests.post(self.ollama_url, json={
            "model": self.model_name,
            "prompt": text,
        })
        return resp.json()["embedding"]


# 使用示例
if __name__ == "__main__":
    # 方式1: 本地 BGE-M3(推荐,质量更好)
    emb_service = EmbeddingService(mode="local", model_name="BAAI/bge-m3")

    # 方式2: Ollama(如果显存紧张)
    # emb_service = EmbeddingService(mode="ollama")

    # 测试
    texts = [
        "DeFi借贷协议的清算机制如何运作",
        "How does liquidation work in DeFi lending",
        "智能合约安全审计的关键步骤",
    ]

    vectors = emb_service.embed(texts)
    print(f"Shape: {vectors.shape}")  # (3, 1024)

    # 计算相似度
    from numpy import dot
    sim_01 = dot(vectors[0], vectors[1])  # 中英同义,应该高
    sim_02 = dot(vectors[0], vectors[2])  # 不同主题,应该低
    print(f"中英同义相似度: {sim_01:.3f}")  # 预期 > 0.8
    print(f"不同主题相似度: {sim_02:.3f}")  # 预期 < 0.5

向量库初始化与索引 / Vector Store Setup

"""
vector_store.py
ChromaDB 向量存储管理
"""

import chromadb
from chromadb.config import Settings
import hashlib
import time

class VectorStore:
    """ChromaDB 向量存储"""

    def __init__(
        self,
        persist_dir: str = "./chroma_db",
        collection_name: str = "my_notes",
    ):
        self.client = chromadb.PersistentClient(
            path=persist_dir,
            settings=Settings(anonymized_telemetry=False),
        )

        self.collection = self.client.get_or_create_collection(
            name=collection_name,
            metadata={
                "hnsw:space": "cosine",  # 余弦相似度
                "hnsw:M": 16,            # HNSW 连接数
                "hnsw:construction_ef": 200,
            }
        )

        print(f"Collection '{collection_name}': {self.collection.count()} documents")

    def add_chunks(
        self,
        chunks: list,
        embeddings: list,
        batch_size: int = 100,
    ):
        """批量添加 chunks 到向量库"""
        total = len(chunks)
        added = 0

        for i in range(0, total, batch_size):
            batch_chunks = chunks[i:i + batch_size]
            batch_embeddings = embeddings[i:i + batch_size]

            ids = [c.chunk_id for c in batch_chunks]
            documents = [c.content for c in batch_chunks]
            metadatas = [c.metadata for c in batch_chunks]

            self.collection.add(
                ids=ids,
                embeddings=[e.tolist() for e in batch_embeddings],
                documents=documents,
                metadatas=metadatas,
            )

            added += len(batch_chunks)
            print(f"  Added {added}/{total} chunks...")

        print(f"Total documents in collection: {self.collection.count()}")

    def search(
        self,
        query_embedding: list,
        n_results: int = 5,
        where: dict = None,
    ) -> dict:
        """相似度搜索"""
        kwargs = {
            "query_embeddings": [query_embedding],
            "n_results": n_results,
            "include": ["documents", "metadatas", "distances"],
        }
        if where:
            kwargs["where"] = where

        return self.collection.query(**kwargs)

    def get_stats(self) -> dict:
        """获取存储统计"""
        count = self.collection.count()
        sample = self.collection.peek(limit=5)

        return {
            "total_documents": count,
            "sample_ids": sample["ids"][:3] if sample["ids"] else [],
        }


def build_index(
    documents: list,
    embedding_service,
    persist_dir: str = "./chroma_db",
) -> VectorStore:
    """完整的索引构建流程"""
    from chunker import chunk_all_documents, analyze_chunks

    print("=== Step 1: Chunking ===")
    chunks = chunk_all_documents(
        documents,
        chunk_size=512,
        chunk_overlap=50,
        min_chunk_size=100,
    )
    analyze_chunks(chunks)

    print("\n=== Step 2: Embedding ===")
    start = time.time()
    texts = [c.content for c in chunks]
    embeddings = embedding_service.embed(texts)
    elapsed = time.time() - start
    print(f"Embedded {len(texts)} chunks in {elapsed:.1f}s "
          f"({len(texts)/elapsed:.0f} chunks/s)")

    print("\n=== Step 3: Indexing ===")
    store = VectorStore(persist_dir=persist_dir)
    store.add_chunks(chunks, embeddings)

    print("\n=== Done! ===")
    print(store.get_stats())

    return store

增量更新策略 / Incremental Update

"""
incremental_update.py
增量更新 — 只处理新增/修改的文件
"""

import json
import hashlib
from pathlib import Path
from datetime import datetime

class IndexManager:
    """索引管理器 — 追踪文件变化,支持增量更新"""

    STATE_FILE = "index_state.json"

    def __init__(self, persist_dir: str = "./chroma_db"):
        self.persist_dir = Path(persist_dir)
        self.state_path = self.persist_dir / self.STATE_FILE
        self.state = self._load_state()

    def _load_state(self) -> dict:
        """加载索引状态"""
        if self.state_path.exists():
            return json.loads(self.state_path.read_text())
        return {"files": {}, "last_update": None}

    def _save_state(self):
        """保存索引状态"""
        self.state["last_update"] = datetime.now().isoformat()
        self.state_path.write_text(
            json.dumps(self.state, indent=2, ensure_ascii=False)
        )

    def _file_hash(self, file_path: Path) -> str:
        """计算文件哈希"""
        content = file_path.read_bytes()
        return hashlib.md5(content).hexdigest()

    def get_changed_files(self, files: list[Path]) -> dict:
        """识别新增/修改/删除的文件"""
        current_hashes = {
            str(f): self._file_hash(f) for f in files
        }

        old_hashes = self.state.get("files", {})

        new_files = [
            f for f in files
            if str(f) not in old_hashes
        ]
        modified_files = [
            f for f in files
            if str(f) in old_hashes
            and current_hashes[str(f)] != old_hashes[str(f)]
        ]
        deleted_files = [
            f for f in old_hashes.keys()
            if f not in current_hashes
        ]

        return {
            "new": new_files,
            "modified": modified_files,
            "deleted": deleted_files,
            "unchanged": len(files) - len(new_files) - len(modified_files),
        }

    def update_state(self, files: list[Path]):
        """更新文件状态"""
        self.state["files"] = {
            str(f): self._file_hash(f) for f in files
        }
        self._save_state()

# 使用流程:
# 1. 首次: build_index() 全量构建
# 2. 后续: incremental_update() 只处理变化的文件
# 3. 每次更新后保存状态

知识点4:检索与生成 / Retrieval and Generation

"""
retriever.py
检索模块
"""

import numpy as np

class Retriever:
    """RAG 检索器"""

    def __init__(self, vector_store, embedding_service):
        self.store = vector_store
        self.emb = embedding_service

    def search(
        self,
        query: str,
        n_results: int = 5,
        category: str = None,
        min_score: float = 0.3,
    ) -> list[dict]:
        """搜索相关文档块

        Args:
            query: 用户查询
            n_results: 返回结果数
            category: 可选分类过滤
            min_score: 最低相关性阈值 (cosine similarity)
        """
        # 生成查询向量
        query_embedding = self.emb.embed(query)[0].tolist()

        # 构建过滤条件
        where = None
        if category:
            where = {"category": category}

        # 向量搜索
        results = self.store.search(
            query_embedding=query_embedding,
            n_results=n_results,
            where=where,
        )

        # 格式化结果
        formatted = []
        for i in range(len(results["ids"][0])):
            # ChromaDB 返回的是距离,需要转换为相似度
            distance = results["distances"][0][i]
            similarity = 1 - distance  # cosine distance → similarity

            if similarity >= min_score:
                formatted.append({
                    "content": results["documents"][0][i],
                    "metadata": results["metadatas"][0][i],
                    "similarity": round(similarity, 3),
                    "id": results["ids"][0][i],
                })

        return formatted
"""
hybrid_retriever.py
混合检索:向量搜索 + 关键词搜索
"""

import re
from collections import Counter

class HybridRetriever:
    """混合检索器 — Day 20 学过的原理

    结合两种检索方式:
    1. Dense (向量搜索): 语义理解,找意思相近的内容
    2. Sparse (关键词搜索): 精确匹配,找包含关键词的内容

    为什么需要混合?
    - 用户问"什么是ERC-4337" → 关键词"ERC-4337"很重要
    - 用户问"如何改善钱包使用体验" → 语义理解更重要
    - 混合搜索两种情况都能处理好
    """

    def __init__(self, vector_store, embedding_service, all_chunks):
        self.vector_store = vector_store
        self.emb = embedding_service
        self.all_chunks = all_chunks

        # 构建关键词倒排索引
        self.keyword_index = self._build_keyword_index()

    def _build_keyword_index(self) -> dict:
        """构建简单的关键词倒排索引"""
        index = {}
        for chunk in self.all_chunks:
            # 提取关键词(简单的分词)
            words = re.findall(
                r'[a-zA-Z][\w-]*|[\u4e00-\u9fff]+',
                chunk.content.lower()
            )
            for word in set(words):
                if word not in index:
                    index[word] = []
                index[word].append(chunk.chunk_id)
        return index

    def _keyword_search(
        self, query: str, n_results: int = 10
    ) -> list[tuple[str, float]]:
        """关键词搜索,返回 (chunk_id, score) 列表"""
        # 提取查询关键词
        query_words = re.findall(
            r'[a-zA-Z][\w-]*|[\u4e00-\u9fff]+',
            query.lower()
        )

        # 计算每个 chunk 的关键词匹配分数
        scores = Counter()
        for word in query_words:
            if word in self.keyword_index:
                for chunk_id in self.keyword_index[word]:
                    scores[chunk_id] += 1

        # 归一化分数
        max_score = max(scores.values()) if scores else 1
        results = [
            (cid, score / max_score)
            for cid, score in scores.most_common(n_results)
        ]

        return results

    def search(
        self,
        query: str,
        n_results: int = 5,
        dense_weight: float = 0.7,
        sparse_weight: float = 0.3,
        category: str = None,
    ) -> list[dict]:
        """混合搜索

        Args:
            dense_weight: 向量搜索权重 (默认 0.7)
            sparse_weight: 关键词搜索权重 (默认 0.3)
        """
        # Dense search (向量)
        query_embedding = self.emb.embed(query)[0].tolist()
        where = {"category": category} if category else None

        dense_results = self.vector_store.search(
            query_embedding=query_embedding,
            n_results=n_results * 2,  # 多取一些用于融合
            where=where,
        )

        # Sparse search (关键词)
        sparse_results = self._keyword_search(query, n_results * 2)

        # 融合分数 (Reciprocal Rank Fusion 简化版)
        combined_scores = {}

        # Dense scores
        for i in range(len(dense_results["ids"][0])):
            cid = dense_results["ids"][0][i]
            distance = dense_results["distances"][0][i]
            score = (1 - distance) * dense_weight
            combined_scores[cid] = {
                "score": score,
                "content": dense_results["documents"][0][i],
                "metadata": dense_results["metadatas"][0][i],
                "dense_score": 1 - distance,
                "sparse_score": 0,
            }

        # Sparse scores
        for cid, sparse_score in sparse_results:
            if cid in combined_scores:
                combined_scores[cid]["score"] += sparse_score * sparse_weight
                combined_scores[cid]["sparse_score"] = sparse_score
            else:
                # 从 chunks 中找内容
                chunk = next(
                    (c for c in self.all_chunks if c.chunk_id == cid),
                    None
                )
                if chunk:
                    combined_scores[cid] = {
                        "score": sparse_score * sparse_weight,
                        "content": chunk.content,
                        "metadata": chunk.metadata,
                        "dense_score": 0,
                        "sparse_score": sparse_score,
                    }

        # 排序并返回 top N
        sorted_results = sorted(
            combined_scores.items(),
            key=lambda x: x[1]["score"],
            reverse=True,
        )[:n_results]

        return [
            {
                "id": cid,
                "content": info["content"],
                "metadata": info["metadata"],
                "combined_score": round(info["score"], 3),
                "dense_score": round(info["dense_score"], 3),
                "sparse_score": round(info["sparse_score"], 3),
            }
            for cid, info in sorted_results
        ]

Prompt 模板设计 / Prompt Template Design

"""
prompt_templates.py
RAG Prompt 模板 — Day 4 Prompt Engineering 的实际应用
"""

# === System Prompt ===
SYSTEM_PROMPT = """你是一个基于个人学习笔记的AI助手。你的知识来源于用户的笔记文档。

重要规则:
1. 只基于提供的上下文内容回答问题
2. 如果上下文中没有相关信息,明确说"在笔记中未找到相关内容"
3. 回答时引用来源文件,格式: [来源: 文件名]
4. 使用中文回答,关键术语保留英文
5. 保持简洁但完整

你擅长的领域(基于笔记内容):
- AI/LLM 技术深度学习
- 金融零售业务架构
- Web3/DeFi 协议分析
- 软件架构设计"""

# === RAG Prompt 模板 ===
RAG_PROMPT_TEMPLATE = """基于以下上下文内容回答用户的问题。

## 相关上下文

{context}

## 用户问题

{question}

## 要求
- 基于上下文内容回答,不要编造信息
- 引用来源: [来源: 文件名]
- 如果上下文不足以回答,说明你不确定的部分
- 优先使用中文,术语保留英文

## 回答"""

def build_context(search_results: list[dict], max_chars: int = 3000) -> str:
    """构建上下文字符串"""
    context_parts = []
    total_chars = 0

    for i, result in enumerate(search_results):
        source = result["metadata"].get("file_path", "unknown")
        title = result["metadata"].get("title", "")
        score = result.get("combined_score", result.get("similarity", 0))

        header = f"### 参考 {i+1} [来源: {source}] (相关度: {score:.2f})"
        if title:
            header += f"\n标题: {title}"

        content = result["content"]

        # 控制总长度
        entry = f"{header}\n{content}\n"
        if total_chars + len(entry) > max_chars:
            # 截断
            remaining = max_chars - total_chars
            if remaining > 200:
                entry = entry[:remaining] + "\n[...截断...]"
            else:
                break

        context_parts.append(entry)
        total_chars += len(entry)

    return "\n---\n".join(context_parts)


def build_rag_prompt(question: str, search_results: list[dict]) -> list[dict]:
    """构建完整的 RAG prompt(OpenAI 消息格式)"""
    context = build_context(search_results)

    user_message = RAG_PROMPT_TEMPLATE.format(
        context=context,
        question=question,
    )

    return [
        {"role": "system", "content": SYSTEM_PROMPT},
        {"role": "user", "content": user_message},
    ]

引用溯源 / Citation Tracking

"""
citation.py
引用溯源 — 让回答可验证
"""

import re

def extract_citations(response: str) -> list[str]:
    """从回答中提取引用的来源"""
    pattern = r'\[来源:\s*([^\]]+)\]'
    return re.findall(pattern, response)

def format_response_with_citations(
    response: str, search_results: list[dict]
) -> str:
    """格式化回答,附加引用信息"""
    citations = extract_citations(response)

    # 构建引用列表
    source_list = []
    for i, result in enumerate(search_results):
        source = result["metadata"].get("file_path", "unknown")
        title = result["metadata"].get("title", "")
        score = result.get("combined_score", result.get("similarity", 0))

        marker = "  *" if source in citations else "   "
        source_list.append(
            f"{marker} [{i+1}] {source}"
            f"{' - ' + title if title else ''}"
            f" (相关度: {score:.2f})"
        )

    footer = "\n\n---\n**引用来源:**\n" + "\n".join(source_list)
    footer += "\n\n* 标记 = 回答中引用的来源"

    return response + footer

知识点5:端到端测试 / End-to-End Testing

准备测试问题 / Test Questions

"""
test_rag.py
端到端测试 RAG 系统
"""

# 10个测试问题(覆盖不同类别和难度)
TEST_QUESTIONS = [
    # === 简单检索(答案在单个文档中) ===
    {
        "id": 1,
        "question": "Transformer的自注意力机制是如何工作的?",
        "expected_source": "day1-transformer",
        "difficulty": "easy",
    },
    {
        "id": 2,
        "question": "什么是Q4_K_M量化?它与FP16相比有什么优劣?",
        "expected_source": "day2-quantization",
        "difficulty": "easy",
    },

    # === 中等检索(需要综合多个文档) ===
    {
        "id": 3,
        "question": "RAG系统中的检索优化有哪些方法?",
        "expected_source": ["day5-rag", "day19-production-rag", "day20-production-rag"],
        "difficulty": "medium",
    },
    {
        "id": 4,
        "question": "金融风控中如何使用AI/ML技术?",
        "expected_source": ["day31-financial-ai", "day34-credit-ai"],
        "difficulty": "medium",
    },

    # === 跨领域(需要关联不同主题的知识) ===
    {
        "id": 5,
        "question": "对比传统金融风控与DeFi协议的风控机制有什么异同?",
        "expected_source": ["day31-financial-ai", "web3笔记"],
        "difficulty": "hard",
    },
    {
        "id": 6,
        "question": "如何设计一个可持续的代币激励机制?",
        "expected_source": ["web3笔记中的tokenomics相关"],
        "difficulty": "hard",
    },

    # === 精确检索(需要找到特定信息) ===
    {
        "id": 7,
        "question": "Agent开发中如何处理错误恢复和状态管理?",
        "expected_source": "day22-agent-state",
        "difficulty": "medium",
    },
    {
        "id": 8,
        "question": "LLM应用的成本优化有哪些策略?",
        "expected_source": "day26-llm-cost",
        "difficulty": "medium",
    },

    # === 开放式问题 ===
    {
        "id": 9,
        "question": "总结一下AI+Web3的主要产品机会有哪些?",
        "expected_source": ["day41-cefi-defi-ai", "day42-ai-fusion"],
        "difficulty": "hard",
    },
    {
        "id": 10,
        "question": "设计系统面试中,RAG系统架构需要考虑哪些关键点?",
        "expected_source": "day44-system-design-rag",
        "difficulty": "medium",
    },
]

运行测试 / Run Tests

def run_e2e_test(rag_pipeline, questions=TEST_QUESTIONS):
    """运行端到端测试"""
    results = []

    for q in questions:
        print(f"\n{'='*60}")
        print(f"Q{q['id']} [{q['difficulty']}]: {q['question']}")
        print(f"{'='*60}")

        # 检索
        search_results = rag_pipeline.retrieve(q["question"])

        # 生成
        answer = rag_pipeline.generate(q["question"], search_results)

        # 记录
        result = {
            "question_id": q["id"],
            "question": q["question"],
            "difficulty": q["difficulty"],
            "answer": answer,
            "sources_found": [
                r["metadata"].get("file_path") for r in search_results
            ],
            "expected_source": q["expected_source"],
            "top_score": search_results[0]["similarity"] if search_results else 0,
        }
        results.append(result)

        # 打印摘要
        print(f"\n回答 (前200字):\n{answer[:200]}...")
        print(f"\n检索到的来源:")
        for r in search_results[:3]:
            print(f"  - {r['metadata'].get('file_path')} "
                  f"(score: {r.get('similarity', 0):.3f})")

    return results

评估回答质量 / Quality Assessment

手动评估模板(用于 V1 基线):

对每个回答评分(1-5):

1. 相关性 (Relevance)
   5 = 完全回答了问题
   3 = 部分回答,有遗漏
   1 = 答非所问

2. 准确性 (Accuracy)
   5 = 所有信息都准确
   3 = 大部分准确,有小错误
   1 = 包含明显错误信息

3. 完整性 (Completeness)
   5 = 覆盖了所有相关要点
   3 = 覆盖了主要要点
   1 = 非常不完整

4. 引用质量 (Citation)
   5 = 所有引用都正确且有帮助
   3 = 有引用但不够准确
   1 = 没有引用或引用错误

评估记录表:
┌────┬──────┬──────┬──────┬──────┬──────┬─────────────────────┐
│ Q# │ 相关 │ 准确 │ 完整 │ 引用 │ 均分 │ 问题备注            │
├────┼──────┼──────┼──────┼──────┼──────┼─────────────────────┤
│ 1  │ __/5 │ __/5 │ __/5 │ __/5 │ __/5 │                     │
│ 2  │ __/5 │ __/5 │ __/5 │ __/5 │ __/5 │                     │
│ ...│      │      │      │      │      │                     │
│ 10 │ __/5 │ __/5 │ __/5 │ __/5 │ __/5 │                     │
├────┼──────┼──────┼──────┼──────┼──────┼─────────────────────┤
│AVG │ __/5 │ __/5 │ __/5 │ __/5 │ __/5 │ V1 Baseline         │
└────┴──────┴──────┴──────┴──────┴──────┴─────────────────────┘

发现问题 / Problem Discovery

V1 常见问题(预计会遇到):

问题1: 检索不准
  症状: 问"量化技术",检索到"金融量化交易"
  原因: 语义歧义,"量化"在不同领域含义不同
  方向: Day 53 用 Reranker 解决

问题2: 回答偏离
  症状: 检索到了相关内容,但回答跑偏
  原因: Prompt 模板不够好 / 上下文太多噪声
  方向: Day 53 优化 Prompt 和 Context 构建

问题3: 幻觉 (Hallucination)
  症状: 回答中包含笔记里没有的信息
  原因: LLM 用了自身知识而非上下文
  方向: Day 53 添加"如果不确定请说不知道"

问题4: 碎片化回答
  症状: 只引用了一个 chunk,遗漏了相关内容
  原因: 相关信息分散在多个 chunk 中
  方向: Day 53 增加检索数量 + Multi-Query

问题5: 速度慢
  症状: 从提问到回答需要 10+ 秒
  原因: Embedding 计算 + LLM 生成都需要时间
  方向: Day 53 添加缓存

记录这些问题!明天 Day 53 专门解决它们。

知识点6:代码架构 / Code Architecture

项目结构 / Project Structure

my-rag-system/
├── config.py              # 配置管理
├── document_processor.py  # 文档解析
├── chunker.py             # 文档分块
├── embedding_service.py   # Embedding 服务
├── vector_store.py        # 向量存储
├── retriever.py           # 检索模块
├── hybrid_retriever.py    # 混合检索
├── prompt_templates.py    # Prompt 模板
├── citation.py            # 引用溯源
├── rag_pipeline.py        # 主流程编排
├── test_rag.py            # 测试脚本
├── build_index.py         # 索引构建入口
├── query.py               # 交互式查询入口
├── chroma_db/             # 向量数据库存储
└── logs/                  # 日志

核心 Pipeline / Core Pipeline

"""
rag_pipeline.py
RAG 主流程 — 串联所有组件
"""

from openai import OpenAI
from document_processor import MarkdownProcessor
from chunker import chunk_all_documents
from embedding_service import EmbeddingService
from vector_store import VectorStore
from prompt_templates import build_rag_prompt
from citation import format_response_with_citations

class RAGPipeline:
    """RAG 完整流程"""

    def __init__(self, config: dict = None):
        self.config = config or self._default_config()

        # 初始化组件
        print("Initializing RAG Pipeline...")

        self.embedding = EmbeddingService(
            mode=self.config["embedding_mode"],
            model_name=self.config["embedding_model"],
        )

        self.store = VectorStore(
            persist_dir=self.config["persist_dir"],
            collection_name=self.config["collection_name"],
        )

        self.llm_client = OpenAI(
            base_url=self.config["llm_base_url"],
            api_key=self.config["llm_api_key"],
        )

        print("Pipeline ready!")

    def _default_config(self) -> dict:
        return {
            "embedding_mode": "local",
            "embedding_model": "BAAI/bge-m3",
            "persist_dir": "./chroma_db",
            "collection_name": "my_notes",
            "llm_base_url": "http://localhost:11434/v1",
            "llm_api_key": "ollama",
            "llm_model": "qwen2.5:7b",
            "n_results": 5,
            "docs_dir": "E:/code/momofinance/momoweb3",
        }

    def build_index(self):
        """构建索引(首次或重建)"""
        processor = MarkdownProcessor(self.config["docs_dir"])
        documents = processor.process_all()

        chunks = chunk_all_documents(
            documents, chunk_size=512, chunk_overlap=50
        )

        texts = [c.content for c in chunks]
        embeddings = self.embedding.embed(texts)

        self.store.add_chunks(chunks, embeddings)
        print(f"Index built: {len(chunks)} chunks")

    def retrieve(self, query: str, n_results: int = None) -> list[dict]:
        """检索相关文档"""
        n = n_results or self.config["n_results"]
        query_emb = self.embedding.embed(query)[0].tolist()

        results = self.store.search(
            query_embedding=query_emb,
            n_results=n,
        )

        formatted = []
        for i in range(len(results["ids"][0])):
            formatted.append({
                "content": results["documents"][0][i],
                "metadata": results["metadatas"][0][i],
                "similarity": round(1 - results["distances"][0][i], 3),
            })

        return formatted

    def generate(self, question: str, context_results: list[dict]) -> str:
        """基于检索结果生成回答"""
        messages = build_rag_prompt(question, context_results)

        response = self.llm_client.chat.completions.create(
            model=self.config["llm_model"],
            messages=messages,
            temperature=0.3,
            max_tokens=1000,
        )

        return response.choices[0].message.content

    def query(self, question: str) -> str:
        """完整的 RAG 查询流程"""
        # Step 1: 检索
        results = self.retrieve(question)

        # Step 2: 生成
        answer = self.generate(question, results)

        # Step 3: 添加引用
        answer_with_citations = format_response_with_citations(
            answer, results
        )

        return answer_with_citations

    def interactive(self):
        """交互式查询模式"""
        print("\n=== RAG 问答系统 ===")
        print("输入问题开始查询,输入 'quit' 退出\n")

        while True:
            question = input("你的问题: ").strip()
            if question.lower() in ("quit", "exit", "q"):
                break
            if not question:
                continue

            print("\n检索中...")
            answer = self.query(question)
            print(f"\n{answer}\n")
            print("-" * 60)


# === 入口脚本 ===
if __name__ == "__main__":
    import sys

    pipeline = RAGPipeline()

    if len(sys.argv) > 1 and sys.argv[1] == "build":
        pipeline.build_index()
    else:
        pipeline.interactive()

配置管理 / Configuration

"""
config.py
配置管理
"""

import os
from pathlib import Path

# 项目根目录
PROJECT_ROOT = Path("E:/code/momofinance/momoweb3")

# RAG 配置
RAG_CONFIG = {
    # 文档源
    "docs_dir": str(PROJECT_ROOT),
    "scan_patterns": ["docs/**/*.md", "CLAUDE.md"],

    # 分块参数
    "chunk_size": 512,
    "chunk_overlap": 50,
    "min_chunk_size": 100,

    # Embedding
    "embedding_mode": "local",      # "local" or "ollama"
    "embedding_model": "BAAI/bge-m3",

    # 向量存储
    "persist_dir": str(PROJECT_ROOT / "rag_data" / "chroma_db"),
    "collection_name": "my_notes",

    # LLM
    "llm_base_url": "http://localhost:11434/v1",
    "llm_api_key": "ollama",
    "llm_model": "qwen2.5:7b",

    # 检索
    "n_results": 5,
    "min_score": 0.3,

    # 生成
    "temperature": 0.3,
    "max_tokens": 1000,
}

# 可以通过环境变量覆盖
def get_config() -> dict:
    config = RAG_CONFIG.copy()

    if os.environ.get("RAG_LLM_MODEL"):
        config["llm_model"] = os.environ["RAG_LLM_MODEL"]

    if os.environ.get("RAG_EMBEDDING_MODE"):
        config["embedding_mode"] = os.environ["RAG_EMBEDDING_MODE"]

    return config

今日思考 / Today's Reflections

思考1:理论和实践的差距 / Gap Between Theory and Practice

Day 5 学 RAG 时写的笔记:
  "RAG = Retrieval + Augmented + Generation,
   先检索再生成,简单直观"

今天实际构建后的感受:
  "天哪,光是文档处理就花了2小时"
  "Chunk 大小选 512 还是 1024,影响比想象的大"
  "中英混合文本的 Embedding 质量不好验证"
  "OpenAI 兼容 API 真的省了大量适配工作"

最大的收获:
  理论学的是"架构图",实践做的是"砖头和水泥"
  Day 19-21 的生产 RAG 笔记,现在每一条都有了切身体会

思考2:自己的数据是最好的测试场景 / Your Own Data = Best Test Case

用公开的 benchmark 测试 RAG,数字好看但没有感觉

用自己的笔记构建 RAG:
- 你知道答案应该是什么 → 能准确评估质量
- 你知道信息在哪个文件 → 能验证检索准确性
- 你知道哪些问题有价值 → 测试更有意义
- 你发现的问题更真实 → 优化方向更明确

而且,这个 RAG 系统之后真的能用:
- 面试前快速复习知识点
- 写文章时查找相关笔记
- 交叉关联不同领域的知识

思考3:MVP 思维的重要性 / MVP Mindset

今天构建的是 V1 — 一个最小可用版本

V1 的特点:
✅ 能工作(问答基本可用)
✅ 可评估(知道哪里不好)
✅ 可迭代(明天优化)

V1 故意没做的:
❌ Reranker(明天加)
❌ Multi-Query(明天加)
❌ 缓存(明天加)
❌ Web UI(Day 58 做)

为什么不一次做完?
因为:
1. 先有基线才能衡量改进
2. 每个优化都需要对比效果
3. 过早优化是万恶之源

这也是 PM 的核心能力:
  知道什么先做、什么后做、什么不做

学习资源 / Resources

RAG 实战参考

分块策略

评估方法


明日预告 / Tomorrow's Preview

Day 53: RAG进阶 — 评估驱动的持续优化

今天发现的所有问题,明天一一解决:

1. 评估体系搭建
   - 构建 Golden QA 集
   - RAGAS 自动评估

2. 检索优化
   - Chunk Size 对比实验
   - 添加 Reranker (BGE-Reranker)
   - Hybrid Search 调权

3. 生成优化
   - Prompt 迭代 v1→v2→v3
   - 减少幻觉

4. 高级特性
   - Multi-Query 检索
   - 对话式 RAG

5. 性能优化
   - 缓存层
   - 异步处理

V1 → V2 → V3,每次改进都有数据支撑!

Day 52 完成! 从 400+ 篇笔记到一个可工作的 RAG 问答系统。 这是你的第一个完整 AI 应用——虽然还粗糙,但已经能用了。 明天开始评估和优化,让它真正好用起来!