返回 Expert 笔记
Expert Day 114

链上数据基础设施 / On-Chain Data Infrastructure

The Graph / Subgraph 架构、Goldsky 替代方案、Flipside SDK、Dune SDK、ETL pipeline 设计

2026-08-23
Phase 2 - MEV与DEX量化 (Day 103-116)
TheGraphSubgraphGoldskyFlipsideDataInfra

日期: 2026-08-23 方向: MEV / DEX量化 阶段: Phase 2 - MEV与DEX量化 (Day 103-116) 标签: #TheGraph #Subgraph #Goldsky #Flipside #DataInfra


今日目标 / Today's Objectives

类型内容
学习The Graph / Subgraph 架构、Goldsky 替代方案、Flipside SDK、Dune SDK、ETL pipeline 设计
实操搭建一个简单 subgraph 索引 USDC/WETH V3 池子的 Swap 事件
产出subgraph_demo — 完整 schema.graphql + mapping.ts + subgraph.yaml

1. 核心机制 / Core Mechanics

1.1 为什么需要数据基础设施

Ethereum / EVM RPC 直接查询的限制:

  • eth_getLogs 不支持复杂过滤
  • 没有聚合查询 (e.g. "今天的总 swap 量")
  • 没有 join 多合约数据
  • 没有历史 reorg 处理
  • 无法支持 GraphQL/SQL 查询

Subgraph (The Graph) 解决这些痛点:把链上事件 ETL 到 PostgreSQL + GraphQL API。

1.2 The Graph 架构

┌──── Ethereum nodes (full / archive) ─────┐
│  Erigon, Nethermind, etc.                │
└────────────────┬─────────────────────────┘
                 │ Firehose / RPC
                 ▼
┌──── Indexer (graph-node) ────────────────┐
│  - Subscribe to events                    │
│  - Run mapping.ts (AssemblyScript)        │
│  - Write to PostgreSQL                    │
│  - Reorg handling                         │
└────────────────┬─────────────────────────┘
                 │ GraphQL endpoint
                 ▼
┌──── DApp / Analyzer ─────────────────────┐
│  Query: "Top 10 LP by fee"                │
└──────────────────────────────────────────┘

Hosted service:

  • The Graph Network (decentralized, GRT-staked indexer)
  • Goldsky (faster, centralized)
  • Subgraph Studio (官方开发平台)

1.3 Subgraph 三大文件

A. subgraph.yaml (Manifest)

specVersion: 0.0.5
schema:
  file: ./schema.graphql
dataSources:
  - kind: ethereum
    name: UniswapV3Pool
    network: mainnet
    source:
      address: "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640"  # USDC/WETH V3 0.05%
      abi: UniswapV3Pool
      startBlock: 12369621
    mapping:
      kind: ethereum/events
      apiVersion: 0.0.7
      language: wasm/assemblyscript
      file: ./src/mapping.ts
      entities:
        - Swap
        - Pool
        - Hourly
      abis:
        - name: UniswapV3Pool
          file: ./abis/UniswapV3Pool.json
      eventHandlers:
        - event: Swap(indexed address,indexed address,int256,int256,uint160,uint128,int24)
          handler: handleSwap

B. schema.graphql

type Pool @entity {
  id: ID!
  token0: String!
  token1: String!
  totalVolumeUSD: BigDecimal!
  totalSwaps: BigInt!
  hourlyData: [Hourly!]! @derivedFrom(field: "pool")
}

type Swap @entity(immutable: true) {
  id: ID!  # tx_hash + log_index
  pool: Pool!
  sender: String!
  recipient: String!
  amount0: BigInt!
  amount1: BigInt!
  amountUSD: BigDecimal!
  sqrtPriceX96: BigInt!
  liquidity: BigInt!
  tick: Int!
  blockNumber: BigInt!
  timestamp: BigInt!
}

type Hourly @entity {
  id: ID!  # pool_id + hour_index
  pool: Pool!
  hourStart: BigInt!
  volumeUSD: BigDecimal!
  txCount: BigInt!
  feeUSD: BigDecimal!
}

C. src/mapping.ts

import { Swap as SwapEvent } from "../generated/UniswapV3Pool/UniswapV3Pool"
import { Pool, Swap, Hourly } from "../generated/schema"
import { BigDecimal, BigInt, log } from "@graphprotocol/graph-ts"

const POOL_ID = "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640"

export function handleSwap(event: SwapEvent): void {
  // Load or create pool
  let pool = Pool.load(POOL_ID)
  if (!pool) {
    pool = new Pool(POOL_ID)
    pool.token0 = "USDC"
    pool.token1 = "WETH"
    pool.totalVolumeUSD = BigDecimal.fromString("0")
    pool.totalSwaps = BigInt.fromI32(0)
  }

  // Compute USD value (simplified: assume 1 USDC = 1 USD; |amount0| in USDC dec=6)
  let amount0_usd = event.params.amount0.toBigDecimal().abs()
    .div(BigDecimal.fromString("1000000"))  // 6 decimals

  // Save Swap entity
  let swapId = event.transaction.hash.toHexString() + "-" + event.logIndex.toString()
  let swap = new Swap(swapId)
  swap.pool = POOL_ID
  swap.sender = event.params.sender.toHexString()
  swap.recipient = event.params.recipient.toHexString()
  swap.amount0 = event.params.amount0
  swap.amount1 = event.params.amount1
  swap.amountUSD = amount0_usd
  swap.sqrtPriceX96 = event.params.sqrtPriceX96
  swap.liquidity = event.params.liquidity
  swap.tick = event.params.tick
  swap.blockNumber = event.block.number
  swap.timestamp = event.block.timestamp
  swap.save()

  // Update pool stats
  pool.totalVolumeUSD = pool.totalVolumeUSD.plus(amount0_usd)
  pool.totalSwaps = pool.totalSwaps.plus(BigInt.fromI32(1))
  pool.save()

  // Update hourly bucket
  let hour = event.block.timestamp.toI32() / 3600
  let hourId = POOL_ID + "-" + hour.toString()
  let hourly = Hourly.load(hourId)
  if (!hourly) {
    hourly = new Hourly(hourId)
    hourly.pool = POOL_ID
    hourly.hourStart = BigInt.fromI32(hour * 3600)
    hourly.volumeUSD = BigDecimal.fromString("0")
    hourly.txCount = BigInt.fromI32(0)
    hourly.feeUSD = BigDecimal.fromString("0")
  }
  hourly.volumeUSD = hourly.volumeUSD.plus(amount0_usd)
  hourly.txCount = hourly.txCount.plus(BigInt.fromI32(1))
  hourly.feeUSD = hourly.feeUSD.plus(
    amount0_usd.times(BigDecimal.fromString("0.0005"))  // 0.05% fee
  )
  hourly.save()
}

1.4 部署 Subgraph

# 1. Install graph-cli
npm install -g @graphprotocol/graph-cli

# 2. Init project
graph init --product hosted-service usdc-weth-v3
cd usdc-weth-v3

# 3. Codegen + build
graph codegen && graph build

# 4. Auth + deploy
graph auth --product hosted-service <ACCESS_TOKEN>
graph deploy --product hosted-service yourname/usdc-weth-v3

Query example:

{
  pool(id: "0x88e6A0c2dDD26FEEb64F039a2c41296FcB3f5640") {
    totalVolumeUSD
    totalSwaps
    hourlyData(first: 24, orderBy: hourStart, orderDirection: desc) {
      hourStart
      volumeUSD
      feeUSD
    }
  }
}

2. 替代方案对比 / Alternatives Comparison

2.1 Goldsky (商业化 subgraph)

优势:

  • 部署速度 10x The Graph (几分钟 vs 几小时)
  • 支持 SQL queries (PostgreSQL direct)
  • Mirror to BigQuery / Snowflake
  • 30+ chains 支持

劣势:

  • 中心化, 月费 $0-2K+
  • 不去中心化 (无 GRT staking)

2.2 Flipside Crypto SDK

特点:

  • 直接查 SQL on Snowflake (Flipside 已 ETL'd 数据)
  • 支持 Python SDK
  • Free tier 充足, 适合分析

示例:

from flipside import Flipside
flipside = Flipside("API_KEY", "https://api.flipsidecrypto.com")
result = flipside.query("""
  SELECT date_trunc('hour', block_timestamp) as hour,
         sum(amount_in_usd) as volume
  FROM ethereum.dex.ez_swaps
  WHERE pool_address = LOWER('0x88e6...640')
  AND block_timestamp >= current_date - 7
  GROUP BY 1
""")

2.3 Dune SDK

特点:

  • SQL on Dune's curated tables
  • Best for DEX/lending/NFT queries
  • Web UI + API

2.4 自建 ETL (custom)

适合: 高频交易需要 < 100ms 数据延迟。 :

  • Erigon archive node
  • Custom Go/Rust indexer
  • ClickHouse / TimescaleDB
  • Redis cache

门槛: $50K-100K/year infra + ops。


3. 真实数据 / Real Data

PlatformAdoptionPricingUse case
The Graph hosted2,000+ subgraphsFree (deprecated) → migrating to NetworkDApp dev, fast prototype
The Graph Network1,000+ subgraphsGRT signal + query feesProduction decentralized
Goldsky500+ projects$0-2K/mo+Production with SLA
Flipside250K+ analystsFree + premiumAnalytics, dashboards
Dune500K+ usersFree + paid plansPublic dashboards
AlliumEnterprise$$$$Hedge funds, banks
FootprintMid-market$0-1K/moProtocol analytics

重要事件:

  • 2018-12: The Graph launched
  • 2020-12: GRT token launch
  • 2023-Q3: Hosted Service sunset 计划公布
  • 2024-Q1: Goldsky raises $20M Series A
  • 2024-Q4: Allium 获 $16.5M Series A,机构级数据 infra

4. 经济学分析 / Economic Analysis

4.1 数据基础设施市场

2025 估算市场规模:
  - 链上数据 SaaS:    $200-300M/yr
  - On-chain analytics (Dune/Flipside): $50-100M
  - Custom infra (机构 in-house): $500M+ (含人力)
  - 总市场:               $1B+

4.2 The Graph Network 经济

Indexer 收入 = query fee (USDC) + GRT inflation rewards
  - 2024 query fee 总额: ~$2-5M
  - GRT inflation: ~3% / yr
Curator 收入 = signal-on-subgraph 抽成
Delegator 收入 = stake-to-indexer 分成

经济问题: query fee revenue 远低于 inflation rewards → 长期 token 通胀压力

4.3 自建 vs 买的决策框架

Use case推荐
MEV searcher (低延迟)自建
DApp 前端 (合理延迟)The Graph / Goldsky
Analytics / dashboardDune / Flipside
机构 risk monitoringAllium / 自建
一次性 researchDune (free)

5. 机构视角 / Institutional Perspective

机构数据 stack 设计原则:

  1. 多源冗余: 同一指标 cross-check 至少 2 个 source (避免单一 bug)
  2. 延迟分级: 实时 (mempool, < 1s) / 近实时 (subgraph, 5-30s) / 历史 (Dune, batch)
  3. Reorg 处理: 对 ≤ 6 confirmation 的数据标 "preliminary"
  4. Audit trail: 所有数据查询保留 hash + timestamp, 用于合规 + 复盘

典型机构 stack:

Real-time:    Erigon node + custom Go listener → Kafka
Near-real:    Goldsky subgraph → Snowflake mirror
Historical:   Allium / Flipside SDK → BigQuery
Compliance:   Chainalysis / TRM Labs
Visual:       Internal dashboard (Grafana + Metabase)

机构 PM 关注点: 数据 vendor 是否 SOC2 compliant?是否支持私有部署 (on-prem)?数据 retention period?


6. 风险与陷阱 / Risks & Pitfalls

  1. Reorg 处理 bug: 7-block 内的数据若被 reorg 而 indexer 没回滚, 数据永久错误
  2. mapping.ts 性能问题: 复杂 logic 让 indexer 跟不上 head, 数据延迟超过 1 min
  3. schema migration 阵痛: schema 变更需重 sync 整个 history (几天到几周)
  4. 数据质量: USD 价值估算依赖外部 oracle, 不同 platform 答案不同 (Dune vs Flipside 5-15% 偏差)
  5. API rate limits: free tier 容易被打爆, 生产必须 paid plan
  6. 链 reorg: PoW 时代偶尔 12+ block reorg, PoS 后罕见但存在 (LMD-GHOST attacks)
  7. L2 finality: Optimistic rollup 7 天 challenge period 内数据可能 reorg

7. 关键速查 / Quick Reference

ToolEndpoint / Repo
The Graph CLInpm i -g @graphprotocol/graph-cli
Subgraph Studiohttps://thegraph.com/studio
Goldskyhttps://goldsky.com
Flipside SDKpip install flipside
Dune API docshttps://dune.com/docs/api
Allium APIhttps://docs.allium.so
Erigongithub.com/ledgerwatch/erigon
graph-nodegithub.com/graphprotocol/graph-node

Subgraph 调试工具:

  • graph-node 本地 + Postgres + IPFS docker-compose
  • graph debug CLI 命令
  • The Graph Indexer Office Hours (Discord)

8. 面试题 / Interview Questions

  1. 解释 Subgraph 与 Etherscan API 的本质区别。Subgraph 解决了什么问题?
  2. 设计一个 monitoring system 监控某 DEX pool 的异常行为 (e.g. 大额抽水, 价格操纵)。数据 stack 怎么选?
  3. Goldsky vs The Graph Network 怎么选?请从成本、SLA、去中心化、性能四个维度对比。
  4. 如果你是机构 DD 团队的 quant,如何用链上数据估算一个 vault protocol 的真实 TVL (排除 wash deposit)?
  5. Subgraph 处理 reorg 的核心机制是什么?如果 mapping 函数有 side effect (如 写外部 DB), 如何保证 idempotency?

9. 明日预告 / Tomorrow

Day 115: 实时交易基础设施 — 从 subgraph 的几十秒延迟,下沉到 mempool 的毫秒级延迟。我们将搭建 Geth/Erigon tracing、mempool 订阅、Chainstream 等实时 stack,并写一个 Python mempool listener。