实战案例
> 通过实际案例掌握异步编程的应用。
HTTP 客户端
简单 HTTP GET
rust
▶ Runuse reqwest;
use tokio;
async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
let response = reqwest::get(url).await?;
let body = response.text().await?;
Ok(body)
}
#[tokio::main]
async fn main() {
match fetch_url("https://example.com").await {
Ok(body) => println!("Body length: {}", body.len()),
Err(e) => println!("Error: {}", e),
}
}配置客户端
rust
▶ Runuse reqwest::Client;
use std::time::Duration;
async fn configured_client() -> Result<(), reqwest::Error> {
let client = Client::builder()
.timeout(Duration::from_secs(10))
.connect_timeout(Duration::from_secs(5))
.user_agent("MyApp/1.0")
.build()?;
let response = client
.get("https://api.example.com/data")
.header("Authorization", "Bearer token")
.send()
.await?;
let data: serde_json::Value = response.json().await?;
println!("Data: {:?}", data);
Ok(())
}并发请求
rust
▶ Runuse reqwest::Client;
use futures::future::join_all;
async fn concurrent_requests(urls: Vec<&str>) -> Vec<Result<String, reqwest::Error>> {
let client = Client::new();
let requests: Vec<_> = urls
.into_iter()
.map(|url| client.get(url).send())
.collect();
let responses = join_all(requests).await;
responses
.into_iter()
.map(|resp| async {
match resp {
Ok(r) => r.text().await,
Err(e) => Err(e),
}
})
.collect()
}
#[tokio::main]
async fn main() {
let urls = vec![
"https://example.com",
"https://example.org",
"https://example.net",
];
let results = concurrent_requests(urls).await;
for (i, result) in results.into_iter().enumerate() {
match result {
Ok(body) => println!("URL {}: {} bytes", i, body.len()),
Err(e) => println!("URL {} error: {}", i, e),
}
}
}POST 请求
rust
▶ Runuse reqwest::Client;
use serde::{Serialize, Deserialize};
#[derive(Serialize)]
struct CreateUser {
username: String,
email: String,
}
#[derive(Deserialize)]
struct User {
id: u64,
username: String,
}
async fn create_user(client: &Client, user: CreateUser) -> Result<User, reqwest::Error> {
let response = client
.post("https://api.example.com/users")
.json(&user)
.send()
.await?;
let created_user: User = response.json().await?;
Ok(created_user)
}文件处理
异步文件读写
rust
▶ Runuse tokio::fs::{File, read_to_string, write};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
async fn read_file(path: &str) -> Result<String, std::io::Error> {
read_to_string(path).await
}
async fn write_file(path: &str, content: &str) -> Result<(), std::io::Error> {
write(path, content).await
}
async fn buffered_read(path: &str) -> Result<Vec<String>, std::io::Error> {
let file = File::open(path).await?;
let reader = BufReader::new(file);
use tokio::io::AsyncBufReadExt;
let mut lines = reader.lines();
let mut result = Vec::new();
while let Some(line) = lines.next_line().await? {
result.push(line);
}
Ok(result)
}并发文件处理
rust
▶ Runuse tokio::fs;
use std::path::Path;
async fn process_files(files: Vec<&Path>) -> Vec<Result<String, std::io::Error>> {
let tasks: Vec<_> = files
.into_iter()
.map(|path| fs::read_to_string(path))
.collect();
futures::future::join_all(tasks).await
}
#[tokio::main]
async fn main() {
let files = vec![
Path::new("file1.txt"),
Path::new("file2.txt"),
Path::new("file3.txt"),
];
let contents = process_files(files).await;
for (i, content) in contents.into_iter().enumerate() {
match content {
Ok(text) => println!("File {}: {} bytes", i, text.len()),
Err(e) => println!("File {} error: {}", i, e),
}
}
}文件监控
rust
▶ Runuse tokio::fs;
use tokio::time::{sleep, Duration};
async fn watch_file(path: &str) {
let mut last_size = 0;
loop {
match fs::metadata(path).await {
Ok(metadata) => {
let current_size = metadata.len();
if current_size != last_size {
println!("File size changed: {} → {}", last_size, current_size);
last_size = current_size;
// 读取新增内容
if let Ok(content) = fs::read_to_string(path).await {
println!("Content length: {}", content.len());
}
}
}
Err(e) => println!("Error: {}", e),
}
sleep(Duration::from_secs(1)).await;
}
}大文件流式处理
rust
▶ Runuse tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};
async fn process_large_file(path: &str) -> Result<usize, std::io::Error> {
let file = File::open(path).await?;
let mut reader = BufReader::new(file);
let mut buffer = [0; 8192]; // 8KB buffer
let mut total_bytes = 0;
loop {
let bytes_read = reader.read(&mut buffer).await?;
if bytes_read == 0 {
break; // EOF
}
total_bytes += bytes_read;
// 处理缓冲区数据
process_chunk(&buffer[..bytes_read]);
}
Ok(total_bytes)
}
fn process_chunk(chunk: &[u8]) {
// 模拟处理
println!("Processed {} bytes", chunk.len());
}数据库操作
使用 sqlx
toml
[dependencies]
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres"] }rust
▶ Runuse sqlx::postgres::PgPoolOptions;
#[derive(sqlx::FromRow)]
struct User {
id: i32,
username: String,
email: String,
}
async fn database_example() -> Result<(), sqlx::Error> {
// 创建连接池
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgres://user:pass@localhost/db")
.await?;
// 查询
let users: Vec<User> = sqlx::query_as("SELECT id, username, email FROM users")
.fetch_all(&pool)
.await?;
for user in users {
println!("User: {} ({})", user.username, user.email);
}
// 插入
sqlx::query("INSERT INTO users (username, email) VALUES ($1, $2)")
.bind("alice")
.bind("alice@example.com")
.execute(&pool)
.await?;
Ok(())
}并发查询
rust
▶ Runuse sqlx::PgPool;
async fn concurrent_queries(pool: PgPool) -> Result<Vec<User>, sqlx::Error> {
let queries: Vec<_> = (1..=10)
.map(|id| {
let pool = pool.clone();
async move {
sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
.bind(id)
.fetch_optional(&pool)
.await
}
})
.collect();
let results = futures::future::join_all(queries).await;
let users: Vec<User> = results
.into_iter()
.filter_map(|r| r.ok())
.flatten()
.collect();
Ok(users)
}事务处理
rust
▶ Runuse sqlx::{PgPool, Postgres, Transaction};
async fn transfer_money(
pool: &PgPool,
from_user: i32,
to_user: i32,
amount: i64,
) -> Result<(), sqlx::Error> {
// 开始事务
let mut tx = pool.begin().await?;
// 减少余额
sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE user_id = $2")
.bind(amount)
.bind(from_user)
.execute(&mut *tx)
.await?;
// 增加余额
sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE user_id = $2")
.bind(amount)
.bind(to_user)
.execute(&mut *tx)
.await?;
// 提交事务
tx.commit().await?;
Ok(())
}Web 爬虫
简单爬虫
rust
▶ Runuse reqwest::Client;
use tokio::time::{timeout, Duration};
use std::collections::HashSet;
struct Crawler {
client: Client,
visited: HashSet<String>,
}
impl Crawler {
async fn crawl(&mut self, url: &str, depth: usize) -> Result<Vec<String>, reqwest::Error> {
if depth == 0 || self.visited.contains(url) {
return Ok(vec![]);
}
self.visited.insert(url.to_string());
println!("Crawling: {} (depth {})", url, depth);
// 获取页面(带超时)
let response = timeout(
Duration::from_secs(5),
self.client.get(url).send()
).await??;
let html = response.text().await?;
// 解析链接(简化版)
let links = extract_links(&html);
// 递归爬取
let mut all_links = Vec::new();
for link in links {
if let Ok(found_links) = self.crawl(&link, depth - 1).await {
all_links.extend(found_links);
}
}
all_links.push(url.to_string());
Ok(all_links)
}
}
fn extract_links(html: &str) -> Vec<String> {
// 简化的链接提取
html.matches("href=\"")
.filter_map(|s| {
let start = s.find('"')?;
let end = s[start..].find('"')?;
Some(s[start..end].to_string())
})
.collect()
}并发爬虫
rust
▶ Runuse tokio::sync::Semaphore;
async fn concurrent_crawl(urls: Vec<String>, max_concurrent: usize) {
let sem = Arc::new(Semaphore::new(max_concurrent));
let client = Arc::new(Client::new());
let tasks: Vec<_> = urls
.into_iter()
.map(|url| {
let sem = sem.clone();
let client = client.clone();
tokio::spawn(async move {
let permit = sem.acquire().await.unwrap();
println!("Fetching: {}", url);
match timeout(Duration::from_secs(5), client.get(&url).send()).await {
Ok(Ok(resp)) => {
println!("Success: {} (status {})", url, resp.status());
}
Ok(Err(e)) => {
println!("Error: {} - {}", url, e);
}
Err(_) => {
println!("Timeout: {}", url);
}
}
// permit 自动释放
})
})
.collect();
for task in tasks {
task.await.unwrap();
}
}TCP 服务器
Echo 服务器
rust
▶ Runuse tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
async fn handle_client(mut stream: TcpStream) {
let mut buffer = [0; 1024];
loop {
let n = match stream.read(&mut buffer).await {
Ok(n) if n == 0 => return, // 连接关闭
Ok(n) => n,
Err(_) => return,
};
// 回显数据
if stream.write_all(&buffer[..n]).await.is_err() {
return;
}
}
}
#[tokio::main]
async fn main() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
println!("Server listening on 127.0.0.1:8080");
loop {
let (stream, addr) = listener.accept().await.unwrap();
println!("New client: {}", addr);
tokio::spawn(handle_client(stream));
}
}负载均衡
rust
▶ Runuse tokio::sync::Semaphore;
async fn load_balanced_server() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
let sem = Arc::new(Semaphore::new(100)); // 最多 100 个并发连接
loop {
let permit = sem.clone().acquire_owned().await.unwrap();
let (stream, addr) = listener.accept().await.unwrap();
println!("New client: {}", addr);
tokio::spawn(async move {
handle_client(stream).await;
// permit 自动释放
});
}
}实时聊天
WebSocket 服务器
rust
▶ Runuse tokio::net::TcpListener;
use tokio_tungstenite::{accept_async, tungstenite::Message};
use futures::StreamExt;
use tokio::sync::broadcast;
async fn websocket_server() {
let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
let (tx, _) = broadcast::channel::<String>(100);
println!("WebSocket server listening on 127.0.0.1:8080");
loop {
let (stream, addr) = listener.accept().await.unwrap();
let tx = tx.clone();
let mut rx = tx.subscribe();
tokio::spawn(async move {
let ws = accept_async(stream).await.unwrap();
let (mut write, mut read) = ws.split();
println!("New WebSocket client: {}", addr);
// 处理消息
while let Some(msg) = read.next().await {
match msg {
Ok(Message::Text(text)) => {
// 广播消息
tx.send(text.clone()).unwrap();
}
Ok(Message::Close(_)) => break,
Err(_) => break,
_ => {}
}
}
println!("Client {} disconnected", addr);
});
// 广播消息给所有客户端
tokio::spawn(async move {
while let Ok(msg) = rx.recv().await {
// 发送给客户端
// write.send(Message::Text(msg)).await.unwrap();
}
});
}
}性能优化案例
批量处理优化
rust
▶ Run// ❌ 低效:逐个处理
async fn slow_process(items: Vec<Item>) {
for item in items {
process(item).await;
}
}
// ✅ 高效:批量并发处理
async fn fast_process(items: Vec<Item>) {
use tokio::sync::Semaphore;
let sem = Arc::new(Semaphore::new(50)); // 最多 50 个并发
let tasks: Vec<_> = items
.into_iter()
.map(|item| {
let sem = sem.clone();
tokio::spawn(async move {
let permit = sem.acquire().await.unwrap();
let result = process(item).await;
// permit 自动释放
result
})
})
.collect();
let results: Vec<_> = futures::future::join_all(tasks)
.await
.into_iter()
.map(|r| r.unwrap())
.collect();
results
}连接池优化
rust
▶ Runuse tokio::sync::Mutex;
use std::collections::VecDeque;
struct ConnectionPool {
connections: Mutex<VecDeque<Connection>>,
max_size: usize,
}
impl ConnectionPool {
async fn get(&self) -> Connection {
let mut pool = self.connections.lock().await;
if let Some(conn) = pool.pop_front() {
conn
} else {
create_connection().await
}
}
async fn return_connection(&self, conn: Connection) {
let mut pool = self.connections.lock().await;
if pool.len() < self.max_size {
pool.push_back(conn);
} else {
// 关闭多余连接
close_connection(conn).await;
}
}
}小结
实战案例核心:
| 类别 | 关键技术 | 应用场景 |
|---|---|---|
| HTTP 客户端 | reqwest、并发请求 | API 调用、爬虫 |
| 文件处理 | tokio::fs、流式读写 | 数据处理、日志分析 |
| 数据库 | sqlx、连接池、事务 | 数据持久化 |
| Web 爬虫 | 并发、超时、信号量 | 数据采集 |
| TCP 服务器 | TcpListener、连接管理 | 网络服务 |
关键技能:
- 使用合适的异步库(reqwest、sqlx)
- 控制并发度(Semaphore)
- 处理超时和错误
- 实现优雅关闭
- 性能优化(批量、连接池)
恭喜完成异步编程学习! 你已经掌握了异步基础、Future、Tokio 运行时、异步模式和实战应用。
练习
练习 1:实现 HTTP 爬虫
编写一个爬取多个网站标题的爬虫:
rust
▶ Run// TODO: 使用 reqwest 获取页面
// TODO: 解析 HTML 提取标题
// TODO: 并发爬取,控制并发度
// TODO: 处理超时和错误练习 2:文件批处理系统
实现并发文件处理系统:
rust
▶ Run// TODO: 读取多个文件
// TODO: 并发处理文件内容
// TODO: 写入结果到输出文件
// TODO: 处理大文件的流式读写练习 3:简单数据库应用
实现用户管理系统:
rust
▶ Run// TODO: 使用 sqlx 连接数据库
// TODO: 实现 CRUD 操作
// TODO: 使用事务保证一致性
// TODO: 并发查询优化