异步模式
> 掌握异步编程中的常见模式和最佳实践。
异步迭代器
Stream
Stream 是异步版本的 Iterator:
rust
▶ Run// futures::Stream trait(类似 Iterator)
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context)
-> Poll<Option<Self::Item>>;
}使用 Stream
rust
▶ Runuse futures::StreamExt;
use tokio_stream::wrappers::ReceiverStream;
async fn stream_example() {
let (tx, rx) = tokio::sync::mpsc::channel(10);
// 将通道转换为 Stream
let stream = ReceiverStream::new(rx);
// 发送数据
tokio::spawn(async move {
for i in 0..5 {
tx.send(i).await.unwrap();
}
});
// 消费 Stream
while let Some(value) = stream.next().await {
println!("Received: {}", value);
}
}Stream 组合器
rust
▶ Runuse futures::StreamExt;
async fn stream_operations() {
let stream = futures::stream::iter(0..10);
// map: 转换每个元素
let mapped = stream.map(|x| x * 2);
// filter: 过滤元素
let filtered = mapped.filter(|x| futures::future::ready(x > 5));
// take: 取前 N 个
let taken = filtered.take(3);
// collect: 收集结果
let result: Vec<i32> = taken.collect().await;
println!("Result: {:?}", result); // [6, 8, 10]
}并发处理 Stream
rust
▶ Runuse futures::StreamExt;
async fn concurrent_processing() {
let stream = futures::stream::iter(1..=10);
// for_each_concurrent: 并发处理
stream
.for_each_concurrent(5, |item| async move {
process(item).await;
})
.await;
}
// buffer_unordered: 并发执行,无序返回
async fn buffer_example() {
let stream = futures::stream::iter(1..=10)
.map(|x| async move {
tokio::time::sleep(Duration::from_millis(100 * x)).await;
x
});
let results = stream
.buffer_unordered(5) // 最多 5 个并发
.collect::<Vec<_>>()
.await;
}异步通道
mpsc - 多生产者单消费者
rust
▶ Runuse tokio::sync::mpsc;
async fn mpsc_channel() {
let (tx, mut rx) = mpsc::channel::<i32>(32);
// 多个生产者
let producers: Vec<_> = (0..3)
.map(|id| {
let tx = tx.clone();
tokio::spawn(async move {
for i in 0..5 {
tx.send(id * 10 + i).await.unwrap();
println!("Producer {} sent {}", id, id * 10 + i);
}
})
})
.collect();
// 单个消费者
tokio::spawn(async move {
while let Some(value) = rx.recv().await {
println!("Consumer received {}", value);
}
});
// 等待生产者完成
for producer in producers {
producer.await.unwrap();
}
}oneshot - 单次通信
rust
▶ Runuse tokio::sync::oneshot;
async fn request_response() {
let (tx, rx) = oneshot::channel();
// 发送请求
tokio::spawn(async move {
// 处理请求
let result = compute_result();
tx.send(result).unwrap();
});
// 等待响应
match rx.await {
Ok(result) => println!("Got result: {}", result),
Err(_) => println!("Sender dropped"),
}
}broadcast - 广播消息
rust
▶ Runuse tokio::sync::broadcast;
async fn pub_sub() {
let (tx, _) = broadcast::channel::<String>(16);
// 多个订阅者
let mut rx1 = tx.subscribe();
let mut rx2 = tx.subscribe();
// 发布者
tokio::spawn(async move {
tx.send("Message 1".to_string()).unwrap();
tx.send("Message 2".to_string()).unwrap();
});
// 订阅者 1
tokio::spawn(async move {
while let Ok(msg) = rx1.recv().await {
println!("Subscriber 1: {}", msg);
}
});
// 订阅者 2
tokio::spawn(async move {
while let Ok(msg) = rx2.recv().await {
println!("Subscriber 2: {}", msg);
}
});
}watch - 监视值变化
rust
▶ Runuse tokio::sync::watch;
async fn watch_example() {
let (tx, mut rx) = watch::channel(0);
// 更新值
tokio::spawn(async move {
for i in 1..=5 {
tx.send(i);
tokio::time::sleep(Duration::from_secs(1)).await;
}
});
// 监视变化
while rx.changed().await.is_ok() {
println!("Value changed to: {}", *rx.borrow());
}
}异步锁
Mutex - 异步互斥锁
rust
▶ Runuse tokio::sync::Mutex;
async fn mutex_example() {
let counter = Arc::new(Mutex::new(0));
let tasks: Vec<_> = (0..10)
.map(|_| {
let counter = counter.clone();
tokio::spawn(async move {
let mut lock = counter.lock().await;
*lock += 1;
})
})
.collect();
for task in tasks {
task.await.unwrap();
}
println!("Final: {}", *counter.lock().await);
}注意:避免长时间持有锁
rust
▶ Run// ❌ 错误:在持有锁时 await
async fn bad_lock_usage(state: Arc<Mutex<Data>>) {
let mut lock = state.lock().await;
// 在持有锁时 await
process_data(&lock).await; // 其他任务无法获取锁
lock.update();
}
// ✅ 正确:只在必要时持有锁
async fn good_lock_usage(state: Arc<Mutex<Data>>) {
// 获取数据副本
let data = {
let lock = state.lock().await;
lock.clone()
};
// 释放锁后处理
let result = process_data(&data).await;
// 仅在更新时获取锁
let mut lock = state.lock().await;
lock.update(result);
}RwLock - 异步读写锁
rust
▶ Runuse tokio::sync::RwLock;
async fn rwlock_example() {
let cache = Arc::new(RwLock::new(HashMap::new()));
// 多个读者可以并发
let readers: Vec<_> = (0..5)
.map(|_| {
let cache = cache.clone();
tokio::spawn(async move {
let lock = cache.read().await;
println!("Read: {:?}", lock.get("key"));
})
})
.collect();
// 写者独占访问
tokio::spawn(async move {
let mut lock = cache.write().await;
lock.insert("key".to_string(), "value".to_string());
});
for reader in readers {
reader.await.unwrap();
}
}Semaphore - 信号量
rust
▶ Runuse tokio::sync::Semaphore;
async fn semaphore_example() {
let sem = Arc::new(Semaphore::new(3)); // 最多 3 个并发
let tasks: Vec<_> = (0..10)
.map(|i| {
let sem = sem.clone();
tokio::spawn(async move {
// 获取许可
let permit = sem.acquire().await.unwrap();
println!("Task {} started", i);
tokio::time::sleep(Duration::from_secs(1)).await;
println!("Task {} done", i);
// 释放许可(自动)
})
})
.collect();
for task in tasks {
task.await.unwrap();
}
}取消和超时
超时处理
rust
▶ Runuse tokio::time::{timeout, Duration};
async fn with_timeout() {
match timeout(Duration::from_secs(2), long_operation()).await {
Ok(result) => println!("Completed: {}", result),
Err(_) => println!("Timeout"),
}
}
// 嵌套超时
async fn nested_timeout() {
let fut1 = timeout(Duration::from_secs(1), operation1());
let fut2 = timeout(Duration::from_secs(2), operation2());
let (r1, r2) = tokio::try_join!(fut1, fut2);
}任务取消
rust
▶ Runuse tokio::task::AbortHandle;
async fn cancellable_task() {
let handle = tokio::spawn(async {
loop {
println!("Working...");
tokio::time::sleep(Duration::from_millis(100)).await;
}
});
// 获取取消句柄
let abort_handle = handle.abort_handle();
// 在某些条件下取消
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(3)).await;
abort_handle.abort();
});
// 等待任务(检测取消)
match handle.await {
Ok(_) => println!("Task completed"),
Err(e) if e.is_cancelled() => println!("Task cancelled"),
Err(e) => println!("Task error: {}", e),
}
}优雅关闭
rust
▶ Runuse tokio::sync::broadcast;
async fn graceful_shutdown() {
let (shutdown_tx, _) = broadcast::channel::<()>(1);
// 多个任务监听关闭信号
let mut shutdown_rx1 = shutdown_tx.subscribe();
let mut shutdown_rx2 = shutdown_tx.subscribe();
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx1.recv() => {
println!("Task 1 shutting down");
break;
}
_ = do_work() => {
println!("Task 1 working");
}
}
}
});
tokio::spawn(async move {
loop {
tokio::select! {
_ = shutdown_rx2.recv() => {
println!("Task 2 shutting down");
break;
}
_ = do_work() => {
println!("Task 2 working");
}
}
}
});
// 发送关闭信号
tokio::time::sleep(Duration::from_secs(5)).await;
shutdown_tx.send(()).unwrap();
}select! 宏
基本用法
rust
▶ Runuse tokio::select;
async fn select_example() {
let mut stream1 = stream1();
let mut stream2 = stream2();
loop {
select! {
item1 = stream1.next() => {
match item1 {
Some(v) => println!("Stream 1: {}", v),
None => break,
}
}
item2 = stream2.next() => {
match item2 {
Some(v) => println!("Stream 2: {}", v),
None => break,
}
}
}
}
}带 default 分支
rust
▶ Runasync fn with_default() {
let mut interval = tokio::time::interval(Duration::from_secs(1));
loop {
select! {
_ = interval.tick() => {
println!("Tick");
}
// 其他分支...
default => {
// 无事件时立即执行
println!("No events ready");
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
}
}取消未完成的 Future
rust
▶ Runasync fn cancellation() {
let task1 = tokio::spawn(task1());
let task2 = tokio::spawn(task2());
tokio::select! {
result1 = task1 => {
// task2 被自动取消
println!("Task 1 completed: {:?}", result1);
}
result2 = task2 => {
// task1 被自动取消
println!("Task 2 completed: {:?}", result2);
}
}
}模式匹配
rust
▶ Runasync fn pattern_matching() {
let (tx, mut rx) = tokio::sync::mpsc::channel(10);
loop {
select! {
Some(value) = rx.recv() => {
println!("Received: {}", value);
}
// 需要显式处理 None
None = rx.recv() => {
println!("Channel closed");
break;
}
_ = tokio::time::sleep(Duration::from_secs(10)) => {
println!("Timeout");
break;
}
}
}
}异步错误处理
TryFuture
rust
▶ Runuse futures::TryFutureExt;
async fn try_future_example() {
let result = async_operation()
.map_err(|e| format!("Error: {}", e))
.await;
match result {
Ok(value) => println!("Success: {}", value),
Err(e) => println!("Failed: {}", e),
}
}TryStream
rust
▶ Runuse futures::TryStreamExt;
async fn try_stream_example() {
let stream = futures::stream::iter(vec![
Ok(1),
Ok(2),
Err("error"),
Ok(3),
]);
// 处理成功和失败
while let Some(item) = stream.try_next().await.transpose() {
match item {
Ok(value) => println!("Value: {}", value),
Err(e) => println!("Error: {}", e),
}
}
}异步编程陷阱
1. 阻塞异步运行时
rust
▶ Run// ❌ 错误:在异步代码中使用阻塞调用
async fn bad() {
std::thread::sleep(Duration::from_secs(1)); // 阻塞整个运行时
let data = std::fs::read_to_string("file.txt").unwrap(); // 阻塞 I/O
}
// ✅ 正确:使用异步版本或 spawn_blocking
async fn good() {
tokio::time::sleep(Duration::from_secs(1)).await;
let data = tokio::fs::read_to_string("file.txt").await.unwrap();
}
// ✅ 正确:使用 spawn_blocking 处理阻塞库
async fn handle_blocking_lib() {
let result = tokio::task::spawn_blocking(|| {
blocking_lib_call()
}).await.unwrap();
}2. 死锁
rust
▶ Run// ❌ 错误:在持有锁时再次获取锁
async fn deadlock(state: Arc<Mutex<Data>>) {
let lock1 = state.lock().await;
// 在持有 lock1 时尝试获取 lock2
let lock2 = state.lock().await; // 永久阻塞
}
// ✅ 正确:避免嵌套锁
async fn no_deadlock(state: Arc<Mutex<Data>>) {
{
let lock = state.lock().await;
// 快速完成操作
lock.update();
}
// 释放锁后再获取
let lock = state.lock().await;
}3. 忘记 await
rust
▶ Run// ❌ 错误:创建 Future 但不 await
async fn forgotten_await() {
tokio::spawn(some_task()); // 创建任务
// 但没有等待任务完成,可能提前退出
}
// ✅ 正确:等待任务完成
async fn proper_await() {
let handle = tokio::spawn(some_task());
handle.await.unwrap(); // 确保任务完成
}小结
异步模式核心:
- Stream: 异步迭代器
- 通道: mpsc、oneshot、broadcast、watch
- 锁: Mutex、RwLock、Semaphore
- 取消: timeout、abort、select!
最佳实践:
- 避免长时间持有异步锁
- 使用 select! 处理多事件
- 合理控制并发度(Semaphore)
- 优雅关闭任务
常见陷阱:
- 阻塞运行时 → 使用异步版本
- 死锁 → 避免嵌套锁
- 忘记 await → 确保等待任务
下一步: 下一节我们将通过实战案例巩固异步编程技能。
练习
练习 1:实现异步管道
使用 Stream 和通道实现数据处理管道:
rust
▶ Run// TODO: 生产者 → 处理器 → 消费者
// TODO: 使用 buffer_unordered 并发处理练习 2:并发任务管理器
实现一个可以取消任务的并发管理器:
rust
▶ Run// TODO: 使用 select! 处理任务和取消信号
// TODO: 使用 broadcast 发送关闭信号练习 3:超时和重试
实现带超时和重试机制的异步操作:
rust
▶ Run// TODO: 使用 timeout 处理超时
// TODO: 失败时自动重试 3 次
// TODO: 使用 backoff 策略