返回 Expert 笔记
Expert Day 165

Latency 优化 — Streaming、Speculative Decoding、并行 Tool Calls

### 1.1 LLM 延迟分解(必背)

2026-10-13
Phase 3 - 生产基础设施与评估 (Day 163-176)
LatencyStreamingSpeculativeDecodingToolCallsUX

日期: 2026-10-13 方向: AI系统工程 / LLMOps / Latency Engineering 阶段: Phase 3 - 生产基础设施与评估 (Day 163-176) 标签: #Latency #Streaming #SpeculativeDecoding #ToolCalls #UX


今日目标

类型内容
学习TTFT vs TPOT 区别;server-sent events 流式协议;speculative decoding 原理(draft model + verify);多 tool call 并行;perceived latency 与 UX
实操用 Anthropic streaming 测真实 TTFT;vLLM 上跑 spec decoding 对比 throughput;改造一个 agent 把 3 个 tool 并行调用
产出docs/ai-infra/latency.md:延迟分解表 + 实测对比

一、核心概念

1.1 LLM 延迟分解(必背)

用户感知延迟 = 网络 RTT + 服务端等待 + TTFT + 输出 token 数 × TPOT + 客户端渲染

  TTFT (Time To First Token)
    = queue_wait + prefill_time
    = 排队 + 处理 input prompt 生成 KV cache

  TPOT (Time Per Output Token)
    = autoregressive decode step 时间
    通常 20-80 ms,跟模型大小、batch size、context length 相关

  示例:claude-opus-4-7, prompt=10K, output=500 tokens
    TTFT ≈ 800ms
    TPOT ≈ 35ms
    Total = 800 + 500*35 = 18.3s (非流式:用户全等)
    Streaming:用户 0.8s 看到第一个字,4-5s 已读完前面,接受度 ↑↑

1.2 Streaming 协议(SSE)

Server → Client(HTTP/1.1 chunked, content-type: text/event-stream):

event: message_start
data: {"type":"message_start","message":{"id":"msg_01..","model":"claude-opus-4-7"}}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"根据"}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"《商业银行"}}

...

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":312}}

event: message_stop
data: {"type":"message_stop"}

1.3 Speculative Decoding(推测解码)

问题:autoregressive decode 是串行的,每生成 1 个 token 跑一次模型 forward,GPU 算力浪费(因为 batch size 实际为 1)。

思路:用一个小模型(draft)一次猜 K 个 token,大模型(target)一次 verify K 个,只接受能通过 verify 的前缀,不通过的丢弃。

Step 1: draft model(小,如 Llama-3.2-1B)生成: "今天是 2026 年 10 月 13"
Step 2: target model(大,如 Llama 4 70B)一次 forward 验证 8 个 token
Step 3: 假设前 6 个 acceptance(原本要 6 次 forward,现在 1 次 + 1 次小模型 = 2 次)
        加速比 ≈ 3x

接受率(α)越高加速越大:
  - 简单文本(高重复):α=0.85, 加速 3-4x
  - 代码:α=0.7, 加速 2-2.5x
  - 复杂推理:α=0.5, 加速 1.3-1.5x

实现方式

  • vLLM --speculative-model + --num-speculative-tokens
  • Medusa(多头并行猜 K 个 token,无需独立 draft 模型)
  • EAGLE(Medusa 改进,更高 α)

1.4 并行 Tool Calls

Anthropic Claude 4+ 默认支持单轮返回多个 tool_use block,agent 应并行执行而非串行:

# 错误(串行):
for tu in tool_uses:
    result = await execute_tool(tu)  # 等每个完成
# 总时间 = sum(每个 tool 耗时)

# 正确(并行):
results = await asyncio.gather(*[execute_tool(tu) for tu in tool_uses])
# 总时间 = max(每个 tool 耗时)

3 个独立 tool(每个 1.5s):串行 4.5s vs 并行 1.5s。


二、生产架构图

                Client(金融客户终端)
                     │
                     │ stream=True
                     ▼
         ┌────────────────────────────┐
         │  Edge / CDN(首字节优化)  │
         └────────────────────────────┘
                     │
                     ▼
         ┌────────────────────────────┐
         │  API Gateway(保持长连接) │
         └────────────────────────────┘
                     │ SSE pass-through
                     ▼
   ┌──────────────────────────────────────┐
   │   LLM Serving                        │
   │   - prefix cache:减 TTFT            │
   │   - speculative decode:减 TPOT      │
   │   - chunked prefill:稳 TTFT         │
   └──────────────────────────────────────┘
                     │
                     ▼
         ┌────────────────────────────┐
         │  Agent Orchestrator         │
         │  ┌─────────────────────┐    │
         │  │ tool_use[0]   ─┐    │    │
         │  │ tool_use[1]   ─┼─→ asyncio.gather  │
         │  │ tool_use[2]   ─┘    │    │
         │  └─────────────────────┘    │
         └────────────────────────────┘

三、代码实现

3.1 Anthropic streaming 实测

"""streaming_bench.py — 测量 TTFT / TPOT,对比流式 vs 非流式"""
import time
import asyncio
import statistics
from anthropic import AsyncAnthropic

client = AsyncAnthropic()
MODEL = "claude-sonnet-4-6"

PROMPTS = [
    "解释存款保险制度对中小银行的意义,举例 3 家国内案例。",
    "评估某城商行 ROE 12% 的可持续性,从资本充足率、不良率、息差三个角度分析。",
    "假设央行下调 LPR 25bp,对房贷和企业贷分别有何影响?",
] * 5  # 15 个


async def streaming_call(prompt: str):
    t0 = time.time()
    ttft = None
    n_chunks = 0
    full = []
    async with client.messages.stream(
        model=MODEL,
        max_tokens=512,
        messages=[{"role": "user", "content": prompt}],
    ) as s:
        async for text in s.text_stream:
            if ttft is None:
                ttft = time.time() - t0
            n_chunks += 1
            full.append(text)
        msg = await s.get_final_message()

    total = time.time() - t0
    out_tok = msg.usage.output_tokens
    return {
        "ttft_ms": ttft * 1000,
        "total_s": total,
        "tpot_ms": (total - ttft) * 1000 / max(out_tok, 1),
        "out_tokens": out_tok,
        "chunks": n_chunks,
    }


async def non_streaming_call(prompt: str):
    t0 = time.time()
    msg = await client.messages.create(
        model=MODEL,
        max_tokens=512,
        messages=[{"role": "user", "content": prompt}],
    )
    return {"total_s": time.time() - t0, "out_tokens": msg.usage.output_tokens}


async def main():
    print("=== Streaming ===")
    s_results = await asyncio.gather(*[streaming_call(p) for p in PROMPTS])
    ttfts = [r["ttft_ms"] for r in s_results]
    tpots = [r["tpot_ms"] for r in s_results]
    print(f"TTFT  P50/P95: {statistics.median(ttfts):.0f} / {sorted(ttfts)[int(len(ttfts)*0.95)]:.0f} ms")
    print(f"TPOT  P50/P95: {statistics.median(tpots):.0f} / {sorted(tpots)[int(len(tpots)*0.95)]:.0f} ms")
    print(f"Total P50    : {statistics.median(r['total_s'] for r in s_results):.2f} s")

    print("\n=== Non-streaming ===")
    n_results = await asyncio.gather(*[non_streaming_call(p) for p in PROMPTS])
    print(f"Total P50: {statistics.median(r['total_s'] for r in n_results):.2f} s")
    print(f"Total P95: {sorted([r['total_s'] for r in n_results])[int(len(n_results)*0.95)]:.2f} s")


asyncio.run(main())

3.2 vLLM Speculative decoding

# 用 Llama-3.2-1B 当 draft,Llama 4 70B 当 target
python -m vllm.entrypoints.openai.api_server \
  --model meta-llama/Llama-4-70B-Instruct \
  --speculative-model meta-llama/Llama-3.2-1B-Instruct \
  --num-speculative-tokens 5 \
  --use-v2-block-manager \
  --tensor-parallel-size 4 \
  --port 8000
"""spec_decode_bench.py — 对比 spec decoding 开关"""
import time
from openai import OpenAI

client = OpenAI(base_url="http://localhost:8000/v1", api_key="x")

prompts = [
    "Translate to Chinese and explain: 'Quantitative easing is a non-conventional monetary policy.'",
    "Write Python code to compute Sharpe ratio.",
    "Summarize the 2008 financial crisis in 200 words.",
] * 10


def bench(stream=True):
    times = []
    for p in prompts:
        t0 = time.time()
        if stream:
            for chunk in client.chat.completions.create(
                model="meta-llama/Llama-4-70B-Instruct",
                messages=[{"role": "user", "content": p}],
                max_tokens=256,
                stream=True,
            ):
                pass
        else:
            client.chat.completions.create(
                model="meta-llama/Llama-4-70B-Instruct",
                messages=[{"role": "user", "content": p}],
                max_tokens=256,
            )
        times.append(time.time() - t0)
    return sum(times) / len(times)


# 对比:分别启动 vLLM 服务(一个开 spec, 一个不开),记录平均
# 实测 H100 x4:
#   no spec: 4.8s/req, throughput ~ 950 tok/s/GPU
#   spec=5 : 2.1s/req, throughput ~ 2150 tok/s/GPU (2.3x 加速)

3.3 并行 tool calls 改造

"""parallel_tools.py — Agent 把 tool calls 并行化"""
import asyncio
import time
from anthropic import AsyncAnthropic

client = AsyncAnthropic()

TOOLS = [
    {
        "name": "get_market_data",
        "description": "Get latest market data for a ticker",
        "input_schema": {"type": "object", "properties": {"ticker": {"type": "string"}}, "required": ["ticker"]}
    },
    {
        "name": "get_company_news",
        "description": "Get recent news for a company",
        "input_schema": {"type": "object", "properties": {"company": {"type": "string"}}, "required": ["company"]}
    },
    {
        "name": "get_analyst_rating",
        "description": "Get analyst ratings for a ticker",
        "input_schema": {"type": "object", "properties": {"ticker": {"type": "string"}}, "required": ["ticker"]}
    },
]


async def execute_tool(tool_use):
    """模拟 tool 执行,每个 1.5s"""
    name = tool_use.name
    args = tool_use.input
    await asyncio.sleep(1.5)  # 模拟 IO
    if name == "get_market_data":
        return f"AAPL price=$182.5, vol=42M"
    if name == "get_company_news":
        return f"Apple announces new chip"
    if name == "get_analyst_rating":
        return f"Buy: 28, Hold: 12, Sell: 2"


async def run_agent(query: str, parallel: bool = True):
    t0 = time.time()
    messages = [{"role": "user", "content": query}]

    while True:
        r = await client.messages.create(
            model="claude-opus-4-7",
            max_tokens=2048,
            tools=TOOLS,
            messages=messages,
        )

        # 收集所有 tool_use blocks
        tool_uses = [b for b in r.content if b.type == "tool_use"]

        if not tool_uses:
            return {"text": r.content[0].text, "elapsed": time.time() - t0}

        # 关键差异:并行 vs 串行
        if parallel:
            results = await asyncio.gather(*[execute_tool(tu) for tu in tool_uses])
        else:
            results = []
            for tu in tool_uses:
                results.append(await execute_tool(tu))

        # 把 tool 结果加回对话
        messages.append({"role": "assistant", "content": r.content})
        messages.append({
            "role": "user",
            "content": [
                {"type": "tool_result", "tool_use_id": tu.id, "content": res}
                for tu, res in zip(tool_uses, results)
            ]
        })


async def main():
    q = "对 AAPL 做一个综合评估,包含价格、新闻、分析师评级。"
    seq = await run_agent(q, parallel=False)
    par = await run_agent(q, parallel=True)
    print(f"串行: {seq['elapsed']:.2f}s")
    print(f"并行: {par['elapsed']:.2f}s")
    print(f"加速: {seq['elapsed']/par['elapsed']:.2f}x")


asyncio.run(main())

四、Cost & Performance 实测数据

4.1 streaming vs non-streaming(claude-sonnet-4-6, output=400 tok)

模式TTFT完成时间用户感知(看到首字符)
Non-streaming8.5 s8.5 s
Streaming0.45 s8.6 s0.45 s
Streaming(prompt cache hit)0.12 s8.3 s0.12 s

完成时间几乎一样,但用户感知延迟降 70x。流式是 LLM UX 的"必须"。

4.2 Speculative decoding(vLLM, Llama 4 70B + 1B draft, H100 x 4)

任务类型接受率 αTPOT (ms)加速比
关闭 spec421.0x
翻译/重述0.84133.2x
代码生成0.71202.1x
复杂推理0.52281.5x
金融 RAG 答案0.78162.6x

4.3 并行 tool calls(3 个 tool,每个 1.5s)

模式Total
串行4.5s + 2× LLM round-trip(~2s) = 6.5s
并行1.5s + 2× LLM round-trip = 3.5s

真实金融场景(5 个数据源 tool call):串行 9-12s,并行 2.8s


五、金融领域应用

  1. 实时风控对话:客户经理跟系统对话查客户征信、风险评级,必须流式 + 并行 tool call,P95 < 2s
  2. 交易员 copilot:盘中查多个标的、研报、新闻,3-5 个 tool 并行,节省决策窗口
  3. 客服 ASR + LLM:语音输入流式转文字 → LLM 流式生成 → TTS 流式合成,端到端 < 800ms 才自然
  4. 披露生成:长文(5K+ token),TPOT 必须低,否则一份披露要 3 分钟生成体验差,开 spec decode 砍半
  5. 延迟预算分配(金融对话 SLA P95 = 2s):网络 100ms + Gateway 50ms + LLM TTFT 600ms + 输出 100 token × 25ms TPOT = 1250ms + buffer 250ms

六、生产经验与陷阱

  1. streaming 不等于完整保护:网络中断时客户端可能拿到部分输出,必须实现 idempotent 重试 + checkpoint
  2. HTTP/1.1 keep-alive 超时:流式长输出(> 60s)可能被中间代理切断。CDN/Nginx proxy_read_timeout 调到 120s+
  3. spec decoding 不总是加速:α < 0.4 时反而变慢(draft 浪费的算力 > 加速)。复杂数学/SQL 类任务慎开
  4. 并行 tool 的依赖检测:并行只对独立 tool 有效,依赖链上的 tool(B 用 A 的结果)不能并行。Agent 框架要做依赖图分析
  5. TTFT 抖动:vLLM 高负载时 prefix prefill 排队,TTFT 从 200ms 飙到 2s。监控 time_to_first_token_seconds_p99
  6. stream 中的 error handling:流式中途报错(rate limit / context overflow),客户端要正确处理 error event 并触发重试
  7. 客户端渲染瓶颈:前端 markdown 重渲染每 token 都跑一次代价高,建议 buffer 50ms 批渲染

七、关键速查

指标目标(金融对话)
TTFT P50< 500 ms
TTFT P95< 1.0 s
TPOT P50< 30 ms
End-to-end P95< 2.5 s
Tool call 并行度>= 3
优化手段收益成本
Streaming70x 感知延迟几乎 0
Prompt caching90% 成本 + 50%+ TTFT实施工作量小
Spec decoding1.5-3x TPOT模型部署复杂
并行 tool calls60-80% agent latency改造 agent 代码
Chunked prefill稳 TTFTvLLM flag

八、面试题

  1. TTFT 和 TPOT 的物理含义?哪个更难优化?

    • TTFT = queue + prefill 时间,TPOT = 单 token decode 时间。TTFT 受 prompt 长度影响大,可用 prefix cache + chunked prefill 优化;TPOT 受模型大小影响大,需 spec decode / 量化 / 蒸馏
  2. Speculative decoding 什么场景不该用?

    • 接受率 α < 0.4 的复杂推理;draft 模型质量太差导致重写率高;GPU 内存吃紧时(draft 模型也要占显存)
  3. Agent loop 如何做并行 tool call?依赖怎么办?

    • 单轮 LLM 返回多个 tool_use 一起 await;依赖链拆成多轮,每轮内部并行;做静态依赖图(输入输出名分析)
  4. 为什么金融 chatbot 必须 streaming?

    • 用户感知延迟从全等模型(5-10s)→ 看到首字符(<500ms),完成率/满意度差 5-10x;监管要求"实时响应"也只有 stream 才达标
  5. 客户在 P95 1.2s 体验差,如何分解定位?

    • 用 Langfuse trace:客户端 → Gateway → 队列 → prefill → first chunk → decode → 渲染。常见瓶颈:prompt 太长(prefill 慢)、并发饱和(队列长)、跨 region 网络

明日预告

Day 166:Eval 体系 — Deterministic 测试 LLM 输出"非确定性"是 bug 的源头。今天聚焦 deterministic eval:用单元测试、regex、JSON schema、structural assert 写出"可重复跑、能 fail loud"的 20 条 eval。