返回AI笔记
AI Day 53

AI Day 53: 实战(3):RAG进阶 — 评估驱动的持续优化

AI Day 53: 实战(3):RAG进阶 — 评估驱动的持续优化

2026-05-24

日期: 2026-05-24 | 阶段: 第五阶段 · 动手实战 (Day 51-60) | 主题: RAG Evaluation & Optimization

学习路径 / Learning Path

AI/LLM 深度技术学习 60天计划
├── 第一阶段:模型基础 (Day 1-15) ✅
├── 第二阶段:工程实践 (Day 16-30) ✅
├── 第三阶段:金融零售AI应用 (Day 31-42) ✅
├── 第四阶段:面试冲刺 (Day 43-50) ✅
└── 第五阶段:动手实战 (Day 51-60)
    ├── Day 51: 本地大模型部署全流程 ✅
    ├── Day 52: RAG系统实战:从文档到问答 ✅
    ├── Day 53: RAG进阶:评估优化与生产化 ← 你在这里
    ├── Day 54: LoRA微调实战:训练你的专属模型
    ├── Day 55: Agent开发实战:构建工具调用Agent
    ├── Day 56: MCP Server开发:扩展AI能力边界
    ├── Day 57: 多模态应用:图文理解与文档分析
    ├── Day 58: AI应用全栈开发:前后端集成
    ├── Day 59: 性能调优与成本实战
    └── Day 60: 总结与作品集

核心概念 / Core Concepts

RAG V1 只是起点 / V1 is Just the Beginning

昨天 Day 52 构建的 V1 RAG 系统:

V1 现状:
├── ✅ 能工作:问问题能得到回答
├── ⚠️ 检索不准:有些问题找不到最相关的内容
├── ⚠️ 回答偏离:有时候答非所问
├── ⚠️ 存在幻觉:偶尔编造笔记里没有的信息
├── ⚠️ 速度不快:端到端约 8-15 秒
└── ⚠️ 引用不够:来源标注不够准确

这很正常!Day 21 学过:
  "第一版 RAG 的平均表现通常只有期望值的 50-60%"
  "关键不是第一版多好,而是能否系统化地迭代改进"

评估→诊断→优化 迭代循环 / Eval→Diagnose→Optimize Loop

真正的价值在迭代循环:

        ┌─────────────┐
        │   评估 Eval  │ ← 量化当前表现
        └──────┬──────┘
               ▼
        ┌─────────────┐
        │  诊断 Diagnose│ ← 找出最大瓶颈
        └──────┬──────┘
               ▼
        ┌─────────────┐
        │ 优化 Optimize│ ← 针对性改进
        └──────┬──────┘
               ▼
        ┌─────────────┐
        │ 验证 Verify  │ ← 确认改进有效
        └──────┬──────┘
               │
               └──────→ 回到评估,开始下一轮

今天的计划:跑 3 轮迭代
  V1 (昨天) → V2 (检索优化) → V3 (生成优化)
  每轮都有数据对比,不凭感觉做决策

这是 Day 21 "生产RAG评估与迭代" 理论的实践落地!

知识点1:评估体系搭建 / Evaluation Framework

Golden QA 集构建 / Building Golden QA Set

"""
golden_qa.py
从笔记中构建 Golden QA 测试集

Golden QA = 有标准答案的问答对
用于客观评估 RAG 系统质量
"""

# 构建方法:从自己的笔记中手动提取
# 标准:答案确实在笔记中存在,且你能验证正确性

GOLDEN_QA = [
    # === 1. AI基础知识 (Day 1-15) ===
    {
        "id": "ai_01",
        "question": "Transformer中的Multi-Head Attention有什么优势?",
        "ground_truth": "Multi-Head Attention允许模型同时关注不同位置的不同表示子空间信息。每个头学习不同的注意力模式,有的关注局部关系,有的关注全局关系,增加了模型的表达能力。",
        "source_file": "docs/ai/day1-transformer-llm-fundamentals.md",
        "category": "ai_basics",
    },
    {
        "id": "ai_02",
        "question": "GGUF量化格式中Q4_K_M代表什么?",
        "ground_truth": "Q4表示4-bit量化,K表示K-quant方法(按重要性分层量化),M表示Medium大小(平衡质量和压缩率)。它将模型权重从FP16压缩约4倍,显存占用大幅降低,质量损失相对可控。",
        "source_file": "docs/ai/day2-quantization-local-deployment.md",
        "category": "ai_basics",
    },
    {
        "id": "ai_03",
        "question": "LoRA微调的核心原理是什么?",
        "ground_truth": "LoRA通过在预训练模型的权重矩阵旁添加低秩分解矩阵(A和B)来实现微调。不修改原始权重,只训练这两个小矩阵。参数量通常只有原模型的0.1-1%,大幅降低了微调成本。",
        "source_file": "docs/ai/day7-finetuning-lora-qlora.md",
        "category": "ai_basics",
    },

    # === 2. RAG相关 (Day 5, 19-21) ===
    {
        "id": "rag_01",
        "question": "RAG系统中Chunking策略对检索质量有什么影响?",
        "ground_truth": "Chunk太大会引入噪声,降低检索精度;Chunk太小会丢失上下文,降低回答质量。最佳实践是使用递归分割,按语义边界切分,保持300-600 token大小,添加50-100 token重叠。",
        "source_file": "docs/ai/day19-production-rag-chunking-parsing.md",
        "category": "rag",
    },
    {
        "id": "rag_02",
        "question": "Hybrid Search相比纯向量搜索的优势是什么?",
        "ground_truth": "纯向量搜索擅长语义匹配但可能遗漏精确关键词;关键词搜索擅长精确匹配但缺乏语义理解。Hybrid Search结合两者,通过加权融合既保证语义相关性又不遗漏关键词匹配。",
        "source_file": "docs/ai/day20-production-rag-retrieval-rerank.md",
        "category": "rag",
    },

    # === 3. Agent相关 (Day 12, 22-25) ===
    {
        "id": "agent_01",
        "question": "Agent中的ReAct模式是如何工作的?",
        "ground_truth": "ReAct结合了Reasoning和Acting。Agent先进行推理(Thought),然后执行动作(Action),观察结果(Observation),再进行下一轮推理。这种交替过程让Agent能够分步解决复杂问题。",
        "source_file": "docs/ai/day12-agent-frameworks.md",
        "category": "agent",
    },
    {
        "id": "agent_02",
        "question": "Agent的错误恢复策略有哪些?",
        "ground_truth": "主要策略包括:重试机制(exponential backoff)、回退策略(fallback to simpler approach)、检查点恢复(从上次成功状态继续)、人机协作(遇到不确定时请求人类介入)。关键是设计状态管理使恢复成为可能。",
        "source_file": "docs/ai/day22-agent-state-error-recovery.md",
        "category": "agent",
    },

    # === 4. 金融AI (Day 31-35) ===
    {
        "id": "fin_01",
        "question": "AI在信贷风控中的应用有哪些关键环节?",
        "ground_truth": "信贷AI覆盖全链路:贷前(反欺诈识别、信用评估、额度定价)、贷中(交易监控、行为预警、动态额度)、贷后(催收策略、资产处置、不良预测)。核心是风险识别和动态决策。",
        "source_file": "docs/ai/day34-credit-ai-full-pipeline.md",
        "category": "financial_ai",
    },

    # === 5. 工程实践 (Day 16-30) ===
    {
        "id": "eng_01",
        "question": "LLM应用的成本优化有哪些主要策略?",
        "ground_truth": "核心策略包括:Prompt优化(减少token数)、缓存(语义缓存避免重复请求)、模型路由(简单任务用小模型)、批量处理(减少API调用次数)、量化部署(降低推理成本)。关键是建立成本可观测性。",
        "source_file": "docs/ai/day26-llm-cost-engineering.md",
        "category": "engineering",
    },
    {
        "id": "eng_02",
        "question": "LLM应用如何实现可观测性?",
        "ground_truth": "三大支柱:Traces(请求链路追踪,从用户输入到最终输出)、Metrics(延迟/成本/质量等关键指标)、Logs(Prompt/Response/中间步骤日志)。工具选择有LangSmith、Langfuse、Phoenix等。",
        "source_file": "docs/ai/day18-llm-observability-monitoring.md",
        "category": "engineering",
    },

    # === 6. 零售AI (Day 36-40) ===
    {
        "id": "retail_01",
        "question": "推荐系统中的冷启动问题如何解决?",
        "ground_truth": "冷启动分为用户冷启动和商品冷启动。用户冷启动可通过问卷、热门推荐、基于人口统计特征推荐。商品冷启动可通过商品属性(Content-based)、相似商品类比、运营策略人工推荐。LLM可以通过理解商品描述帮助解决商品冷启动。",
        "source_file": "docs/ai/day36-retail-ai-recommendation.md",
        "category": "retail_ai",
    },

    # === 7. 跨领域 ===
    {
        "id": "cross_01",
        "question": "CeFi和DeFi在风控架构上有什么核心差异?",
        "ground_truth": "CeFi风控基于用户身份(KYC/信用评分),集中式决策,有人工审核兜底。DeFi风控基于链上数据(抵押率/协议参数),智能合约自动执行(清算机制),没有人工介入。两者融合趋势是将传统风控模型与链上数据结合。",
        "source_file": "docs/ai/day41-cefi-defi-ai-fusion.md",
        "category": "cross_domain",
    },

    # === 8-10. 系统设计 (Day 43-46) ===
    {
        "id": "design_01",
        "question": "设计一个RAG系统时需要考虑哪些关键架构决策?",
        "ground_truth": "关键决策包括:1)Embedding模型选择(通用vs领域专用),2)向量数据库选型(规模/性能/功能),3)分块策略(大小/重叠/语义切分),4)检索策略(密集/稀疏/混合),5)Reranking(是否需要二次排序),6)缓存策略(语义缓存/结果缓存),7)评估体系。",
        "source_file": "docs/ai/day44-system-design-rag-system.md",
        "category": "system_design",
    },
    {
        "id": "design_02",
        "question": "多Agent系统的通信模式有哪几种?",
        "ground_truth": "主要模式包括:1)集中式(Manager Agent协调),2)去中心化(Agent间直接通信),3)黑板模式(共享数据空间),4)管道模式(顺序处理链),5)竞争模式(多Agent竞争提供最佳结果)。选择取决于任务复杂度和协作需求。",
        "source_file": "docs/ai/day24-multi-agent-systems.md",
        "category": "system_design",
    },

    # 继续添加到50个...
    # 这里展示前14个作为示例
    # 实际应该从每个知识领域各提取 5-7 个
]

def get_golden_qa(category: str = None) -> list[dict]:
    """获取 Golden QA 集,可按类别过滤"""
    if category:
        return [q for q in GOLDEN_QA if q["category"] == category]
    return GOLDEN_QA

def get_categories() -> list[str]:
    """获取所有类别"""
    return list(set(q["category"] for q in GOLDEN_QA))

RAGAS 四指标计算 / RAGAS Metrics

"""
ragas_eval.py
RAG 评估指标 — Day 21 学的 RAGAS 框架实战

RAGAS 四个核心指标:
1. Faithfulness (忠实度): 回答是否忠于检索到的上下文
2. Answer Relevancy (回答相关性): 回答是否与问题相关
3. Context Precision (上下文精度): 检索到的内容是否精确相关
4. Context Recall (上下文召回): 是否检索到了所有相关内容
"""

from openai import OpenAI
import json
import re

class RAGEvaluator:
    """RAG 评估器 — 使用 LLM 作为评判者"""

    def __init__(self, llm_client=None, model="qwen2.5:7b"):
        self.client = llm_client or OpenAI(
            base_url="http://localhost:11434/v1",
            api_key="ollama",
        )
        self.model = model

    def evaluate_faithfulness(
        self, answer: str, contexts: list[str]
    ) -> float:
        """评估回答是否忠于上下文(不编造信息)

        方法:
        1. 从回答中提取所有声明(claims)
        2. 检查每个声明是否能在上下文中找到支持
        3. 忠实度 = 有支持的声明数 / 总声明数
        """
        context_text = "\n---\n".join(contexts)

        # Step 1: 提取声明
        claims_prompt = f"""从以下回答中提取所有事实性声明,每行一个。
只提取事实声明,不包括观点或推测。

回答:
{answer}

请用JSON数组格式输出声明列表:
["声明1", "声明2", ...]"""

        claims_response = self._call_llm(claims_prompt)
        try:
            claims = json.loads(
                re.search(r'\[.*\]', claims_response, re.DOTALL).group()
            )
        except (json.JSONDecodeError, AttributeError):
            return 0.5  # 解析失败,给中间值

        if not claims:
            return 1.0  # 没有声明

        # Step 2: 验证每个声明
        supported = 0
        for claim in claims:
            verify_prompt = f"""判断以下声明是否能从给定的上下文中得到支持。

声明: {claim}

上下文:
{context_text}

只回答 "支持" 或 "不支持"。"""

            result = self._call_llm(verify_prompt)
            if "支持" in result and "不支持" not in result:
                supported += 1

        return round(supported / len(claims), 3)

    def evaluate_answer_relevancy(
        self, question: str, answer: str
    ) -> float:
        """评估回答与问题的相关性

        方法:从回答反向生成问题,计算与原问题的相似度
        """
        prompt = f"""基于以下回答,你觉得这个回答回答了用户的问题吗?

用户问题: {question}

系统回答: {answer}

请从1到5评分:
5 = 完全回答了问题
4 = 大部分回答了,有少量遗漏
3 = 部分回答了,有较多遗漏
2 = 只回答了一小部分
1 = 完全没有回答问题

只输出数字评分:"""

        result = self._call_llm(prompt)
        try:
            score = int(re.search(r'[1-5]', result).group())
            return round(score / 5.0, 3)
        except (AttributeError, ValueError):
            return 0.5

    def evaluate_context_precision(
        self, question: str, contexts: list[str], ground_truth: str
    ) -> float:
        """评估检索到的上下文精度

        方法:检查排名靠前的上下文是否确实相关
        """
        relevant_count = 0
        total = len(contexts)

        for i, ctx in enumerate(contexts):
            prompt = f"""判断以下上下文是否包含与问题相关的信息。

问题: {question}
参考答案: {ground_truth}

上下文:
{ctx}

只回答 "相关" 或 "不相关"。"""

            result = self._call_llm(prompt)
            if "相关" in result and "不相关" not in result:
                relevant_count += 1

        return round(relevant_count / max(total, 1), 3)

    def evaluate_context_recall(
        self, ground_truth: str, contexts: list[str]
    ) -> float:
        """评估上下文是否覆盖了标准答案的所有要点

        方法:
        1. 从标准答案提取关键要点
        2. 检查上下文中覆盖了多少要点
        """
        context_text = "\n---\n".join(contexts)

        prompt = f"""标准答案包含以下内容:
{ground_truth}

检索到的上下文包含:
{context_text}

标准答案中的关键信息点,有多少被上下文覆盖了?
请从1到5评分:
5 = 全部覆盖
4 = 大部分覆盖 (>80%)
3 = 约一半覆盖
2 = 少部分覆盖 (<30%)
1 = 几乎没有覆盖

只输出数字评分:"""

        result = self._call_llm(prompt)
        try:
            score = int(re.search(r'[1-5]', result).group())
            return round(score / 5.0, 3)
        except (AttributeError, ValueError):
            return 0.5

    def evaluate_full(
        self,
        question: str,
        answer: str,
        contexts: list[str],
        ground_truth: str,
    ) -> dict:
        """完整评估"""
        return {
            "faithfulness": self.evaluate_faithfulness(answer, contexts),
            "answer_relevancy": self.evaluate_answer_relevancy(
                question, answer
            ),
            "context_precision": self.evaluate_context_precision(
                question, contexts, ground_truth
            ),
            "context_recall": self.evaluate_context_recall(
                ground_truth, contexts
            ),
        }

    def _call_llm(self, prompt: str) -> str:
        """调用 LLM"""
        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.1,
            max_tokens=500,
        )
        return response.choices[0].message.content

基线 Benchmark / Baseline Benchmark

"""
benchmark.py
运行基线评估
"""

import json
import time
from golden_qa import GOLDEN_QA
from ragas_eval import RAGEvaluator

def run_benchmark(
    rag_pipeline,
    evaluator: RAGEvaluator,
    qa_set: list[dict] = None,
    version: str = "v1",
) -> dict:
    """运行完整基线测试"""
    qa_set = qa_set or GOLDEN_QA
    results = []

    print(f"\n{'='*60}")
    print(f"Running Benchmark: {version}")
    print(f"Questions: {len(qa_set)}")
    print(f"{'='*60}")

    for i, qa in enumerate(qa_set):
        print(f"\n[{i+1}/{len(qa_set)}] {qa['question'][:50]}...")

        start = time.time()

        # 检索
        search_results = rag_pipeline.retrieve(qa["question"])
        contexts = [r["content"] for r in search_results]

        # 生成
        answer = rag_pipeline.generate(qa["question"], search_results)

        elapsed = time.time() - start

        # 评估
        scores = evaluator.evaluate_full(
            question=qa["question"],
            answer=answer,
            contexts=contexts,
            ground_truth=qa["ground_truth"],
        )

        result = {
            "id": qa["id"],
            "question": qa["question"],
            "category": qa["category"],
            "answer": answer,
            "sources": [r.get("metadata", {}).get("file_path") for r in search_results],
            "expected_source": qa["source_file"],
            "scores": scores,
            "latency_s": round(elapsed, 2),
        }
        results.append(result)

        # 打印单题结果
        avg = sum(scores.values()) / len(scores)
        print(f"  Faith={scores['faithfulness']:.2f} "
              f"Relev={scores['answer_relevancy']:.2f} "
              f"Prec={scores['context_precision']:.2f} "
              f"Recall={scores['context_recall']:.2f} "
              f"Avg={avg:.2f} "
              f"Time={elapsed:.1f}s")

    # 汇总统计
    summary = compute_summary(results)
    summary["version"] = version

    # 保存结果
    output = {"summary": summary, "details": results}
    with open(f"benchmark_{version}.json", "w", encoding="utf-8") as f:
        json.dump(output, f, ensure_ascii=False, indent=2)

    print_summary(summary)
    return summary

def compute_summary(results: list[dict]) -> dict:
    """计算汇总统计"""
    all_scores = {
        "faithfulness": [],
        "answer_relevancy": [],
        "context_precision": [],
        "context_recall": [],
    }
    latencies = []

    for r in results:
        for metric, value in r["scores"].items():
            all_scores[metric].append(value)
        latencies.append(r["latency_s"])

    summary = {}
    for metric, values in all_scores.items():
        summary[f"{metric}_avg"] = round(sum(values) / len(values), 3)
        summary[f"{metric}_min"] = round(min(values), 3)

    summary["overall_avg"] = round(
        sum(summary[f"{m}_avg"] for m in all_scores) / len(all_scores), 3
    )
    summary["avg_latency"] = round(sum(latencies) / len(latencies), 2)
    summary["p95_latency"] = round(sorted(latencies)[int(len(latencies) * 0.95)], 2)
    summary["total_questions"] = len(results)

    return summary

def print_summary(summary: dict):
    """打印评估摘要"""
    print(f"\n{'='*60}")
    print(f"Benchmark Summary: {summary.get('version', 'N/A')}")
    print(f"{'='*60}")
    print(f"  Faithfulness:      {summary['faithfulness_avg']:.3f} (min: {summary['faithfulness_min']:.3f})")
    print(f"  Answer Relevancy:  {summary['answer_relevancy_avg']:.3f} (min: {summary['answer_relevancy_min']:.3f})")
    print(f"  Context Precision: {summary['context_precision_avg']:.3f} (min: {summary['context_precision_min']:.3f})")
    print(f"  Context Recall:    {summary['context_recall_avg']:.3f} (min: {summary['context_recall_min']:.3f})")
    print(f"  ─────────────────────────────")
    print(f"  Overall Average:   {summary['overall_avg']:.3f}")
    print(f"  Avg Latency:       {summary['avg_latency']:.1f}s")
    print(f"  P95 Latency:       {summary['p95_latency']:.1f}s")
    print(f"{'='*60}")

知识点2:检索优化 / Retrieval Optimization

问题诊断 / Problem Diagnosis

"""
retrieval_diagnosis.py
诊断检索问题:哪些问题检索不到相关内容?
"""

def diagnose_retrieval(rag_pipeline, qa_set):
    """诊断检索质量"""
    issues = {
        "miss_source": [],      # 没有检索到期望的来源文件
        "low_score": [],        # 最高相关度分数低
        "semantic_mismatch": [], # 语义不匹配(找到了不相关的内容)
    }

    for qa in qa_set:
        results = rag_pipeline.retrieve(qa["question"])

        # 检查是否找到了期望的来源
        found_sources = [
            r["metadata"].get("file_path", "") for r in results
        ]
        expected = qa["source_file"]

        if not any(expected in s for s in found_sources):
            issues["miss_source"].append({
                "question": qa["question"],
                "expected": expected,
                "found": found_sources[:3],
            })

        # 检查最高分数
        top_score = results[0]["similarity"] if results else 0
        if top_score < 0.5:
            issues["low_score"].append({
                "question": qa["question"],
                "top_score": top_score,
                "top_content": results[0]["content"][:100] if results else "",
            })

    # 打印诊断报告
    print("=== Retrieval Diagnosis ===")
    print(f"\nMissed Sources: {len(issues['miss_source'])}/{len(qa_set)}")
    for item in issues["miss_source"][:5]:
        print(f"  Q: {item['question'][:50]}...")
        print(f"  Expected: {item['expected']}")
        print(f"  Found: {item['found'][0] if item['found'] else 'none'}")

    print(f"\nLow Scores (<0.5): {len(issues['low_score'])}/{len(qa_set)}")
    for item in issues["low_score"][:5]:
        print(f"  Q: {item['question'][:50]}...")
        print(f"  Score: {item['top_score']:.3f}")

    return issues

Chunk Size 实验 / Chunk Size Experiment

"""
chunk_size_experiment.py
对比不同 Chunk Size 对检索质量的影响

Day 19 的理论:
  "Chunk Size 是 RAG 最重要的超参数之一"
  "太大→噪声多,太小→信息不完整"

今天实验验证!
"""

def run_chunk_size_experiment(
    documents: list,
    embedding_service,
    evaluator,
    qa_set: list,
    sizes: list[int] = [256, 512, 1024],
):
    """Chunk Size 对比实验"""
    from chunker import chunk_all_documents
    from vector_store import VectorStore

    results = {}

    for size in sizes:
        print(f"\n{'='*40}")
        print(f"Testing chunk_size = {size}")
        print(f"{'='*40}")

        # 1. 分块
        chunks = chunk_all_documents(
            documents,
            chunk_size=size,
            chunk_overlap=size // 10,  # 10% 重叠
        )
        print(f"  Chunks: {len(chunks)}")

        # 2. Embedding
        texts = [c.content for c in chunks]
        embeddings = embedding_service.embed(texts)

        # 3. 索引
        store = VectorStore(
            persist_dir=f"./chroma_experiment/size_{size}",
            collection_name=f"test_{size}",
        )
        store.add_chunks(chunks, embeddings)

        # 4. 测试检索
        precision_scores = []
        recall_scores = []

        for qa in qa_set:
            query_emb = embedding_service.embed(qa["question"])[0].tolist()
            search_results = store.search(
                query_embedding=query_emb, n_results=5
            )

            contexts = search_results["documents"][0]

            prec = evaluator.evaluate_context_precision(
                qa["question"], contexts, qa["ground_truth"]
            )
            rec = evaluator.evaluate_context_recall(
                qa["ground_truth"], contexts
            )
            precision_scores.append(prec)
            recall_scores.append(rec)

        results[size] = {
            "num_chunks": len(chunks),
            "avg_precision": round(
                sum(precision_scores) / len(precision_scores), 3
            ),
            "avg_recall": round(
                sum(recall_scores) / len(recall_scores), 3
            ),
        }

    # 打印对比
    print(f"\n{'='*60}")
    print(f"Chunk Size Experiment Results")
    print(f"{'='*60}")
    print(f"{'Size':<10} {'Chunks':<10} {'Precision':<12} {'Recall':<12}")
    print(f"{'-'*44}")
    for size, r in results.items():
        print(f"{size:<10} {r['num_chunks']:<10} "
              f"{r['avg_precision']:<12.3f} {r['avg_recall']:<12.3f}")

    return results

"""
预期结果:

Size   Chunks   Precision   Recall
256    ~8000    0.7xx       0.5xx    ← 精度高但召回低
512    ~4000    0.6xx       0.6xx    ← 平衡 ★
1024   ~2000    0.5xx       0.7xx    ← 召回高但精度低

结论:512 通常是最佳平衡点
但这取决于你的文档特征——如果笔记中每个知识点比较独立,
256 可能更好;如果知识点需要更多上下文,1024 可能更好。
"""

添加 Reranker / Adding Reranker

"""
reranker.py
BGE-Reranker 二次排序

Day 20 学过:
  "Embedding 检索是粗排,Reranker 是精排"
  "Reranker 看的是 Query-Document 对,比 Embedding 更精确"
  "代价是速度——Reranker 需要对每个候选 Document 做一次推理"
"""

class Reranker:
    """BGE-Reranker 重排序器"""

    def __init__(self, model_name: str = "BAAI/bge-reranker-v2-m3"):
        from FlagEmbedding import FlagReranker
        print(f"Loading reranker: {model_name}...")
        self.model = FlagReranker(model_name, use_fp16=True)
        print("Reranker loaded!")

    def rerank(
        self,
        query: str,
        documents: list[dict],
        top_n: int = 5,
    ) -> list[dict]:
        """对检索结果重排序

        Args:
            query: 用户查询
            documents: 初始检索结果列表
            top_n: 返回前N个
        """
        if not documents:
            return []

        # 构建 query-document 对
        pairs = [
            [query, doc["content"]] for doc in documents
        ]

        # 计算相关性分数
        scores = self.model.compute_score(pairs, normalize=True)

        # 如果只有一个文档,scores 是单个值
        if isinstance(scores, (int, float)):
            scores = [scores]

        # 按分数排序
        scored_docs = list(zip(documents, scores))
        scored_docs.sort(key=lambda x: x[1], reverse=True)

        # 返回 top_n
        result = []
        for doc, score in scored_docs[:top_n]:
            doc_copy = doc.copy()
            doc_copy["rerank_score"] = round(score, 3)
            result.append(doc_copy)

        return result


class EnhancedRetriever:
    """增强检索器 = 向量检索 + Reranker"""

    def __init__(self, vector_store, embedding_service, reranker=None):
        self.store = vector_store
        self.emb = embedding_service
        self.reranker = reranker

    def search(
        self,
        query: str,
        initial_n: int = 20,   # 初始检索数量(多取一些给Reranker)
        final_n: int = 5,      # 最终返回数量
        use_rerank: bool = True,
    ) -> list[dict]:
        """检索 + 重排序"""
        # Step 1: 向量检索(粗排)
        query_emb = self.emb.embed(query)[0].tolist()
        raw_results = self.store.search(
            query_embedding=query_emb,
            n_results=initial_n if use_rerank else final_n,
        )

        # 格式化
        documents = []
        for i in range(len(raw_results["ids"][0])):
            documents.append({
                "content": raw_results["documents"][0][i],
                "metadata": raw_results["metadatas"][0][i],
                "similarity": round(1 - raw_results["distances"][0][i], 3),
            })

        # Step 2: Reranker(精排)
        if use_rerank and self.reranker:
            documents = self.reranker.rerank(
                query=query,
                documents=documents,
                top_n=final_n,
            )
        else:
            documents = documents[:final_n]

        return documents

"""
Reranker 效果预期:

Without Reranker (V1):
  检索20个 → 取top 5 → 其中可能只有2-3个真正相关

With Reranker (V2):
  检索20个 → Rerank → 取top 5 → 4-5个都相关

代价:
  额外延迟 ~0.5-1秒(对20个文档做 Reranker 推理)
  额外显存 ~0.5GB(加载 Reranker 模型)

ROI 分析:
  质量提升 >> 延迟增加
  在 RAG 中,检索质量是最关键的瓶颈
  值得投入!
"""

Hybrid Search 调权 / Hybrid Search Weight Tuning

"""
hybrid_search_tuning.py
调整 Dense vs Sparse 权重
"""

def tune_hybrid_weights(
    hybrid_retriever,
    qa_set: list,
    evaluator,
    weight_configs: list[tuple] = None,
) -> dict:
    """实验不同的权重组合"""
    if weight_configs is None:
        weight_configs = [
            (1.0, 0.0, "Pure Dense"),       # 纯向量
            (0.9, 0.1, "Dense Heavy"),       # 向量为主
            (0.7, 0.3, "Balanced (default)"),# 默认平衡
            (0.5, 0.5, "Equal"),             # 等权
            (0.3, 0.7, "Sparse Heavy"),      # 关键词为主
            (0.0, 1.0, "Pure Sparse"),       # 纯关键词
        ]

    results = {}

    for dense_w, sparse_w, name in weight_configs:
        print(f"\nTesting: {name} (dense={dense_w}, sparse={sparse_w})")

        precision_scores = []

        for qa in qa_set:
            search_results = hybrid_retriever.search(
                query=qa["question"],
                n_results=5,
                dense_weight=dense_w,
                sparse_weight=sparse_w,
            )

            contexts = [r["content"] for r in search_results]
            prec = evaluator.evaluate_context_precision(
                qa["question"], contexts, qa["ground_truth"]
            )
            precision_scores.append(prec)

        avg_prec = sum(precision_scores) / len(precision_scores)
        results[name] = {
            "dense_weight": dense_w,
            "sparse_weight": sparse_w,
            "avg_precision": round(avg_prec, 3),
        }
        print(f"  Precision: {avg_prec:.3f}")

    # 打印对比
    print(f"\n{'='*50}")
    print(f"{'Config':<25} {'Precision':<12}")
    print(f"{'-'*37}")
    for name, r in sorted(
        results.items(), key=lambda x: x[1]["avg_precision"], reverse=True
    ):
        marker = " ★" if r["avg_precision"] == max(
            rr["avg_precision"] for rr in results.values()
        ) else ""
        print(f"{name:<25} {r['avg_precision']:<12.3f}{marker}")

    return results

"""
预期发现:

对于我们的中英混合笔记:
- Pure Dense 对语义问题好,但遗漏精确术语
- Pure Sparse 对术语查询好,但语义理解差
- 0.7/0.3 或 0.8/0.2 通常是最佳平衡

关键洞察:
  不同类型的问题,最优权重不同
  → 未来可以做 Query-Dependent 的动态权重调整
"""

知识点3:生成优化 / Generation Optimization

Prompt 迭代 / Prompt Iteration

"""
prompt_iteration.py
Prompt 模板 V1 → V2 → V3 逐步优化
"""

# === V1: 基础版(Day 52 用的) ===
PROMPT_V1 = """基于以下上下文内容回答用户的问题。

## 相关上下文
{context}

## 用户问题
{question}

## 要求
- 基于上下文内容回答
- 引用来源

## 回答"""


# === V2: 结构化 + 防幻觉 ===
PROMPT_V2 = """你是一个基于个人学习笔记的AI助手。
请严格基于提供的上下文内容回答问题。

## 重要规则
1. 只使用上下文中的信息回答
2. 如果上下文中没有足够信息,请明确说:"根据现有笔记,我无法完整回答这个问题"
3. 不要添加上下文中没有的信息
4. 回答中引用来源文件

## 相关上下文
{context}

## 用户问题
{question}

## 回答格式
1. 直接回答问题
2. 列出关键要点
3. 标注信息来源 [来源: 文件名]
4. 如有不确定之处,明确标注

请回答:"""


# === V3: 思考链 + 自我检查 ===
PROMPT_V3 = """你是一个基于个人学习笔记的AI助手。

## 回答策略
1. 先分析问题需要什么信息
2. 从上下文中找到相关信息
3. 组织成结构化的回答
4. 自我检查:回答中的每个要点是否有上下文支持

## 规则
- 严格基于上下文回答,不编造信息
- 不确定时说"不确定"而非猜测
- 用 [来源: 文件名] 标注引用
- 如果多个上下文有补充信息,综合整理

## 相关上下文
{context}

## 用户问题
{question}

## 请按以下结构回答
**回答**:[核心回答]

**详细说明**:
[展开说明,引用来源]

**补充**:[如果有相关但问题没直接问到的有用信息]

**来源**:[列出引用的文件]

**确信度**:[高/中/低,基于上下文信息的充分程度]"""

def compare_prompts(
    rag_pipeline,
    evaluator,
    qa_set: list,
    prompts: dict = None,
) -> dict:
    """对比不同 Prompt 版本的效果"""
    if prompts is None:
        prompts = {
            "v1": PROMPT_V1,
            "v2": PROMPT_V2,
            "v3": PROMPT_V3,
        }

    results = {}

    for version, template in prompts.items():
        print(f"\n--- Testing Prompt {version} ---")

        scores_list = []
        for qa in qa_set:
            # 检索(相同)
            search_results = rag_pipeline.retrieve(qa["question"])
            contexts = [r["content"] for r in search_results]

            # 用不同 Prompt 生成
            # (这里需要临时替换 pipeline 的 prompt 模板)
            answer = generate_with_template(
                rag_pipeline, qa["question"], search_results, template
            )

            # 评估
            scores = evaluator.evaluate_full(
                question=qa["question"],
                answer=answer,
                contexts=contexts,
                ground_truth=qa["ground_truth"],
            )
            scores_list.append(scores)

        # 汇总
        avg_scores = {}
        for metric in ["faithfulness", "answer_relevancy",
                       "context_precision", "context_recall"]:
            vals = [s[metric] for s in scores_list]
            avg_scores[metric] = round(sum(vals) / len(vals), 3)

        avg_scores["overall"] = round(
            sum(avg_scores.values()) / len(avg_scores), 3
        )
        results[version] = avg_scores

        print(f"  Overall: {avg_scores['overall']:.3f}")

    return results

def generate_with_template(pipeline, question, search_results, template):
    """使用指定模板生成回答"""
    from prompt_templates import build_context
    context = build_context(search_results)

    prompt = template.format(context=context, question=question)

    response = pipeline.llm_client.chat.completions.create(
        model=pipeline.config["llm_model"],
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
        max_tokens=1000,
    )
    return response.choices[0].message.content

减少幻觉 / Reducing Hallucination

幻觉的三种类型及对策:

Type 1: 编造事实 (Fabrication)
  症状: 回答中包含笔记里完全没有的信息
  原因: LLM 使用了自身预训练知识
  对策:
    - Prompt 中强调"只使用上下文信息" (V2/V3已加)
    - 添加 "如果不确定请说不知道"
    - 降低 temperature (0.3 → 0.1)

Type 2: 张冠李戴 (Misattribution)
  症状: 把 A 文件的内容说成来自 B 文件
  原因: 多个上下文混在一起
  对策:
    - 在上下文中明确标注来源
    - 要求 LLM 在回答时标注引用来源
    - 减少一次性提供的上下文数量

Type 3: 过度推断 (Over-inference)
  症状: 在上下文基础上做了过多的推断
  原因: LLM 天然倾向于"补全"信息
  对策:
    - Prompt 中要求"不要推测"
    - 添加确信度标注
    - 让 LLM 区分"笔记中明确说的"和"可能的推断"

Temperature 实验 / Temperature Experiment

Temperature 对 RAG 回答质量的影响:

Temperature | 忠实度 | 创造性 | 适用场景
0.0         | 最高   | 最低   | 精确事实查询
0.1         | 很高   | 低     | 技术问答 ★推荐
0.3         | 高     | 适中   | 综合分析
0.5         | 中等   | 适中   | 开放讨论
0.7         | 较低   | 高     | 创意写作(不适合RAG)
1.0         | 低     | 最高   | 不推荐用于RAG

RAG 场景推荐: Temperature = 0.1
  因为我们要的是准确引用笔记内容,不是创造新内容

Day 4 的 Prompt Engineering 笔记中写过:
  "Temperature 不是越高越好或越低越好,
   取决于你要精确性还是多样性"

知识点4:高级特性 / Advanced Features

Multi-Query 检索 / Multi-Query Retrieval

"""
multi_query.py
Multi-Query 检索 — 一个问题,多个角度搜索

原理:
  用户问"如何设计代币激励机制?"

  单查询: 只搜索这一句 → 可能遗漏相关内容

  Multi-Query:
  → "代币激励机制设计方法"
  → "Token incentive mechanism"
  → "veToken经济模型"
  → 合并所有结果,去重后排序

  覆盖面更广,召回率更高
"""

class MultiQueryRetriever:
    """Multi-Query 检索器"""

    def __init__(self, retriever, llm_client, model="qwen2.5:7b"):
        self.retriever = retriever
        self.client = llm_client
        self.model = model

    def generate_queries(self, question: str, n: int = 3) -> list[str]:
        """用 LLM 生成多个搜索查询"""
        prompt = f"""请为以下问题生成 {n} 个不同角度的搜索查询,
用于在知识库中检索相关内容。

原始问题: {question}

要求:
1. 每个查询从不同角度表述
2. 包含中文和英文查询
3. 考虑问题可能涉及的相关概念

请用JSON数组格式输出:
["查询1", "查询2", "查询3"]"""

        response = self.client.chat.completions.create(
            model=self.model,
            messages=[{"role": "user", "content": prompt}],
            temperature=0.5,
            max_tokens=300,
        )

        import json, re
        try:
            text = response.choices[0].message.content
            queries = json.loads(
                re.search(r'\[.*\]', text, re.DOTALL).group()
            )
            return [question] + queries[:n]  # 原始问题 + 扩展
        except (json.JSONDecodeError, AttributeError):
            return [question]  # 失败时返回原始问题

    def search(
        self, question: str, n_queries: int = 3, final_n: int = 5
    ) -> list[dict]:
        """Multi-Query 检索"""
        # 1. 生成多个查询
        queries = self.generate_queries(question, n_queries)
        print(f"  Generated queries: {queries}")

        # 2. 每个查询分别检索
        all_results = {}
        for query in queries:
            results = self.retriever.search(query, n_results=5)
            for r in results:
                doc_id = r.get("id", r["content"][:50])
                if doc_id not in all_results:
                    all_results[doc_id] = r
                    all_results[doc_id]["query_hits"] = 1
                else:
                    # 命中次数+1,提高排名
                    all_results[doc_id]["query_hits"] += 1
                    # 取较高的相似度
                    existing_score = all_results[doc_id].get("similarity", 0)
                    new_score = r.get("similarity", 0)
                    all_results[doc_id]["similarity"] = max(
                        existing_score, new_score
                    )

        # 3. 按综合分数排序
        sorted_results = sorted(
            all_results.values(),
            key=lambda x: (
                x.get("query_hits", 1),  # 首先按命中次数
                x.get("similarity", 0),   # 然后按相似度
            ),
            reverse=True,
        )

        return sorted_results[:final_n]

对话式 RAG / Conversational RAG

"""
conversational_rag.py
多轮对话 RAG — 记住上下文进行追问

用户: "RAG系统的核心组件有哪些?"
AI:   "RAG系统包含...Embedding...向量数据库..."
用户: "第二个组件具体怎么选型?"  ← 这里的"第二个"需要上下文
AI:   "向量数据库的选型考虑..."   ← 理解了"第二个"指向量数据库
"""

class ConversationalRAG:
    """对话式 RAG"""

    def __init__(self, rag_pipeline):
        self.pipeline = rag_pipeline
        self.history = []  # 对话历史
        self.max_history = 5  # 保留最近5轮

    def _rewrite_query(self, question: str) -> str:
        """基于对话历史重写查询

        将含有指代的问题转化为独立的查询
        例如: "第二个呢?" → "向量数据库如何选型?"
        """
        if not self.history:
            return question

        # 构建历史摘要
        history_text = ""
        for turn in self.history[-3:]:  # 最近3轮
            history_text += f"用户: {turn['question']}\n"
            history_text += f"AI: {turn['answer'][:200]}...\n\n"

        prompt = f"""基于以下对话历史,将用户的最新问题改写为一个独立的、
完整的搜索查询(不依赖上下文也能理解的)。

对话历史:
{history_text}

用户最新问题: {question}

改写后的独立查询(只输出查询本身):"""

        response = self.pipeline.llm_client.chat.completions.create(
            model=self.pipeline.config["llm_model"],
            messages=[{"role": "user", "content": prompt}],
            temperature=0.1,
            max_tokens=100,
        )

        rewritten = response.choices[0].message.content.strip()
        print(f"  Query rewrite: '{question}' → '{rewritten}'")
        return rewritten

    def query(self, question: str) -> str:
        """对话式查询"""
        # 1. 重写查询
        rewritten = self._rewrite_query(question)

        # 2. 检索
        results = self.pipeline.retrieve(rewritten)

        # 3. 生成(包含对话历史)
        answer = self._generate_with_history(question, results)

        # 4. 更新历史
        self.history.append({
            "question": question,
            "rewritten": rewritten,
            "answer": answer,
        })

        # 5. 裁剪历史
        if len(self.history) > self.max_history:
            self.history = self.history[-self.max_history:]

        return answer

    def _generate_with_history(
        self, question: str, search_results: list
    ) -> str:
        """带对话历史的生成"""
        from prompt_templates import build_context
        context = build_context(search_results)

        messages = [
            {"role": "system", "content": "你是基于学习笔记的AI助手。基于上下文回答问题。"},
        ]

        # 添加对话历史
        for turn in self.history[-3:]:
            messages.append({"role": "user", "content": turn["question"]})
            messages.append({"role": "assistant", "content": turn["answer"][:500]})

        # 当前问题
        messages.append({
            "role": "user",
            "content": f"上下文信息:\n{context}\n\n问题: {question}",
        })

        response = self.pipeline.llm_client.chat.completions.create(
            model=self.pipeline.config["llm_model"],
            messages=messages,
            temperature=0.3,
            max_tokens=1000,
        )

        return response.choices[0].message.content

    def reset(self):
        """清空对话历史"""
        self.history = []
        print("Conversation reset.")

Metadata 过滤 / Metadata Filtering

"""
metadata_filter.py
按元数据过滤检索结果

场景:
  "只在AI学习笔记中搜索" → category = "ai_learning"
  "Day 30 之前学的内容" → day_number <= 30
  "和RAG相关的笔记" → tags contains "RAG"
"""

class FilteredRetriever:
    """支持元数据过滤的检索器"""

    def __init__(self, vector_store, embedding_service):
        self.store = vector_store
        self.emb = embedding_service

    def search(
        self,
        query: str,
        n_results: int = 5,
        category: str = None,
        stage: str = None,
        tags: list[str] = None,
        day_range: tuple = None,
    ) -> list[dict]:
        """带过滤条件的检索"""

        # 构建 ChromaDB where 条件
        where_conditions = []

        if category:
            where_conditions.append({"category": category})

        if stage:
            where_conditions.append({"stage": stage})

        if tags:
            # ChromaDB 的 metadata 值是字符串
            # tags 存储为 "AI,RAG,Security" 格式
            for tag in tags:
                where_conditions.append({
                    "tags": {"$contains": tag}
                })

        if day_range:
            min_day, max_day = day_range
            where_conditions.append({
                "$and": [
                    {"day_number": {"$gte": min_day}},
                    {"day_number": {"$lte": max_day}},
                ]
            })

        # 组合条件
        where = None
        if len(where_conditions) == 1:
            where = where_conditions[0]
        elif len(where_conditions) > 1:
            where = {"$and": where_conditions}

        # 执行检索
        query_emb = self.emb.embed(query)[0].tolist()
        results = self.store.search(
            query_embedding=query_emb,
            n_results=n_results,
            where=where,
        )

        # 格式化
        formatted = []
        for i in range(len(results["ids"][0])):
            formatted.append({
                "content": results["documents"][0][i],
                "metadata": results["metadatas"][0][i],
                "similarity": round(1 - results["distances"][0][i], 3),
            })

        return formatted

# 使用示例:
#
# 只在AI笔记中搜索
# retriever.search("RAG优化方法", category="ai_learning")
#
# 只在第一阶段笔记中搜索
# retriever.search("Transformer", day_range=(1, 15))
#
# 按标签过滤
# retriever.search("风控", tags=["Risk", "AI"])

知识点5:性能优化 / Performance Optimization

缓存层添加 / Adding Cache Layer

"""
cache.py
语义缓存 — 相似的问题直接返回缓存结果

Day 26 "成本工程" 学过:
  "缓存是降低成本的第一招"
  "RAG 中很多问题是重复或高度相似的"
"""

import hashlib
import json
import time
from pathlib import Path
import numpy as np

class SemanticCache:
    """语义缓存 — 相似问题命中缓存"""

    def __init__(
        self,
        embedding_service,
        cache_dir: str = "./rag_cache",
        similarity_threshold: float = 0.95,
        ttl_seconds: int = 86400,  # 24小时过期
    ):
        self.emb = embedding_service
        self.cache_dir = Path(cache_dir)
        self.cache_dir.mkdir(exist_ok=True)
        self.threshold = similarity_threshold
        self.ttl = ttl_seconds

        # 内存中维护查询向量 + 缓存键的映射
        self.query_vectors = []
        self.cache_keys = []

        self._load_index()

    def _load_index(self):
        """加载缓存索引"""
        index_file = self.cache_dir / "index.json"
        if index_file.exists():
            data = json.loads(index_file.read_text())
            self.cache_keys = data.get("keys", [])
            vectors_file = self.cache_dir / "vectors.npy"
            if vectors_file.exists():
                self.query_vectors = np.load(str(vectors_file)).tolist()

    def _save_index(self):
        """保存缓存索引"""
        index_file = self.cache_dir / "index.json"
        index_file.write_text(json.dumps({"keys": self.cache_keys}))
        if self.query_vectors:
            np.save(
                str(self.cache_dir / "vectors.npy"),
                np.array(self.query_vectors),
            )

    def get(self, query: str) -> dict | None:
        """查询缓存"""
        if not self.query_vectors:
            return None

        query_vec = self.emb.embed(query)[0]

        # 计算与所有缓存查询的相似度
        similarities = np.dot(
            np.array(self.query_vectors), query_vec
        )

        best_idx = np.argmax(similarities)
        best_sim = similarities[best_idx]

        if best_sim >= self.threshold:
            cache_key = self.cache_keys[best_idx]
            cache_file = self.cache_dir / f"{cache_key}.json"

            if cache_file.exists():
                data = json.loads(cache_file.read_text(encoding="utf-8"))

                # 检查TTL
                if time.time() - data.get("timestamp", 0) < self.ttl:
                    print(f"  Cache HIT (similarity: {best_sim:.3f})")
                    return data
                else:
                    # 过期了
                    cache_file.unlink()

        return None

    def put(self, query: str, response: str, search_results: list):
        """写入缓存"""
        query_vec = self.emb.embed(query)[0]
        cache_key = hashlib.md5(query.encode()).hexdigest()[:12]

        # 保存缓存数据
        data = {
            "query": query,
            "response": response,
            "sources": [r.get("metadata", {}).get("file_path") for r in search_results],
            "timestamp": time.time(),
        }
        cache_file = self.cache_dir / f"{cache_key}.json"
        cache_file.write_text(
            json.dumps(data, ensure_ascii=False, indent=2),
            encoding="utf-8",
        )

        # 更新索引
        self.query_vectors.append(query_vec.tolist())
        self.cache_keys.append(cache_key)
        self._save_index()

    def stats(self) -> dict:
        """缓存统计"""
        cache_files = list(self.cache_dir.glob("*.json"))
        return {
            "total_entries": len(self.cache_keys),
            "cache_files": len(cache_files) - 1,  # 减去 index.json
        }

批量 Embedding 加速 / Batch Embedding

"""
batch_optimization.py
批量处理优化
"""

import asyncio
import aiohttp
from concurrent.futures import ThreadPoolExecutor

class BatchEmbeddingService:
    """批量 Embedding 优化"""

    def __init__(self, base_service, batch_size: int = 64):
        self.service = base_service
        self.batch_size = batch_size

    def embed_large_batch(self, texts: list[str]) -> list:
        """处理大量文本的 Embedding"""
        all_embeddings = []
        total = len(texts)

        for i in range(0, total, self.batch_size):
            batch = texts[i:i + self.batch_size]
            embeddings = self.service.embed(batch)
            all_embeddings.extend(embeddings)

            progress = min(i + self.batch_size, total)
            print(f"  Embedded: {progress}/{total} "
                  f"({progress/total*100:.0f}%)")

        return all_embeddings

延迟分析与优化 / Latency Analysis

RAG 请求延迟分解:

典型请求 (V1, 未优化):
┌──────────────────────────┬──────────┐
│ 步骤                     │ 耗时     │
├──────────────────────────┼──────────┤
│ 1. Query Embedding       │ ~0.1s    │
│ 2. Vector Search         │ ~0.05s   │
│ 3. (Reranker)            │ ~0.8s    │ ← V2 新增
│ 4. Context Building      │ ~0.01s   │
│ 5. LLM Generation        │ ~5-10s   │ ← 最大瓶颈
│ 6. Post-processing       │ ~0.01s   │
├──────────────────────────┼──────────┤
│ 总计 (V1)                │ ~6-11s   │
│ 总计 (V2 with Reranker)  │ ~7-12s   │
│ 总计 (V3 with Cache HIT) │ ~0.2s    │ ← 缓存命中!
└──────────────────────────┴──────────┘

优化策略优先级:
1. 缓存 (Cache) → 重复问题 0.2s       ★★★★★
2. 流式输出 (Streaming) → 感知延迟降低  ★★★★
3. 并行检索 (Parallel) → 多路检索并发   ★★★
4. 更快的LLM → 换 Phi-4 (快但质量降低) ★★
5. GPU优化 → 确保 Embedding 在 GPU 上   ★★

关键认识:
  LLM 生成占 80%+ 的延迟
  检索环节已经很快了(<1s)
  缓存是对重复查询最有效的优化
  流式输出不减少总延迟,但大幅改善用户体验

知识点6:优化结果对比 / Optimization Results

V1 vs V2 vs V3 对比表 / Version Comparison

优化版本对比(模板,需填入实测数据):

┌────────────────────┬─────────┬─────────┬─────────┐
│ 指标               │ V1      │ V2      │ V3      │
│                    │ (基线)  │ (检索优化)│ (全优化) │
├────────────────────┼─────────┼─────────┼─────────┤
│ Faithfulness       │ __.___  │ __.___  │ __.___  │
│ Answer Relevancy   │ __.___  │ __.___  │ __.___  │
│ Context Precision  │ __.___  │ __.___  │ __.___  │
│ Context Recall     │ __.___  │ __.___  │ __.___  │
│ ──────────────     │         │         │         │
│ Overall Average    │ __.___  │ __.___  │ __.___  │
│ ──────────────     │         │         │         │
│ Avg Latency (s)    │ __._    │ __._    │ __._    │
│ P95 Latency (s)    │ __._    │ __._    │ __._    │
│ Cache Hit Rate     │ 0%      │ 0%      │ __%     │
└────────────────────┴─────────┴─────────┴─────────┘

V2 vs V1 变化:
├── Chunk Size: 512 → 最优值
├── 添加 Reranker: BGE-Reranker-v2-m3
├── Hybrid Search: Dense 0.7 + Sparse 0.3
└── 初始检索: 5 → 20 (给 Reranker 更多候选)

V3 vs V2 变化:
├── Prompt: V1 → V3 (结构化+防幻觉)
├── Temperature: 0.7 → 0.1
├── Multi-Query: 3 路并行查询
├── 添加语义缓存
└── 流式输出

关键改进总结 / Key Improvements Summary

三轮迭代的核心发现:

1. 检索质量是 RAG 的命脉
   V1→V2 通过 Reranker 提升检索质量
   效果: Context Precision 提升最显著
   教训: 检索不准,生成再好也没用

2. Prompt 工程对忠实度影响大
   V2→V3 通过 Prompt 优化减少幻觉
   效果: Faithfulness 提升明显
   教训: "不要编造" 这句话真的有效

3. 缓存投入产出比最高
   V3 添加缓存后重复查询极快
   效果: 延迟从 ~10s 降到 ~0.2s (命中时)
   教训: 最好的优化是不做计算

4. Multi-Query 提升召回率
   V3 多角度搜索覆盖更广
   效果: Context Recall 提升
   教训: 一个问题可以有多种搜索方式

每轮改进的核心策略:
  V1 → V2: 改善"找得准"(检索优化)
  V2 → V3: 改善"答得好"(生成优化)
  未来:     改善"答得快"(性能优化)

剩余问题 / Remaining Issues

V3 仍然存在的问题(未来可以继续优化):

1. 跨文档综合能力有限
   问"对比传统风控和DeFi风控"
   需要综合多个不同主题的笔记
   当前的 Chunk 检索天然倾向于同一文档
   → 需要 Document-level 的检索策略

2. 长答案质量衰减
   回答超过 500 字后质量明显下降
   7B 模型的生成稳定性有限
   → 考虑用 Claude API 处理复杂问题

3. 表格和代码的检索
   纯文本 Embedding 对表格和代码块的理解有限
   → 需要专门的代码/表格处理策略

4. 实时性
   新增笔记需要重新索引
   → 增量更新已实现但未自动化
   → 考虑文件监控自动触发更新

5. 评估自动化
   当前依赖 LLM-as-Judge(本地 7B 模型)
   评估本身可能不够准确
   → 考虑用 Claude API 做评估
   → 增加人工评估作为校准

这些问题不影响基本使用,
但如果要做成"生产级"工具,每个都需要解决。

今日思考 / Today's Reflections

思考1:评估的价值远超你的想象 / Evaluation is Worth More Than You Think

没有评估的优化 = 凭感觉改代码

今天的经历证明了这一点:
- 我以为 Reranker 会提升 10%,实测提升了 ___%
- 我以为 Prompt 优化效果有限,实测 Faithfulness 提升 ___%
- 我以为 Temperature 影响不大,实测幻觉减少了 ___%

Day 21 学评估方法时觉得"道理我都懂"
今天实操后才真正理解:
  评估不是 QA 工程师的事
  评估是 PM 和架构师最重要的技能之一
  因为没有评估,你不知道自己在做对还是做错

思考2:80/20 法则在 RAG 优化中的体现 / 80/20 Rule in RAG

三轮优化中的投入产出比:

最高 ROI 的优化:
1. 添加 Reranker — 改1行配置,效果显著
2. Prompt V2 "不要编造" — 加1句话,幻觉减半
3. 语义缓存 — 一次投入,重复查询永久受益

最低 ROI 的优化(如果做了的话):
1. 精调 Hybrid Search 权重 — 差异很小
2. Chunk 重叠率调优 — 影响可忽略
3. 更换 Embedding 模型 — BGE-M3 已经够好

PM 的决策力体现在:
  知道哪些优化值得做(高ROI)
  知道哪些优化可以跳过(低ROI)
  知道什么时候停止优化(够好就行)

思考3:从零到一容易,从一到好难 / 0→1 is Easy, 1→Good is Hard

Day 52: 6小时从零构建了 V1
Day 53: 6小时把 V1 优化到 V3

V1 能工作了 → 成就感 +100
V3 比 V1 好了 20-30% → 成就感 +50

但是:
  V1 = 50% 的代码量
  V2+V3 的评估+优化 = 50% 的代码量

  V1 解决了"有没有"的问题
  V2+V3 解决了"好不好"的问题

  真正的产品价值在"好不好"

这也是为什么面试经常问:
  "你的 RAG 系统准确率是多少?"
  "你是怎么评估和优化的?"
  "能量化你的改进效果吗?"

如果回答"我没有评估"→ 不专业
如果回答"V1 到 V3 整体提升 25%"→ 数据驱动的工程师

学习资源 / Resources

RAG 评估

检索优化

RAG 最佳实践


明日预告 / Tomorrow's Preview

Day 54: LoRA微调实战 — 训练你的专属模型

从 RAG 转向微调:
  RAG = 给模型"外挂"知识库(不改变模型本身)
  微调 = 直接改变模型的"大脑"

明天将实际操作:
1. 准备微调数据集(从笔记中提取 QA 对)
2. 配置 LoRA 参数(Day 7 学的理论)
3. 在 Qwen2.5-7B 上做 LoRA 微调
4. 对比微调前后的效果
5. 思考: RAG vs 微调,什么时候用哪个?

需要提前准备:
  pip install peft transformers datasets accelerate
  确保 GPU 显存 >= 8GB
  准备好笔记中的 QA 数据

Day 7 的理论 → Day 54 的实践
又一次 "理论→动手" 的验证!

Day 53 完成! 从 V1 到 V3,每一步优化都有数据支撑。 这不仅是技术实践,更是"数据驱动决策"思维的锻炼。 明天开始微调,探索另一种让 AI "更懂你" 的方式!