返回 SC 笔记
SC Day 23

Rust async/await + tokio 运行时 + Future

### 一、为什么需要异步编程?

2026-04-23
第一阶段:基础构建 (Day 21-24)
RustasynctokioFuture并发区块链

日期: 2026-04-23 方向: Rust 阶段: 第一阶段:基础构建 (Day 21-24) 标签: #Rust #async #tokio #Future #并发 #区块链


今日目标

  1. 理解 Rust 异步编程的核心模型(Future trait + Poll)
  2. 掌握 tokio 运行时的配置和使用
  3. 学会 async fn.awaittokio::spawnselect!
  4. 使用 mpsc channel 实现任务间通信
  5. 编写异步 HTTP 客户端获取区块链数据

核心概念

一、为什么需要异步编程?

在区块链应用中,绝大部分操作都是 IO 密集型的:

操作类型耗时
调用 RPC 节点查询余额网络 IO50-500ms
查询多个链的数据并发网络 ION * 50-500ms (同步) / max(50-500ms) (异步)
读取本地区块数据磁盘 IO1-10ms
监听区块链事件 (WebSocket)长连接 IO持续
计算哈希CPU<1ms

同步模型下,查询 10 个链的余额需要串行等待 10 次网络请求(约 5 秒)。异步模型下,所有请求并发发出,总耗时等于最慢的那一个(约 500ms)。

二、Rust 异步模型基础

2.1 Future Trait

Rust 的异步核心是 Future trait:

pub trait Future {
    type Output;
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

pub enum Poll<T> {
    Ready(T),    // 计算完成,返回结果
    Pending,     // 尚未完成,稍后再 poll
}

关键理解

  • Future 是惰性的 — 创建 Future 不会执行任何操作
  • Future 需要被驱动 — 通过运行时反复 poll 直到 Ready
  • async fn 返回一个实现了 Future 的匿名类型
  • .await 是让出控制权给运行时,等待 Future 完成
async fn 的执行流程:

1. 调用 async fn → 返回 Future (不执行!)
2. .await → 运行时开始 poll
3. poll() 返回 Pending → 运行时去执行其他任务
4. IO 就绪通知 → 运行时再次 poll
5. poll() 返回 Ready(value) → .await 返回 value

2.2 async/await 语法

// async fn 自动返回 impl Future<Output = T>
async fn fetch_balance(address: &str) -> Result<u64, String> {
    // .await 让出控制权,等待结果
    let response = make_rpc_call(address).await?;
    let balance = parse_response(response).await?;
    Ok(balance)
}

// 等价于 (概念上):
fn fetch_balance(address: &str) -> impl Future<Output = Result<u64, String>> {
    async move {
        let response = make_rpc_call(address).await?;
        let balance = parse_response(response).await?;
        Ok(balance)
    }
}

三、tokio 运行时

tokio 是 Rust 生态最主流的异步运行时,提供:

功能说明
多线程调度器工作窃取算法分配任务到线程池
IO 驱动基于 epoll/kqueue/IOCP 的非阻塞 IO
定时器tokio::time::sleep, interval, timeout
同步原语Mutex, RwLock, Semaphore, Notify
通道mpsc, oneshot, broadcast, watch

3.1 运行时配置

// 方式一: 宏标注 (最常用)
#[tokio::main]
async fn main() {
    println!("Running on tokio runtime");
}

// 方式二: 手动构建 (更灵活)
fn main() {
    let rt = tokio::runtime::Builder::new_multi_thread()
        .worker_threads(4)           // 工作线程数
        .enable_all()                // 启用 IO + 时间驱动
        .thread_name("blockchain")   // 线程名前缀
        .build()
        .unwrap();

    rt.block_on(async {
        println!("Running on custom runtime");
    });
}

// 方式三: 单线程运行时 (测试用)
#[tokio::main(flavor = "current_thread")]
async fn main() {
    println!("Single-threaded runtime");
}

3.2 tokio::spawn — 任务调度

tokio::spawn 将 Future 提交给运行时并发执行:

use tokio::task::JoinHandle;

#[tokio::main]
async fn main() {
    // spawn 返回 JoinHandle,可以 await 获取结果
    let handle: JoinHandle<u64> = tokio::spawn(async {
        // 这个任务会在另一个线程上执行
        expensive_computation().await
    });

    // 同时可以做其他事情
    do_something_else().await;

    // 等待 spawn 的任务完成
    let result = handle.await.unwrap(); // JoinError 如果任务 panic
    println!("Result: {}", result);
}

spawn 的要求

  • Future 必须是 'static(不能引用栈上的数据)
  • Future 必须是 Send(可在线程间传递)

3.3 tokio::select! — 竞争选择

select! 同时等待多个 Future,第一个完成的胜出:

use tokio::time::{sleep, Duration};

async fn fetch_from_primary() -> String {
    sleep(Duration::from_millis(100)).await;
    "primary data".to_string()
}

async fn fetch_from_backup() -> String {
    sleep(Duration::from_millis(200)).await;
    "backup data".to_string()
}

#[tokio::main]
async fn main() {
    // 竞争: 谁先完成用谁
    let result = tokio::select! {
        data = fetch_from_primary() => {
            println!("Got from primary");
            data
        }
        data = fetch_from_backup() => {
            println!("Got from backup");
            data
        }
    };
    println!("Result: {}", result);

    // 超时模式
    tokio::select! {
        result = long_running_task() => {
            println!("Task completed: {:?}", result);
        }
        _ = sleep(Duration::from_secs(5)) => {
            println!("Timeout! Task took too long");
        }
    }
}

async fn long_running_task() -> u64 {
    sleep(Duration::from_secs(10)).await;
    42
}

3.4 mpsc Channel — 任务间通信

use tokio::sync::mpsc;

#[derive(Debug)]
enum BlockchainEvent {
    NewBlock { number: u64, hash: String },
    NewTransaction { hash: String, value: u64 },
    PriceUpdate { token: String, price: f64 },
}

#[tokio::main]
async fn main() {
    // 创建有界通道 (缓冲 100 条消息)
    let (tx, mut rx) = mpsc::channel::<BlockchainEvent>(100);

    // 生产者 1: 区块监听
    let tx1 = tx.clone();
    tokio::spawn(async move {
        for i in 1..=5 {
            let event = BlockchainEvent::NewBlock {
                number: 18000000 + i,
                hash: format!("0x{:064x}", i),
            };
            tx1.send(event).await.unwrap();
            tokio::time::sleep(Duration::from_millis(500)).await;
        }
    });

    // 生产者 2: 价格监听
    let tx2 = tx.clone();
    tokio::spawn(async move {
        let prices = vec![("ETH", 3500.0), ("BTC", 65000.0), ("SOL", 150.0)];
        for (token, price) in prices {
            let event = BlockchainEvent::PriceUpdate {
                token: token.to_string(),
                price,
            };
            tx2.send(event).await.unwrap();
            tokio::time::sleep(Duration::from_millis(300)).await;
        }
    });

    // 释放原始 tx,这样所有生产者结束后 rx 会收到 None
    drop(tx);

    // 消费者: 处理事件
    while let Some(event) = rx.recv().await {
        match event {
            BlockchainEvent::NewBlock { number, hash } => {
                println!("New Block #{}: {}", number, &hash[..10]);
            }
            BlockchainEvent::NewTransaction { hash, value } => {
                println!("New TX {}: {} wei", &hash[..10], value);
            }
            BlockchainEvent::PriceUpdate { token, price } => {
                println!("Price Update: {} = ${:.2}", token, price);
            }
        }
    }

    println!("All producers finished");
}

use tokio::time::Duration;

代码实战

异步 HTTP 客户端 — 获取区块链数据

// Cargo.toml 依赖:
// [dependencies]
// tokio = { version = "1", features = ["full"] }
// reqwest = { version = "0.12", features = ["json"] }
// serde = { version = "1", features = ["derive"] }
// serde_json = "1"

use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Instant;
use tokio::time::{timeout, Duration};

// ============ 数据结构 ============

#[derive(Debug, Serialize)]
struct JsonRpcRequest {
    jsonrpc: String,
    method: String,
    params: Vec<serde_json::Value>,
    id: u64,
}

#[derive(Debug, Deserialize)]
struct JsonRpcResponse {
    jsonrpc: String,
    id: u64,
    result: Option<serde_json::Value>,
    error: Option<JsonRpcError>,
}

#[derive(Debug, Deserialize)]
struct JsonRpcError {
    code: i64,
    message: String,
}

#[derive(Debug, Clone)]
struct ChainConfig {
    name: String,
    rpc_url: String,
    chain_id: u64,
}

#[derive(Debug)]
struct BalanceResult {
    chain: String,
    address: String,
    balance_wei: u128,
    balance_eth: f64,
    latency_ms: u128,
}

// ============ RPC 客户端 ============

struct BlockchainClient {
    http_client: Client,
}

impl BlockchainClient {
    fn new() -> Self {
        let http_client = Client::builder()
            .timeout(Duration::from_secs(10))
            .build()
            .expect("Failed to create HTTP client");

        Self { http_client }
    }

    /// 查询 ETH 余额
    async fn get_balance(
        &self,
        rpc_url: &str,
        address: &str,
    ) -> Result<u128, Box<dyn std::error::Error + Send + Sync>> {
        let request = JsonRpcRequest {
            jsonrpc: "2.0".to_string(),
            method: "eth_getBalance".to_string(),
            params: vec![
                serde_json::Value::String(address.to_string()),
                serde_json::Value::String("latest".to_string()),
            ],
            id: 1,
        };

        let response: JsonRpcResponse = self
            .http_client
            .post(rpc_url)
            .json(&request)
            .send()
            .await?
            .json()
            .await?;

        if let Some(error) = response.error {
            return Err(format!("RPC Error: {}", error.message).into());
        }

        let hex_balance = response
            .result
            .ok_or("No result")?
            .as_str()
            .ok_or("Result not string")?
            .to_string();

        // 解析十六进制余额 "0x..." -> u128
        let balance = u128::from_str_radix(hex_balance.trim_start_matches("0x"), 16)?;
        Ok(balance)
    }

    /// 查询最新区块号
    async fn get_block_number(
        &self,
        rpc_url: &str,
    ) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
        let request = JsonRpcRequest {
            jsonrpc: "2.0".to_string(),
            method: "eth_blockNumber".to_string(),
            params: vec![],
            id: 1,
        };

        let response: JsonRpcResponse = self
            .http_client
            .post(rpc_url)
            .json(&request)
            .send()
            .await?
            .json()
            .await?;

        let hex_number = response
            .result
            .ok_or("No result")?
            .as_str()
            .ok_or("Not string")?
            .to_string();

        let block_number = u64::from_str_radix(hex_number.trim_start_matches("0x"), 16)?;
        Ok(block_number)
    }

    /// 并发查询多条链的余额
    async fn get_multichain_balance(
        &self,
        chains: &[ChainConfig],
        address: &str,
    ) -> Vec<BalanceResult> {
        let mut handles = Vec::new();

        for chain in chains {
            let client = self.http_client.clone();
            let rpc_url = chain.rpc_url.clone();
            let chain_name = chain.name.clone();
            let addr = address.to_string();

            // 为每条链 spawn 一个异步任务
            let handle = tokio::spawn(async move {
                let start = Instant::now();
                let bc = BlockchainClient {
                    http_client: client,
                };

                // 3秒超时
                let result = timeout(
                    Duration::from_secs(3),
                    bc.get_balance(&rpc_url, &addr),
                )
                .await;

                let latency = start.elapsed().as_millis();

                match result {
                    Ok(Ok(balance_wei)) => BalanceResult {
                        chain: chain_name,
                        address: addr,
                        balance_wei,
                        balance_eth: balance_wei as f64 / 1e18,
                        latency_ms: latency,
                    },
                    Ok(Err(e)) => {
                        eprintln!("[{}] Error: {}", chain_name, e);
                        BalanceResult {
                            chain: chain_name,
                            address: addr,
                            balance_wei: 0,
                            balance_eth: 0.0,
                            latency_ms: latency,
                        }
                    }
                    Err(_) => {
                        eprintln!("[{}] Timeout after 3s", chain_name);
                        BalanceResult {
                            chain: chain_name,
                            address: addr,
                            balance_wei: 0,
                            balance_eth: 0.0,
                            latency_ms: 3000,
                        }
                    }
                }
            });

            handles.push(handle);
        }

        // 等待所有任务完成
        let mut results = Vec::new();
        for handle in handles {
            if let Ok(result) = handle.await {
                results.push(result);
            }
        }

        results
    }
}

// ============ 主函数 ============

#[tokio::main]
async fn main() {
    println!("=== Blockchain Multi-Chain Balance Checker ===\n");

    let chains = vec![
        ChainConfig {
            name: "Ethereum".to_string(),
            rpc_url: "https://eth.llamarpc.com".to_string(),
            chain_id: 1,
        },
        ChainConfig {
            name: "Arbitrum".to_string(),
            rpc_url: "https://arb1.arbitrum.io/rpc".to_string(),
            chain_id: 42161,
        },
        ChainConfig {
            name: "Optimism".to_string(),
            rpc_url: "https://mainnet.optimism.io".to_string(),
            chain_id: 10,
        },
        ChainConfig {
            name: "Base".to_string(),
            rpc_url: "https://mainnet.base.org".to_string(),
            chain_id: 8453,
        },
        ChainConfig {
            name: "Polygon".to_string(),
            rpc_url: "https://polygon-rpc.com".to_string(),
            chain_id: 137,
        },
    ];

    // Vitalik 的地址
    let address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045";
    let client = BlockchainClient::new();

    // 并发查询所有链
    let start = Instant::now();
    let results = client.get_multichain_balance(&chains, address).await;
    let total_time = start.elapsed().as_millis();

    // 打印结果
    println!("{:<12} {:>15} {:>10}", "Chain", "Balance (ETH)", "Latency");
    println!("{}", "-".repeat(40));

    let mut total_eth = 0.0;
    for r in &results {
        println!(
            "{:<12} {:>15.6} {:>8}ms",
            r.chain, r.balance_eth, r.latency_ms
        );
        total_eth += r.balance_eth;
    }

    println!("{}", "-".repeat(40));
    println!("Total:       {:>15.6} ETH", total_eth);
    println!("Total time:  {}ms (concurrent)", total_time);
    println!(
        "vs serial:   ~{}ms (estimated)",
        results.iter().map(|r| r.latency_ms).sum::<u128>()
    );
}

使用 select! 实现带超时的事件监听

use tokio::sync::mpsc;
use tokio::time::{interval, sleep, Duration, Instant};

#[derive(Debug)]
enum Command {
    CheckBalance(String),    // 查询余额
    GetBlockNumber,          // 获取区块号
    Shutdown,                // 关闭
}

/// 事件循环: 监听命令 + 定时心跳 + 超时退出
async fn event_loop(mut cmd_rx: mpsc::Receiver<Command>) {
    let mut heartbeat = interval(Duration::from_secs(5));
    let shutdown_timer = sleep(Duration::from_secs(30));
    tokio::pin!(shutdown_timer);

    let mut request_count: u64 = 0;

    loop {
        tokio::select! {
            // 分支1: 收到命令
            Some(cmd) = cmd_rx.recv() => {
                match cmd {
                    Command::CheckBalance(addr) => {
                        request_count += 1;
                        println!("[#{}] Checking balance for: {}...", request_count, &addr[..10]);
                        // 实际应调用 RPC
                    }
                    Command::GetBlockNumber => {
                        request_count += 1;
                        println!("[#{}] Getting block number...", request_count);
                    }
                    Command::Shutdown => {
                        println!("Received shutdown command");
                        break;
                    }
                }
            }

            // 分支2: 定时心跳
            _ = heartbeat.tick() => {
                println!("[Heartbeat] Alive. Requests processed: {}", request_count);
            }

            // 分支3: 总超时
            _ = &mut shutdown_timer => {
                println!("[Timeout] Auto-shutdown after 30s");
                break;
            }
        }
    }

    println!("Event loop terminated. Total requests: {}", request_count);
}

#[tokio::main]
async fn main() {
    let (tx, rx) = mpsc::channel::<Command>(32);

    // 启动事件循环
    let loop_handle = tokio::spawn(event_loop(rx));

    // 模拟发送命令
    let addresses = vec![
        "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
        "0x742d35Cc6634C0532925a3b844Bc9e7595f2bD38",
        "0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B",
    ];

    for addr in addresses {
        tx.send(Command::CheckBalance(addr.to_string())).await.unwrap();
        sleep(Duration::from_millis(200)).await;
    }

    tx.send(Command::GetBlockNumber).await.unwrap();
    sleep(Duration::from_secs(6)).await; // 等一个心跳
    tx.send(Command::Shutdown).await.unwrap();

    loop_handle.await.unwrap();
}

关键要点总结

async/await 心智模型

1. async fn 返回 Future (惰性, 不执行)
2. .await 驱动 Future 执行 (让出控制权)
3. tokio::spawn 并发执行任务
4. select! 多路复用 (谁先完成用谁)
5. mpsc channel 跨任务通信

tokio 常用模式

模式代码场景
并发请求JoinSet / futures::join_all多链并发查询
超时tokio::time::timeout(dur, fut)RPC 超时控制
定时任务tokio::time::interval(dur)定期轮询区块
竞争tokio::select!多源取最快
限流tokio::sync::Semaphore限制并发 RPC 数

区块链应用中的 async 模式

场景模式
多链余额查询spawn + join_all
区块监听事件循环 + select!
交易广播发送到多个节点 + 取最快确认
价格聚合并发请求多个 API + 中位数

常见误区

  1. 在 async 中使用 std::thread::sleep: 会阻塞整个线程!必须用 tokio::time::sleep
  2. 忘记 .await: async fn 只返回 Future,不 .await 就不执行
  3. spawn 中引用栈上变量: spawn 的 Future 必须是 'static,需要 moveclone
  4. 过度 spawn: 每个 spawn 有调度开销,简单的顺序操作不需要 spawn
  5. 在 tokio runtime 中调用阻塞函数: 用 tokio::task::spawn_blocking 处理 CPU 密集操作
  6. mpsc sender 没有 drop: 接收端不会收到 Nonewhile let Some 永远等待

面试关联

面试题本课关联
"Rust 的 async/await 和 Go 的 goroutine 有什么区别?"Rust 的 Future 是 lazy(需要 poll),Go 的 goroutine 是 eager
"如何实现多链并发查询?"tokio::spawn + JoinSet
"如何处理 RPC 超时和重试?"timeout + select! + 重试逻辑
"Rust async 中常见的陷阱?"阻塞调用、忘记 await、'static 约束
"tokio 的 select! 和 Go 的 select 有什么区别?"tokio select! 是宏展开,Go select 是语言级别

参考资源