第六步:命令执行
src/command/executor.rs
rust
▶ Runuse crate::store::engine::Store;
use crate::command::protocol::Response;
/// 命令执行器
pub struct CommandExecutor {
store: Store,
}
impl CommandExecutor {
pub fn new(store: Store) -> Self {
Self { store }
}
/// 执行命令
pub fn execute(&self, command: Vec<String>) -> Response {
if command.is_empty() {
return Response::error("空命令");
}
let cmd = command[0].to_uppercase();
let args = &command[1..];
match cmd.as_str() {
// === String 命令 ===
"SET" => self.cmd_set(args),
"GET" => self.cmd_get(args),
"DEL" => self.cmd_del(args),
"EXISTS" => self.cmd_exists(args),
"KEYS" => self.cmd_keys(args),
"FLUSH" => self.cmd_flush(args),
// === List 命令 ===
"LPUSH" => self.cmd_lpush(args),
"RPUSH" => self.cmd_rpush(args),
"LPOP" => self.cmd_lpop(args),
"RPOP" => self.cmd_rpop(args),
"LLEN" => self.cmd_llen(args),
"LRANGE" => self.cmd_lrange(args),
// === Hash 命令 ===
"HSET" => self.cmd_hset(args),
"HGET" => self.cmd_hget(args),
"HGETALL" => self.cmd_hgetall(args),
"HDEL" => self.cmd_hdel(args),
// === 其他 ===
"PING" => Response::bulk("PONG"),
"INFO" => self.cmd_info(args),
"QUIT" => Response::ok(),
_ => Response::error(format!("未知命令: {}", cmd)),
}
}
// === String 命令实现 ===
fn cmd_set(&self, args: &[String]) -> Response {
if args.len() < 2 {
return Response::error("SET 需要 2 个参数");
}
let key = args[0].clone();
let value = crate::store::types::Value::string(args[1].clone());
// 检查是否有 EX 参数
if args.len() >= 4 && args[2].to_uppercase() == "EX" {
let seconds: u64 = args[3].parse().unwrap_or(0);
if seconds > 0 {
self.store.setex(key, value, seconds);
} else {
self.store.set(key, value);
}
} else {
self.store.set(key, value);
}
Response::ok()
}
fn cmd_get(&self, args: &[String]) -> Response {
if args.is_empty() {
return Response::error("GET 需要 1 个参数");
}
match self.store.get(&args[0]) {
Some(value) => {
match value.as_string() {
Some(s) => Response::bulk(s),
None => Response::error("键类型不是字符串"),
}
}
None => Response::nil(),
}
}
fn cmd_del(&self, args: &[String]) -> Response {
if args.is_empty() {
return Response::error("DEL 需要 1 个参数");
}
let deleted = self.store.del(&args[0]);
Response::integer(if deleted { 1 } else { 0 })
}
fn cmd_exists(&self, args: &[String]) -> Response {
if args.is_empty() {
return Response::error("EXISTS 需要 1 个参数");
}
let exists = self.store.exists(&args[0]);
Response::integer(if exists { 1 } else { 0 })
}
fn cmd_keys(&self, _args: &[String]) -> Response {
let keys = self.store.keys();
Response::array(keys.into_iter().map(Response::bulk).collect())
}
fn cmd_flush(&self, _args: &[String]) -> Response {
self.store.flush();
Response::ok()
}
// === List 命令实现 ===
fn cmd_lpush(&self, args: &[String]) -> Response {
if args.len() < 2 {
return Response::error("LPUSH 需要 2 个参数");
}
match self.store.lpush(args[0].clone(), args[1].clone()) {
Ok(len) => Response::integer(len as i64),
Err(e) => Response::error(e.to_string()),
}
}
fn cmd_rpush(&self, args: &[String]) -> Response {
if args.len() < 2 {
return Response::error("RPUSH 需要 2 个参数");
}
match self.store.rpush(args[0].clone(), args[1].clone()) {
Ok(len) => Response::integer(len as i64),
Err(e) => Response::error(e.to_string()),
}
}
fn cmd_lpop(&self, args: &[String]) -> Response {
if args.is_empty() {
return Response::error("LPOP 需要 1 个参数");
}
match self.store.lpop(&args[0]) {
Some(s) => Response::bulk(s),
None => Response::nil(),
}
}
fn cmd_rpop(&self, args: &[String]) -> Response {
if args.is_empty() {
return Response::error("RPOP 需要 1 个参数");
}
match self.store.rpop(&args[0]) {
Some(s) => Response::bulk(s),
None => Response::nil(),
}
}
fn cmd_llen(&self, args: &[String]) -> Response {
if args.is_empty() {
return Response::error("LLEN 需要 1 个参数");
}
let len = self.store.llen(&args[0]);
Response::integer(len as i64)
}
fn cmd_lrange(&self, args: &[String]) -> Response {
if args.len() < 3 {
return Response::error("LRANGE 需要 3 个参数");
}
let start: usize = args[1].parse().unwrap_or(0);
let end: usize = args[2].parse().unwrap_or(0);
match self.store.lrange(&args[0], start, end + 1) {
Some(list) => Response::array(list.into_iter().map(Response::bulk).collect()),
None => Response::nil(),
}
}
// === Hash 命令实现 ===
fn cmd_hset(&self, args: &[String]) -> Response {
if args.len() < 3 {
return Response::error("HSET 需要 3 个参数");
}
match self.store.hset(args[0].clone(), args[1].clone(), args[2].clone()) {
Ok(is_new) => Response::integer(if is_new { 1 } else { 0 }),
Err(e) => Response::error(e.to_string()),
}
}
fn cmd_hget(&self, args: &[String]) -> Response {
if args.len() < 2 {
return Response::error("HGET 需要 2 个参数");
}
match self.store.hget(&args[0], &args[1]) {
Some(s) => Response::bulk(s),
None => Response::nil(),
}
}
fn cmd_hgetall(&self, args: &[String]) -> Response {
if args.is_empty() {
return Response::error("HGETALL 需要 1 个参数");
}
match self.store.hgetall(&args[0]) {
Some(hash) => {
let items: Vec<Response> = hash
.into_iter()
.flat_map(|(k, v)| [Response::bulk(k), Response::bulk(v)])
.collect();
Response::array(items)
}
None => Response::nil(),
}
}
fn cmd_hdel(&self, args: &[String]) -> Response {
if args.len() < 2 {
return Response::error("HDEL 需要 2 个参数");
}
let deleted = self.store.hdel(&args[0], &args[1]);
Response::integer(if deleted { 1 } else { 0 })
}
// === 其他命令 ===
fn cmd_info(&self, _args: &[String]) -> Response {
Response::bulk(format!(
"kv-store v0.1.0\nkeys: {}\n",
self.store.size()
))
}
}第七步:TCP 服务器
src/server.rs
rust
▶ Runuse tokio::net::{TcpListener, TcpStream};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use std::sync::Arc;
use crate::store::engine::Store;
use crate::command::{parser::CommandParser, executor::CommandExecutor, protocol::Response};
/// 启动服务器
pub async fn run(addr: &str) -> anyhow::Result<()> {
let store = Arc::new(Store::new());
let executor = Arc::new(CommandExecutor::new((*store).clone()));
let listener = TcpListener::bind(addr).await?;
tracing::info!("服务器启动: {}", addr);
loop {
let (socket, addr) = listener.accept().await?;
tracing::debug!("新连接: {}", addr);
let executor = executor.clone();
tokio::spawn(async move {
handle_connection(socket, executor).await;
});
}
}
/// 处理连接
async fn handle_connection(
mut socket: TcpStream,
executor: Arc<CommandExecutor>,
) {
let mut buffer = vec![0u8; 1024];
loop {
let n = match socket.read(&mut buffer).await {
Ok(0) => break, // 连接关闭
Ok(n) => n,
Err(_) => break,
};
// 解析命令
let command = CommandParser::parse_resp(&buffer[..n]);
if let Some(cmd) = command {
// 执行命令
let response = executor.execute(cmd);
// 发送响应
let response_str = response.to_string();
if socket.write_all(response_str.as_bytes()).await.is_err() {
break;
}
}
}
}第八步:主入口
src/main.rs
rust
▶ Runmod store;
mod command;
mod server;
use store::engine::Store;
use command::executor::CommandExecutor;
#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
server::run("127.0.0.1:6379").await?;
Ok(())
}测试客户端
rust
▶ Run// 可以用 telnet 或 redis-cli 测试
// telnet 127.0.0.1 6379
// SET key value
// GET key
// DEL key
// LPUSH list item
// LPOP list
// HSET hash field value
// HGET hash field小结
本项目涵盖:
- ✅ 并发数据结构 (DashMap)
- ✅ 多种数据类型支持
- ✅ 过期时间管理
- ✅ 命令协议解析
- ✅ TCP 服务器
- ✅ 异步处理