Memory 系统——Short-term / Long-term / Episodic / Semantic 四层 + Mem0
认知科学的 memory 分层(short-term, episodic, semantic, procedural)映射到 agent 系统;Mem0/LangMem/Letta 三个工业 lib 思路;vector store 长记忆
日期: 2026-10-06 方向: AI系统工程 / Agent 阶段: Phase 3 - Agent架构与多Agent (Day 149-162) 标签: #Memory #VectorStore #Episodic #Semantic #Mem0 #LangMem
今日目标
| 类型 | 内容 |
|---|---|
| 学习 | 认知科学的 memory 分层(short-term, episodic, semantic, procedural)映射到 agent 系统;Mem0/LangMem/Letta 三个工业 lib 思路;vector store 长记忆 |
| 实操 | 实现 4 层 memory:working / episodic / semantic / procedural;用 Chroma + Anthropic 跑一个长会话连续性测试 |
| 产出 | memory.py(约 600 行)+ memory 调用示例 |
一、为什么 agent 需要"分层" memory
1.1 单纯的 message history 不够
最简单的 memory = 把全部对话历史塞 context。问题:
- token 爆炸:100 轮后 200k token 全占满
- 关键信息淹没:早期重要事实被淹在闲聊里
- 跨会话遗忘:新开 session 全忘
- 隐私污染:一次提到的敏感信息一直留在 context
1.2 认知科学分层
| 层 | 学界含义 | Agent 对应 |
|---|---|---|
| Working memory | 几秒到几分钟的当下注意 | 当前 messages(context window) |
| Short-term | 几分钟到几小时 | 当前会话 summary |
| Episodic | 自传体记忆("上周我做了 X") | 历史会话 trace(带时间戳) |
| Semantic | 概念知识("巴黎是法国首都") | 用户偏好 / 实体属性 KB |
| Procedural | 技能(怎么骑车) | 学到的工作流 / SOPs |
1.3 工业实现思路
| 库 | 思路 |
|---|---|
| Mem0 | LLM 提取"facts" → vector store + graph,每轮自动更新 |
| LangMem (LangChain) | 类似 Mem0,与 LangGraph 集成 |
| Letta (前 MemGPT) | OS 风格的 memory 分层 + 自管理函数(recall/archival memory) |
| Zep | Knowledge graph + temporal reasoning(事实有效期) |
| OpenAI memory | 黑盒,不可控 |
| 自建 | 纯 vector store + summary chain |
二、架构图——4 层 memory
┌──────────────────────────────────────────────────────────────────┐
│ MemoryManager │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 1. Working Memory (in-context messages, last K turns) │ │
│ │ - bounded by token budget │ │
│ │ - oldest summarized into Short-term │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 2. Episodic Memory (timestamped events, vector store) │ │
│ │ "On 2026-09-12 user asked about AAPL margins" │ │
│ │ - retrieve by query similarity + recency │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 3. Semantic Memory (facts, key-value + vector) │ │
│ │ - user.risk_tolerance = "moderate" │ │
│ │ - portfolio.current_holdings = [...] │ │
│ │ - 通过 LLM extraction + dedup 维护 │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ ┌─────────────────────────────────────────────────────────┐ │
│ │ 4. Procedural Memory (learned workflows) │ │
│ │ "When user asks for credit memo: do A, B, C" │ │
│ │ - templates, prompts, tool call patterns │ │
│ └─────────────────────────────────────────────────────────┘ │
│ │
│ API: │
│ - on_user_message(text) -> updates 1, schedules 2/3 update │
│ - on_assistant_message(text) │
│ - retrieve_for_query(text) -> dict of facts/episodes/skills │
│ - flush_to_long_term() │
└──────────────────────────────────────────────────────────────────┘
三、代码——memory.py
# memory.py
"""
Day 158 - 4-layer memory for agents.
Storage:
- Working: in-process list, bounded by tokens
- Episodic: Chroma collection "episodes"
- Semantic: Chroma collection "facts" + key-value dict
- Procedural: dict of skill name -> recipe
Background tasks:
- summarize_oldest_when_over_budget()
- extract_facts_from_recent_turns()
"""
from __future__ import annotations
import json
import time
import uuid
from dataclasses import dataclass, field
from typing import Any
import chromadb
from anthropic import Anthropic
# Optional: tiktoken approximation
def approx_tokens(text: str) -> int:
return max(1, len(text) // 4)
# ====================================================================
# Working memory
# ====================================================================
@dataclass
class WorkingMemory:
user_id: str
messages: list[dict] = field(default_factory=list)
summary_so_far: str = ""
token_budget: int = 8000
def append(self, role: str, content: str):
self.messages.append({"role": role, "content": content,
"ts": time.time(), "id": str(uuid.uuid4())})
def total_tokens(self) -> int:
return approx_tokens(self.summary_so_far) + sum(
approx_tokens(m["content"]) for m in self.messages
)
def render_for_llm(self) -> list[dict]:
prefix = []
if self.summary_so_far:
prefix = [{
"role": "user",
"content": f"<conversation_summary>{self.summary_so_far}</conversation_summary>",
}]
return prefix + [{"role": m["role"], "content": m["content"]} for m in self.messages]
def needs_summarization(self) -> bool:
return self.total_tokens() > self.token_budget
# ====================================================================
# Episodic memory (vector store, by user_id)
# ====================================================================
class EpisodicMemory:
def __init__(self, client: chromadb.PersistentClient):
self.col = client.get_or_create_collection(
"episodes",
metadata={"hnsw:space": "cosine"},
)
def add(self, user_id: str, text: str, ts: float | None = None,
extra_meta: dict | None = None):
eid = f"ep_{uuid.uuid4().hex[:12]}"
meta = {"user_id": user_id, "ts": ts or time.time()}
if extra_meta:
meta.update(extra_meta)
self.col.add(ids=[eid], documents=[text], metadatas=[meta])
def retrieve(self, user_id: str, query: str, k: int = 5) -> list[dict]:
res = self.col.query(query_texts=[query], n_results=k,
where={"user_id": user_id})
out = []
for doc, meta, dist in zip(res["documents"][0], res["metadatas"][0],
res["distances"][0]):
out.append({"text": doc, "meta": meta, "score": 1 - dist})
return out
# ====================================================================
# Semantic memory (facts: key-value + vector)
# ====================================================================
class SemanticMemory:
def __init__(self, client: chromadb.PersistentClient):
self.col = client.get_or_create_collection(
"facts",
metadata={"hnsw:space": "cosine"},
)
self.kv: dict[str, dict[str, Any]] = {} # user_id -> {fact_name: value}
def upsert_fact(self, user_id: str, key: str, value: Any, source: str = ""):
self.kv.setdefault(user_id, {})[key] = value
text = f"{key}: {json.dumps(value)}"
fid = f"f_{user_id}_{key}"
self.col.upsert(ids=[fid], documents=[text],
metadatas=[{"user_id": user_id, "key": key, "source": source}])
def get(self, user_id: str, key: str, default=None):
return self.kv.get(user_id, {}).get(key, default)
def search(self, user_id: str, query: str, k: int = 5) -> list[dict]:
res = self.col.query(query_texts=[query], n_results=k,
where={"user_id": user_id})
return [{"text": d, "meta": m, "score": 1 - s}
for d, m, s in zip(res["documents"][0], res["metadatas"][0], res["distances"][0])]
# ====================================================================
# Procedural memory (learned workflows / skills)
# ====================================================================
@dataclass
class Skill:
name: str
when_to_use: str
steps: list[str]
last_used: float = 0.0
success_count: int = 0
class ProceduralMemory:
def __init__(self):
self.skills: dict[str, Skill] = {}
def add(self, name: str, when_to_use: str, steps: list[str]):
self.skills[name] = Skill(name=name, when_to_use=when_to_use, steps=steps)
def find(self, query: str) -> list[Skill]:
# Simple keyword match; production would use embeddings
q = query.lower()
return [s for s in self.skills.values()
if any(kw in s.when_to_use.lower() for kw in q.split())]
def mark_used(self, name: str, success: bool):
if name in self.skills:
self.skills[name].last_used = time.time()
if success:
self.skills[name].success_count += 1
# ====================================================================
# Memory Manager
# ====================================================================
SUMMARIZE_PROMPT = """\
You are summarizing a conversation history into a compact memo. Keep:
- The user's current intent
- Key facts established
- Decisions made
- Open questions
Drop chitchat. Maximum 200 words.
"""
EXTRACT_PROMPT = """\
Extract DURABLE facts from the recent conversation that should be remembered
for future sessions. Examples: user preferences, decisions, account info,
relationships, dates. Output strictly JSON list of:
{{"key": "...", "value": "...", "confidence": 0.0-1.0}}
Conversation:
{convo}
JSON:
"""
class MemoryManager:
def __init__(self, persist_dir: str = "./mem_db"):
self.client = Anthropic()
self.chroma = chromadb.PersistentClient(path=persist_dir)
self.episodic = EpisodicMemory(self.chroma)
self.semantic = SemanticMemory(self.chroma)
self.procedural = ProceduralMemory()
self.working: dict[str, WorkingMemory] = {}
def get_working(self, user_id: str) -> WorkingMemory:
return self.working.setdefault(user_id, WorkingMemory(user_id=user_id))
def on_message(self, user_id: str, role: str, content: str):
wm = self.get_working(user_id)
wm.append(role, content)
if wm.needs_summarization():
self._summarize_oldest(wm)
def _summarize_oldest(self, wm: WorkingMemory):
# Take oldest half for summarization, keep newest half + summary
if len(wm.messages) < 6:
return
half = len(wm.messages) // 2
old, keep = wm.messages[:half], wm.messages[half:]
convo = "\n".join(f"{m['role']}: {m['content']}" for m in old)
resp = self.client.messages.create(
model="claude-haiku-4-5",
max_tokens=600,
system=SUMMARIZE_PROMPT,
messages=[{"role": "user", "content": convo}],
)
new_summary = resp.content[0].text
wm.summary_so_far = (
(wm.summary_so_far + "\n" + new_summary)
if wm.summary_so_far else new_summary
)
wm.messages = keep
# Push old turns to episodic
for m in old:
self.episodic.add(wm.user_id,
text=f"{m['role']}: {m['content']}",
ts=m["ts"])
def flush_facts(self, user_id: str):
"""Periodically (e.g. on session end) extract semantic facts."""
wm = self.get_working(user_id)
if not wm.messages:
return
convo = "\n".join(f"{m['role']}: {m['content']}" for m in wm.messages[-20:])
resp = self.client.messages.create(
model="claude-haiku-4-5",
max_tokens=800,
messages=[{"role": "user", "content": EXTRACT_PROMPT.format(convo=convo)}],
)
try:
facts = json.loads(resp.content[0].text.strip().removeprefix("```json").removeprefix("```").removesuffix("```"))
except Exception:
return
for f in facts:
if f.get("confidence", 0) >= 0.6:
self.semantic.upsert_fact(user_id, f["key"], f["value"],
source="auto_extract")
def retrieve_for_query(self, user_id: str, query: str) -> dict:
return {
"episodes": self.episodic.retrieve(user_id, query, k=3),
"facts": self.semantic.search(user_id, query, k=5),
"skills": [s.name for s in self.procedural.find(query)[:3]],
"working_summary": self.get_working(user_id).summary_so_far,
}
def build_context_prefix(self, user_id: str, query: str) -> str:
bundle = self.retrieve_for_query(user_id, query)
parts = []
if bundle["facts"]:
parts.append("<known_facts>")
for f in bundle["facts"]:
parts.append(f"- {f['text']}")
parts.append("</known_facts>")
if bundle["episodes"]:
parts.append("<relevant_history>")
for ep in bundle["episodes"]:
ts = time.strftime("%Y-%m-%d", time.localtime(ep["meta"]["ts"]))
parts.append(f"[{ts}] {ep['text'][:200]}")
parts.append("</relevant_history>")
if bundle["working_summary"]:
parts.append(f"<conversation_so_far>\n{bundle['working_summary']}\n</conversation_so_far>")
return "\n".join(parts)
# ====================================================================
# Smoke test
# ====================================================================
if __name__ == "__main__":
mm = MemoryManager(persist_dir="./mem_test")
uid = "alice"
# Simulate a few sessions
mm.on_message(uid, "user", "I'm a moderate-risk investor in my 40s.")
mm.on_message(uid, "assistant", "Got it. I'll calibrate suggestions.")
mm.on_message(uid, "user", "I just bought 100 shares of AAPL at 215.")
mm.on_message(uid, "assistant", "Recorded.")
mm.flush_facts(uid)
# New session, retrieve
bundle = mm.retrieve_for_query(uid, "what's my risk tolerance and recent trade?")
print(json.dumps(bundle, indent=2, default=str))
四、金融领域应用
4.1 客户经理 copilot 的 memory 设计
| 层 | 内容 |
|---|---|
| Working | 当前对话窗口(10 轮) |
| Episodic | 过去 6 个月内每次咨询的摘要(每条 200 词) |
| Semantic | 客户画像:风险偏好、流动性需求、家庭情况、税务居住地、合规标记 |
| Procedural | "当客户问 X 类问题,按 SOP-A 处理" |
关键收益:客户每次开口不用再讲一遍背景;KYC 字段半结构化半向量化检索。
4.2 合规审查 agent 的 memory
- Episodic:过去类似审查案例(哪些被否、为什么)
- Semantic:监管规则库(条款 → 解释 → 适用场景)
- Procedural:审查流程模板
RAG 仅返回相关,避免上下文塞 1000 条规则。
五、Web3 集成
链上身份与跨 dApp memory
传统 SaaS:memory keyed by user_id。Web3:keyed by wallet address。这带来好处:
- 用户切换 client/dApp,memory 仍连续
- "Sign-in with Ethereum" 验证后,memory bundle 可携带
隐私挑战:钱包地址公开,谁都可以查 memory?解:① memory 内容加密(用户钱包加密);② 链上只存指针/hash,链下存内容;③ ZK proof of "I am this wallet"。
Mem0 / Letta 的 onchain 实验
社区有一些项目(比如 OLAS、Virtuals)尝试把 agent memory 上链。当前现实:
- Hot memory(高频 read/write)必须链下
- Settlement memory(关键事实哈希、时间戳)可以上链,做"agent 行为的可审计 trail"
- Cross-agent memory market:Ocean Protocol 类似数据市场,agent 互相买卖记忆
六、生产经验与陷阱
-
Memory 污染 一次错误事实进了 semantic memory,之后每次 retrieve 都误导 LLM。需要:
- confidence threshold(< 0.6 不入库)
- 周期性 audit(用 evaluator LLM 检查 memory 一致性)
- user-facing "forget X" 命令
-
Retrieve 召回不准 embedding 模型不行 / chunk 太大 / 用户问题措辞陌生。措施:
- hybrid search(dense + BM25)
- rerank 模型
- HyDE(先 LLM 生成"理想答案"再 embed)
-
Episodic 太多 → noise 存了 1000 条历史,retrieve 出的"相关"未必有用。recency 加权 + 强相关性阈值。
-
Semantic 自动 extraction 出错 LLM 抽出"用户家有狗"作为 fact 放进 KB,1 年后忘记狗死了。fact 有 expiration / staleness 标记 + 周期 refresh。
-
跨 user 泄露 忘记
where: user_id过滤,A 的事实出现在 B 的 retrieval。所有 query 必须强制 user_id 过滤,做单元测试。 -
GDPR / 删除权 用户要求 forget。需要级联删除 episodic + semantic + cache。设计时就要考虑 audit log 和删除接口。
-
Cost 失控 每轮都跑 fact extraction 烧 token。改成按事件触发(session 结束 / 显著状态变化)或异步队列。
七、Cost & Latency
一次完整交互的 memory cost(估算)
| 项 | 频率 | 单次 cost |
|---|---|---|
| Embedding(store + query) | 每轮 1-2 次 | ~$0.0001/轮 |
| Working memory summarize | 每 N 轮 1 次 | $0.005/次(haiku) |
| Fact extraction | session 结束 | $0.01/次(haiku) |
| Vector search | 每 query | < 10ms(本地 Chroma) |
对比 LLM 主调用 ($0.05-0.10),memory 开销 < 10%。但若每轮都做全套抽取/总结,就 25%+。
延迟
| 操作 | 延迟 |
|---|---|
| Vector search (Chroma local, 100k docs) | 10-30ms |
| Vector search (Pinecone hosted) | 50-200ms |
| Summarize (haiku 600 tokens) | 1-2s |
| Fact extract (haiku 800 tokens) | 1-2s |
实时路径只做 retrieve(快),写入和 summarize 走异步 worker。
八、关键速查
Memory 层速查
| 层 | 何时写 | 何时读 | 存储 |
|---|---|---|---|
| Working | 每条消息 | 每次 LLM call | in-mem list |
| Short-term summary | 容量超阈值时 | 每次 LLM call | summary string |
| Episodic | 写入旧消息 / 重要事件 | 与 query 相关时 | vector DB |
| Semantic | 抽取出 fact 时 | 与 query 相关时 | vector + kv |
| Procedural | 学到/手工录入 skill | 任务匹配时 | dict / vector |
工业 lib 选型
| 需求 | 选 |
|---|---|
| 与 LangGraph 集成 | LangMem |
| 极简、自动 fact 抽取 | Mem0 |
| 完整 OS 风格 | Letta |
| 时间敏感 / 事实有效期 | Zep |
| 自定义 / 不锁定 | Chroma + own pipeline |
九、面试题
Q1: 为什么不能把全部历史塞 LLM context?
A: ① 1M context 看似够大但 cost ∝ token,每轮重传 100k 烧钱;② attention dilution("中间被遗忘")现象——长 context 中段信息回忆质量下降;③ 隐私污染——一次提到的敏感信息持续存在;④ 跨 session 不能共享。分层 memory 把"啥都记 vs 啥都忘"的两极变成可控分级。
Q2: Episodic 与 Semantic 区别是什么?设计上怎么分?
A: Episodic 是"事件流"——带时间戳的 raw observations("用户在 9-10 问了 AAPL")。Semantic 是"事实"——脱时间的概念("用户的风险偏好 = moderate")。设计上:① episodic 直接存对话 chunk,按时间存;② semantic 由 LLM 从 episodic 抽取 + dedup 维护;③ retrieval 时 semantic 优先(密度高),episodic 补充 context。
Q3: 一个用户告诉 agent "我有一只叫 Buddy 的狗"。1 年后,agent 还应该记得吗?
A: 看场景。① 严格事实型 product(医疗、法律):保留 + 加 staleness 字段,retrieve 时显示"上次更新 1 年前",触发确认;② 闲聊型:可保留作为 personality cue;③ 监管/隐私:是否过 retention 期?是否用户撤销同意?需要 expiration policy。永远不要"无脑保留所有"。
Q4: 如何防 memory 中毒(错误事实持续误导)?
A: ① confidence threshold;② evaluator agent 周期性 audit;③ 显式 user override("forget X");④ 来源标记(
source=auto_extractvssource=user_explicit);⑤ 多次冲突时优先最近 + 高 confidence;⑥ 关键 fact 由用户确认("我记下了 X,对吗?")。
Q5: Mem0 / LangMem / Letta 哪个适合 enterprise 部署?
A: 没有一个完美。考虑:① 数据驻留——必须支持 self-host / on-prem,Mem0 / Letta 支持;② 审计 / GDPR——需要 audit log 和删除接口,Letta 完整度高;③ 延迟——在线交易场景要 < 100ms,自建 + Chroma 最快;④ 维护——团队规模决定能不能 own 复杂栈。生产里常见:自建简化版(vector + 简单 fact extract)。
十、与 RAG 的关系
RAG(Retrieval-Augmented Generation)和 Agent Memory 是同一系列的概念吗?
不完全。区别:
| 维度 | RAG | Agent Memory |
|---|---|---|
| 目标 | 在 prompt 中注入相关知识 | 跨会话保持上下文 |
| 范围 | 通常静态文档库 | 动态、用户特定 |
| 更新 | 文档更新触发 | 每次交互可写 |
| 检索 | 按 query 单次 | 按 query + 时间 + 用户 |
| 保留 | 永久 | 有 expiration |
| 隐私 | 通常公共 | 高度个人 |
实际系统:RAG + Memory 同时存在。RAG 提供"知识"(产品手册、监管文件),Memory 提供"用户上下文"(user.preference, recent.activity)。检索时两者并联。
后续 Day 163-176 会深入 RAG(hybrid / rerank / GraphRAG)。
十一、Memory 与 Long-Context 模型的取舍
随着 1M context 模型普及,是否还需要 memory 系统?
| 方案 | 优 | 劣 |
|---|---|---|
| All-in-context(每轮塞历史) | 简单、无依赖 | 贵、慢、attention 衰减 |
| Memory-based | 便宜、可扩展、隐私可控 | 增加复杂度、检索可能漏 |
| Hybrid(短期 in-context + 长期 memory) | 折中 | 设计复杂 |
大多数生产系统采用 hybrid:
- 当前会话 last K turns 留 in-context
- 旧会话 + 跨会话事实走 memory
- 1M context 用于"single document large input",不是"无限聊天"
十二、扩展练习
- 替换 embedding 为 Cohere embed-v4——比对 retrieve 召回率
- 实现 fact 矛盾检测——新 fact 与旧 fact 冲突时让 LLM 决断
- 加 namespace——一个 user 下多个 projects(隔离不同业务上下文的 memory)
- 加 forget API——用户主动让 agent 删除某 fact
- 实现 procedural memory 学习——agent 完成任务后自动总结成 skill 模板
- 给 episodic 加 importance score——只保留高重要度的记忆,老低重要度记忆被 prune
明日预告
Day 159: 多 Agent 协作——Hierarchical / Network / Sequential 三种拓扑
- 主管-下属、对等网络、流水线三种结构
- 各自的通信开销和一致性问题
- 实现 3 种 multi_agent.py