实时交易基础设施 / Real-time Trading Infrastructure
Geth/Erigon tracing API、mempool 订阅、Chainstream/BloXroute 商用方案、tx pool 内部工作机制
日期: 2026-08-24 方向: MEV / DEX量化 阶段: Phase 2 - MEV与DEX量化 (Day 103-116) 标签: #Mempool #Geth #Erigon #Chainstream #BloXroute #Tracing
今日目标 / Today's Objectives
| 类型 | 内容 |
|---|---|
| 学习 | Geth/Erigon tracing API、mempool 订阅、Chainstream/BloXroute 商用方案、tx pool 内部工作机制 |
| 实操 | 用 Python 订阅 mempool 并解析 swap tx |
| 产出 | mempool.py — Websocket mempool listener + decoded swap output |
1. 核心机制 / Core Mechanics
1.1 Mempool 是什么
Mempool = pending transaction pool, 节点接收但未打包的 tx 集合。
关键事实:
- 每个节点有自己的 mempool (轻微差异)
- Public mempool 通过 P2P 传播 (DEVp2p, Wire Protocol)
- Tx 进入 mempool 后, 任何 listening node 都能看到
- Private mempool: Flashbots 等 relay 接收但不广播
1.2 Mempool 订阅方式
A. 自建 Erigon node + WebSocket
最准确, 但需要全节点 infra ($1-2K/mo for archive)。
# 启动 Erigon 并开 ws
erigon --http.api=eth,net,trace,txpool,debug \
--ws --ws.api=eth,net,trace,txpool \
--txpool.priceLimit=1000000000 # 1 gwei
订阅 pending tx:
import websocket, json
ws = websocket.create_connection("ws://localhost:8545")
ws.send(json.dumps({
"jsonrpc": "2.0", "id": 1,
"method": "eth_subscribe",
"params": ["newPendingTransactions", True] # True = full tx body
}))
while True:
msg = json.loads(ws.recv())
print(msg)
B. 商业 RPC (BloXroute, Chainstack, Alchemy)
优势: 全球节点网络, < 100ms 收到 tx (比自建快 50-200ms)。 劣势: 月费 $200-5000+。
BloXroute 订阅:
ws = websocket.create_connection(
"wss://api.blxrbdn.com/ws",
header=["Authorization: <BLOXROUTE_AUTH>"],
)
ws.send(json.dumps({
"jsonrpc": "2.0",
"id": 1,
"method": "subscribe",
"params": ["pendingTxs", {"include": ["tx_hash", "tx_contents"]}],
}))
Chainstream (Solana 主导): Solana mempool 不存在传统意义, 用 Jito ShredStream 可以拿到 leader 的 partially-built block。
C. Flashbots Stream API
订阅 Flashbots Protect RPC 的 transaction events (限 user 的 tx, 不是全网)。
1.3 Tx Pool Internal Machinery
Geth tx pool 数据结构 (concept):
TxPool {
pending: map[address]*nonce→*Transaction // 可执行 tx
queued: map[address]*nonce→*Transaction // 不可执行 (gap nonce)
all: ordered by gasPrice // 价格排序
// promotion: queued → pending when prerequisite tx confirmed
// demotion: pending → queued when nonce gap detected
}
关键参数:
--txpool.pricelimit: 最低 gas price--txpool.globalslots: 全局 pending 上限 (4096 default)--txpool.accountslots: 单账户 pending 上限 (16 default)
1.4 Tracing API
调试 / 模拟 tx 执行的 RPC:
debug_traceTransaction
trace = w3.provider.make_request("debug_traceTransaction", [
"0xabc...", {"tracer": "callTracer"}
])
返回完整 call hierarchy: 谁调谁, 多少 gas, return data。
trace_call (Erigon-only, 性能更好)
result = w3.provider.make_request("trace_call", [
{"to": "0x...", "data": "0x..."}, ["trace", "stateDiff"], "latest"
])
eth_callBundle (Flashbots)
模拟 bundle 完整执行, 返回 gasUsed + 是否 revert。
2. 架构图与延迟预算 / Architecture & Latency Budget
┌────── User wallet ──────┐
│ signs + sends tx │
└────────────┬─────────────┘
│
│ ~10-50ms (RPC propagation)
▼
┌───── Public mempool (P2P) ──────┐
│ Tx broadcasts via DEVp2p │
│ Reaches 80% nodes in < 200ms │
└────────────┬─────────────────────┘
│
┌───────────────────────┼───────────────────────┐
│ │ │
▼ ▼ ▼
┌───────────┐ ┌──────────────────┐ ┌───────────────┐
│ Self-hosted│ │ BloXroute / etc │ │ Searcher bot │
│ Erigon ws │ │ Cloud node │ │ (mempool sub) │
└─────┬──────┘ └─────────┬────────┘ └──────┬────────┘
│ │ │
│ 1-30ms │ 100-500ms │
│ │ │
└─────────┬───────────┴───────┬────────────┘
│ │
▼ ▼
┌── Decoder (pyev / abi) ──┐
│ signature lookup + │
│ parameter decode │
└────────┬──────────────────┘
│
▼
┌── Strategy engine ───────┐
│ - Sandwich PnL? │
│ - Arb opportunity? │
│ - JIT setup? │
└────────┬──────────────────┘
│
▼
Bundle build → submit (Day 104)
端到端预算 (顶级 searcher): < 100ms from tx broadcast to bundle submission
3. 代码实现 / mempool.py
"""
mempool.py — Subscribe to mempool, decode swaps, output potential MEV ops.
Reqs: pip install web3 websocket-client
Env: WS_URL (e.g. ws://localhost:8545 or wss://api.blxrbdn.com/ws)
"""
import json
import os
import websocket
from web3 import Web3
from eth_abi import decode
WS = os.environ.get("WS_URL", "ws://localhost:8545")
w3 = Web3(Web3.HTTPProvider(os.environ.get("ETH_RPC", "https://eth.llamarpc.com")))
# Common router selectors
SELECTORS = {
"0x38ed1739": "swapExactTokensForTokens", # V2
"0x7ff36ab5": "swapExactETHForTokens", # V2
"0x18cbafe5": "swapExactTokensForETH", # V2
"0x414bf389": "exactInputSingle", # V3
"0xc04b8d59": "exactInput", # V3 multi-hop
"0x3593564c": "execute", # Universal Router
}
UNI_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
UNI_V3_ROUTER = "0xE592427A0AEce92De3Edee1F18E0157C05861564"
UNIVERSAL_ROUTER = "0x66a9893cC07D91D95644AEDD05D03f95e1dBA8Af"
def decode_swap_v2(input_hex: str):
"""swapExactTokensForTokens(amountIn, amountOutMin, path, to, deadline)"""
raw = bytes.fromhex(input_hex[10:]) # strip selector
types = ["uint256", "uint256", "address[]", "address", "uint256"]
return decode(types, raw)
def decode_swap_v3_single(input_hex: str):
"""exactInputSingle((address,address,uint24,address,uint256,uint256,uint160))"""
raw = bytes.fromhex(input_hex[10:])
# struct ExactInputSingleParams encoded as tuple
types = ["(address,address,uint24,address,uint256,uint256,uint160)"]
return decode(types, raw)
def classify(tx) -> dict:
info = {"hash": tx.get("hash", ""), "to": tx.get("to", ""), "value": tx.get("value", "0x0")}
data = tx.get("input") or tx.get("data") or ""
if not data or len(data) < 10:
return None
selector = data[:10].lower()
fn_name = SELECTORS.get(selector)
if not fn_name:
return None
info["fn"] = fn_name
try:
if fn_name in ("swapExactTokensForTokens", "swapExactETHForTokens", "swapExactTokensForETH"):
params = decode_swap_v2(data)
info["amountIn"] = params[0]
info["amountOutMin"] = params[1]
info["path"] = [a.hex() for a in params[2]]
info["recipient"] = params[3]
elif fn_name == "exactInputSingle":
params = decode_swap_v3_single(data)
info["tokenIn"] = params[0][0]
info["tokenOut"] = params[0][1]
info["fee"] = params[0][2]
info["amountIn"] = params[0][4]
info["amountOutMin"] = params[0][5]
except Exception as e:
info["decode_error"] = str(e)
return info
def opportunity_score(swap_info: dict) -> int:
"""Simple heuristic: large swap = higher MEV potential."""
if not swap_info or "amountIn" not in swap_info:
return 0
# Convert raw amount to USD-ish (assume token has 6 or 18 decimals)
amount = swap_info["amountIn"]
if amount > 10**6 * 10**6: # > 1M USDC eq
return 10
if amount > 10**5 * 10**6: # > 100k
return 5
if amount > 10**3 * 10**6: # > 1k
return 1
return 0
def main():
print(f"[+] Connecting to {WS}")
ws = websocket.create_connection(WS, timeout=10)
sub_id = 1
ws.send(json.dumps({
"jsonrpc": "2.0",
"id": sub_id,
"method": "eth_subscribe",
"params": ["newPendingTransactions", True], # full tx body if supported
}))
print("[+] Subscribed; listening...")
while True:
try:
raw = ws.recv()
msg = json.loads(raw)
if "params" not in msg:
continue
tx = msg["params"]["result"]
# If only hash returned, fetch full tx
if isinstance(tx, str):
full = w3.eth.get_transaction(tx)
tx = dict(full)
tx["input"] = tx.get("input", "0x")
tx["hash"] = full.hash.hex()
info = classify(tx)
if info is None:
continue
score = opportunity_score(info)
if score >= 5:
print(f"[!] MEV-relevant tx: {info['hash']} fn={info['fn']} score={score}")
print(f" amountIn={info.get('amountIn')} to={info.get('to')}")
except websocket.WebSocketException as e:
print(f"[err] ws: {e}; reconnecting...")
ws = websocket.create_connection(WS, timeout=10)
except KeyboardInterrupt:
print("[+] shutting down")
break
except Exception as e:
print(f"[err] {e}")
if __name__ == "__main__":
main()
Run:
WS_URL=ws://localhost:8545 \
ETH_RPC=https://eth.llamarpc.com \
python mempool.py
预期输出:
[+] Connecting to ws://localhost:8545
[+] Subscribed; listening...
[!] MEV-relevant tx: 0xabc1234... fn=swapExactTokensForTokens score=5
amountIn=120000000000 to=0x7a25...488D
[!] MEV-relevant tx: 0xdef5678... fn=exactInputSingle score=10
amountIn=2500000000000 to=0xE592...1564
4. 真实数据 / Real Data
| 数据点 | 数值 |
|---|---|
| Ethereum daily mempool throughput | ~1.2M tx |
| 全网 mempool tx 平均存活时间 | 30-90 秒 (含 dropped) |
| Average tx propagation P50 | ~150ms (DEVp2p) |
| Top mempool services 延迟 P50 | < 50ms (BloXroute, Chainstack) |
| Erigon mempool size | 4096 default, 实际跑到 5K-10K |
| Private mempool 占比 | ~20-25% (Flashbots Protect, MM RPC) |
| 商业 mempool 服务商 | 月费 (起步) | 延迟 P50 |
|---|---|---|
| BloXroute Cloud | $200 | 30ms |
| Chainstack | $99-499 | 50ms |
| Alchemy Subscriptions | $49+ | 80ms |
| QuickNode | $49+ | 60ms |
| Eden Network | $$$ | 20ms (与 builder 直连) |
5. 经济学分析 / Economic Analysis
5.1 延迟价值
典型 atomic arb opportunity 寿命: 1-3 秒。 价值衰减: 错过 100ms → 利润减少 5-10% (其他 searcher 已抢先)。
Co-location 经济:
- 自建 Erigon + colocate AWS Tokyo: ~$1.5K/month
- BloXroute Pro Tier (top quartile latency): ~$2-5K/month
- Eden / 私下 builder relationship: $10K+/month + revenue share
ROI: 顶级 atomic arb searcher 月度利润 $1-5M, 延迟基础设施成本 $5-30K → 利润率 ≥ 95%。
5.2 数据基础设施作为护城河
机构 quant 的 alpha 一半来自策略, 一半来自基础设施。搭建一套延迟 < 50ms 的端到端系统需要 $1-3M 初始投入 + 6-12 月迭代。这是为什么 jaredfromsubway, MEV Bot 0x000... 等头部 searcher 长期保持 dominance。
5.3 Public mempool 的衰减
趋势: 越来越多 tx 走 private mempool (Flashbots Protect, OFA)
- 2022: ~5% private
- 2024: ~20-25% private
- 2026 (估算): ~35-40% private
对 searcher 的影响: public mempool 的 "feed" 越来越差, 必须接入 OFA filler 流, 接 wallet/MM 直送, 才能持续 alpha。
6. 机构视角 / Institutional Perspective
机构实时 stack 选择:
| 用例 | 推荐方案 |
|---|---|
| HFT searcher | Self-hosted Erigon + colocate + private builder relay |
| Mid-tier MM | BloXroute / Chainstack + Erigon backup |
| Risk monitoring | Alchemy / QuickNode subscription |
| Research / 慢分析 | Alchemy free / QuickNode standard |
| Compliance (TRM/Chainalysis) | 商业 service |
机构 PM checkpoint:
- Multi-region failover (US East + EU + Asia)
- Backup public RPC (Infura, llamarpc) 万一自建挂掉
- Tx submission redundancy (送 ≥3 builder)
- Latency monitoring (P99 alert)
- tx_pool snapshot 备份 (可恢复策略)
机构敏感的法律问题:
- Mempool snooping 是否构成 wire fraud 在 SDNY 已有判例 (DOJ v. Avi Eisenberg 2023)
- Frontrunning 在多数司法辖区合法 (因为没有 fiduciary duty), 但 sandwich 灰色
7. 风险与陷阱 / Risks & Pitfalls
- WS 连接断: 长时间运行的 ws 频繁掉线, 需 robust reconnect 逻辑
- 解码错误: tx data 格式多样 (Universal Router, multicall), 单一 decoder 覆盖率 < 70%
- Tx 永不上链 (dropped): 看到的 mempool tx 可能 30s 后被 replace 或 timeout
- Public mempool 中毒: 对手故意发 honeypot tx (revert on simulate)
- 节点同步延迟: 自建节点偶尔 fall behind head, 看到的 tx 已经过期
- Memory blow up: 高频订阅 mempool, Python 内存占用增长 (需 lru_cache + 定期清理)
- Provider 限流: Alchemy free tier 每秒 3 个请求, 远不够 mempool 监听
8. 关键速查 / Quick Reference
| RPC method | 用途 |
|---|---|
eth_subscribe newPendingTransactions | 订阅 pending tx |
txpool_content (Erigon) | snapshot 当前 mempool |
txpool_status | mempool 大小统计 |
debug_traceTransaction | trace 已上链 tx |
trace_call (Erigon) | trace pending tx |
eth_callBundle (Flashbots) | simulate bundle |
eth_simulateV1 (新, Geth 1.13+) | 多 tx state-aware sim |
| 服务商 | 接入方式 |
|---|---|
| BloXroute | wss://api.blxrbdn.com/ws + Auth header |
| Chainstack | wss://nd-XXX-XXX.p2pify.com |
| Erigon | ws://your-host:8545 + flag enable |
| Alchemy WS | wss://eth-mainnet.g.alchemy.com/v2/<KEY> |
9. 面试题 / Interview Questions
- 设计一个 latency-sensitive MEV searcher 的端到端架构。从节点选型到 bundle 提交,哪一层最值得投入优化?
- 解释
eth_subscribe newPendingTransactions与txpool_content的核心差异。在什么场景下用哪个? - Public mempool 私有化趋势 (2024 ~25%) 对 searcher 行业有什么长期影响?
- 如果你是 Wintermute 的 head of infra,预算 $5M/year,如何分配 mempool / builder / inventory / R&D?
- 写一个 mempool listener 的 unit test 框架 (考虑:reconnect, decode coverage, dropped tx, reorg)。
10. 明日预告 / Tomorrow
Day 116: Week 17 复习 — DEX 量化全栈 — 整合 Day 110-115 的所有 DEX 量化能力,搭建一个 unified dex_strategies Python 库 v1,包含 router、LVR calc、bribe analysis、mempool listener、subgraph helper。