返回 Expert 笔记
Expert Day 170

LLMOps 工具链 — Langfuse / Helicone / LangSmith

### 1.1 为什么需要 LLM 专用 observability

2026-10-18
Phase 3 - 生产基础设施与评估 (Day 163-176)
LangfuseHeliconeLangSmithObservabilityTracing

日期: 2026-10-18 方向: AI系统工程 / LLMOps / Observability 阶段: Phase 3 - 生产基础设施与评估 (Day 163-176) 标签: #Langfuse #Helicone #LangSmith #Observability #Tracing


今日目标

类型内容
学习LLM observability 三件套(trace/score/dataset);Langfuse vs Helicone vs LangSmith 选型;OpenTelemetry GenAI semconv
实操Self-host Langfuse;接入 Anthropic SDK;落地 trace + score + experiments
产出docs/ai-infra/observability.md:完整接入指南、deployment compose、查询示例

一、核心概念

1.1 为什么需要 LLM 专用 observability

传统 APM(DataDog/NewRelic)只能看延迟和错误率,但 LLM 应用还需要:

维度传统 APMLLM Observability
Trace 嵌套✓ HTTP 调用✓ + LLM call / tool call / embedding / retrieval 多层
Token 成本✓ 按 model/user/feature 切片
Prompt + Output✓ 全文存储 + diff
Score(质量)✓ 接入 LLM judge / human / det eval
User feedback✓ thumbs up/down 关联 trace
Dataset / Eval✓ 可从 production 一键转 dataset
Prompt 版本✓ 版本管理 + AB

1.2 三家对比

项目开源Self-host强项选型建议
Langfuse✅(Docker 一键)全功能、社区活跃、SDK 多语言首选,金融自托管必选
Helicone简单、proxy 模式不改代码轻量场景
LangSmith❌(云)与 LangChain 深度集成LangChain 重度用户
OpenLLMetry标准 OpenTelemetry,可对接任意 backend已有 OTel 基建
Phoenix (Arize)评测 + 监控合一评测重度场景

1.3 OpenTelemetry GenAI Semantic Conventions(趋势)

OTel 在 2025 推出 GenAI semconv,所有厂商正在对齐:

Span name: chat <model>
Attributes:
  gen_ai.system: "anthropic"
  gen_ai.request.model: "claude-opus-4-7"
  gen_ai.response.model: "claude-opus-4-7"
  gen_ai.usage.input_tokens: 1200
  gen_ai.usage.output_tokens: 350
  gen_ai.usage.cached_input_tokens: 1100
  gen_ai.request.temperature: 0.0
Events:
  gen_ai.user.message
  gen_ai.assistant.message

未来可在任何兼容后端(Tempo、Jaeger、Datadog)查 GenAI trace。

1.4 Trace / Score / Dataset 三件套

┌────────────────────────────────────────────┐
│ Trace(一次请求的完整执行)                  │
│   ├─ LLM call (claude-opus, 1.2s)          │
│   ├─ Tool call: search_kb (0.3s)           │
│   ├─ LLM call (claude-opus, 0.8s)          │
│   └─ Final output                          │
└────────────────────────────────────────────┘
                    │
            scored by
            ┌───────┴────────┐
        LLM judge       human thumbs-up    det eval
        (auto)          (1% sample)       (every call)
                            │
                    ┌───────┴────────┐
                    │ promoted to    │
                    │ Dataset        │
                    │  (golden v1.2) │
                    └────────────────┘
                            │
                    ┌───────┴────────┐
                    │   Experiment   │
                    │ run new prompt │
                    │ on dataset     │
                    └────────────────┘

二、生产架构图

   App (你的金融 AI 服务)
   ├── @observe decorator (Langfuse SDK)
   │
   ├──→ Anthropic API
   │
   └──→ Langfuse SDK (async batched)
              │
              ▼
         Langfuse Server (self-host)
         ├── Web UI
         ├── PostgreSQL (metadata)
         ├── ClickHouse (events)
         └── S3 (prompts/outputs blob)
              │
              ├──→ Slack / PagerDuty (alerts)
              ├──→ ClickHouse SQL (analytics)
              └──→ S3 export (audit / compliance)

三、代码实现

3.1 Self-host Langfuse(Docker Compose)

# docker-compose.yml
version: '3.9'
services:
  langfuse-server:
    image: langfuse/langfuse:3
    depends_on:
      - postgres
      - clickhouse
    ports:
      - 3000:3000
    environment:
      DATABASE_URL: postgresql://postgres:postgres@postgres:5432/postgres
      CLICKHOUSE_URL: http://clickhouse:8123
      CLICKHOUSE_USER: default
      CLICKHOUSE_PASSWORD: clickhouse
      NEXTAUTH_URL: http://localhost:3000
      NEXTAUTH_SECRET: change-me
      SALT: change-me
      ENCRYPTION_KEY: 0000000000000000000000000000000000000000000000000000000000000000
      TELEMETRY_ENABLED: false
      LANGFUSE_ENABLE_EXPERIMENTAL_FEATURES: true
      # 金融场景:禁用产品分析、禁外发
      LANGFUSE_DISABLE_USAGE_TRACKING: true

  postgres:
    image: postgres:16
    environment:
      POSTGRES_PASSWORD: postgres
    volumes:
      - pgdata:/var/lib/postgresql/data

  clickhouse:
    image: clickhouse/clickhouse-server:24
    environment:
      CLICKHOUSE_PASSWORD: clickhouse
    volumes:
      - chdata:/var/lib/clickhouse

volumes:
  pgdata:
  chdata:

3.2 Anthropic SDK 接入

"""langfuse_anthropic.py — 完整接入示例"""
import os
from langfuse import Langfuse, observe
from langfuse.anthropic import Anthropic  # 官方 wrapper(auto-trace)

# Langfuse 自托管 endpoint
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-..."
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-..."
os.environ["LANGFUSE_HOST"] = "https://langfuse.internal.bank.com"

lf = Langfuse()
client = Anthropic()  # ← 这个 wrapper 自动埋 trace

# ─── 简单 LLM 调用:自动 trace ───
def simple_call(question: str):
    r = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=512,
        messages=[{"role": "user", "content": question}],
        # Langfuse 元数据
        name="customer_support_query",          # span 名
        metadata={"user_id": "u_12345", "feature": "chat"},
        tags=["prod_v3", "customer_support"],
    )
    return r.content[0].text


# ─── 复杂多步骤:用 @observe 串成 trace ───
@observe(name="kyc_pipeline")
def kyc_pipeline(doc: str, user_id: str):
    lf.update_current_trace(user_id=user_id, session_id=f"sess_{user_id}_kyc")

    # step 1: 抽取
    extracted = extract_fields(doc)

    # step 2: 验证
    validation = validate_fields(extracted)

    # step 3: 风险评级
    risk = assess_risk(extracted)

    return {"extracted": extracted, "valid": validation, "risk_level": risk}


@observe(name="extract_fields")
def extract_fields(doc: str):
    r = client.messages.create(
        model="claude-haiku-4-5",  # 简单任务用便宜模型
        max_tokens=256,
        messages=[{"role": "user", "content": f"抽取 KYC: {doc}"}],
        metadata={"step": "extract"},
    )
    return r.content[0].text


@observe(name="validate_fields")
def validate_fields(fields):
    # 业务逻辑也可以是 span
    return all(k in fields for k in ["name", "id_number"])


@observe(as_type="generation", name="risk_assessment")
def assess_risk(profile):
    r = client.messages.create(
        model="claude-opus-4-7",  # 关键决策用 opus
        max_tokens=128,
        messages=[{"role": "user", "content": f"评级: {profile}"}],
        metadata={"step": "risk"},
    )
    # 给这次 generation 打分
    lf.score_current_trace(
        name="risk_decision_quality",
        value=0.85,
        comment="auto-scored by det eval",
    )
    return r.content[0].text

3.3 接入 user feedback

"""user_feedback.py"""
from fastapi import FastAPI, HTTPException

app = FastAPI()

@app.post("/api/feedback")
async def feedback(trace_id: str, rating: int, comment: str | None = None):
    """前端用户点 thumbs up/down 时调"""
    if rating not in (1, -1, 0):
        raise HTTPException(400, "rating must be -1/0/1")
    lf.score(
        trace_id=trace_id,
        name="user_thumbs",
        value=rating,
        comment=comment,
    )
    return {"ok": True}

3.4 接入 deterministic eval 分数

"""push_eval_scores.py — 把 Day 169 的 eval pipeline 结果推到 Langfuse"""
def push_eval_to_langfuse(eval_result: dict, trace_ids: dict):
    """eval_result: 来自 eval pipeline; trace_ids: case_id -> trace_id 映射"""
    for case in eval_result["cases"]:
        tid = trace_ids.get(case["id"])
        if not tid:
            continue
        lf.score(
            trace_id=tid,
            name="det_eval_pass",
            value=1.0 if case["passed"] else 0.0,
            comment="; ".join(case.get("failures", [])) or "ok",
        )
        if case.get("judge"):
            lf.score(
                trace_id=tid,
                name="judge_winner",
                value=1.0 if case["judge"]["winner"] == "A" else 0.0,
            )

3.5 从生产 trace 创建 dataset

"""promote_to_dataset.py — 把生产高质量 trace 提为 golden case"""
from langfuse import Langfuse

lf = Langfuse()

# 找到 user thumbs up 的高质量 trace
traces = lf.get_traces(
    tags=["customer_support"],
    from_timestamp="2026-10-01",
    to_timestamp="2026-10-15",
)

dataset_name = "customer_support_golden_v2"
lf.create_dataset(name=dataset_name)

for t in traces:
    score = lf.get_score(trace_id=t.id, name="user_thumbs")
    if score and score.value == 1:  # thumbs up
        lf.create_dataset_item(
            dataset_name=dataset_name,
            input=t.input,
            expected_output=t.output,
            metadata={"source_trace_id": t.id},
        )

3.6 Run experiment(在 dataset 上跑新 prompt)

"""run_experiment.py"""
from langfuse import Langfuse

lf = Langfuse()
ds = lf.get_dataset(name="customer_support_golden_v2")

NEW_SYSTEM_PROMPT = "你是金融助手 v4 ..."

for item in ds.items:
    # 在 dataset item 上下文里跑 → Langfuse 自动归集为 experiment run
    with item.run(run_name="prod_v4_test_2026_10_18") as run:
        out = client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=512,
            system=NEW_SYSTEM_PROMPT,
            messages=item.input,
            metadata={"experiment": "prod_v4"},
        )
        run.update(output=out.content[0].text)
        # 可立刻接 LLM judge 给分

3.7 SQL 查询(ClickHouse 后端)

-- 按 user 分组的 cost 切片
SELECT
    metadata['user_id'] AS user_id,
    sum(cost_details['total_cost']) AS total_cost_usd,
    count() AS request_count,
    avg(JSONExtractFloat(metadata, 'latency_ms')) AS avg_latency
FROM observations
WHERE timestamp >= now() - INTERVAL 7 DAY
  AND type = 'GENERATION'
GROUP BY user_id
ORDER BY total_cost_usd DESC
LIMIT 20;

-- Cache hit rate
SELECT
    toStartOfHour(timestamp) AS hour,
    sum(cached_input_tokens) / sum(input_tokens) AS cache_hit_rate
FROM observations
WHERE timestamp >= now() - INTERVAL 24 HOUR
GROUP BY hour
ORDER BY hour;

-- 一天里每小时的 thumbs-down trace
SELECT t.id, t.name, t.input, s.value
FROM traces t
JOIN scores s ON t.id = s.trace_id
WHERE s.name = 'user_thumbs' AND s.value = -1
  AND t.timestamp >= now() - INTERVAL 1 DAY;

四、Cost & Performance 实测数据

4.1 Self-host vs SaaS

选项月成本数据合规
Langfuse Cloud (Pro)$99/月起 + $29/100K events数据出域
Helicone Cloud$99/月起数据出域
LangSmith Cloud$39/月起 + 用量数据出域
Self-host Langfuse(VM 4core 16G + ClickHouse)$80/月(云主机)数据不出域

4.2 SDK 开销

  • 同步 SDK:每 LLM call 增加 ~5-10ms(latency 增加 < 1%)
  • 异步 batched:几乎 0 开销,每 1s flush 一批

4.3 存储成本

  • 每条 trace(含 prompt+output 全文)≈ 5KB
  • 1M 条/月 ≈ 5GB
  • ClickHouse 压缩后 ≈ 1.5GB → S3 归档 = ~$0.04/月

五、金融领域应用

  1. 审计追溯:监管要求"每个 AI 决策可追溯"。Langfuse trace + immutable S3 export 一站式
  2. 合规归因:客户投诉某次决策不公,trace 完整重放:用了哪个 prompt 版本、哪个模型、KB 哪个版本
  3. 成本归因:按业务部门切 cost(metadata 加 dept_id),按月出账单
  4. 质量趋势:LLM judge + user feedback 双指标,每天图表,跌破阈值自动告警
  5. A/B 实验:prompt 改动用 Langfuse experiment,量化效果再放量
  6. PII 红线:Langfuse 支持 redaction hook,prompt/output 写入前用正则脱敏
  7. 数据驻留:Self-host 部署在内网/私有云,金融监管必备

六、生产经验与陷阱

  1. prompt 全文写日志的合规风险:含客户 PII 的 prompt 不能进 Langfuse。必须实现 mask_data hook(Langfuse SDK 支持),脱敏后再发送
  2. flush 时间窗导致丢数据:进程崩溃时未 flush 的 trace 丢失。生产用 sync flush + retry 或队列持久化
  3. ClickHouse 不调优会撑爆磁盘:默认保留 30 天,金融场景可能要 7 年。设 TTL 分层,热数据 90 天 ClickHouse,冷数据归档 S3 Glacier
  4. scoring 命名不规范thumbs_upuser-thumbsfeedback_pos 各种叫法。建立 score taxonomy 表
  5. trace 嵌套过深:5 层以上 UI 难看。复杂 agent 把次要细节放 metadata 而非新建 span
  6. 同时启用多个 instrumentation:例如 Langfuse 和 OpenLLMetry 都 patch SDK 会冲突,选一个
  7. production 误用 Langfuse cloud:金融数据出域是大事故。架构评审强制核查 endpoint 是否私有
  8. Token cost 计算错:不同模型不同价格,不同 cache 状态不同价格,必须用 Langfuse 的 model_price 配置同步更新

七、关键速查

操作工具
装 SDKpip install langfuse anthropic
Decorator@observe(name="...")
加 metadatalf.update_current_trace(user_id=..., metadata=...)
加 scorelf.score(trace_id=..., name=..., value=0.85)
创建 datasetlf.create_dataset(name=...)
Run experimentwith item.run(run_name=...) as run:

八、面试题

  1. 金融 AI 必须 self-host observability 工具,原因?

    • 数据驻留合规、PII 不出域、监管审计要求 7 年保留、私有化部署一致性、成本可控
  2. 如何按业务部门切 LLM 成本?

    • metadata 必含 dept_id / feature_name;ClickHouse SQL group by 即可;月度自动账单到部门
  3. 生产 trace 怎么转 golden dataset?

    • 用户 thumbs up 高质量 trace + det eval pass 的 → SME 抽审 → 脱敏 → create_dataset_item
  4. trace + score + dataset 三件套的关系?

    • trace 是事实记录;score 是质量标注(auto/human);dataset 是策划过的金牌输入输出对,用于 experiment
  5. Langfuse vs LangSmith 怎么选?

    • 金融/数据合规:Langfuse 自托管;LangChain 重度用户 + 不在意数据出域:LangSmith;要 OpenTelemetry 标准:OpenLLMetry → Langfuse

明日预告

Day 171:Versioning 与 CI — Prompt 版本化、PromptOps、CI 集成 prompt 是软件资产,必须版本化。今天把 prompt registry、CI eval、灰度发布流程串起来。