Skip to content

异步模式

> 掌握异步编程中的常见模式和最佳实践。

异步迭代器

Stream

Stream 是异步版本的 Iterator:

rust
// futures::Stream trait(类似 Iterator)
pub trait Stream {
    type Item;
    
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) 
        -> Poll<Option<Self::Item>>;
}
▶ Run

使用 Stream

rust
use futures::StreamExt;
use tokio_stream::wrappers::ReceiverStream;

async fn stream_example() {
    let (tx, rx) = tokio::sync::mpsc::channel(10);
    
    // 将通道转换为 Stream
    let stream = ReceiverStream::new(rx);
    
    // 发送数据
    tokio::spawn(async move {
        for i in 0..5 {
            tx.send(i).await.unwrap();
        }
    });
    
    // 消费 Stream
    while let Some(value) = stream.next().await {
        println!("Received: {}", value);
    }
}
▶ Run

Stream 组合器

rust
use futures::StreamExt;

async fn stream_operations() {
    let stream = futures::stream::iter(0..10);
    
    // map: 转换每个元素
    let mapped = stream.map(|x| x * 2);
    
    // filter: 过滤元素
    let filtered = mapped.filter(|x| futures::future::ready(x > 5));
    
    // take: 取前 N 个
    let taken = filtered.take(3);
    
    // collect: 收集结果
    let result: Vec<i32> = taken.collect().await;
    println!("Result: {:?}", result);  // [6, 8, 10]
}
▶ Run

并发处理 Stream

rust
use futures::StreamExt;

async fn concurrent_processing() {
    let stream = futures::stream::iter(1..=10);
    
    // for_each_concurrent: 并发处理
    stream
        .for_each_concurrent(5, |item| async move {
            process(item).await;
        })
        .await;
}

// buffer_unordered: 并发执行,无序返回
async fn buffer_example() {
    let stream = futures::stream::iter(1..=10)
        .map(|x| async move {
            tokio::time::sleep(Duration::from_millis(100 * x)).await;
            x
        });
    
    let results = stream
        .buffer_unordered(5)  // 最多 5 个并发
        .collect::<Vec<_>>()
        .await;
}
▶ Run

异步通道

mpsc - 多生产者单消费者

rust
use tokio::sync::mpsc;

async fn mpsc_channel() {
    let (tx, mut rx) = mpsc::channel::<i32>(32);
    
    // 多个生产者
    let producers: Vec<_> = (0..3)
        .map(|id| {
            let tx = tx.clone();
            tokio::spawn(async move {
                for i in 0..5 {
                    tx.send(id * 10 + i).await.unwrap();
                    println!("Producer {} sent {}", id, id * 10 + i);
                }
            })
        })
        .collect();
    
    // 单个消费者
    tokio::spawn(async move {
        while let Some(value) = rx.recv().await {
            println!("Consumer received {}", value);
        }
    });
    
    // 等待生产者完成
    for producer in producers {
        producer.await.unwrap();
    }
}
▶ Run

oneshot - 单次通信

rust
use tokio::sync::oneshot;

async fn request_response() {
    let (tx, rx) = oneshot::channel();
    
    // 发送请求
    tokio::spawn(async move {
        // 处理请求
        let result = compute_result();
        tx.send(result).unwrap();
    });
    
    // 等待响应
    match rx.await {
        Ok(result) => println!("Got result: {}", result),
        Err(_) => println!("Sender dropped"),
    }
}
▶ Run

broadcast - 广播消息

rust
use tokio::sync::broadcast;

async fn pub_sub() {
    let (tx, _) = broadcast::channel::<String>(16);
    
    // 多个订阅者
    let mut rx1 = tx.subscribe();
    let mut rx2 = tx.subscribe();
    
    // 发布者
    tokio::spawn(async move {
        tx.send("Message 1".to_string()).unwrap();
        tx.send("Message 2".to_string()).unwrap();
    });
    
    // 订阅者 1
    tokio::spawn(async move {
        while let Ok(msg) = rx1.recv().await {
            println!("Subscriber 1: {}", msg);
        }
    });
    
    // 订阅者 2
    tokio::spawn(async move {
        while let Ok(msg) = rx2.recv().await {
            println!("Subscriber 2: {}", msg);
        }
    });
}
▶ Run

watch - 监视值变化

rust
use tokio::sync::watch;

async fn watch_example() {
    let (tx, mut rx) = watch::channel(0);
    
    // 更新值
    tokio::spawn(async move {
        for i in 1..=5 {
            tx.send(i);
            tokio::time::sleep(Duration::from_secs(1)).await;
        }
    });
    
    // 监视变化
    while rx.changed().await.is_ok() {
        println!("Value changed to: {}", *rx.borrow());
    }
}
▶ Run

异步锁

Mutex - 异步互斥锁

rust
use tokio::sync::Mutex;

async fn mutex_example() {
    let counter = Arc::new(Mutex::new(0));
    
    let tasks: Vec<_> = (0..10)
        .map(|_| {
            let counter = counter.clone();
            tokio::spawn(async move {
                let mut lock = counter.lock().await;
                *lock += 1;
            })
        })
        .collect();
    
    for task in tasks {
        task.await.unwrap();
    }
    
    println!("Final: {}", *counter.lock().await);
}
▶ Run

注意:避免长时间持有锁

rust
// ❌ 错误:在持有锁时 await
async fn bad_lock_usage(state: Arc<Mutex<Data>>) {
    let mut lock = state.lock().await;
    
    // 在持有锁时 await
    process_data(&lock).await;  // 其他任务无法获取锁
    
    lock.update();
}

// ✅ 正确:只在必要时持有锁
async fn good_lock_usage(state: Arc<Mutex<Data>>) {
    // 获取数据副本
    let data = {
        let lock = state.lock().await;
        lock.clone()
    };
    
    // 释放锁后处理
    let result = process_data(&data).await;
    
    // 仅在更新时获取锁
    let mut lock = state.lock().await;
    lock.update(result);
}
▶ Run

RwLock - 异步读写锁

rust
use tokio::sync::RwLock;

async fn rwlock_example() {
    let cache = Arc::new(RwLock::new(HashMap::new()));
    
    // 多个读者可以并发
    let readers: Vec<_> = (0..5)
        .map(|_| {
            let cache = cache.clone();
            tokio::spawn(async move {
                let lock = cache.read().await;
                println!("Read: {:?}", lock.get("key"));
            })
        })
        .collect();
    
    // 写者独占访问
    tokio::spawn(async move {
        let mut lock = cache.write().await;
        lock.insert("key".to_string(), "value".to_string());
    });
    
    for reader in readers {
        reader.await.unwrap();
    }
}
▶ Run

Semaphore - 信号量

rust
use tokio::sync::Semaphore;

async fn semaphore_example() {
    let sem = Arc::new(Semaphore::new(3));  // 最多 3 个并发
    
    let tasks: Vec<_> = (0..10)
        .map(|i| {
            let sem = sem.clone();
            tokio::spawn(async move {
                // 获取许可
                let permit = sem.acquire().await.unwrap();
                println!("Task {} started", i);
                
                tokio::time::sleep(Duration::from_secs(1)).await;
                
                println!("Task {} done", i);
                // 释放许可(自动)
            })
        })
        .collect();
    
    for task in tasks {
        task.await.unwrap();
    }
}
▶ Run

取消和超时

超时处理

rust
use tokio::time::{timeout, Duration};

async fn with_timeout() {
    match timeout(Duration::from_secs(2), long_operation()).await {
        Ok(result) => println!("Completed: {}", result),
        Err(_) => println!("Timeout"),
    }
}

// 嵌套超时
async fn nested_timeout() {
    let fut1 = timeout(Duration::from_secs(1), operation1());
    let fut2 = timeout(Duration::from_secs(2), operation2());
    
    let (r1, r2) = tokio::try_join!(fut1, fut2);
}
▶ Run

任务取消

rust
use tokio::task::AbortHandle;

async fn cancellable_task() {
    let handle = tokio::spawn(async {
        loop {
            println!("Working...");
            tokio::time::sleep(Duration::from_millis(100)).await;
        }
    });
    
    // 获取取消句柄
    let abort_handle = handle.abort_handle();
    
    // 在某些条件下取消
    tokio::spawn(async move {
        tokio::time::sleep(Duration::from_secs(3)).await;
        abort_handle.abort();
    });
    
    // 等待任务(检测取消)
    match handle.await {
        Ok(_) => println!("Task completed"),
        Err(e) if e.is_cancelled() => println!("Task cancelled"),
        Err(e) => println!("Task error: {}", e),
    }
}
▶ Run

优雅关闭

rust
use tokio::sync::broadcast;

async fn graceful_shutdown() {
    let (shutdown_tx, _) = broadcast::channel::<()>(1);
    
    // 多个任务监听关闭信号
    let mut shutdown_rx1 = shutdown_tx.subscribe();
    let mut shutdown_rx2 = shutdown_tx.subscribe();
    
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = shutdown_rx1.recv() => {
                    println!("Task 1 shutting down");
                    break;
                }
                _ = do_work() => {
                    println!("Task 1 working");
                }
            }
        }
    });
    
    tokio::spawn(async move {
        loop {
            tokio::select! {
                _ = shutdown_rx2.recv() => {
                    println!("Task 2 shutting down");
                    break;
                }
                _ = do_work() => {
                    println!("Task 2 working");
                }
            }
        }
    });
    
    // 发送关闭信号
    tokio::time::sleep(Duration::from_secs(5)).await;
    shutdown_tx.send(()).unwrap();
}
▶ Run

select! 宏

基本用法

rust
use tokio::select;

async fn select_example() {
    let mut stream1 = stream1();
    let mut stream2 = stream2();
    
    loop {
        select! {
            item1 = stream1.next() => {
                match item1 {
                    Some(v) => println!("Stream 1: {}", v),
                    None => break,
                }
            }
            item2 = stream2.next() => {
                match item2 {
                    Some(v) => println!("Stream 2: {}", v),
                    None => break,
                }
            }
        }
    }
}
▶ Run

带 default 分支

rust
async fn with_default() {
    let mut interval = tokio::time::interval(Duration::from_secs(1));
    
    loop {
        select! {
            _ = interval.tick() => {
                println!("Tick");
            }
            // 其他分支...
            
            default => {
                // 无事件时立即执行
                println!("No events ready");
                tokio::time::sleep(Duration::from_millis(100)).await;
            }
        }
    }
}
▶ Run

取消未完成的 Future

rust
async fn cancellation() {
    let task1 = tokio::spawn(task1());
    let task2 = tokio::spawn(task2());
    
    tokio::select! {
        result1 = task1 => {
            // task2 被自动取消
            println!("Task 1 completed: {:?}", result1);
        }
        result2 = task2 => {
            // task1 被自动取消
            println!("Task 2 completed: {:?}", result2);
        }
    }
}
▶ Run

模式匹配

rust
async fn pattern_matching() {
    let (tx, mut rx) = tokio::sync::mpsc::channel(10);
    
    loop {
        select! {
            Some(value) = rx.recv() => {
                println!("Received: {}", value);
            }
            // 需要显式处理 None
            None = rx.recv() => {
                println!("Channel closed");
                break;
            }
            _ = tokio::time::sleep(Duration::from_secs(10)) => {
                println!("Timeout");
                break;
            }
        }
    }
}
▶ Run

异步错误处理

TryFuture

rust
use futures::TryFutureExt;

async fn try_future_example() {
    let result = async_operation()
        .map_err(|e| format!("Error: {}", e))
        .await;
    
    match result {
        Ok(value) => println!("Success: {}", value),
        Err(e) => println!("Failed: {}", e),
    }
}
▶ Run

TryStream

rust
use futures::TryStreamExt;

async fn try_stream_example() {
    let stream = futures::stream::iter(vec![
        Ok(1),
        Ok(2),
        Err("error"),
        Ok(3),
    ]);
    
    // 处理成功和失败
    while let Some(item) = stream.try_next().await.transpose() {
        match item {
            Ok(value) => println!("Value: {}", value),
            Err(e) => println!("Error: {}", e),
        }
    }
}
▶ Run

异步编程陷阱

1. 阻塞异步运行时

rust
// ❌ 错误:在异步代码中使用阻塞调用
async fn bad() {
    std::thread::sleep(Duration::from_secs(1));  // 阻塞整个运行时
    let data = std::fs::read_to_string("file.txt").unwrap();  // 阻塞 I/O
}

// ✅ 正确:使用异步版本或 spawn_blocking
async fn good() {
    tokio::time::sleep(Duration::from_secs(1)).await;
    let data = tokio::fs::read_to_string("file.txt").await.unwrap();
}

// ✅ 正确:使用 spawn_blocking 处理阻塞库
async fn handle_blocking_lib() {
    let result = tokio::task::spawn_blocking(|| {
        blocking_lib_call()
    }).await.unwrap();
}
▶ Run

2. 死锁

rust
// ❌ 错误:在持有锁时再次获取锁
async fn deadlock(state: Arc<Mutex<Data>>) {
    let lock1 = state.lock().await;
    
    // 在持有 lock1 时尝试获取 lock2
    let lock2 = state.lock().await;  // 永久阻塞
}

// ✅ 正确:避免嵌套锁
async fn no_deadlock(state: Arc<Mutex<Data>>) {
    {
        let lock = state.lock().await;
        // 快速完成操作
        lock.update();
    }
    // 释放锁后再获取
    let lock = state.lock().await;
}
▶ Run

3. 忘记 await

rust
// ❌ 错误:创建 Future 但不 await
async fn forgotten_await() {
    tokio::spawn(some_task());  // 创建任务
    // 但没有等待任务完成,可能提前退出
}

// ✅ 正确:等待任务完成
async fn proper_await() {
    let handle = tokio::spawn(some_task());
    handle.await.unwrap();  // 确保任务完成
}
▶ Run

小结

异步模式核心:

  • Stream: 异步迭代器
  • 通道: mpsc、oneshot、broadcast、watch
  • 锁: Mutex、RwLock、Semaphore
  • 取消: timeout、abort、select!

最佳实践:

  • 避免长时间持有异步锁
  • 使用 select! 处理多事件
  • 合理控制并发度(Semaphore)
  • 优雅关闭任务

常见陷阱:

  • 阻塞运行时 → 使用异步版本
  • 死锁 → 避免嵌套锁
  • 忘记 await → 确保等待任务

下一步: 下一节我们将通过实战案例巩固异步编程技能。

练习

练习 1:实现异步管道

使用 Stream 和通道实现数据处理管道:

rust
// TODO: 生产者 → 处理器 → 消费者
// TODO: 使用 buffer_unordered 并发处理
▶ Run

练习 2:并发任务管理器

实现一个可以取消任务的并发管理器:

rust
// TODO: 使用 select! 处理任务和取消信号
// TODO: 使用 broadcast 发送关闭信号
▶ Run

练习 3:超时和重试

实现带超时和重试机制的异步操作:

rust
// TODO: 使用 timeout 处理超时
// TODO: 失败时自动重试 3 次
// TODO: 使用 backoff 策略
▶ Run