SC Day 23
Rust async/await + tokio 运行时 + Future
### 一、为什么需要异步编程?
2026-04-23
第一阶段:基础构建 (Day 21-24)RustasynctokioFuture并发区块链
日期: 2026-04-23 方向: Rust 阶段: 第一阶段:基础构建 (Day 21-24) 标签: #Rust #async #tokio #Future #并发 #区块链
今日目标
- 理解 Rust 异步编程的核心模型(Future trait + Poll)
- 掌握 tokio 运行时的配置和使用
- 学会
async fn、.await、tokio::spawn、select! - 使用
mpscchannel 实现任务间通信 - 编写异步 HTTP 客户端获取区块链数据
核心概念
一、为什么需要异步编程?
在区块链应用中,绝大部分操作都是 IO 密集型的:
| 操作 | 类型 | 耗时 |
|---|---|---|
| 调用 RPC 节点查询余额 | 网络 IO | 50-500ms |
| 查询多个链的数据 | 并发网络 IO | N * 50-500ms (同步) / max(50-500ms) (异步) |
| 读取本地区块数据 | 磁盘 IO | 1-10ms |
| 监听区块链事件 (WebSocket) | 长连接 IO | 持续 |
| 计算哈希 | CPU | <1ms |
同步模型下,查询 10 个链的余额需要串行等待 10 次网络请求(约 5 秒)。异步模型下,所有请求并发发出,总耗时等于最慢的那一个(约 500ms)。
二、Rust 异步模型基础
2.1 Future Trait
Rust 的异步核心是 Future trait:
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}
pub enum Poll<T> {
Ready(T), // 计算完成,返回结果
Pending, // 尚未完成,稍后再 poll
}
关键理解:
- Future 是惰性的 — 创建 Future 不会执行任何操作
- Future 需要被驱动 — 通过运行时反复
poll直到Ready async fn返回一个实现了Future的匿名类型.await是让出控制权给运行时,等待 Future 完成
async fn 的执行流程:
1. 调用 async fn → 返回 Future (不执行!)
2. .await → 运行时开始 poll
3. poll() 返回 Pending → 运行时去执行其他任务
4. IO 就绪通知 → 运行时再次 poll
5. poll() 返回 Ready(value) → .await 返回 value
2.2 async/await 语法
// async fn 自动返回 impl Future<Output = T>
async fn fetch_balance(address: &str) -> Result<u64, String> {
// .await 让出控制权,等待结果
let response = make_rpc_call(address).await?;
let balance = parse_response(response).await?;
Ok(balance)
}
// 等价于 (概念上):
fn fetch_balance(address: &str) -> impl Future<Output = Result<u64, String>> {
async move {
let response = make_rpc_call(address).await?;
let balance = parse_response(response).await?;
Ok(balance)
}
}
三、tokio 运行时
tokio 是 Rust 生态最主流的异步运行时,提供:
| 功能 | 说明 |
|---|---|
| 多线程调度器 | 工作窃取算法分配任务到线程池 |
| IO 驱动 | 基于 epoll/kqueue/IOCP 的非阻塞 IO |
| 定时器 | tokio::time::sleep, interval, timeout |
| 同步原语 | Mutex, RwLock, Semaphore, Notify |
| 通道 | mpsc, oneshot, broadcast, watch |
3.1 运行时配置
// 方式一: 宏标注 (最常用)
#[tokio::main]
async fn main() {
println!("Running on tokio runtime");
}
// 方式二: 手动构建 (更灵活)
fn main() {
let rt = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4) // 工作线程数
.enable_all() // 启用 IO + 时间驱动
.thread_name("blockchain") // 线程名前缀
.build()
.unwrap();
rt.block_on(async {
println!("Running on custom runtime");
});
}
// 方式三: 单线程运行时 (测试用)
#[tokio::main(flavor = "current_thread")]
async fn main() {
println!("Single-threaded runtime");
}
3.2 tokio::spawn — 任务调度
tokio::spawn 将 Future 提交给运行时并发执行:
use tokio::task::JoinHandle;
#[tokio::main]
async fn main() {
// spawn 返回 JoinHandle,可以 await 获取结果
let handle: JoinHandle<u64> = tokio::spawn(async {
// 这个任务会在另一个线程上执行
expensive_computation().await
});
// 同时可以做其他事情
do_something_else().await;
// 等待 spawn 的任务完成
let result = handle.await.unwrap(); // JoinError 如果任务 panic
println!("Result: {}", result);
}
spawn 的要求:
- Future 必须是
'static(不能引用栈上的数据) - Future 必须是
Send(可在线程间传递)
3.3 tokio::select! — 竞争选择
select! 同时等待多个 Future,第一个完成的胜出:
use tokio::time::{sleep, Duration};
async fn fetch_from_primary() -> String {
sleep(Duration::from_millis(100)).await;
"primary data".to_string()
}
async fn fetch_from_backup() -> String {
sleep(Duration::from_millis(200)).await;
"backup data".to_string()
}
#[tokio::main]
async fn main() {
// 竞争: 谁先完成用谁
let result = tokio::select! {
data = fetch_from_primary() => {
println!("Got from primary");
data
}
data = fetch_from_backup() => {
println!("Got from backup");
data
}
};
println!("Result: {}", result);
// 超时模式
tokio::select! {
result = long_running_task() => {
println!("Task completed: {:?}", result);
}
_ = sleep(Duration::from_secs(5)) => {
println!("Timeout! Task took too long");
}
}
}
async fn long_running_task() -> u64 {
sleep(Duration::from_secs(10)).await;
42
}
3.4 mpsc Channel — 任务间通信
use tokio::sync::mpsc;
#[derive(Debug)]
enum BlockchainEvent {
NewBlock { number: u64, hash: String },
NewTransaction { hash: String, value: u64 },
PriceUpdate { token: String, price: f64 },
}
#[tokio::main]
async fn main() {
// 创建有界通道 (缓冲 100 条消息)
let (tx, mut rx) = mpsc::channel::<BlockchainEvent>(100);
// 生产者 1: 区块监听
let tx1 = tx.clone();
tokio::spawn(async move {
for i in 1..=5 {
let event = BlockchainEvent::NewBlock {
number: 18000000 + i,
hash: format!("0x{:064x}", i),
};
tx1.send(event).await.unwrap();
tokio::time::sleep(Duration::from_millis(500)).await;
}
});
// 生产者 2: 价格监听
let tx2 = tx.clone();
tokio::spawn(async move {
let prices = vec![("ETH", 3500.0), ("BTC", 65000.0), ("SOL", 150.0)];
for (token, price) in prices {
let event = BlockchainEvent::PriceUpdate {
token: token.to_string(),
price,
};
tx2.send(event).await.unwrap();
tokio::time::sleep(Duration::from_millis(300)).await;
}
});
// 释放原始 tx,这样所有生产者结束后 rx 会收到 None
drop(tx);
// 消费者: 处理事件
while let Some(event) = rx.recv().await {
match event {
BlockchainEvent::NewBlock { number, hash } => {
println!("New Block #{}: {}", number, &hash[..10]);
}
BlockchainEvent::NewTransaction { hash, value } => {
println!("New TX {}: {} wei", &hash[..10], value);
}
BlockchainEvent::PriceUpdate { token, price } => {
println!("Price Update: {} = ${:.2}", token, price);
}
}
}
println!("All producers finished");
}
use tokio::time::Duration;
代码实战
异步 HTTP 客户端 — 获取区块链数据
// Cargo.toml 依赖:
// [dependencies]
// tokio = { version = "1", features = ["full"] }
// reqwest = { version = "0.12", features = ["json"] }
// serde = { version = "1", features = ["derive"] }
// serde_json = "1"
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::time::Instant;
use tokio::time::{timeout, Duration};
// ============ 数据结构 ============
#[derive(Debug, Serialize)]
struct JsonRpcRequest {
jsonrpc: String,
method: String,
params: Vec<serde_json::Value>,
id: u64,
}
#[derive(Debug, Deserialize)]
struct JsonRpcResponse {
jsonrpc: String,
id: u64,
result: Option<serde_json::Value>,
error: Option<JsonRpcError>,
}
#[derive(Debug, Deserialize)]
struct JsonRpcError {
code: i64,
message: String,
}
#[derive(Debug, Clone)]
struct ChainConfig {
name: String,
rpc_url: String,
chain_id: u64,
}
#[derive(Debug)]
struct BalanceResult {
chain: String,
address: String,
balance_wei: u128,
balance_eth: f64,
latency_ms: u128,
}
// ============ RPC 客户端 ============
struct BlockchainClient {
http_client: Client,
}
impl BlockchainClient {
fn new() -> Self {
let http_client = Client::builder()
.timeout(Duration::from_secs(10))
.build()
.expect("Failed to create HTTP client");
Self { http_client }
}
/// 查询 ETH 余额
async fn get_balance(
&self,
rpc_url: &str,
address: &str,
) -> Result<u128, Box<dyn std::error::Error + Send + Sync>> {
let request = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
method: "eth_getBalance".to_string(),
params: vec![
serde_json::Value::String(address.to_string()),
serde_json::Value::String("latest".to_string()),
],
id: 1,
};
let response: JsonRpcResponse = self
.http_client
.post(rpc_url)
.json(&request)
.send()
.await?
.json()
.await?;
if let Some(error) = response.error {
return Err(format!("RPC Error: {}", error.message).into());
}
let hex_balance = response
.result
.ok_or("No result")?
.as_str()
.ok_or("Result not string")?
.to_string();
// 解析十六进制余额 "0x..." -> u128
let balance = u128::from_str_radix(hex_balance.trim_start_matches("0x"), 16)?;
Ok(balance)
}
/// 查询最新区块号
async fn get_block_number(
&self,
rpc_url: &str,
) -> Result<u64, Box<dyn std::error::Error + Send + Sync>> {
let request = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
method: "eth_blockNumber".to_string(),
params: vec![],
id: 1,
};
let response: JsonRpcResponse = self
.http_client
.post(rpc_url)
.json(&request)
.send()
.await?
.json()
.await?;
let hex_number = response
.result
.ok_or("No result")?
.as_str()
.ok_or("Not string")?
.to_string();
let block_number = u64::from_str_radix(hex_number.trim_start_matches("0x"), 16)?;
Ok(block_number)
}
/// 并发查询多条链的余额
async fn get_multichain_balance(
&self,
chains: &[ChainConfig],
address: &str,
) -> Vec<BalanceResult> {
let mut handles = Vec::new();
for chain in chains {
let client = self.http_client.clone();
let rpc_url = chain.rpc_url.clone();
let chain_name = chain.name.clone();
let addr = address.to_string();
// 为每条链 spawn 一个异步任务
let handle = tokio::spawn(async move {
let start = Instant::now();
let bc = BlockchainClient {
http_client: client,
};
// 3秒超时
let result = timeout(
Duration::from_secs(3),
bc.get_balance(&rpc_url, &addr),
)
.await;
let latency = start.elapsed().as_millis();
match result {
Ok(Ok(balance_wei)) => BalanceResult {
chain: chain_name,
address: addr,
balance_wei,
balance_eth: balance_wei as f64 / 1e18,
latency_ms: latency,
},
Ok(Err(e)) => {
eprintln!("[{}] Error: {}", chain_name, e);
BalanceResult {
chain: chain_name,
address: addr,
balance_wei: 0,
balance_eth: 0.0,
latency_ms: latency,
}
}
Err(_) => {
eprintln!("[{}] Timeout after 3s", chain_name);
BalanceResult {
chain: chain_name,
address: addr,
balance_wei: 0,
balance_eth: 0.0,
latency_ms: 3000,
}
}
}
});
handles.push(handle);
}
// 等待所有任务完成
let mut results = Vec::new();
for handle in handles {
if let Ok(result) = handle.await {
results.push(result);
}
}
results
}
}
// ============ 主函数 ============
#[tokio::main]
async fn main() {
println!("=== Blockchain Multi-Chain Balance Checker ===\n");
let chains = vec![
ChainConfig {
name: "Ethereum".to_string(),
rpc_url: "https://eth.llamarpc.com".to_string(),
chain_id: 1,
},
ChainConfig {
name: "Arbitrum".to_string(),
rpc_url: "https://arb1.arbitrum.io/rpc".to_string(),
chain_id: 42161,
},
ChainConfig {
name: "Optimism".to_string(),
rpc_url: "https://mainnet.optimism.io".to_string(),
chain_id: 10,
},
ChainConfig {
name: "Base".to_string(),
rpc_url: "https://mainnet.base.org".to_string(),
chain_id: 8453,
},
ChainConfig {
name: "Polygon".to_string(),
rpc_url: "https://polygon-rpc.com".to_string(),
chain_id: 137,
},
];
// Vitalik 的地址
let address = "0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045";
let client = BlockchainClient::new();
// 并发查询所有链
let start = Instant::now();
let results = client.get_multichain_balance(&chains, address).await;
let total_time = start.elapsed().as_millis();
// 打印结果
println!("{:<12} {:>15} {:>10}", "Chain", "Balance (ETH)", "Latency");
println!("{}", "-".repeat(40));
let mut total_eth = 0.0;
for r in &results {
println!(
"{:<12} {:>15.6} {:>8}ms",
r.chain, r.balance_eth, r.latency_ms
);
total_eth += r.balance_eth;
}
println!("{}", "-".repeat(40));
println!("Total: {:>15.6} ETH", total_eth);
println!("Total time: {}ms (concurrent)", total_time);
println!(
"vs serial: ~{}ms (estimated)",
results.iter().map(|r| r.latency_ms).sum::<u128>()
);
}
使用 select! 实现带超时的事件监听
use tokio::sync::mpsc;
use tokio::time::{interval, sleep, Duration, Instant};
#[derive(Debug)]
enum Command {
CheckBalance(String), // 查询余额
GetBlockNumber, // 获取区块号
Shutdown, // 关闭
}
/// 事件循环: 监听命令 + 定时心跳 + 超时退出
async fn event_loop(mut cmd_rx: mpsc::Receiver<Command>) {
let mut heartbeat = interval(Duration::from_secs(5));
let shutdown_timer = sleep(Duration::from_secs(30));
tokio::pin!(shutdown_timer);
let mut request_count: u64 = 0;
loop {
tokio::select! {
// 分支1: 收到命令
Some(cmd) = cmd_rx.recv() => {
match cmd {
Command::CheckBalance(addr) => {
request_count += 1;
println!("[#{}] Checking balance for: {}...", request_count, &addr[..10]);
// 实际应调用 RPC
}
Command::GetBlockNumber => {
request_count += 1;
println!("[#{}] Getting block number...", request_count);
}
Command::Shutdown => {
println!("Received shutdown command");
break;
}
}
}
// 分支2: 定时心跳
_ = heartbeat.tick() => {
println!("[Heartbeat] Alive. Requests processed: {}", request_count);
}
// 分支3: 总超时
_ = &mut shutdown_timer => {
println!("[Timeout] Auto-shutdown after 30s");
break;
}
}
}
println!("Event loop terminated. Total requests: {}", request_count);
}
#[tokio::main]
async fn main() {
let (tx, rx) = mpsc::channel::<Command>(32);
// 启动事件循环
let loop_handle = tokio::spawn(event_loop(rx));
// 模拟发送命令
let addresses = vec![
"0xd8dA6BF26964aF9D7eEd9e03E53415D37aA96045",
"0x742d35Cc6634C0532925a3b844Bc9e7595f2bD38",
"0xAb5801a7D398351b8bE11C439e05C5B3259aeC9B",
];
for addr in addresses {
tx.send(Command::CheckBalance(addr.to_string())).await.unwrap();
sleep(Duration::from_millis(200)).await;
}
tx.send(Command::GetBlockNumber).await.unwrap();
sleep(Duration::from_secs(6)).await; // 等一个心跳
tx.send(Command::Shutdown).await.unwrap();
loop_handle.await.unwrap();
}
关键要点总结
async/await 心智模型
1. async fn 返回 Future (惰性, 不执行)
2. .await 驱动 Future 执行 (让出控制权)
3. tokio::spawn 并发执行任务
4. select! 多路复用 (谁先完成用谁)
5. mpsc channel 跨任务通信
tokio 常用模式
| 模式 | 代码 | 场景 |
|---|---|---|
| 并发请求 | JoinSet / futures::join_all | 多链并发查询 |
| 超时 | tokio::time::timeout(dur, fut) | RPC 超时控制 |
| 定时任务 | tokio::time::interval(dur) | 定期轮询区块 |
| 竞争 | tokio::select! | 多源取最快 |
| 限流 | tokio::sync::Semaphore | 限制并发 RPC 数 |
区块链应用中的 async 模式
| 场景 | 模式 |
|---|---|
| 多链余额查询 | spawn + join_all |
| 区块监听 | 事件循环 + select! |
| 交易广播 | 发送到多个节点 + 取最快确认 |
| 价格聚合 | 并发请求多个 API + 中位数 |
常见误区
- 在 async 中使用
std::thread::sleep: 会阻塞整个线程!必须用tokio::time::sleep - 忘记
.await:async fn只返回 Future,不.await就不执行 - spawn 中引用栈上变量: spawn 的 Future 必须是
'static,需要move或clone - 过度 spawn: 每个 spawn 有调度开销,简单的顺序操作不需要 spawn
- 在 tokio runtime 中调用阻塞函数: 用
tokio::task::spawn_blocking处理 CPU 密集操作 - mpsc sender 没有 drop: 接收端不会收到
None,while let Some永远等待
面试关联
| 面试题 | 本课关联 |
|---|---|
| "Rust 的 async/await 和 Go 的 goroutine 有什么区别?" | Rust 的 Future 是 lazy(需要 poll),Go 的 goroutine 是 eager |
| "如何实现多链并发查询?" | tokio::spawn + JoinSet |
| "如何处理 RPC 超时和重试?" | timeout + select! + 重试逻辑 |
| "Rust async 中常见的陷阱?" | 阻塞调用、忘记 await、'static 约束 |
| "tokio 的 select! 和 Go 的 select 有什么区别?" | tokio select! 是宏展开,Go select 是语言级别 |