返回 Expert 笔记
Expert Day 158

Memory 系统——Short-term / Long-term / Episodic / Semantic 四层 + Mem0

认知科学的 memory 分层(short-term, episodic, semantic, procedural)映射到 agent 系统;Mem0/LangMem/Letta 三个工业 lib 思路;vector store 长记忆

2026-10-06
Phase 3 - Agent架构与多Agent (Day 149-162)
MemoryVectorStoreEpisodicSemanticMem0LangMem

日期: 2026-10-06 方向: AI系统工程 / Agent 阶段: Phase 3 - Agent架构与多Agent (Day 149-162) 标签: #Memory #VectorStore #Episodic #Semantic #Mem0 #LangMem


今日目标

类型内容
学习认知科学的 memory 分层(short-term, episodic, semantic, procedural)映射到 agent 系统;Mem0/LangMem/Letta 三个工业 lib 思路;vector store 长记忆
实操实现 4 层 memory:working / episodic / semantic / procedural;用 Chroma + Anthropic 跑一个长会话连续性测试
产出memory.py(约 600 行)+ memory 调用示例

一、为什么 agent 需要"分层" memory

1.1 单纯的 message history 不够

最简单的 memory = 把全部对话历史塞 context。问题:

  • token 爆炸:100 轮后 200k token 全占满
  • 关键信息淹没:早期重要事实被淹在闲聊里
  • 跨会话遗忘:新开 session 全忘
  • 隐私污染:一次提到的敏感信息一直留在 context

1.2 认知科学分层

学界含义Agent 对应
Working memory几秒到几分钟的当下注意当前 messages(context window)
Short-term几分钟到几小时当前会话 summary
Episodic自传体记忆("上周我做了 X")历史会话 trace(带时间戳)
Semantic概念知识("巴黎是法国首都")用户偏好 / 实体属性 KB
Procedural技能(怎么骑车)学到的工作流 / SOPs

1.3 工业实现思路

思路
Mem0LLM 提取"facts" → vector store + graph,每轮自动更新
LangMem (LangChain)类似 Mem0,与 LangGraph 集成
Letta (前 MemGPT)OS 风格的 memory 分层 + 自管理函数(recall/archival memory)
ZepKnowledge graph + temporal reasoning(事实有效期)
OpenAI memory黑盒,不可控
自建纯 vector store + summary chain

二、架构图——4 层 memory

┌──────────────────────────────────────────────────────────────────┐
│                      MemoryManager                                │
│                                                                  │
│   ┌─────────────────────────────────────────────────────────┐    │
│   │ 1. Working Memory  (in-context messages, last K turns)  │    │
│   │    - bounded by token budget                            │    │
│   │    - oldest summarized into Short-term                  │    │
│   └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│   ┌─────────────────────────────────────────────────────────┐    │
│   │ 2. Episodic Memory  (timestamped events, vector store)   │   │
│   │    "On 2026-09-12 user asked about AAPL margins"         │   │
│   │    - retrieve by query similarity + recency              │   │
│   └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│   ┌─────────────────────────────────────────────────────────┐    │
│   │ 3. Semantic Memory  (facts, key-value + vector)           │   │
│   │    - user.risk_tolerance = "moderate"                     │   │
│   │    - portfolio.current_holdings = [...]                   │   │
│   │    - 通过 LLM extraction + dedup 维护                     │   │
│   └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│   ┌─────────────────────────────────────────────────────────┐    │
│   │ 4. Procedural Memory  (learned workflows)                 │   │
│   │    "When user asks for credit memo: do A, B, C"           │   │
│   │    - templates, prompts, tool call patterns               │   │
│   └─────────────────────────────────────────────────────────┘    │
│                                                                  │
│   API:                                                            │
│     - on_user_message(text) -> updates 1, schedules 2/3 update   │
│     - on_assistant_message(text)                                 │
│     - retrieve_for_query(text) -> dict of facts/episodes/skills  │
│     - flush_to_long_term()                                       │
└──────────────────────────────────────────────────────────────────┘

三、代码——memory.py

# memory.py
"""
Day 158 - 4-layer memory for agents.

Storage:
  - Working:   in-process list, bounded by tokens
  - Episodic:  Chroma collection "episodes"
  - Semantic:  Chroma collection "facts" + key-value dict
  - Procedural: dict of skill name -> recipe

Background tasks:
  - summarize_oldest_when_over_budget()
  - extract_facts_from_recent_turns()
"""
from __future__ import annotations
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Any

import chromadb
from anthropic import Anthropic

# Optional: tiktoken approximation
def approx_tokens(text: str) -> int:
    return max(1, len(text) // 4)

# ====================================================================
# Working memory
# ====================================================================
@dataclass
class WorkingMemory:
    user_id: str
    messages: list[dict] = field(default_factory=list)
    summary_so_far: str = ""
    token_budget: int = 8000

    def append(self, role: str, content: str):
        self.messages.append({"role": role, "content": content,
                              "ts": time.time(), "id": str(uuid.uuid4())})

    def total_tokens(self) -> int:
        return approx_tokens(self.summary_so_far) + sum(
            approx_tokens(m["content"]) for m in self.messages
        )

    def render_for_llm(self) -> list[dict]:
        prefix = []
        if self.summary_so_far:
            prefix = [{
                "role": "user",
                "content": f"<conversation_summary>{self.summary_so_far}</conversation_summary>",
            }]
        return prefix + [{"role": m["role"], "content": m["content"]} for m in self.messages]

    def needs_summarization(self) -> bool:
        return self.total_tokens() > self.token_budget

# ====================================================================
# Episodic memory (vector store, by user_id)
# ====================================================================
class EpisodicMemory:
    def __init__(self, client: chromadb.PersistentClient):
        self.col = client.get_or_create_collection(
            "episodes",
            metadata={"hnsw:space": "cosine"},
        )

    def add(self, user_id: str, text: str, ts: float | None = None,
            extra_meta: dict | None = None):
        eid = f"ep_{uuid.uuid4().hex[:12]}"
        meta = {"user_id": user_id, "ts": ts or time.time()}
        if extra_meta:
            meta.update(extra_meta)
        self.col.add(ids=[eid], documents=[text], metadatas=[meta])

    def retrieve(self, user_id: str, query: str, k: int = 5) -> list[dict]:
        res = self.col.query(query_texts=[query], n_results=k,
                             where={"user_id": user_id})
        out = []
        for doc, meta, dist in zip(res["documents"][0], res["metadatas"][0],
                                   res["distances"][0]):
            out.append({"text": doc, "meta": meta, "score": 1 - dist})
        return out

# ====================================================================
# Semantic memory (facts: key-value + vector)
# ====================================================================
class SemanticMemory:
    def __init__(self, client: chromadb.PersistentClient):
        self.col = client.get_or_create_collection(
            "facts",
            metadata={"hnsw:space": "cosine"},
        )
        self.kv: dict[str, dict[str, Any]] = {}  # user_id -> {fact_name: value}

    def upsert_fact(self, user_id: str, key: str, value: Any, source: str = ""):
        self.kv.setdefault(user_id, {})[key] = value
        text = f"{key}: {json.dumps(value)}"
        fid = f"f_{user_id}_{key}"
        self.col.upsert(ids=[fid], documents=[text],
                        metadatas=[{"user_id": user_id, "key": key, "source": source}])

    def get(self, user_id: str, key: str, default=None):
        return self.kv.get(user_id, {}).get(key, default)

    def search(self, user_id: str, query: str, k: int = 5) -> list[dict]:
        res = self.col.query(query_texts=[query], n_results=k,
                             where={"user_id": user_id})
        return [{"text": d, "meta": m, "score": 1 - s}
                for d, m, s in zip(res["documents"][0], res["metadatas"][0], res["distances"][0])]

# ====================================================================
# Procedural memory (learned workflows / skills)
# ====================================================================
@dataclass
class Skill:
    name: str
    when_to_use: str
    steps: list[str]
    last_used: float = 0.0
    success_count: int = 0

class ProceduralMemory:
    def __init__(self):
        self.skills: dict[str, Skill] = {}

    def add(self, name: str, when_to_use: str, steps: list[str]):
        self.skills[name] = Skill(name=name, when_to_use=when_to_use, steps=steps)

    def find(self, query: str) -> list[Skill]:
        # Simple keyword match; production would use embeddings
        q = query.lower()
        return [s for s in self.skills.values()
                if any(kw in s.when_to_use.lower() for kw in q.split())]

    def mark_used(self, name: str, success: bool):
        if name in self.skills:
            self.skills[name].last_used = time.time()
            if success:
                self.skills[name].success_count += 1

# ====================================================================
# Memory Manager
# ====================================================================
SUMMARIZE_PROMPT = """\
You are summarizing a conversation history into a compact memo. Keep:
- The user's current intent
- Key facts established
- Decisions made
- Open questions
Drop chitchat. Maximum 200 words.
"""

EXTRACT_PROMPT = """\
Extract DURABLE facts from the recent conversation that should be remembered
for future sessions. Examples: user preferences, decisions, account info,
relationships, dates. Output strictly JSON list of:
  {{"key": "...", "value": "...", "confidence": 0.0-1.0}}

Conversation:
{convo}

JSON:
"""

class MemoryManager:
    def __init__(self, persist_dir: str = "./mem_db"):
        self.client = Anthropic()
        self.chroma = chromadb.PersistentClient(path=persist_dir)
        self.episodic = EpisodicMemory(self.chroma)
        self.semantic = SemanticMemory(self.chroma)
        self.procedural = ProceduralMemory()
        self.working: dict[str, WorkingMemory] = {}

    def get_working(self, user_id: str) -> WorkingMemory:
        return self.working.setdefault(user_id, WorkingMemory(user_id=user_id))

    def on_message(self, user_id: str, role: str, content: str):
        wm = self.get_working(user_id)
        wm.append(role, content)
        if wm.needs_summarization():
            self._summarize_oldest(wm)

    def _summarize_oldest(self, wm: WorkingMemory):
        # Take oldest half for summarization, keep newest half + summary
        if len(wm.messages) < 6:
            return
        half = len(wm.messages) // 2
        old, keep = wm.messages[:half], wm.messages[half:]
        convo = "\n".join(f"{m['role']}: {m['content']}" for m in old)
        resp = self.client.messages.create(
            model="claude-haiku-4-5",
            max_tokens=600,
            system=SUMMARIZE_PROMPT,
            messages=[{"role": "user", "content": convo}],
        )
        new_summary = resp.content[0].text
        wm.summary_so_far = (
            (wm.summary_so_far + "\n" + new_summary)
            if wm.summary_so_far else new_summary
        )
        wm.messages = keep
        # Push old turns to episodic
        for m in old:
            self.episodic.add(wm.user_id,
                              text=f"{m['role']}: {m['content']}",
                              ts=m["ts"])

    def flush_facts(self, user_id: str):
        """Periodically (e.g. on session end) extract semantic facts."""
        wm = self.get_working(user_id)
        if not wm.messages:
            return
        convo = "\n".join(f"{m['role']}: {m['content']}" for m in wm.messages[-20:])
        resp = self.client.messages.create(
            model="claude-haiku-4-5",
            max_tokens=800,
            messages=[{"role": "user", "content": EXTRACT_PROMPT.format(convo=convo)}],
        )
        try:
            facts = json.loads(resp.content[0].text.strip().removeprefix("```json").removeprefix("```").removesuffix("```"))
        except Exception:
            return
        for f in facts:
            if f.get("confidence", 0) >= 0.6:
                self.semantic.upsert_fact(user_id, f["key"], f["value"],
                                           source="auto_extract")

    def retrieve_for_query(self, user_id: str, query: str) -> dict:
        return {
            "episodes": self.episodic.retrieve(user_id, query, k=3),
            "facts": self.semantic.search(user_id, query, k=5),
            "skills": [s.name for s in self.procedural.find(query)[:3]],
            "working_summary": self.get_working(user_id).summary_so_far,
        }

    def build_context_prefix(self, user_id: str, query: str) -> str:
        bundle = self.retrieve_for_query(user_id, query)
        parts = []
        if bundle["facts"]:
            parts.append("<known_facts>")
            for f in bundle["facts"]:
                parts.append(f"- {f['text']}")
            parts.append("</known_facts>")
        if bundle["episodes"]:
            parts.append("<relevant_history>")
            for ep in bundle["episodes"]:
                ts = time.strftime("%Y-%m-%d", time.localtime(ep["meta"]["ts"]))
                parts.append(f"[{ts}] {ep['text'][:200]}")
            parts.append("</relevant_history>")
        if bundle["working_summary"]:
            parts.append(f"<conversation_so_far>\n{bundle['working_summary']}\n</conversation_so_far>")
        return "\n".join(parts)

# ====================================================================
# Smoke test
# ====================================================================
if __name__ == "__main__":
    mm = MemoryManager(persist_dir="./mem_test")
    uid = "alice"

    # Simulate a few sessions
    mm.on_message(uid, "user", "I'm a moderate-risk investor in my 40s.")
    mm.on_message(uid, "assistant", "Got it. I'll calibrate suggestions.")
    mm.on_message(uid, "user", "I just bought 100 shares of AAPL at 215.")
    mm.on_message(uid, "assistant", "Recorded.")
    mm.flush_facts(uid)

    # New session, retrieve
    bundle = mm.retrieve_for_query(uid, "what's my risk tolerance and recent trade?")
    print(json.dumps(bundle, indent=2, default=str))

四、金融领域应用

4.1 客户经理 copilot 的 memory 设计

内容
Working当前对话窗口(10 轮)
Episodic过去 6 个月内每次咨询的摘要(每条 200 词)
Semantic客户画像:风险偏好、流动性需求、家庭情况、税务居住地、合规标记
Procedural"当客户问 X 类问题,按 SOP-A 处理"

关键收益:客户每次开口不用再讲一遍背景;KYC 字段半结构化半向量化检索。

4.2 合规审查 agent 的 memory

  • Episodic:过去类似审查案例(哪些被否、为什么)
  • Semantic:监管规则库(条款 → 解释 → 适用场景)
  • Procedural:审查流程模板

RAG 仅返回相关,避免上下文塞 1000 条规则。


五、Web3 集成

链上身份与跨 dApp memory

传统 SaaS:memory keyed by user_id。Web3:keyed by wallet address。这带来好处:

  • 用户切换 client/dApp,memory 仍连续
  • "Sign-in with Ethereum" 验证后,memory bundle 可携带

隐私挑战:钱包地址公开,谁都可以查 memory?解:① memory 内容加密(用户钱包加密);② 链上只存指针/hash,链下存内容;③ ZK proof of "I am this wallet"。

Mem0 / Letta 的 onchain 实验

社区有一些项目(比如 OLAS、Virtuals)尝试把 agent memory 上链。当前现实:

  • Hot memory(高频 read/write)必须链下
  • Settlement memory(关键事实哈希、时间戳)可以上链,做"agent 行为的可审计 trail"
  • Cross-agent memory market:Ocean Protocol 类似数据市场,agent 互相买卖记忆

六、生产经验与陷阱

  1. Memory 污染 一次错误事实进了 semantic memory,之后每次 retrieve 都误导 LLM。需要:

    • confidence threshold(< 0.6 不入库)
    • 周期性 audit(用 evaluator LLM 检查 memory 一致性)
    • user-facing "forget X" 命令
  2. Retrieve 召回不准 embedding 模型不行 / chunk 太大 / 用户问题措辞陌生。措施:

    • hybrid search(dense + BM25)
    • rerank 模型
    • HyDE(先 LLM 生成"理想答案"再 embed)
  3. Episodic 太多 → noise 存了 1000 条历史,retrieve 出的"相关"未必有用。recency 加权 + 强相关性阈值。

  4. Semantic 自动 extraction 出错 LLM 抽出"用户家有狗"作为 fact 放进 KB,1 年后忘记狗死了。fact 有 expiration / staleness 标记 + 周期 refresh。

  5. 跨 user 泄露 忘记 where: user_id 过滤,A 的事实出现在 B 的 retrieval。所有 query 必须强制 user_id 过滤,做单元测试。

  6. GDPR / 删除权 用户要求 forget。需要级联删除 episodic + semantic + cache。设计时就要考虑 audit log 和删除接口。

  7. Cost 失控 每轮都跑 fact extraction 烧 token。改成按事件触发(session 结束 / 显著状态变化)或异步队列。


七、Cost & Latency

一次完整交互的 memory cost(估算)

频率单次 cost
Embedding(store + query)每轮 1-2 次~$0.0001/轮
Working memory summarize每 N 轮 1 次$0.005/次(haiku)
Fact extractionsession 结束$0.01/次(haiku)
Vector search每 query< 10ms(本地 Chroma)

对比 LLM 主调用 ($0.05-0.10),memory 开销 < 10%。但若每轮都做全套抽取/总结,就 25%+。

延迟

操作延迟
Vector search (Chroma local, 100k docs)10-30ms
Vector search (Pinecone hosted)50-200ms
Summarize (haiku 600 tokens)1-2s
Fact extract (haiku 800 tokens)1-2s

实时路径只做 retrieve(快),写入和 summarize 走异步 worker。


八、关键速查

Memory 层速查

何时写何时读存储
Working每条消息每次 LLM callin-mem list
Short-term summary容量超阈值时每次 LLM callsummary string
Episodic写入旧消息 / 重要事件与 query 相关时vector DB
Semantic抽取出 fact 时与 query 相关时vector + kv
Procedural学到/手工录入 skill任务匹配时dict / vector

工业 lib 选型

需求
与 LangGraph 集成LangMem
极简、自动 fact 抽取Mem0
完整 OS 风格Letta
时间敏感 / 事实有效期Zep
自定义 / 不锁定Chroma + own pipeline

九、面试题

Q1: 为什么不能把全部历史塞 LLM context?

A: ① 1M context 看似够大但 cost ∝ token,每轮重传 100k 烧钱;② attention dilution("中间被遗忘")现象——长 context 中段信息回忆质量下降;③ 隐私污染——一次提到的敏感信息持续存在;④ 跨 session 不能共享。分层 memory 把"啥都记 vs 啥都忘"的两极变成可控分级。

Q2: Episodic 与 Semantic 区别是什么?设计上怎么分?

A: Episodic 是"事件流"——带时间戳的 raw observations("用户在 9-10 问了 AAPL")。Semantic 是"事实"——脱时间的概念("用户的风险偏好 = moderate")。设计上:① episodic 直接存对话 chunk,按时间存;② semantic 由 LLM 从 episodic 抽取 + dedup 维护;③ retrieval 时 semantic 优先(密度高),episodic 补充 context。

Q3: 一个用户告诉 agent "我有一只叫 Buddy 的狗"。1 年后,agent 还应该记得吗?

A: 看场景。① 严格事实型 product(医疗、法律):保留 + 加 staleness 字段,retrieve 时显示"上次更新 1 年前",触发确认;② 闲聊型:可保留作为 personality cue;③ 监管/隐私:是否过 retention 期?是否用户撤销同意?需要 expiration policy。永远不要"无脑保留所有"。

Q4: 如何防 memory 中毒(错误事实持续误导)?

A: ① confidence threshold;② evaluator agent 周期性 audit;③ 显式 user override("forget X");④ 来源标记(source=auto_extract vs source=user_explicit);⑤ 多次冲突时优先最近 + 高 confidence;⑥ 关键 fact 由用户确认("我记下了 X,对吗?")。

Q5: Mem0 / LangMem / Letta 哪个适合 enterprise 部署?

A: 没有一个完美。考虑:① 数据驻留——必须支持 self-host / on-prem,Mem0 / Letta 支持;② 审计 / GDPR——需要 audit log 和删除接口,Letta 完整度高;③ 延迟——在线交易场景要 < 100ms,自建 + Chroma 最快;④ 维护——团队规模决定能不能 own 复杂栈。生产里常见:自建简化版(vector + 简单 fact extract)。


十、与 RAG 的关系

RAG(Retrieval-Augmented Generation)和 Agent Memory 是同一系列的概念吗?

不完全。区别:

维度RAGAgent Memory
目标在 prompt 中注入相关知识跨会话保持上下文
范围通常静态文档库动态、用户特定
更新文档更新触发每次交互可写
检索按 query 单次按 query + 时间 + 用户
保留永久有 expiration
隐私通常公共高度个人

实际系统:RAG + Memory 同时存在。RAG 提供"知识"(产品手册、监管文件),Memory 提供"用户上下文"(user.preference, recent.activity)。检索时两者并联。

后续 Day 163-176 会深入 RAG(hybrid / rerank / GraphRAG)。


十一、Memory 与 Long-Context 模型的取舍

随着 1M context 模型普及,是否还需要 memory 系统?

方案
All-in-context(每轮塞历史)简单、无依赖贵、慢、attention 衰减
Memory-based便宜、可扩展、隐私可控增加复杂度、检索可能漏
Hybrid(短期 in-context + 长期 memory)折中设计复杂

大多数生产系统采用 hybrid:

  • 当前会话 last K turns 留 in-context
  • 旧会话 + 跨会话事实走 memory
  • 1M context 用于"single document large input",不是"无限聊天"

十二、扩展练习

  1. 替换 embedding 为 Cohere embed-v4——比对 retrieve 召回率
  2. 实现 fact 矛盾检测——新 fact 与旧 fact 冲突时让 LLM 决断
  3. 加 namespace——一个 user 下多个 projects(隔离不同业务上下文的 memory)
  4. 加 forget API——用户主动让 agent 删除某 fact
  5. 实现 procedural memory 学习——agent 完成任务后自动总结成 skill 模板
  6. 给 episodic 加 importance score——只保留高重要度的记忆,老低重要度记忆被 prune

明日预告

Day 159: 多 Agent 协作——Hierarchical / Network / Sequential 三种拓扑

  • 主管-下属、对等网络、流水线三种结构
  • 各自的通信开销和一致性问题
  • 实现 3 种 multi_agent.py