Skip to content

实战案例

> 通过实际案例掌握异步编程的应用。

HTTP 客户端

简单 HTTP GET

rust
use reqwest;
use tokio;

async fn fetch_url(url: &str) -> Result<String, reqwest::Error> {
    let response = reqwest::get(url).await?;
    let body = response.text().await?;
    Ok(body)
}

#[tokio::main]
async fn main() {
    match fetch_url("https://example.com").await {
        Ok(body) => println!("Body length: {}", body.len()),
        Err(e) => println!("Error: {}", e),
    }
}
▶ Run

配置客户端

rust
use reqwest::Client;
use std::time::Duration;

async fn configured_client() -> Result<(), reqwest::Error> {
    let client = Client::builder()
        .timeout(Duration::from_secs(10))
        .connect_timeout(Duration::from_secs(5))
        .user_agent("MyApp/1.0")
        .build()?;
    
    let response = client
        .get("https://api.example.com/data")
        .header("Authorization", "Bearer token")
        .send()
        .await?;
    
    let data: serde_json::Value = response.json().await?;
    println!("Data: {:?}", data);
    
    Ok(())
}
▶ Run

并发请求

rust
use reqwest::Client;
use futures::future::join_all;

async fn concurrent_requests(urls: Vec<&str>) -> Vec<Result<String, reqwest::Error>> {
    let client = Client::new();
    
    let requests: Vec<_> = urls
        .into_iter()
        .map(|url| client.get(url).send())
        .collect();
    
    let responses = join_all(requests).await;
    
    responses
        .into_iter()
        .map(|resp| async {
            match resp {
                Ok(r) => r.text().await,
                Err(e) => Err(e),
            }
        })
        .collect()
}

#[tokio::main]
async fn main() {
    let urls = vec![
        "https://example.com",
        "https://example.org",
        "https://example.net",
    ];
    
    let results = concurrent_requests(urls).await;
    
    for (i, result) in results.into_iter().enumerate() {
        match result {
            Ok(body) => println!("URL {}: {} bytes", i, body.len()),
            Err(e) => println!("URL {} error: {}", i, e),
        }
    }
}
▶ Run

POST 请求

rust
use reqwest::Client;
use serde::{Serialize, Deserialize};

#[derive(Serialize)]
struct CreateUser {
    username: String,
    email: String,
}

#[derive(Deserialize)]
struct User {
    id: u64,
    username: String,
}

async fn create_user(client: &Client, user: CreateUser) -> Result<User, reqwest::Error> {
    let response = client
        .post("https://api.example.com/users")
        .json(&user)
        .send()
        .await?;
    
    let created_user: User = response.json().await?;
    Ok(created_user)
}
▶ Run

文件处理

异步文件读写

rust
use tokio::fs::{File, read_to_string, write};
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};

async fn read_file(path: &str) -> Result<String, std::io::Error> {
    read_to_string(path).await
}

async fn write_file(path: &str, content: &str) -> Result<(), std::io::Error> {
    write(path, content).await
}

async fn buffered_read(path: &str) -> Result<Vec<String>, std::io::Error> {
    let file = File::open(path).await?;
    let reader = BufReader::new(file);
    
    use tokio::io::AsyncBufReadExt;
    let mut lines = reader.lines();
    
    let mut result = Vec::new();
    while let Some(line) = lines.next_line().await? {
        result.push(line);
    }
    
    Ok(result)
}
▶ Run

并发文件处理

rust
use tokio::fs;
use std::path::Path;

async fn process_files(files: Vec<&Path>) -> Vec<Result<String, std::io::Error>> {
    let tasks: Vec<_> = files
        .into_iter()
        .map(|path| fs::read_to_string(path))
        .collect();
    
    futures::future::join_all(tasks).await
}

#[tokio::main]
async fn main() {
    let files = vec![
        Path::new("file1.txt"),
        Path::new("file2.txt"),
        Path::new("file3.txt"),
    ];
    
    let contents = process_files(files).await;
    
    for (i, content) in contents.into_iter().enumerate() {
        match content {
            Ok(text) => println!("File {}: {} bytes", i, text.len()),
            Err(e) => println!("File {} error: {}", i, e),
        }
    }
}
▶ Run

文件监控

rust
use tokio::fs;
use tokio::time::{sleep, Duration};

async fn watch_file(path: &str) {
    let mut last_size = 0;
    
    loop {
        match fs::metadata(path).await {
            Ok(metadata) => {
                let current_size = metadata.len();
                if current_size != last_size {
                    println!("File size changed: {} → {}", last_size, current_size);
                    last_size = current_size;
                    
                    // 读取新增内容
                    if let Ok(content) = fs::read_to_string(path).await {
                        println!("Content length: {}", content.len());
                    }
                }
            }
            Err(e) => println!("Error: {}", e),
        }
        
        sleep(Duration::from_secs(1)).await;
    }
}
▶ Run

大文件流式处理

rust
use tokio::fs::File;
use tokio::io::{AsyncReadExt, BufReader};

async fn process_large_file(path: &str) -> Result<usize, std::io::Error> {
    let file = File::open(path).await?;
    let mut reader = BufReader::new(file);
    
    let mut buffer = [0; 8192];  // 8KB buffer
    let mut total_bytes = 0;
    
    loop {
        let bytes_read = reader.read(&mut buffer).await?;
        if bytes_read == 0 {
            break;  // EOF
        }
        
        total_bytes += bytes_read;
        
        // 处理缓冲区数据
        process_chunk(&buffer[..bytes_read]);
    }
    
    Ok(total_bytes)
}

fn process_chunk(chunk: &[u8]) {
    // 模拟处理
    println!("Processed {} bytes", chunk.len());
}
▶ Run

数据库操作

使用 sqlx

toml
[dependencies]
sqlx = { version = "0.7", features = ["runtime-tokio", "postgres"] }
rust
use sqlx::postgres::PgPoolOptions;

#[derive(sqlx::FromRow)]
struct User {
    id: i32,
    username: String,
    email: String,
}

async fn database_example() -> Result<(), sqlx::Error> {
    // 创建连接池
    let pool = PgPoolOptions::new()
        .max_connections(5)
        .connect("postgres://user:pass@localhost/db")
        .await?;
    
    // 查询
    let users: Vec<User> = sqlx::query_as("SELECT id, username, email FROM users")
        .fetch_all(&pool)
        .await?;
    
    for user in users {
        println!("User: {} ({})", user.username, user.email);
    }
    
    // 插入
    sqlx::query("INSERT INTO users (username, email) VALUES ($1, $2)")
        .bind("alice")
        .bind("alice@example.com")
        .execute(&pool)
        .await?;
    
    Ok(())
}
▶ Run

并发查询

rust
use sqlx::PgPool;

async fn concurrent_queries(pool: PgPool) -> Result<Vec<User>, sqlx::Error> {
    let queries: Vec<_> = (1..=10)
        .map(|id| {
            let pool = pool.clone();
            async move {
                sqlx::query_as::<_, User>("SELECT * FROM users WHERE id = $1")
                    .bind(id)
                    .fetch_optional(&pool)
                    .await
            }
        })
        .collect();
    
    let results = futures::future::join_all(queries).await;
    
    let users: Vec<User> = results
        .into_iter()
        .filter_map(|r| r.ok())
        .flatten()
        .collect();
    
    Ok(users)
}
▶ Run

事务处理

rust
use sqlx::{PgPool, Postgres, Transaction};

async fn transfer_money(
    pool: &PgPool,
    from_user: i32,
    to_user: i32,
    amount: i64,
) -> Result<(), sqlx::Error> {
    // 开始事务
    let mut tx = pool.begin().await?;
    
    // 减少余额
    sqlx::query("UPDATE accounts SET balance = balance - $1 WHERE user_id = $2")
        .bind(amount)
        .bind(from_user)
        .execute(&mut *tx)
        .await?;
    
    // 增加余额
    sqlx::query("UPDATE accounts SET balance = balance + $1 WHERE user_id = $2")
        .bind(amount)
        .bind(to_user)
        .execute(&mut *tx)
        .await?;
    
    // 提交事务
    tx.commit().await?;
    
    Ok(())
}
▶ Run

Web 爬虫

简单爬虫

rust
use reqwest::Client;
use tokio::time::{timeout, Duration};
use std::collections::HashSet;

struct Crawler {
    client: Client,
    visited: HashSet<String>,
}

impl Crawler {
    async fn crawl(&mut self, url: &str, depth: usize) -> Result<Vec<String>, reqwest::Error> {
        if depth == 0 || self.visited.contains(url) {
            return Ok(vec![]);
        }
        
        self.visited.insert(url.to_string());
        
        println!("Crawling: {} (depth {})", url, depth);
        
        // 获取页面(带超时)
        let response = timeout(
            Duration::from_secs(5),
            self.client.get(url).send()
        ).await??;
        
        let html = response.text().await?;
        
        // 解析链接(简化版)
        let links = extract_links(&html);
        
        // 递归爬取
        let mut all_links = Vec::new();
        for link in links {
            if let Ok(found_links) = self.crawl(&link, depth - 1).await {
                all_links.extend(found_links);
            }
        }
        
        all_links.push(url.to_string());
        Ok(all_links)
    }
}

fn extract_links(html: &str) -> Vec<String> {
    // 简化的链接提取
    html.matches("href=\"")
        .filter_map(|s| {
            let start = s.find('"')?;
            let end = s[start..].find('"')?;
            Some(s[start..end].to_string())
        })
        .collect()
}
▶ Run

并发爬虫

rust
use tokio::sync::Semaphore;

async fn concurrent_crawl(urls: Vec<String>, max_concurrent: usize) {
    let sem = Arc::new(Semaphore::new(max_concurrent));
    let client = Arc::new(Client::new());
    
    let tasks: Vec<_> = urls
        .into_iter()
        .map(|url| {
            let sem = sem.clone();
            let client = client.clone();
            
            tokio::spawn(async move {
                let permit = sem.acquire().await.unwrap();
                
                println!("Fetching: {}", url);
                match timeout(Duration::from_secs(5), client.get(&url).send()).await {
                    Ok(Ok(resp)) => {
                        println!("Success: {} (status {})", url, resp.status());
                    }
                    Ok(Err(e)) => {
                        println!("Error: {} - {}", url, e);
                    }
                    Err(_) => {
                        println!("Timeout: {}", url);
                    }
                }
                
                // permit 自动释放
            })
        })
        .collect();
    
    for task in tasks {
        task.await.unwrap();
    }
}
▶ Run

TCP 服务器

Echo 服务器

rust
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

async fn handle_client(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    
    loop {
        let n = match stream.read(&mut buffer).await {
            Ok(n) if n == 0 => return,  // 连接关闭
            Ok(n) => n,
            Err(_) => return,
        };
        
        // 回显数据
        if stream.write_all(&buffer[..n]).await.is_err() {
            return;
        }
    }
}

#[tokio::main]
async fn main() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    println!("Server listening on 127.0.0.1:8080");
    
    loop {
        let (stream, addr) = listener.accept().await.unwrap();
        println!("New client: {}", addr);
        
        tokio::spawn(handle_client(stream));
    }
}
▶ Run

负载均衡

rust
use tokio::sync::Semaphore;

async fn load_balanced_server() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    let sem = Arc::new(Semaphore::new(100));  // 最多 100 个并发连接
    
    loop {
        let permit = sem.clone().acquire_owned().await.unwrap();
        
        let (stream, addr) = listener.accept().await.unwrap();
        println!("New client: {}", addr);
        
        tokio::spawn(async move {
            handle_client(stream).await;
            // permit 自动释放
        });
    }
}
▶ Run

实时聊天

WebSocket 服务器

rust
use tokio::net::TcpListener;
use tokio_tungstenite::{accept_async, tungstenite::Message};
use futures::StreamExt;
use tokio::sync::broadcast;

async fn websocket_server() {
    let listener = TcpListener::bind("127.0.0.1:8080").await.unwrap();
    let (tx, _) = broadcast::channel::<String>(100);
    
    println!("WebSocket server listening on 127.0.0.1:8080");
    
    loop {
        let (stream, addr) = listener.accept().await.unwrap();
        let tx = tx.clone();
        let mut rx = tx.subscribe();
        
        tokio::spawn(async move {
            let ws = accept_async(stream).await.unwrap();
            let (mut write, mut read) = ws.split();
            
            println!("New WebSocket client: {}", addr);
            
            // 处理消息
            while let Some(msg) = read.next().await {
                match msg {
                    Ok(Message::Text(text)) => {
                        // 广播消息
                        tx.send(text.clone()).unwrap();
                    }
                    Ok(Message::Close(_)) => break,
                    Err(_) => break,
                    _ => {}
                }
            }
            
            println!("Client {} disconnected", addr);
        });
        
        // 广播消息给所有客户端
        tokio::spawn(async move {
            while let Ok(msg) = rx.recv().await {
                // 发送给客户端
                // write.send(Message::Text(msg)).await.unwrap();
            }
        });
    }
}
▶ Run

性能优化案例

批量处理优化

rust
// ❌ 低效:逐个处理
async fn slow_process(items: Vec<Item>) {
    for item in items {
        process(item).await;
    }
}

// ✅ 高效:批量并发处理
async fn fast_process(items: Vec<Item>) {
    use tokio::sync::Semaphore;
    
    let sem = Arc::new(Semaphore::new(50));  // 最多 50 个并发
    let tasks: Vec<_> = items
        .into_iter()
        .map(|item| {
            let sem = sem.clone();
            tokio::spawn(async move {
                let permit = sem.acquire().await.unwrap();
                let result = process(item).await;
                // permit 自动释放
                result
            })
        })
        .collect();
    
    let results: Vec<_> = futures::future::join_all(tasks)
        .await
        .into_iter()
        .map(|r| r.unwrap())
        .collect();
    
    results
}
▶ Run

连接池优化

rust
use tokio::sync::Mutex;
use std::collections::VecDeque;

struct ConnectionPool {
    connections: Mutex<VecDeque<Connection>>,
    max_size: usize,
}

impl ConnectionPool {
    async fn get(&self) -> Connection {
        let mut pool = self.connections.lock().await;
        
        if let Some(conn) = pool.pop_front() {
            conn
        } else {
            create_connection().await
        }
    }
    
    async fn return_connection(&self, conn: Connection) {
        let mut pool = self.connections.lock().await;
        
        if pool.len() < self.max_size {
            pool.push_back(conn);
        } else {
            // 关闭多余连接
            close_connection(conn).await;
        }
    }
}
▶ Run

小结

实战案例核心:

类别关键技术应用场景
HTTP 客户端reqwest、并发请求API 调用、爬虫
文件处理tokio::fs、流式读写数据处理、日志分析
数据库sqlx、连接池、事务数据持久化
Web 爬虫并发、超时、信号量数据采集
TCP 服务器TcpListener、连接管理网络服务

关键技能:

  • 使用合适的异步库(reqwest、sqlx)
  • 控制并发度(Semaphore)
  • 处理超时和错误
  • 实现优雅关闭
  • 性能优化(批量、连接池)

恭喜完成异步编程学习! 你已经掌握了异步基础、Future、Tokio 运行时、异步模式和实战应用。

练习

练习 1:实现 HTTP 爬虫

编写一个爬取多个网站标题的爬虫:

rust
// TODO: 使用 reqwest 获取页面
// TODO: 解析 HTML 提取标题
// TODO: 并发爬取,控制并发度
// TODO: 处理超时和错误
▶ Run

练习 2:文件批处理系统

实现并发文件处理系统:

rust
// TODO: 读取多个文件
// TODO: 并发处理文件内容
// TODO: 写入结果到输出文件
// TODO: 处理大文件的流式读写
▶ Run

练习 3:简单数据库应用

实现用户管理系统:

rust
// TODO: 使用 sqlx 连接数据库
// TODO: 实现 CRUD 操作
// TODO: 使用事务保证一致性
// TODO: 并发查询优化
▶ Run