消息传递
通道基础
┌─────────────────────────────────────────────────────┐
│ 通道(Channel) │
├─────────────────────────────────────────────────────┤
│ │
│ 发送端 (tx) ──────> 通道 ──────> 接收端 (rx) │
│ │
│ mpsc = Multiple Producer, Single Consumer │
│ 多生产者,单消费者 │
│ │
│ 两种通道: │
│ • 无界通道:send 不会阻塞 │
│ • 有界通道:容量满时 send 阻塞 │
│ │
└─────────────────────────────────────────────────────┘1
2
3
4
5
6
7
8
9
10
11
12
13
14
2
3
4
5
6
7
8
9
10
11
12
13
14
基本用法
rust
▶ Runuse std::sync::mpsc;
use std::thread;
fn main() {
// 创建通道
let (tx, rx) = mpsc::channel();
// 发送数据
thread::spawn(move || {
let val = String::from("hello");
tx.send(val).unwrap();
// val 所有权已转移
});
// 接收数据
let received = rx.recv().unwrap(); // 阻塞
println!("收到:{}", received);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
多生产者
rust
▶ Runuse std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
// 克隆发送端
let tx2 = tx.clone();
// 第一个生产者
thread::spawn(move || {
let vals = vec!["hi", "from", "thread 1"];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(200));
}
});
// 第二个生产者
thread::spawn(move || {
let vals = vec!["more", "from", "thread 2"];
for val in vals {
tx2.send(val).unwrap();
thread::sleep(Duration::from_millis(200));
}
});
// 消费者
for received in rx {
println!("收到:{}", received);
}
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
有界通道
rust
▶ Runuse std::sync::mpsc;
use std::thread;
fn main() {
// 容量为 1 的有界通道
let (tx, rx) = mpsc::sync_channel(1);
thread::spawn(move || {
tx.send("message").unwrap();
println!("发送完成");
});
// 模拟延迟接收
thread::sleep(std::time::Duration::from_millis(100));
let msg = rx.recv().unwrap();
println!("收到:{}", msg);
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
通道迭代器
rust
▶ Runuse std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
// 生产者线程
thread::spawn(move || {
for i in 1..5 {
tx.send(i).unwrap();
}
// tx 离开作用域,通道关闭
});
// 消费者:使用迭代器
for num in rx {
println!("收到:{}", num);
}
// rx 迭代到通道关闭
}1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20