Skip to content

第六步:命令执行

src/command/executor.rs

rust
use crate::store::engine::Store;
use crate::command::protocol::Response;

/// 命令执行器
pub struct CommandExecutor {
    store: Store,
}

impl CommandExecutor {
    pub fn new(store: Store) -> Self {
        Self { store }
    }
    
    /// 执行命令
    pub fn execute(&self, command: Vec<String>) -> Response {
        if command.is_empty() {
            return Response::error("空命令");
        }
        
        let cmd = command[0].to_uppercase();
        let args = &command[1..];
        
        match cmd.as_str() {
            // === String 命令 ===
            "SET" => self.cmd_set(args),
            "GET" => self.cmd_get(args),
            "DEL" => self.cmd_del(args),
            "EXISTS" => self.cmd_exists(args),
            "KEYS" => self.cmd_keys(args),
            "FLUSH" => self.cmd_flush(args),
            
            // === List 命令 ===
            "LPUSH" => self.cmd_lpush(args),
            "RPUSH" => self.cmd_rpush(args),
            "LPOP" => self.cmd_lpop(args),
            "RPOP" => self.cmd_rpop(args),
            "LLEN" => self.cmd_llen(args),
            "LRANGE" => self.cmd_lrange(args),
            
            // === Hash 命令 ===
            "HSET" => self.cmd_hset(args),
            "HGET" => self.cmd_hget(args),
            "HGETALL" => self.cmd_hgetall(args),
            "HDEL" => self.cmd_hdel(args),
            
            // === 其他 ===
            "PING" => Response::bulk("PONG"),
            "INFO" => self.cmd_info(args),
            "QUIT" => Response::ok(),
            
            _ => Response::error(format!("未知命令: {}", cmd)),
        }
    }
    
    // === String 命令实现 ===
    
    fn cmd_set(&self, args: &[String]) -> Response {
        if args.len() < 2 {
            return Response::error("SET 需要 2 个参数");
        }
        
        let key = args[0].clone();
        let value = crate::store::types::Value::string(args[1].clone());
        
        // 检查是否有 EX 参数
        if args.len() >= 4 && args[2].to_uppercase() == "EX" {
            let seconds: u64 = args[3].parse().unwrap_or(0);
            if seconds > 0 {
                self.store.setex(key, value, seconds);
            } else {
                self.store.set(key, value);
            }
        } else {
            self.store.set(key, value);
        }
        
        Response::ok()
    }
    
    fn cmd_get(&self, args: &[String]) -> Response {
        if args.is_empty() {
            return Response::error("GET 需要 1 个参数");
        }
        
        match self.store.get(&args[0]) {
            Some(value) => {
                match value.as_string() {
                    Some(s) => Response::bulk(s),
                    None => Response::error("键类型不是字符串"),
                }
            }
            None => Response::nil(),
        }
    }
    
    fn cmd_del(&self, args: &[String]) -> Response {
        if args.is_empty() {
            return Response::error("DEL 需要 1 个参数");
        }
        
        let deleted = self.store.del(&args[0]);
        Response::integer(if deleted { 1 } else { 0 })
    }
    
    fn cmd_exists(&self, args: &[String]) -> Response {
        if args.is_empty() {
            return Response::error("EXISTS 需要 1 个参数");
        }
        
        let exists = self.store.exists(&args[0]);
        Response::integer(if exists { 1 } else { 0 })
    }
    
    fn cmd_keys(&self, _args: &[String]) -> Response {
        let keys = self.store.keys();
        Response::array(keys.into_iter().map(Response::bulk).collect())
    }
    
    fn cmd_flush(&self, _args: &[String]) -> Response {
        self.store.flush();
        Response::ok()
    }
    
    // === List 命令实现 ===
    
    fn cmd_lpush(&self, args: &[String]) -> Response {
        if args.len() < 2 {
            return Response::error("LPUSH 需要 2 个参数");
        }
        
        match self.store.lpush(args[0].clone(), args[1].clone()) {
            Ok(len) => Response::integer(len as i64),
            Err(e) => Response::error(e.to_string()),
        }
    }
    
    fn cmd_rpush(&self, args: &[String]) -> Response {
        if args.len() < 2 {
            return Response::error("RPUSH 需要 2 个参数");
        }
        
        match self.store.rpush(args[0].clone(), args[1].clone()) {
            Ok(len) => Response::integer(len as i64),
            Err(e) => Response::error(e.to_string()),
        }
    }
    
    fn cmd_lpop(&self, args: &[String]) -> Response {
        if args.is_empty() {
            return Response::error("LPOP 需要 1 个参数");
        }
        
        match self.store.lpop(&args[0]) {
            Some(s) => Response::bulk(s),
            None => Response::nil(),
        }
    }
    
    fn cmd_rpop(&self, args: &[String]) -> Response {
        if args.is_empty() {
            return Response::error("RPOP 需要 1 个参数");
        }
        
        match self.store.rpop(&args[0]) {
            Some(s) => Response::bulk(s),
            None => Response::nil(),
        }
    }
    
    fn cmd_llen(&self, args: &[String]) -> Response {
        if args.is_empty() {
            return Response::error("LLEN 需要 1 个参数");
        }
        
        let len = self.store.llen(&args[0]);
        Response::integer(len as i64)
    }
    
    fn cmd_lrange(&self, args: &[String]) -> Response {
        if args.len() < 3 {
            return Response::error("LRANGE 需要 3 个参数");
        }
        
        let start: usize = args[1].parse().unwrap_or(0);
        let end: usize = args[2].parse().unwrap_or(0);
        
        match self.store.lrange(&args[0], start, end + 1) {
            Some(list) => Response::array(list.into_iter().map(Response::bulk).collect()),
            None => Response::nil(),
        }
    }
    
    // === Hash 命令实现 ===
    
    fn cmd_hset(&self, args: &[String]) -> Response {
        if args.len() < 3 {
            return Response::error("HSET 需要 3 个参数");
        }
        
        match self.store.hset(args[0].clone(), args[1].clone(), args[2].clone()) {
            Ok(is_new) => Response::integer(if is_new { 1 } else { 0 }),
            Err(e) => Response::error(e.to_string()),
        }
    }
    
    fn cmd_hget(&self, args: &[String]) -> Response {
        if args.len() < 2 {
            return Response::error("HGET 需要 2 个参数");
        }
        
        match self.store.hget(&args[0], &args[1]) {
            Some(s) => Response::bulk(s),
            None => Response::nil(),
        }
    }
    
    fn cmd_hgetall(&self, args: &[String]) -> Response {
        if args.is_empty() {
            return Response::error("HGETALL 需要 1 个参数");
        }
        
        match self.store.hgetall(&args[0]) {
            Some(hash) => {
                let items: Vec<Response> = hash
                    .into_iter()
                    .flat_map(|(k, v)| [Response::bulk(k), Response::bulk(v)])
                    .collect();
                Response::array(items)
            }
            None => Response::nil(),
        }
    }
    
    fn cmd_hdel(&self, args: &[String]) -> Response {
        if args.len() < 2 {
            return Response::error("HDEL 需要 2 个参数");
        }
        
        let deleted = self.store.hdel(&args[0], &args[1]);
        Response::integer(if deleted { 1 } else { 0 })
    }
    
    // === 其他命令 ===
    
    fn cmd_info(&self, _args: &[String]) -> Response {
        Response::bulk(format!(
            "kv-store v0.1.0\nkeys: {}\n",
            self.store.size()
        ))
    }
}
▶ Run

第七步:TCP 服务器

src/server.rs

rust
use tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::sync::Arc;
use crate::store::engine::Store;
use crate::command::{parser::CommandParser, executor::CommandExecutor, protocol::Response};

/// 启动服务器
pub async fn run(addr: &str) -> anyhow::Result<()> {
    let store = Arc::new(Store::new());
    let executor = Arc::new(CommandExecutor::new((*store).clone()));
    
    let listener = TcpListener::bind(addr).await?;
    tracing::info!("服务器启动: {}", addr);
    
    loop {
        let (socket, addr) = listener.accept().await?;
        tracing::debug!("新连接: {}", addr);
        
        let executor = executor.clone();
        tokio::spawn(async move {
            handle_connection(socket, executor).await;
        });
    }
}

/// 处理连接
async fn handle_connection(
    mut socket: TcpStream,
    executor: Arc<CommandExecutor>,
) {
    let mut buffer = vec![0u8; 1024];
    
    loop {
        let n = match socket.read(&mut buffer).await {
            Ok(0) => break,  // 连接关闭
            Ok(n) => n,
            Err(_) => break,
        };
        
        // 解析命令
        let command = CommandParser::parse_resp(&buffer[..n]);
        
        if let Some(cmd) = command {
            // 执行命令
            let response = executor.execute(cmd);
            
            // 发送响应
            let response_str = response.to_string();
            if socket.write_all(response_str.as_bytes()).await.is_err() {
                break;
            }
        }
    }
}
▶ Run

第八步:主入口

src/main.rs

rust
mod store;
mod command;
mod server;

use store::engine::Store;
use command::executor::CommandExecutor;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    tracing_subscriber::fmt::init();
    
    server::run("127.0.0.1:6379").await?;
    
    Ok(())
}
▶ Run

测试客户端

rust
// 可以用 telnet 或 redis-cli 测试
// telnet 127.0.0.1 6379

// SET key value
// GET key
// DEL key
// LPUSH list item
// LPOP list
// HSET hash field value
// HGET hash field
▶ Run

小结

本项目涵盖:

  • ✅ 并发数据结构 (DashMap)
  • ✅ 多种数据类型支持
  • ✅ 过期时间管理
  • ✅ 命令协议解析
  • ✅ TCP 服务器
  • ✅ 异步处理