返回 Expert 笔记
Expert Day 115

实时交易基础设施 / Real-time Trading Infrastructure

Geth/Erigon tracing API、mempool 订阅、Chainstream/BloXroute 商用方案、tx pool 内部工作机制

2026-08-24
Phase 2 - MEV与DEX量化 (Day 103-116)
MempoolGethErigonChainstreamBloXrouteTracing

日期: 2026-08-24 方向: MEV / DEX量化 阶段: Phase 2 - MEV与DEX量化 (Day 103-116) 标签: #Mempool #Geth #Erigon #Chainstream #BloXroute #Tracing


今日目标 / Today's Objectives

类型内容
学习Geth/Erigon tracing API、mempool 订阅、Chainstream/BloXroute 商用方案、tx pool 内部工作机制
实操用 Python 订阅 mempool 并解析 swap tx
产出mempool.py — Websocket mempool listener + decoded swap output

1. 核心机制 / Core Mechanics

1.1 Mempool 是什么

Mempool = pending transaction pool, 节点接收但未打包的 tx 集合。

关键事实:

  • 每个节点有自己的 mempool (轻微差异)
  • Public mempool 通过 P2P 传播 (DEVp2p, Wire Protocol)
  • Tx 进入 mempool 后, 任何 listening node 都能看到
  • Private mempool: Flashbots 等 relay 接收但不广播

1.2 Mempool 订阅方式

A. 自建 Erigon node + WebSocket

最准确, 但需要全节点 infra ($1-2K/mo for archive)。

# 启动 Erigon 并开 ws
erigon --http.api=eth,net,trace,txpool,debug \
       --ws --ws.api=eth,net,trace,txpool \
       --txpool.priceLimit=1000000000  # 1 gwei

订阅 pending tx:

import websocket, json
ws = websocket.create_connection("ws://localhost:8545")
ws.send(json.dumps({
  "jsonrpc": "2.0", "id": 1,
  "method": "eth_subscribe",
  "params": ["newPendingTransactions", True]   # True = full tx body
}))
while True:
    msg = json.loads(ws.recv())
    print(msg)

B. 商业 RPC (BloXroute, Chainstack, Alchemy)

优势: 全球节点网络, < 100ms 收到 tx (比自建快 50-200ms)。 劣势: 月费 $200-5000+。

BloXroute 订阅:

ws = websocket.create_connection(
    "wss://api.blxrbdn.com/ws",
    header=["Authorization: <BLOXROUTE_AUTH>"],
)
ws.send(json.dumps({
    "jsonrpc": "2.0",
    "id": 1,
    "method": "subscribe",
    "params": ["pendingTxs", {"include": ["tx_hash", "tx_contents"]}],
}))

Chainstream (Solana 主导): Solana mempool 不存在传统意义, 用 Jito ShredStream 可以拿到 leader 的 partially-built block。

C. Flashbots Stream API

订阅 Flashbots Protect RPC 的 transaction events (限 user 的 tx, 不是全网)。

1.3 Tx Pool Internal Machinery

Geth tx pool 数据结构 (concept):

TxPool {
  pending: map[address]*nonce→*Transaction   // 可执行 tx
  queued:  map[address]*nonce→*Transaction   // 不可执行 (gap nonce)
  all:     ordered by gasPrice                // 价格排序
  
  // promotion: queued → pending when prerequisite tx confirmed
  // demotion:  pending → queued when nonce gap detected
}

关键参数:

  • --txpool.pricelimit: 最低 gas price
  • --txpool.globalslots: 全局 pending 上限 (4096 default)
  • --txpool.accountslots: 单账户 pending 上限 (16 default)

1.4 Tracing API

调试 / 模拟 tx 执行的 RPC:

debug_traceTransaction

trace = w3.provider.make_request("debug_traceTransaction", [
    "0xabc...", {"tracer": "callTracer"}
])

返回完整 call hierarchy: 谁调谁, 多少 gas, return data。

trace_call (Erigon-only, 性能更好)

result = w3.provider.make_request("trace_call", [
    {"to": "0x...", "data": "0x..."}, ["trace", "stateDiff"], "latest"
])

eth_callBundle (Flashbots)

模拟 bundle 完整执行, 返回 gasUsed + 是否 revert。


2. 架构图与延迟预算 / Architecture & Latency Budget

                  ┌────── User wallet ──────┐
                  │  signs + sends tx        │
                  └────────────┬─────────────┘
                               │
                               │  ~10-50ms (RPC propagation)
                               ▼
              ┌───── Public mempool (P2P) ──────┐
              │  Tx broadcasts via DEVp2p        │
              │  Reaches 80% nodes in < 200ms    │
              └────────────┬─────────────────────┘
                           │
   ┌───────────────────────┼───────────────────────┐
   │                       │                       │
   ▼                       ▼                       ▼
┌───────────┐    ┌──────────────────┐    ┌───────────────┐
│ Self-hosted│    │ BloXroute / etc │    │ Searcher bot  │
│ Erigon ws  │    │ Cloud node      │    │ (mempool sub) │
└─────┬──────┘    └─────────┬────────┘    └──────┬────────┘
      │                     │                    │
      │   1-30ms             │  100-500ms         │
      │                     │                    │
      └─────────┬───────────┴───────┬────────────┘
                │                    │
                ▼                    ▼
        ┌── Decoder (pyev / abi) ──┐
        │  signature lookup +       │
        │  parameter decode         │
        └────────┬──────────────────┘
                 │
                 ▼
       ┌── Strategy engine ───────┐
       │  - Sandwich PnL?          │
       │  - Arb opportunity?       │
       │  - JIT setup?             │
       └────────┬──────────────────┘
                │
                ▼
        Bundle build → submit (Day 104)
        
端到端预算 (顶级 searcher): < 100ms from tx broadcast to bundle submission

3. 代码实现 / mempool.py

"""
mempool.py — Subscribe to mempool, decode swaps, output potential MEV ops.
Reqs: pip install web3 websocket-client
Env:  WS_URL (e.g. ws://localhost:8545 or wss://api.blxrbdn.com/ws)
"""
import json
import os
import websocket
from web3 import Web3
from eth_abi import decode

WS = os.environ.get("WS_URL", "ws://localhost:8545")
w3 = Web3(Web3.HTTPProvider(os.environ.get("ETH_RPC", "https://eth.llamarpc.com")))

# Common router selectors
SELECTORS = {
    "0x38ed1739": "swapExactTokensForTokens",          # V2
    "0x7ff36ab5": "swapExactETHForTokens",             # V2
    "0x18cbafe5": "swapExactTokensForETH",             # V2
    "0x414bf389": "exactInputSingle",                  # V3
    "0xc04b8d59": "exactInput",                        # V3 multi-hop
    "0x3593564c": "execute",                           # Universal Router
}

UNI_V2_ROUTER = "0x7a250d5630B4cF539739dF2C5dAcb4c659F2488D"
UNI_V3_ROUTER = "0xE592427A0AEce92De3Edee1F18E0157C05861564"
UNIVERSAL_ROUTER = "0x66a9893cC07D91D95644AEDD05D03f95e1dBA8Af"


def decode_swap_v2(input_hex: str):
    """swapExactTokensForTokens(amountIn, amountOutMin, path, to, deadline)"""
    raw = bytes.fromhex(input_hex[10:])  # strip selector
    types = ["uint256", "uint256", "address[]", "address", "uint256"]
    return decode(types, raw)


def decode_swap_v3_single(input_hex: str):
    """exactInputSingle((address,address,uint24,address,uint256,uint256,uint160))"""
    raw = bytes.fromhex(input_hex[10:])
    # struct ExactInputSingleParams encoded as tuple
    types = ["(address,address,uint24,address,uint256,uint256,uint160)"]
    return decode(types, raw)


def classify(tx) -> dict:
    info = {"hash": tx.get("hash", ""), "to": tx.get("to", ""), "value": tx.get("value", "0x0")}
    data = tx.get("input") or tx.get("data") or ""
    if not data or len(data) < 10:
        return None
    selector = data[:10].lower()
    fn_name = SELECTORS.get(selector)
    if not fn_name:
        return None
    info["fn"] = fn_name
    try:
        if fn_name in ("swapExactTokensForTokens", "swapExactETHForTokens", "swapExactTokensForETH"):
            params = decode_swap_v2(data)
            info["amountIn"] = params[0]
            info["amountOutMin"] = params[1]
            info["path"] = [a.hex() for a in params[2]]
            info["recipient"] = params[3]
        elif fn_name == "exactInputSingle":
            params = decode_swap_v3_single(data)
            info["tokenIn"] = params[0][0]
            info["tokenOut"] = params[0][1]
            info["fee"] = params[0][2]
            info["amountIn"] = params[0][4]
            info["amountOutMin"] = params[0][5]
    except Exception as e:
        info["decode_error"] = str(e)
    return info


def opportunity_score(swap_info: dict) -> int:
    """Simple heuristic: large swap = higher MEV potential."""
    if not swap_info or "amountIn" not in swap_info:
        return 0
    # Convert raw amount to USD-ish (assume token has 6 or 18 decimals)
    amount = swap_info["amountIn"]
    if amount > 10**6 * 10**6:    # > 1M USDC eq
        return 10
    if amount > 10**5 * 10**6:    # > 100k
        return 5
    if amount > 10**3 * 10**6:    # > 1k
        return 1
    return 0


def main():
    print(f"[+] Connecting to {WS}")
    ws = websocket.create_connection(WS, timeout=10)
    sub_id = 1
    ws.send(json.dumps({
        "jsonrpc": "2.0",
        "id": sub_id,
        "method": "eth_subscribe",
        "params": ["newPendingTransactions", True],   # full tx body if supported
    }))
    print("[+] Subscribed; listening...")
    
    while True:
        try:
            raw = ws.recv()
            msg = json.loads(raw)
            if "params" not in msg:
                continue
            tx = msg["params"]["result"]
            # If only hash returned, fetch full tx
            if isinstance(tx, str):
                full = w3.eth.get_transaction(tx)
                tx = dict(full)
                tx["input"] = tx.get("input", "0x")
                tx["hash"] = full.hash.hex()
            
            info = classify(tx)
            if info is None:
                continue
            
            score = opportunity_score(info)
            if score >= 5:
                print(f"[!] MEV-relevant tx: {info['hash']}  fn={info['fn']}  score={score}")
                print(f"    amountIn={info.get('amountIn')}  to={info.get('to')}")
        except websocket.WebSocketException as e:
            print(f"[err] ws: {e}; reconnecting...")
            ws = websocket.create_connection(WS, timeout=10)
        except KeyboardInterrupt:
            print("[+] shutting down")
            break
        except Exception as e:
            print(f"[err] {e}")


if __name__ == "__main__":
    main()

Run:

WS_URL=ws://localhost:8545 \
ETH_RPC=https://eth.llamarpc.com \
python mempool.py

预期输出:

[+] Connecting to ws://localhost:8545
[+] Subscribed; listening...
[!] MEV-relevant tx: 0xabc1234...  fn=swapExactTokensForTokens  score=5
    amountIn=120000000000  to=0x7a25...488D
[!] MEV-relevant tx: 0xdef5678...  fn=exactInputSingle  score=10
    amountIn=2500000000000  to=0xE592...1564

4. 真实数据 / Real Data

数据点数值
Ethereum daily mempool throughput~1.2M tx
全网 mempool tx 平均存活时间30-90 秒 (含 dropped)
Average tx propagation P50~150ms (DEVp2p)
Top mempool services 延迟 P50< 50ms (BloXroute, Chainstack)
Erigon mempool size4096 default, 实际跑到 5K-10K
Private mempool 占比~20-25% (Flashbots Protect, MM RPC)
商业 mempool 服务商月费 (起步)延迟 P50
BloXroute Cloud$20030ms
Chainstack$99-49950ms
Alchemy Subscriptions$49+80ms
QuickNode$49+60ms
Eden Network$$$20ms (与 builder 直连)

5. 经济学分析 / Economic Analysis

5.1 延迟价值

典型 atomic arb opportunity 寿命: 1-3 秒。 价值衰减: 错过 100ms → 利润减少 5-10% (其他 searcher 已抢先)。

Co-location 经济:

  • 自建 Erigon + colocate AWS Tokyo: ~$1.5K/month
  • BloXroute Pro Tier (top quartile latency): ~$2-5K/month
  • Eden / 私下 builder relationship: $10K+/month + revenue share

ROI: 顶级 atomic arb searcher 月度利润 $1-5M, 延迟基础设施成本 $5-30K → 利润率 ≥ 95%。

5.2 数据基础设施作为护城河

机构 quant 的 alpha 一半来自策略, 一半来自基础设施。搭建一套延迟 < 50ms 的端到端系统需要 $1-3M 初始投入 + 6-12 月迭代。这是为什么 jaredfromsubway, MEV Bot 0x000... 等头部 searcher 长期保持 dominance。

5.3 Public mempool 的衰减

趋势: 越来越多 tx 走 private mempool (Flashbots Protect, OFA)

  • 2022: ~5% private
  • 2024: ~20-25% private
  • 2026 (估算): ~35-40% private

对 searcher 的影响: public mempool 的 "feed" 越来越差, 必须接入 OFA filler 流, 接 wallet/MM 直送, 才能持续 alpha。


6. 机构视角 / Institutional Perspective

机构实时 stack 选择:

用例推荐方案
HFT searcherSelf-hosted Erigon + colocate + private builder relay
Mid-tier MMBloXroute / Chainstack + Erigon backup
Risk monitoringAlchemy / QuickNode subscription
Research / 慢分析Alchemy free / QuickNode standard
Compliance (TRM/Chainalysis)商业 service

机构 PM checkpoint:

  • Multi-region failover (US East + EU + Asia)
  • Backup public RPC (Infura, llamarpc) 万一自建挂掉
  • Tx submission redundancy (送 ≥3 builder)
  • Latency monitoring (P99 alert)
  • tx_pool snapshot 备份 (可恢复策略)

机构敏感的法律问题:

  • Mempool snooping 是否构成 wire fraud 在 SDNY 已有判例 (DOJ v. Avi Eisenberg 2023)
  • Frontrunning 在多数司法辖区合法 (因为没有 fiduciary duty), 但 sandwich 灰色

7. 风险与陷阱 / Risks & Pitfalls

  1. WS 连接断: 长时间运行的 ws 频繁掉线, 需 robust reconnect 逻辑
  2. 解码错误: tx data 格式多样 (Universal Router, multicall), 单一 decoder 覆盖率 < 70%
  3. Tx 永不上链 (dropped): 看到的 mempool tx 可能 30s 后被 replace 或 timeout
  4. Public mempool 中毒: 对手故意发 honeypot tx (revert on simulate)
  5. 节点同步延迟: 自建节点偶尔 fall behind head, 看到的 tx 已经过期
  6. Memory blow up: 高频订阅 mempool, Python 内存占用增长 (需 lru_cache + 定期清理)
  7. Provider 限流: Alchemy free tier 每秒 3 个请求, 远不够 mempool 监听

8. 关键速查 / Quick Reference

RPC method用途
eth_subscribe newPendingTransactions订阅 pending tx
txpool_content (Erigon)snapshot 当前 mempool
txpool_statusmempool 大小统计
debug_traceTransactiontrace 已上链 tx
trace_call (Erigon)trace pending tx
eth_callBundle (Flashbots)simulate bundle
eth_simulateV1 (新, Geth 1.13+)多 tx state-aware sim
服务商接入方式
BloXroutewss://api.blxrbdn.com/ws + Auth header
Chainstackwss://nd-XXX-XXX.p2pify.com
Erigonws://your-host:8545 + flag enable
Alchemy WSwss://eth-mainnet.g.alchemy.com/v2/<KEY>

9. 面试题 / Interview Questions

  1. 设计一个 latency-sensitive MEV searcher 的端到端架构。从节点选型到 bundle 提交,哪一层最值得投入优化?
  2. 解释 eth_subscribe newPendingTransactionstxpool_content 的核心差异。在什么场景下用哪个?
  3. Public mempool 私有化趋势 (2024 ~25%) 对 searcher 行业有什么长期影响?
  4. 如果你是 Wintermute 的 head of infra,预算 $5M/year,如何分配 mempool / builder / inventory / R&D?
  5. 写一个 mempool listener 的 unit test 框架 (考虑:reconnect, decode coverage, dropped tx, reorg)。

10. 明日预告 / Tomorrow

Day 116: Week 17 复习 — DEX 量化全栈 — 整合 Day 110-115 的所有 DEX 量化能力,搭建一个 unified dex_strategies Python 库 v1,包含 router、LVR calc、bribe analysis、mempool listener、subgraph helper。