Expert Day 170
LLMOps 工具链 — Langfuse / Helicone / LangSmith
### 1.1 为什么需要 LLM 专用 observability
2026-10-18
Phase 3 - 生产基础设施与评估 (Day 163-176)LangfuseHeliconeLangSmithObservabilityTracing
日期: 2026-10-18 方向: AI系统工程 / LLMOps / Observability 阶段: Phase 3 - 生产基础设施与评估 (Day 163-176) 标签: #Langfuse #Helicone #LangSmith #Observability #Tracing
今日目标
| 类型 | 内容 |
|---|---|
| 学习 | LLM observability 三件套(trace/score/dataset);Langfuse vs Helicone vs LangSmith 选型;OpenTelemetry GenAI semconv |
| 实操 | Self-host Langfuse;接入 Anthropic SDK;落地 trace + score + experiments |
| 产出 | docs/ai-infra/observability.md:完整接入指南、deployment compose、查询示例 |
一、核心概念
1.1 为什么需要 LLM 专用 observability
传统 APM(DataDog/NewRelic)只能看延迟和错误率,但 LLM 应用还需要:
| 维度 | 传统 APM | LLM Observability |
|---|---|---|
| Trace 嵌套 | ✓ HTTP 调用 | ✓ + LLM call / tool call / embedding / retrieval 多层 |
| Token 成本 | ✗ | ✓ 按 model/user/feature 切片 |
| Prompt + Output | ✗ | ✓ 全文存储 + diff |
| Score(质量) | ✗ | ✓ 接入 LLM judge / human / det eval |
| User feedback | ✗ | ✓ thumbs up/down 关联 trace |
| Dataset / Eval | ✗ | ✓ 可从 production 一键转 dataset |
| Prompt 版本 | ✗ | ✓ 版本管理 + AB |
1.2 三家对比
| 项目 | 开源 | Self-host | 强项 | 选型建议 |
|---|---|---|---|---|
| Langfuse | ✅ | ✅(Docker 一键) | 全功能、社区活跃、SDK 多语言 | 首选,金融自托管必选 |
| Helicone | ✅ | ✅ | 简单、proxy 模式不改代码 | 轻量场景 |
| LangSmith | ❌(云) | ❌ | 与 LangChain 深度集成 | LangChain 重度用户 |
| OpenLLMetry | ✅ | ✅ | 标准 OpenTelemetry,可对接任意 backend | 已有 OTel 基建 |
| Phoenix (Arize) | ✅ | ✅ | 评测 + 监控合一 | 评测重度场景 |
1.3 OpenTelemetry GenAI Semantic Conventions(趋势)
OTel 在 2025 推出 GenAI semconv,所有厂商正在对齐:
Span name: chat <model>
Attributes:
gen_ai.system: "anthropic"
gen_ai.request.model: "claude-opus-4-7"
gen_ai.response.model: "claude-opus-4-7"
gen_ai.usage.input_tokens: 1200
gen_ai.usage.output_tokens: 350
gen_ai.usage.cached_input_tokens: 1100
gen_ai.request.temperature: 0.0
Events:
gen_ai.user.message
gen_ai.assistant.message
未来可在任何兼容后端(Tempo、Jaeger、Datadog)查 GenAI trace。
1.4 Trace / Score / Dataset 三件套
┌────────────────────────────────────────────┐
│ Trace(一次请求的完整执行) │
│ ├─ LLM call (claude-opus, 1.2s) │
│ ├─ Tool call: search_kb (0.3s) │
│ ├─ LLM call (claude-opus, 0.8s) │
│ └─ Final output │
└────────────────────────────────────────────┘
│
scored by
┌───────┴────────┐
LLM judge human thumbs-up det eval
(auto) (1% sample) (every call)
│
┌───────┴────────┐
│ promoted to │
│ Dataset │
│ (golden v1.2) │
└────────────────┘
│
┌───────┴────────┐
│ Experiment │
│ run new prompt │
│ on dataset │
└────────────────┘
二、生产架构图
App (你的金融 AI 服务)
├── @observe decorator (Langfuse SDK)
│
├──→ Anthropic API
│
└──→ Langfuse SDK (async batched)
│
▼
Langfuse Server (self-host)
├── Web UI
├── PostgreSQL (metadata)
├── ClickHouse (events)
└── S3 (prompts/outputs blob)
│
├──→ Slack / PagerDuty (alerts)
├──→ ClickHouse SQL (analytics)
└──→ S3 export (audit / compliance)
三、代码实现
3.1 Self-host Langfuse(Docker Compose)
# docker-compose.yml
version: '3.9'
services:
langfuse-server:
image: langfuse/langfuse:3
depends_on:
- postgres
- clickhouse
ports:
- 3000:3000
environment:
DATABASE_URL: postgresql://postgres:postgres@postgres:5432/postgres
CLICKHOUSE_URL: http://clickhouse:8123
CLICKHOUSE_USER: default
CLICKHOUSE_PASSWORD: clickhouse
NEXTAUTH_URL: http://localhost:3000
NEXTAUTH_SECRET: change-me
SALT: change-me
ENCRYPTION_KEY: 0000000000000000000000000000000000000000000000000000000000000000
TELEMETRY_ENABLED: false
LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: true
# 金融场景:禁用产品分析、禁外发
LANGFUSE_DISABLE_USAGE_TRACKING: true
postgres:
image: postgres:16
environment:
POSTGRES_PASSWORD: postgres
volumes:
- pgdata:/var/lib/postgresql/data
clickhouse:
image: clickhouse/clickhouse-server:24
environment:
CLICKHOUSE_PASSWORD: clickhouse
volumes:
- chdata:/var/lib/clickhouse
volumes:
pgdata:
chdata:
3.2 Anthropic SDK 接入
"""langfuse_anthropic.py — 完整接入示例"""
import os
from langfuse import Langfuse, observe
from langfuse.anthropic import Anthropic # 官方 wrapper(auto-trace)
# Langfuse 自托管 endpoint
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_HOST"] = "https://langfuse.internal.bank.com"
lf = Langfuse()
client = Anthropic() # ← 这个 wrapper 自动埋 trace
# ─── 简单 LLM 调用:自动 trace ───
def simple_call(question: str):
r = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
messages=[{"role": "user", "content": question}],
# Langfuse 元数据
name="customer_support_query", # span 名
metadata={"user_id": "u_12345", "feature": "chat"},
tags=["prod_v3", "customer_support"],
)
return r.content[0].text
# ─── 复杂多步骤:用 @observe 串成 trace ───
@observe(name="kyc_pipeline")
def kyc_pipeline(doc: str, user_id: str):
lf.update_current_trace(user_id=user_id, session_id=f"sess_{user_id}_kyc")
# step 1: 抽取
extracted = extract_fields(doc)
# step 2: 验证
validation = validate_fields(extracted)
# step 3: 风险评级
risk = assess_risk(extracted)
return {"extracted": extracted, "valid": validation, "risk_level": risk}
@observe(name="extract_fields")
def extract_fields(doc: str):
r = client.messages.create(
model="claude-haiku-4-5", # 简单任务用便宜模型
max_tokens=256,
messages=[{"role": "user", "content": f"抽取 KYC: {doc}"}],
metadata={"step": "extract"},
)
return r.content[0].text
@observe(name="validate_fields")
def validate_fields(fields):
# 业务逻辑也可以是 span
return all(k in fields for k in ["name", "id_number"])
@observe(as_type="generation", name="risk_assessment")
def assess_risk(profile):
r = client.messages.create(
model="claude-opus-4-7", # 关键决策用 opus
max_tokens=128,
messages=[{"role": "user", "content": f"评级: {profile}"}],
metadata={"step": "risk"},
)
# 给这次 generation 打分
lf.score_current_trace(
name="risk_decision_quality",
value=0.85,
comment="auto-scored by det eval",
)
return r.content[0].text
3.3 接入 user feedback
"""user_feedback.py"""
from fastapi import FastAPI, HTTPException
app = FastAPI()
@app.post("/api/feedback")
async def feedback(trace_id: str, rating: int, comment: str | None = None):
"""前端用户点 thumbs up/down 时调"""
if rating not in (1, -1, 0):
raise HTTPException(400, "rating must be -1/0/1")
lf.score(
trace_id=trace_id,
name="user_thumbs",
value=rating,
comment=comment,
)
return {"ok": True}
3.4 接入 deterministic eval 分数
"""push_eval_scores.py — 把 Day 169 的 eval pipeline 结果推到 Langfuse"""
def push_eval_to_langfuse(eval_result: dict, trace_ids: dict):
"""eval_result: 来自 eval pipeline; trace_ids: case_id -> trace_id 映射"""
for case in eval_result["cases"]:
tid = trace_ids.get(case["id"])
if not tid:
continue
lf.score(
trace_id=tid,
name="det_eval_pass",
value=1.0 if case["passed"] else 0.0,
comment="; ".join(case.get("failures", [])) or "ok",
)
if case.get("judge"):
lf.score(
trace_id=tid,
name="judge_winner",
value=1.0 if case["judge"]["winner"] == "A" else 0.0,
)
3.5 从生产 trace 创建 dataset
"""promote_to_dataset.py — 把生产高质量 trace 提为 golden case"""
from langfuse import Langfuse
lf = Langfuse()
# 找到 user thumbs up 的高质量 trace
traces = lf.get_traces(
tags=["customer_support"],
from_timestamp="2026-10-01",
to_timestamp="2026-10-15",
)
dataset_name = "customer_support_golden_v2"
lf.create_dataset(name=dataset_name)
for t in traces:
score = lf.get_score(trace_id=t.id, name="user_thumbs")
if score and score.value == 1: # thumbs up
lf.create_dataset_item(
dataset_name=dataset_name,
input=t.input,
expected_output=t.output,
metadata={"source_trace_id": t.id},
)
3.6 Run experiment(在 dataset 上跑新 prompt)
"""run_experiment.py"""
from langfuse import Langfuse
lf = Langfuse()
ds = lf.get_dataset(name="customer_support_golden_v2")
NEW_SYSTEM_PROMPT = "你是金融助手 v4 ..."
for item in ds.items:
# 在 dataset item 上下文里跑 → Langfuse 自动归集为 experiment run
with item.run(run_name="prod_v4_test_2026_10_18") as run:
out = client.messages.create(
model="claude-sonnet-4-6",
max_tokens=512,
system=NEW_SYSTEM_PROMPT,
messages=item.input,
metadata={"experiment": "prod_v4"},
)
run.update(output=out.content[0].text)
# 可立刻接 LLM judge 给分
3.7 SQL 查询(ClickHouse 后端)
-- 按 user 分组的 cost 切片
SELECT
metadata['user_id'] AS user_id,
sum(cost_details['total_cost']) AS total_cost_usd,
count() AS request_count,
avg(JSONExtractFloat(metadata, 'latency_ms')) AS avg_latency
FROM observations
WHERE timestamp >= now() - INTERVAL 7 DAY
AND type = 'GENERATION'
GROUP BY user_id
ORDER BY total_cost_usd DESC
LIMIT 20;
-- Cache hit rate
SELECT
toStartOfHour(timestamp) AS hour,
sum(cached_input_tokens) / sum(input_tokens) AS cache_hit_rate
FROM observations
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY hour
ORDER BY hour;
-- 一天里每小时的 thumbs-down trace
SELECT t.id, t.name, t.input, s.value
FROM traces t
JOIN scores s ON t.id = s.trace_id
WHERE s.name = 'user_thumbs' AND s.value = -1
AND t.timestamp >= now() - INTERVAL 1 DAY;
四、Cost & Performance 实测数据
4.1 Self-host vs SaaS
| 选项 | 月成本 | 数据合规 |
|---|---|---|
| Langfuse Cloud (Pro) | $99/月起 + $29/100K events | 数据出域 |
| Helicone Cloud | $99/月起 | 数据出域 |
| LangSmith Cloud | $39/月起 + 用量 | 数据出域 |
| Self-host Langfuse(VM 4core 16G + ClickHouse) | $80/月(云主机) | 数据不出域 |
4.2 SDK 开销
- 同步 SDK:每 LLM call 增加 ~5-10ms(latency 增加 < 1%)
- 异步 batched:几乎 0 开销,每 1s flush 一批
4.3 存储成本
- 每条 trace(含 prompt+output 全文)≈ 5KB
- 1M 条/月 ≈ 5GB
- ClickHouse 压缩后 ≈ 1.5GB → S3 归档 = ~$0.04/月
五、金融领域应用
- 审计追溯:监管要求"每个 AI 决策可追溯"。Langfuse trace + immutable S3 export 一站式
- 合规归因:客户投诉某次决策不公,trace 完整重放:用了哪个 prompt 版本、哪个模型、KB 哪个版本
- 成本归因:按业务部门切 cost(metadata 加
dept_id),按月出账单 - 质量趋势:LLM judge + user feedback 双指标,每天图表,跌破阈值自动告警
- A/B 实验:prompt 改动用 Langfuse experiment,量化效果再放量
- PII 红线:Langfuse 支持 redaction hook,prompt/output 写入前用正则脱敏
- 数据驻留:Self-host 部署在内网/私有云,金融监管必备
六、生产经验与陷阱
- prompt 全文写日志的合规风险:含客户 PII 的 prompt 不能进 Langfuse。必须实现
mask_datahook(Langfuse SDK 支持),脱敏后再发送 - flush 时间窗导致丢数据:进程崩溃时未 flush 的 trace 丢失。生产用 sync flush + retry 或队列持久化
- ClickHouse 不调优会撑爆磁盘:默认保留 30 天,金融场景可能要 7 年。设 TTL 分层,热数据 90 天 ClickHouse,冷数据归档 S3 Glacier
- scoring 命名不规范:
thumbs_up、user-thumbs、feedback_pos各种叫法。建立 score taxonomy 表 - trace 嵌套过深:5 层以上 UI 难看。复杂 agent 把次要细节放 metadata 而非新建 span
- 同时启用多个 instrumentation:例如 Langfuse 和 OpenLLMetry 都 patch SDK 会冲突,选一个
- production 误用 Langfuse cloud:金融数据出域是大事故。架构评审强制核查 endpoint 是否私有
- Token cost 计算错:不同模型不同价格,不同 cache 状态不同价格,必须用 Langfuse 的 model_price 配置同步更新
七、关键速查
| 操作 | 工具 |
|---|---|
| 装 SDK | pip install langfuse anthropic |
| Decorator | @observe(name="...") |
| 加 metadata | lf.update_current_trace(user_id=..., metadata=...) |
| 加 score | lf.score(trace_id=..., name=..., value=0.85) |
| 创建 dataset | lf.create_dataset(name=...) |
| Run experiment | with item.run(run_name=...) as run: |
八、面试题
-
金融 AI 必须 self-host observability 工具,原因?
- 数据驻留合规、PII 不出域、监管审计要求 7 年保留、私有化部署一致性、成本可控
-
如何按业务部门切 LLM 成本?
- metadata 必含
dept_id/feature_name;ClickHouse SQL group by 即可;月度自动账单到部门
- metadata 必含
-
生产 trace 怎么转 golden dataset?
- 用户 thumbs up 高质量 trace + det eval pass 的 → SME 抽审 → 脱敏 → create_dataset_item
-
trace + score + dataset 三件套的关系?
- trace 是事实记录;score 是质量标注(auto/human);dataset 是策划过的金牌输入输出对,用于 experiment
-
Langfuse vs LangSmith 怎么选?
- 金融/数据合规:Langfuse 自托管;LangChain 重度用户 + 不在意数据出域:LangSmith;要 OpenTelemetry 标准:OpenLLMetry → Langfuse
明日预告
Day 171:Versioning 与 CI — Prompt 版本化、PromptOps、CI 集成 prompt 是软件资产,必须版本化。今天把 prompt registry、CI eval、灰度发布流程串起来。