Skip to content

完整示例

示例 1:线程池

rust
use std::sync::{mpsc, Arc, Mutex};
use std::thread;

type Job = Box<dyn FnOnce() + Send + 'static>;

struct ThreadPool {
    workers: Vec<Worker>,
    sender: Option<mpsc::Sender<Job>>,
}

struct Worker {
    id: usize,
    thread: Option<thread::JoinHandle<()>>,
}

impl ThreadPool {
    fn new(size: usize) -> ThreadPool {
        assert!(size > 0);

        let (sender, receiver) = mpsc::channel();
        let receiver = Arc::new(Mutex::new(receiver));

        let mut workers = Vec::with_capacity(size);

        for id in 0..size {
            workers.push(Worker::new(id, Arc::clone(&receiver)));
        }

        ThreadPool {
            workers,
            sender: Some(sender),
        }
    }

    fn execute<F>(&self, f: F)
    where
        F: FnOnce() + Send + 'static,
    {
        let job = Box::new(f);
        self.sender.as_ref().unwrap().send(job).unwrap();
    }
}

impl Worker {
    fn new(id: usize, receiver: Arc<Mutex<mpsc::Receiver<Job>>>) -> Worker {
        let thread = thread::spawn(move || loop {
            let message = receiver.lock().unwrap().recv();

            match message {
                Ok(job) => {
                    println!("Worker {} got a job; executing.", id);
                    job();
                }
                Err(_) => {
                    println!("Worker {} disconnected; shutting down.", id);
                    break;
                }
            }
        });

        Worker {
            id,
            thread: Some(thread),
        }
    }
}

impl Drop for ThreadPool {
    fn drop(&mut self) {
        drop(self.sender.take());

        for worker in &mut self.workers {
            println!("Shutting down worker {}", worker.id);
            if let Some(thread) = worker.thread.take() {
                thread.join().unwrap();
            }
        }
    }
}

fn main() {
    let pool = ThreadPool::new(4);

    for i in 0..8 {
        pool.execute(move || {
            println!("Task {} executed", i);
        });
    }

    thread::sleep(std::time::Duration::from_secs(1));
}
▶ Run

示例 2:并行计算

rust
use std::thread;

fn parallel_sum(data: &[i32]) -> i32 {
    if data.len() < 1000 {
        // 小数据量直接求和
        return data.iter().sum();
    }

    let mid = data.len() / 2;
    let (left, right) = data.split_at(mid);

    let handle = thread::spawn(move || {
        parallel_sum(left)
    });

    let right_sum = parallel_sum(right);
    let left_sum = handle.join().unwrap();

    left_sum + right_sum
}

fn main() {
    let data: Vec<i32> = (1..=10000).collect();
    let result = parallel_sum(&data);
    println!("总和:{}", result);
}
▶ Run

常见错误

错误 1:数据竞争

rust
use std::thread;

fn main() {
    let mut data = 0;

    // ❌ 错误:多个线程同时修改
    // let h1 = thread::spawn(|| {
    //     data += 1;  // 编译错误
    // });

    // ✅ 正确:使用 Mutex
    use std::sync::{Arc, Mutex};
    let data = Arc::new(Mutex::new(0));

    let h1 = thread::spawn({
        let data = Arc::clone(&data);
        move || {
            *data.lock().unwrap() += 1;
        }
    });

    h1.join().unwrap();
}
▶ Run

错误 2:死锁

rust
use std::sync::{Mutex, Arc};
use std::thread;

fn main() {
    let lock1 = Arc::new(Mutex::new(0));
    let lock2 = Arc::new(Mutex::new(0));

    let l1 = Arc::clone(&lock1);
    let l2 = Arc::clone(&lock2);

    // 线程 1:先锁 lock1,再锁 lock2
    thread::spawn(move || {
        let _g1 = l1.lock().unwrap();
        thread::sleep(std::time::Duration::from_millis(10));
        let _g2 = l2.lock().unwrap();  // 可能死锁
    });

    // 线程 2:先锁 lock2,再锁 lock1
    let l1 = Arc::clone(&lock1);
    let l2 = Arc::clone(&lock2);
    thread::spawn(move || {
        let _g2 = l2.lock().unwrap();
        thread::sleep(std::time::Duration::from_millis(10));
        let _g1 = l1.lock().unwrap();  // 可能死锁
    });

    thread::sleep(std::time::Duration::from_millis(100));
}

// 解决:固定锁的顺序
▶ Run

错误 3:忘记 join

rust
use std::thread;

fn main() {
    let handle = thread::spawn(|| {
        println!("工作中...");
    });

    // ❌ 忘记 join
    // handle.join().unwrap();

    // ✅ 正确
    handle.join().unwrap();
}
▶ Run

并发原语对比

原语用途性能适用场景
Mutex独占访问中等通用
RwLock多读单写读多写少时优配置、缓存
Atomic无锁操作最快计数器、标志
Channel线程通信中等消息传递
Condition条件等待中等生产者 - 消费者

练习

练习 1:并行求和

创建一个程序,使用多个线程计算大数组的和。

练习 2:生产者 - 消费者

使用通道实现一个生产者 - 消费者模型,多个生产者发送数据,单个消费者处理。

练习 3:原子计数器

使用原子类型实现一个无锁计数器,比较与 Mutex 的性能差异。

小结

本章我们学习了:

  • ✅ 创建线程和 move 闭包
  • ✅ 消息传递(通道)
  • ✅ 互斥锁(Mutex)
  • ✅ 读写锁(RwLock)
  • ✅ 原子类型
  • ✅ Send 和 Sync Trait

并发选择指南

需要共享数据?
├── 是,需要修改
│   ├── 简单计数 → Atomic
│   ├── 复杂数据 → Mutex
│   └── 读多写少 → RwLock
├── 是,只读 → Arc
└── 否 → 消息传递(Channel)

第 24 章:Unsafe Rust