完整示例
示例 1:线程池
rust
▶ Runuse std::sync::{mpsc, Arc, Mutex};
use std::thread;
type Job = Box<dyn FnOnce() + Send + 'static>;
struct ThreadPool {
workers: Vec<Worker>,
sender: Option<mpsc::Sender<Job>>,
}
struct Worker {
id: usize,
thread: Option<thread::JoinHandle<()>>,
}
impl ThreadPool {
fn new(size: usize) -> ThreadPool {
assert!(size > 0);
let (sender, receiver) = mpsc::channel();
let receiver = Arc::new(Mutex::new(receiver));
let mut workers = Vec::with_capacity(size);
for id in 0..size {
workers.push(Worker::new(id, Arc::clone(&receiver)));
}
ThreadPool {
workers,
sender: Some(sender),
}
}
fn execute<F>(&self, f: F)
where
F: FnOnce() + Send + 'static,
{
let job = Box::new(f);
self.sender.as_ref().unwrap().send(job).unwrap();
}
}
impl Worker {
fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
let thread = thread::spawn(move || loop {
let message = receiver.lock().unwrap().recv();
match message {
Ok(job) => {
println!("Worker {} got a job; executing.", id);
job();
}
Err(_) => {
println!("Worker {} disconnected; shutting down.", id);
break;
}
}
});
Worker {
id,
thread: Some(thread),
}
}
}
impl Drop for ThreadPool {
fn drop(&mut self) {
drop(self.sender.take());
for worker in &mut self.workers {
println!("Shutting down worker {}", worker.id);
if let Some(thread) = worker.thread.take() {
thread.join().unwrap();
}
}
}
}
fn main() {
let pool = ThreadPool::new(4);
for i in 0..8 {
pool.execute(move || {
println!("Task {} executed", i);
});
}
thread::sleep(std::time::Duration::from_secs(1));
}示例 2:并行计算
rust
▶ Runuse std::thread;
fn parallel_sum(data: &[i32]) -> i32 {
if data.len() < 1000 {
// 小数据量直接求和
return data.iter().sum();
}
let mid = data.len() / 2;
let (left, right) = data.split_at(mid);
let handle = thread::spawn(move || {
parallel_sum(left)
});
let right_sum = parallel_sum(right);
let left_sum = handle.join().unwrap();
left_sum + right_sum
}
fn main() {
let data: Vec<i32> = (1..=10000).collect();
let result = parallel_sum(&data);
println!("总和:{}", result);
}常见错误
错误 1:数据竞争
rust
▶ Runuse std::thread;
fn main() {
let mut data = 0;
// ❌ 错误:多个线程同时修改
// let h1 = thread::spawn(|| {
// data += 1; // 编译错误
// });
// ✅ 正确:使用 Mutex
use std::sync::{Arc, Mutex};
let data = Arc::new(Mutex::new(0));
let h1 = thread::spawn({
let data = Arc::clone(&data);
move || {
*data.lock().unwrap() += 1;
}
});
h1.join().unwrap();
}错误 2:死锁
rust
▶ Runuse std::sync::{Mutex, Arc};
use std::thread;
fn main() {
let lock1 = Arc::new(Mutex::new(0));
let lock2 = Arc::new(Mutex::new(0));
let l1 = Arc::clone(&lock1);
let l2 = Arc::clone(&lock2);
// 线程 1:先锁 lock1,再锁 lock2
thread::spawn(move || {
let _g1 = l1.lock().unwrap();
thread::sleep(std::time::Duration::from_millis(10));
let _g2 = l2.lock().unwrap(); // 可能死锁
});
// 线程 2:先锁 lock2,再锁 lock1
let l1 = Arc::clone(&lock1);
let l2 = Arc::clone(&lock2);
thread::spawn(move || {
let _g2 = l2.lock().unwrap();
thread::sleep(std::time::Duration::from_millis(10));
let _g1 = l1.lock().unwrap(); // 可能死锁
});
thread::sleep(std::time::Duration::from_millis(100));
}
// 解决:固定锁的顺序错误 3:忘记 join
rust
▶ Runuse std::thread;
fn main() {
let handle = thread::spawn(|| {
println!("工作中...");
});
// ❌ 忘记 join
// handle.join().unwrap();
// ✅ 正确
handle.join().unwrap();
}并发原语对比
| 原语 | 用途 | 性能 | 适用场景 |
|---|---|---|---|
| Mutex | 独占访问 | 中等 | 通用 |
| RwLock | 多读单写 | 读多写少时优 | 配置、缓存 |
| Atomic | 无锁操作 | 最快 | 计数器、标志 |
| Channel | 线程通信 | 中等 | 消息传递 |
| Condition | 条件等待 | 中等 | 生产者 - 消费者 |
练习
练习 1:并行求和
创建一个程序,使用多个线程计算大数组的和。
练习 2:生产者 - 消费者
使用通道实现一个生产者 - 消费者模型,多个生产者发送数据,单个消费者处理。
练习 3:原子计数器
使用原子类型实现一个无锁计数器,比较与 Mutex 的性能差异。
小结
本章我们学习了:
- ✅ 创建线程和 move 闭包
- ✅ 消息传递(通道)
- ✅ 互斥锁(Mutex)
- ✅ 读写锁(RwLock)
- ✅ 原子类型
- ✅ Send 和 Sync Trait
并发选择指南
需要共享数据?
├── 是,需要修改
│ ├── 简单计数 → Atomic
│ ├── 复杂数据 → Mutex
│ └── 读多写少 → RwLock
├── 是,只读 → Arc
└── 否 → 消息传递(Channel)