Latency 优化 — Streaming、Speculative Decoding、并行 Tool Calls
### 1.1 LLM 延迟分解(必背)
日期: 2026-10-13 方向: AI系统工程 / LLMOps / Latency Engineering 阶段: Phase 3 - 生产基础设施与评估 (Day 163-176) 标签: #Latency #Streaming #SpeculativeDecoding #ToolCalls #UX
今日目标
| 类型 | 内容 |
|---|---|
| 学习 | TTFT vs TPOT 区别;server-sent events 流式协议;speculative decoding 原理(draft model + verify);多 tool call 并行;perceived latency 与 UX |
| 实操 | 用 Anthropic streaming 测真实 TTFT;vLLM 上跑 spec decoding 对比 throughput;改造一个 agent 把 3 个 tool 并行调用 |
| 产出 | docs/ai-infra/latency.md:延迟分解表 + 实测对比 |
一、核心概念
1.1 LLM 延迟分解(必背)
用户感知延迟 = 网络 RTT + 服务端等待 + TTFT + 输出 token 数 × TPOT + 客户端渲染
TTFT (Time To First Token)
= queue_wait + prefill_time
= 排队 + 处理 input prompt 生成 KV cache
TPOT (Time Per Output Token)
= autoregressive decode step 时间
通常 20-80 ms,跟模型大小、batch size、context length 相关
示例:claude-opus-4-7, prompt=10K, output=500 tokens
TTFT ≈ 800ms
TPOT ≈ 35ms
Total = 800 + 500*35 = 18.3s (非流式:用户全等)
Streaming:用户 0.8s 看到第一个字,4-5s 已读完前面,接受度 ↑↑
1.2 Streaming 协议(SSE)
Server → Client(HTTP/1.1 chunked, content-type: text/event-stream):
event: message_start
data: {"type":"message_start","message":{"id":"msg_01..","model":"claude-opus-4-7"}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"根据"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"《商业银行"}}
...
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn","stop_sequence":null},"usage":{"output_tokens":312}}
event: message_stop
data: {"type":"message_stop"}
1.3 Speculative Decoding(推测解码)
问题:autoregressive decode 是串行的,每生成 1 个 token 跑一次模型 forward,GPU 算力浪费(因为 batch size 实际为 1)。
思路:用一个小模型(draft)一次猜 K 个 token,大模型(target)一次 verify K 个,只接受能通过 verify 的前缀,不通过的丢弃。
Step 1: draft model(小,如 Llama-3.2-1B)生成: "今天是 2026 年 10 月 13"
Step 2: target model(大,如 Llama 4 70B)一次 forward 验证 8 个 token
Step 3: 假设前 6 个 acceptance(原本要 6 次 forward,现在 1 次 + 1 次小模型 = 2 次)
加速比 ≈ 3x
接受率(α)越高加速越大:
- 简单文本(高重复):α=0.85, 加速 3-4x
- 代码:α=0.7, 加速 2-2.5x
- 复杂推理:α=0.5, 加速 1.3-1.5x
实现方式:
- vLLM
--speculative-model+--num-speculative-tokens - Medusa(多头并行猜 K 个 token,无需独立 draft 模型)
- EAGLE(Medusa 改进,更高 α)
1.4 并行 Tool Calls
Anthropic Claude 4+ 默认支持单轮返回多个 tool_use block,agent 应并行执行而非串行:
# 错误(串行):
for tu in tool_uses:
result = await execute_tool(tu) # 等每个完成
# 总时间 = sum(每个 tool 耗时)
# 正确(并行):
results = await asyncio.gather(*[execute_tool(tu) for tu in tool_uses])
# 总时间 = max(每个 tool 耗时)
3 个独立 tool(每个 1.5s):串行 4.5s vs 并行 1.5s。
二、生产架构图
Client(金融客户终端)
│
│ stream=True
▼
┌────────────────────────────┐
│ Edge / CDN(首字节优化) │
└────────────────────────────┘
│
▼
┌────────────────────────────┐
│ API Gateway(保持长连接) │
└────────────────────────────┘
│ SSE pass-through
▼
┌──────────────────────────────────────┐
│ LLM Serving │
│ - prefix cache:减 TTFT │
│ - speculative decode:减 TPOT │
│ - chunked prefill:稳 TTFT │
└──────────────────────────────────────┘
│
▼
┌────────────────────────────┐
│ Agent Orchestrator │
│ ┌─────────────────────┐ │
│ │ tool_use[0] ─┐ │ │
│ │ tool_use[1] ─┼─→ asyncio.gather │
│ │ tool_use[2] ─┘ │ │
│ └─────────────────────┘ │
└────────────────────────────┘
三、代码实现
3.1 Anthropic streaming 实测
"""streaming_bench.py — 测量 TTFT / TPOT,对比流式 vs 非流式"""
import time
import asyncio
import statistics
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
MODEL = "claude-sonnet-4-6"
PROMPTS = [
"解释存款保险制度对中小银行的意义,举例 3 家国内案例。",
"评估某城商行 ROE 12% 的可持续性,从资本充足率、不良率、息差三个角度分析。",
"假设央行下调 LPR 25bp,对房贷和企业贷分别有何影响?",
] * 5 # 15 个
async def streaming_call(prompt: str):
t0 = time.time()
ttft = None
n_chunks = 0
full = []
async with client.messages.stream(
model=MODEL,
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
) as s:
async for text in s.text_stream:
if ttft is None:
ttft = time.time() - t0
n_chunks += 1
full.append(text)
msg = await s.get_final_message()
total = time.time() - t0
out_tok = msg.usage.output_tokens
return {
"ttft_ms": ttft * 1000,
"total_s": total,
"tpot_ms": (total - ttft) * 1000 / max(out_tok, 1),
"out_tokens": out_tok,
"chunks": n_chunks,
}
async def non_streaming_call(prompt: str):
t0 = time.time()
msg = await client.messages.create(
model=MODEL,
max_tokens=512,
messages=[{"role": "user", "content": prompt}],
)
return {"total_s": time.time() - t0, "out_tokens": msg.usage.output_tokens}
async def main():
print("=== Streaming ===")
s_results = await asyncio.gather(*[streaming_call(p) for p in PROMPTS])
ttfts = [r["ttft_ms"] for r in s_results]
tpots = [r["tpot_ms"] for r in s_results]
print(f"TTFT P50/P95: {statistics.median(ttfts):.0f} / {sorted(ttfts)[int(len(ttfts)*0.95)]:.0f} ms")
print(f"TPOT P50/P95: {statistics.median(tpots):.0f} / {sorted(tpots)[int(len(tpots)*0.95)]:.0f} ms")
print(f"Total P50 : {statistics.median(r['total_s'] for r in s_results):.2f} s")
print("\n=== Non-streaming ===")
n_results = await asyncio.gather(*[non_streaming_call(p) for p in PROMPTS])
print(f"Total P50: {statistics.median(r['total_s'] for r in n_results):.2f} s")
print(f"Total P95: {sorted([r['total_s'] for r in n_results])[int(len(n_results)*0.95)]:.2f} s")
asyncio.run(main())
3.2 vLLM Speculative decoding
# 用 Llama-3.2-1B 当 draft,Llama 4 70B 当 target
python -m vllm.entrypoints.openai.api_server \
--model meta-llama/Llama-4-70B-Instruct \
--speculative-model meta-llama/Llama-3.2-1B-Instruct \
--num-speculative-tokens 5 \
--use-v2-block-manager \
--tensor-parallel-size 4 \
--port 8000
"""spec_decode_bench.py — 对比 spec decoding 开关"""
import time
from openai import OpenAI
client = OpenAI(base_url="http://localhost:8000/v1", api_key="x")
prompts = [
"Translate to Chinese and explain: 'Quantitative easing is a non-conventional monetary policy.'",
"Write Python code to compute Sharpe ratio.",
"Summarize the 2008 financial crisis in 200 words.",
] * 10
def bench(stream=True):
times = []
for p in prompts:
t0 = time.time()
if stream:
for chunk in client.chat.completions.create(
model="meta-llama/Llama-4-70B-Instruct",
messages=[{"role": "user", "content": p}],
max_tokens=256,
stream=True,
):
pass
else:
client.chat.completions.create(
model="meta-llama/Llama-4-70B-Instruct",
messages=[{"role": "user", "content": p}],
max_tokens=256,
)
times.append(time.time() - t0)
return sum(times) / len(times)
# 对比:分别启动 vLLM 服务(一个开 spec, 一个不开),记录平均
# 实测 H100 x4:
# no spec: 4.8s/req, throughput ~ 950 tok/s/GPU
# spec=5 : 2.1s/req, throughput ~ 2150 tok/s/GPU (2.3x 加速)
3.3 并行 tool calls 改造
"""parallel_tools.py — Agent 把 tool calls 并行化"""
import asyncio
import time
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
TOOLS = [
{
"name": "get_market_data",
"description": "Get latest market data for a ticker",
"input_schema": {"type": "object", "properties": {"ticker": {"type": "string"}}, "required": ["ticker"]}
},
{
"name": "get_company_news",
"description": "Get recent news for a company",
"input_schema": {"type": "object", "properties": {"company": {"type": "string"}}, "required": ["company"]}
},
{
"name": "get_analyst_rating",
"description": "Get analyst ratings for a ticker",
"input_schema": {"type": "object", "properties": {"ticker": {"type": "string"}}, "required": ["ticker"]}
},
]
async def execute_tool(tool_use):
"""模拟 tool 执行,每个 1.5s"""
name = tool_use.name
args = tool_use.input
await asyncio.sleep(1.5) # 模拟 IO
if name == "get_market_data":
return f"AAPL price=$182.5, vol=42M"
if name == "get_company_news":
return f"Apple announces new chip"
if name == "get_analyst_rating":
return f"Buy: 28, Hold: 12, Sell: 2"
async def run_agent(query: str, parallel: bool = True):
t0 = time.time()
messages = [{"role": "user", "content": query}]
while True:
r = await client.messages.create(
model="claude-opus-4-7",
max_tokens=2048,
tools=TOOLS,
messages=messages,
)
# 收集所有 tool_use blocks
tool_uses = [b for b in r.content if b.type == "tool_use"]
if not tool_uses:
return {"text": r.content[0].text, "elapsed": time.time() - t0}
# 关键差异:并行 vs 串行
if parallel:
results = await asyncio.gather(*[execute_tool(tu) for tu in tool_uses])
else:
results = []
for tu in tool_uses:
results.append(await execute_tool(tu))
# 把 tool 结果加回对话
messages.append({"role": "assistant", "content": r.content})
messages.append({
"role": "user",
"content": [
{"type": "tool_result", "tool_use_id": tu.id, "content": res}
for tu, res in zip(tool_uses, results)
]
})
async def main():
q = "对 AAPL 做一个综合评估,包含价格、新闻、分析师评级。"
seq = await run_agent(q, parallel=False)
par = await run_agent(q, parallel=True)
print(f"串行: {seq['elapsed']:.2f}s")
print(f"并行: {par['elapsed']:.2f}s")
print(f"加速: {seq['elapsed']/par['elapsed']:.2f}x")
asyncio.run(main())
四、Cost & Performance 实测数据
4.1 streaming vs non-streaming(claude-sonnet-4-6, output=400 tok)
| 模式 | TTFT | 完成时间 | 用户感知(看到首字符) |
|---|---|---|---|
| Non-streaming | — | 8.5 s | 8.5 s |
| Streaming | 0.45 s | 8.6 s | 0.45 s |
| Streaming(prompt cache hit) | 0.12 s | 8.3 s | 0.12 s |
完成时间几乎一样,但用户感知延迟降 70x。流式是 LLM UX 的"必须"。
4.2 Speculative decoding(vLLM, Llama 4 70B + 1B draft, H100 x 4)
| 任务类型 | 接受率 α | TPOT (ms) | 加速比 |
|---|---|---|---|
| 关闭 spec | — | 42 | 1.0x |
| 翻译/重述 | 0.84 | 13 | 3.2x |
| 代码生成 | 0.71 | 20 | 2.1x |
| 复杂推理 | 0.52 | 28 | 1.5x |
| 金融 RAG 答案 | 0.78 | 16 | 2.6x |
4.3 并行 tool calls(3 个 tool,每个 1.5s)
| 模式 | Total |
|---|---|
| 串行 | 4.5s + 2× LLM round-trip(~2s) = 6.5s |
| 并行 | 1.5s + 2× LLM round-trip = 3.5s |
真实金融场景(5 个数据源 tool call):串行 9-12s,并行 2.8s
五、金融领域应用
- 实时风控对话:客户经理跟系统对话查客户征信、风险评级,必须流式 + 并行 tool call,P95 < 2s
- 交易员 copilot:盘中查多个标的、研报、新闻,3-5 个 tool 并行,节省决策窗口
- 客服 ASR + LLM:语音输入流式转文字 → LLM 流式生成 → TTS 流式合成,端到端 < 800ms 才自然
- 披露生成:长文(5K+ token),TPOT 必须低,否则一份披露要 3 分钟生成体验差,开 spec decode 砍半
- 延迟预算分配(金融对话 SLA P95 = 2s):网络 100ms + Gateway 50ms + LLM TTFT 600ms + 输出 100 token × 25ms TPOT = 1250ms + buffer 250ms
六、生产经验与陷阱
- streaming 不等于完整保护:网络中断时客户端可能拿到部分输出,必须实现 idempotent 重试 + checkpoint
- HTTP/1.1 keep-alive 超时:流式长输出(> 60s)可能被中间代理切断。CDN/Nginx
proxy_read_timeout调到 120s+ - spec decoding 不总是加速:α < 0.4 时反而变慢(draft 浪费的算力 > 加速)。复杂数学/SQL 类任务慎开
- 并行 tool 的依赖检测:并行只对独立 tool 有效,依赖链上的 tool(B 用 A 的结果)不能并行。Agent 框架要做依赖图分析
- TTFT 抖动:vLLM 高负载时 prefix prefill 排队,TTFT 从 200ms 飙到 2s。监控
time_to_first_token_seconds_p99 - stream 中的 error handling:流式中途报错(rate limit / context overflow),客户端要正确处理
errorevent 并触发重试 - 客户端渲染瓶颈:前端 markdown 重渲染每 token 都跑一次代价高,建议 buffer 50ms 批渲染
七、关键速查
| 指标 | 目标(金融对话) |
|---|---|
| TTFT P50 | < 500 ms |
| TTFT P95 | < 1.0 s |
| TPOT P50 | < 30 ms |
| End-to-end P95 | < 2.5 s |
| Tool call 并行度 | >= 3 |
| 优化手段 | 收益 | 成本 |
|---|---|---|
| Streaming | 70x 感知延迟 | 几乎 0 |
| Prompt caching | 90% 成本 + 50%+ TTFT | 实施工作量小 |
| Spec decoding | 1.5-3x TPOT | 模型部署复杂 |
| 并行 tool calls | 60-80% agent latency | 改造 agent 代码 |
| Chunked prefill | 稳 TTFT | vLLM flag |
八、面试题
-
TTFT 和 TPOT 的物理含义?哪个更难优化?
- TTFT = queue + prefill 时间,TPOT = 单 token decode 时间。TTFT 受 prompt 长度影响大,可用 prefix cache + chunked prefill 优化;TPOT 受模型大小影响大,需 spec decode / 量化 / 蒸馏
-
Speculative decoding 什么场景不该用?
- 接受率 α < 0.4 的复杂推理;draft 模型质量太差导致重写率高;GPU 内存吃紧时(draft 模型也要占显存)
-
Agent loop 如何做并行 tool call?依赖怎么办?
- 单轮 LLM 返回多个 tool_use 一起 await;依赖链拆成多轮,每轮内部并行;做静态依赖图(输入输出名分析)
-
为什么金融 chatbot 必须 streaming?
- 用户感知延迟从全等模型(5-10s)→ 看到首字符(<500ms),完成率/满意度差 5-10x;监管要求"实时响应"也只有 stream 才达标
-
客户在 P95 1.2s 体验差,如何分解定位?
- 用 Langfuse trace:客户端 → Gateway → 队列 → prefill → first chunk → decode → 渲染。常见瓶颈:prompt 太长(prefill 慢)、并发饱和(队列长)、跨 region 网络
明日预告
Day 166:Eval 体系 — Deterministic 测试 LLM 输出"非确定性"是 bug 的源头。今天聚焦 deterministic eval:用单元测试、regex、JSON schema、structural assert 写出"可重复跑、能 fail loud"的 20 条 eval。