Skip to content

消息传递

通道基础

┌─────────────────────────────────────────────────────┐
│              通道(Channel)                         │
├─────────────────────────────────────────────────────┤
│                                                     │
│  发送端 (tx) ──────> 通道 ──────> 接收端 (rx)        │
│                                                     │
│  mpsc = Multiple Producer, Single Consumer          │
│  多生产者,单消费者                                  │
│                                                     │
│  两种通道:                                          │
│  • 无界通道:send 不会阻塞                          │
│  • 有界通道:容量满时 send 阻塞                      │
│                                                     │
└─────────────────────────────────────────────────────┘

基本用法

rust
use std::sync::mpsc;
use std::thread;

fn main() {
    // 创建通道
    let (tx, rx) = mpsc::channel();

    // 发送数据
    thread::spawn(move || {
        let val = String::from("hello");
        tx.send(val).unwrap();
        // val 所有权已转移
    });

    // 接收数据
    let received = rx.recv().unwrap();  // 阻塞
    println!("收到:{}", received);
}
▶ Run

多生产者

rust
use std::sync::mpsc;
use std::thread;
use std::time::Duration;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 克隆发送端
    let tx2 = tx.clone();

    // 第一个生产者
    thread::spawn(move || {
        let vals = vec!["hi", "from", "thread 1"];
        for val in vals {
            tx.send(val).unwrap();
            thread::sleep(Duration::from_millis(200));
        }
    });

    // 第二个生产者
    thread::spawn(move || {
        let vals = vec!["more", "from", "thread 2"];
        for val in vals {
            tx2.send(val).unwrap();
            thread::sleep(Duration::from_millis(200));
        }
    });

    // 消费者
    for received in rx {
        println!("收到:{}", received);
    }
}
▶ Run

有界通道

rust
use std::sync::mpsc;
use std::thread;

fn main() {
    // 容量为 1 的有界通道
    let (tx, rx) = mpsc::sync_channel(1);

    thread::spawn(move || {
        tx.send("message").unwrap();
        println!("发送完成");
    });

    // 模拟延迟接收
    thread::sleep(std::time::Duration::from_millis(100));

    let msg = rx.recv().unwrap();
    println!("收到:{}", msg);
}
▶ Run

通道迭代器

rust
use std::sync::mpsc;
use std::thread;

fn main() {
    let (tx, rx) = mpsc::channel();

    // 生产者线程
    thread::spawn(move || {
        for i in 1..5 {
            tx.send(i).unwrap();
        }
        // tx 离开作用域,通道关闭
    });

    // 消费者:使用迭代器
    for num in rx {
        println!("收到:{}", num);
    }
    // rx 迭代到通道关闭
}
▶ Run