From 9b30146b4e693e428397ccc5269a21f11bd71ed0 Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Sun, 7 Apr 2024 19:01:27 +0300 Subject: [PATCH 01/14] wip --- Cargo.lock | 20 ++++++++- Cargo.toml | 3 +- memcached/Cargo.toml | 12 +++++ memcached/src/commands/get.rs | 13 ++++++ memcached/src/commands/mod.rs | 71 ++++++++++++++++++++++++++++++ memcached/src/commands/set.rs | 83 +++++++++++++++++++++++++++++++++++ memcached/src/db.rs | 0 memcached/src/main.rs | 54 +++++++++++++++++++++++ 8 files changed, 253 insertions(+), 3 deletions(-) create mode 100644 memcached/Cargo.toml create mode 100644 memcached/src/commands/get.rs create mode 100644 memcached/src/commands/mod.rs create mode 100644 memcached/src/commands/set.rs create mode 100644 memcached/src/db.rs create mode 100644 memcached/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index 1a1bce1..04fd767 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -331,9 +331,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" +checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" [[package]] name = "async-trait" @@ -1514,6 +1514,16 @@ dependencies = [ "digest", ] +[[package]] +name = "memcached" +version = "0.1.0" +dependencies = [ + "anyhow", + "crossbeam", + "itertools", + "multipeek", +] + [[package]] name = "memchr" version = "2.7.1" @@ -1553,6 +1563,12 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "multipeek" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d6b1cf1c2ae7c8c3898cbf8354ee836bc7037e35592d3739a9901d53c97b6a2" + [[package]] name = "native-tls" version = "0.2.11" diff --git a/Cargo.toml b/Cargo.toml index 22bcff9..ee97de6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,8 @@ members = [ "http-load-tester", "huffman-compression", "json-parser", - "load-balancer", + "load-balancer", + "memcached", "redis-server", "shell", "sort-tool", diff --git a/memcached/Cargo.toml b/memcached/Cargo.toml new file mode 100644 index 0000000..f5c873b --- /dev/null +++ b/memcached/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "memcached" +version = "0.1.0" +edition = "2021" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +anyhow = "1.0.81" +crossbeam = "0.8" +itertools = "0.12.1" +multipeek = "0.1.2" diff --git a/memcached/src/commands/get.rs b/memcached/src/commands/get.rs new file mode 100644 index 0000000..692fab8 --- /dev/null +++ b/memcached/src/commands/get.rs @@ -0,0 +1,13 @@ +use super::Parser; +use anyhow::anyhow; + +pub struct GetCommand { + key: String, +} + +impl GetCommand { + pub fn parse(mut parser: Parser) -> anyhow::Result { + let key = parser.next_string().ok_or(anyhow!("Expected a key"))?; + Ok(Self { key }) + } +} diff --git a/memcached/src/commands/mod.rs b/memcached/src/commands/mod.rs new file mode 100644 index 0000000..5b12e80 --- /dev/null +++ b/memcached/src/commands/mod.rs @@ -0,0 +1,71 @@ +use itertools::Itertools; +use multipeek::multipeek; +mod set; +pub mod get; + +pub struct Parser { + content: multipeek::MultiPeek>, +} + +impl Parser { + pub fn new(content: Vec) -> Self { + let content = content + .into_iter() + .filter(|b| !matches!(*b, b'\n' | b'\r')) + .collect_vec(); + Self { + content: multipeek(content), + } + } + + pub fn next_string(&mut self) -> Option { + let (content, skipped) = self.peek_next_bytes()?; + for _ in 0..skipped { + self.content.next(); + } + String::from_utf8(content).ok() + } + + pub fn peek_next_string(&mut self) -> Option { + let (content, _) = self.peek_next_bytes()?; + String::from_utf8(content).ok() + } + + pub fn next_bytes(&mut self) -> Option> { + let (content, counter) = self.peek_next_bytes()?; + for _ in 0..counter { + self.content.next(); + } + Some(content) + } + + pub fn peek_next_bytes(&mut self) -> Option<(Vec, usize)> { + let mut skip_white_space_counter = 0; + + loop { + let byte = self.content.by_ref().peek_nth(skip_white_space_counter)?; + if *byte != b' ' { + break; + } + skip_white_space_counter += 1; + } + let mut counter = skip_white_space_counter; + let mut buf = Vec::new(); + + loop { + let byte = self.content.peek_nth(counter); + match byte { + None => break, + Some(byte) => { + if *byte == b' ' { + break; + } + buf.push(*byte); + counter += 1; + } + } + } + + Some((buf, counter)) + } +} diff --git a/memcached/src/commands/set.rs b/memcached/src/commands/set.rs new file mode 100644 index 0000000..319a0c9 --- /dev/null +++ b/memcached/src/commands/set.rs @@ -0,0 +1,83 @@ +use anyhow::{anyhow, Context}; + +use super::Parser; + +#[derive(Debug)] +pub struct SetCommand { + key: String, + flags: u32, + exptime: i64, + bytes: usize, + noreply: bool, + content: Vec, +} + +impl SetCommand { + pub fn parse(mut parser: Parser) -> anyhow::Result { + let key = parser.next_string().ok_or(anyhow!("Expected a key"))?; + + let flags = parser + .next_string() + .ok_or(anyhow!("Expected a flag"))? + .parse() + .context("Failed to parse flags")?; + + let exptime = parser + .next_string() + .ok_or(anyhow!("Expected expiry time"))? + .parse() + .context("Failed to parse exptime")?; + + let bytes = parser + .next_string() + .ok_or(anyhow!("Expected bytes count"))? + .parse() + .context("Failed to parse number of bytes")?; + + let maybe_noreply = parser + .peek_next_string() + .ok_or(anyhow!("Expected to get noreply or bytes"))?; + + let noreply = if maybe_noreply == "noreply" { + let _ = parser.next_string(); + true + } else { + false + }; + + let content = parser.next_bytes().ok_or(anyhow!("Expected bytes"))?; + + Ok(Self { + key, + flags, + exptime, + bytes, + noreply, + content, + }) + } +} + +#[cfg(test)] +mod tests { + use crate::commands::Parser; + + use super::SetCommand; + + #[test] + fn test_set_command() { + let content = "set test 0 0 4\r\n + 1234\r\n" + .as_bytes() + .to_vec(); + let mut parser = Parser::new(content); + let _command = parser.next_string().unwrap(); + let set_command = SetCommand::parse(parser).unwrap(); + assert_eq!(set_command.key, "test"); + assert_eq!(set_command.flags, 0); + assert_eq!(set_command.exptime, 0); + assert_eq!(set_command.bytes, 4); + assert!(!set_command.noreply); + assert_eq!(set_command.content, b"1234".to_vec()); + } +} diff --git a/memcached/src/db.rs b/memcached/src/db.rs new file mode 100644 index 0000000..e69de29 diff --git a/memcached/src/main.rs b/memcached/src/main.rs new file mode 100644 index 0000000..958db39 --- /dev/null +++ b/memcached/src/main.rs @@ -0,0 +1,54 @@ +pub mod commands; +pub mod db; + +use crossbeam::channel::Receiver; +use itertools::Itertools; +use std::{ + env, + net::{TcpListener, TcpStream}, + num::NonZeroUsize, +}; + +fn main() -> std::io::Result<()> { + let port = env::var("PORT").unwrap_or("11211".to_string()); + let address = format!("127.0.0.1:{port}"); + let listener = TcpListener::bind(address)?; + let (tx, rx) = crossbeam::channel::unbounded::(); + + let thread_count = std::thread::available_parallelism() + .unwrap_or(NonZeroUsize::MIN) + .get(); + let threads = (0..thread_count) + .map(|_| { + let rx = rx.clone(); + std::thread::spawn(move || { + handle_streams(rx); + }) + }) + .collect_vec(); + for stream in listener.incoming() { + tx.send(stream?) + .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; + } + drop(tx); + for thread in threads { + if let Err(err) = thread.join() { + eprintln!("Error joining thread: {:?}", err) + } + } + Ok(()) +} + +fn handle_streams(receiver: Receiver) { + for stream in receiver { + handle_stream(stream); + } +} + +fn handle_stream(mut stream: TcpStream) { + use std::io::Read; + use std::io::Write; + let mut buf = Vec::with_capacity(1024); + let _ = stream.read_to_end(&mut buf); + let _ = stream.write(&buf); +} From d76740ecf54f6a48086e22b2c469abf8e30cfd90 Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Wed, 10 Apr 2024 07:20:55 +0300 Subject: [PATCH 02/14] setup db and connection fixes --- Cargo.lock | 5 +-- memcached/Cargo.toml | 1 + memcached/src/commands/get.rs | 14 ++++++++ memcached/src/commands/mod.rs | 28 +++++++++++++-- memcached/src/commands/set.rs | 46 ++++++++++++++++++++++--- memcached/src/db.rs | 54 +++++++++++++++++++++++++++++ memcached/src/main.rs | 64 +++++++++++++++++++++++++++++++---- memcached/src/response.rs | 50 +++++++++++++++++++++++++++ 8 files changed, 246 insertions(+), 16 deletions(-) create mode 100644 memcached/src/response.rs diff --git a/Cargo.lock b/Cargo.lock index 04fd767..293068c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -475,9 +475,9 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" [[package]] name = "bytes" -version = "1.5.0" +version = "1.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2bd12c1caf447e69cd4528f47f94d203fd2582878ecb9e9465484c4148a8223" +checksum = "514de17de45fdb8dc022b1a7975556c53c86f9f0aa5f534b98977b171857c2c9" dependencies = [ "serde", ] @@ -1519,6 +1519,7 @@ name = "memcached" version = "0.1.0" dependencies = [ "anyhow", + "bytes", "crossbeam", "itertools", "multipeek", diff --git a/memcached/Cargo.toml b/memcached/Cargo.toml index f5c873b..0e54bfe 100644 --- a/memcached/Cargo.toml +++ b/memcached/Cargo.toml @@ -7,6 +7,7 @@ edition = "2021" [dependencies] anyhow = "1.0.81" +bytes = "1.6.0" crossbeam = "0.8" itertools = "0.12.1" multipeek = "0.1.2" diff --git a/memcached/src/commands/get.rs b/memcached/src/commands/get.rs index 692fab8..3128dc1 100644 --- a/memcached/src/commands/get.rs +++ b/memcached/src/commands/get.rs @@ -1,3 +1,5 @@ +use crate::{db::Db, response::Response}; + use super::Parser; use anyhow::anyhow; @@ -10,4 +12,16 @@ impl GetCommand { let key = parser.next_string().ok_or(anyhow!("Expected a key"))?; Ok(Self { key }) } + + pub fn execute(self, db: &Db) -> Response { + db.with_data(|data| { + let content = data.get(&self.key); + match content { + Some(content) if !content.is_expired() => { + Response::Value((content, self.key).into()) + } + _ => Response::End, + } + }) + } } diff --git a/memcached/src/commands/mod.rs b/memcached/src/commands/mod.rs index 5b12e80..c5401f1 100644 --- a/memcached/src/commands/mod.rs +++ b/memcached/src/commands/mod.rs @@ -1,20 +1,28 @@ +use anyhow::{anyhow, Context}; use itertools::Itertools; use multipeek::multipeek; -mod set; + +use crate::{db::Db, response::Response}; pub mod get; +mod set; pub struct Parser { content: multipeek::MultiPeek>, + /// useful for debugging purposes + full_command: String, } impl Parser { - pub fn new(content: Vec) -> Self { + pub fn new(content: &[u8]) -> Self { let content = content - .into_iter() + .iter() + .copied() .filter(|b| !matches!(*b, b'\n' | b'\r')) .collect_vec(); + let full_command = String::from_utf8_lossy(&content).to_string(); Self { content: multipeek(content), + full_command, } } @@ -69,3 +77,17 @@ impl Parser { Some((buf, counter)) } } + +pub fn execute_command(data: &[u8], db: &Db) -> anyhow::Result { + let mut parser = Parser::new(data); + let full_command = parser.full_command.clone(); + println!("Executing command: {}", full_command); + let command = parser.next_string().ok_or(anyhow!("Expected a command"))?; + match command.as_str() { + "get" => Ok(get::GetCommand::parse(parser)?.execute(db)), + "set" => Ok(set::SetCommand::parse(parser) + .with_context(|| format!("Failed to parse set command: {}", full_command))? + .execute(db)), + cmd => Err(anyhow!("Unknown command {cmd}")), + } +} diff --git a/memcached/src/commands/set.rs b/memcached/src/commands/set.rs index 319a0c9..ff3b2d6 100644 --- a/memcached/src/commands/set.rs +++ b/memcached/src/commands/set.rs @@ -1,5 +1,10 @@ use anyhow::{anyhow, Context}; +use crate::{ + db::{Content, Db}, + response::Response, +}; + use super::Parser; #[derive(Debug)] @@ -56,6 +61,39 @@ impl SetCommand { content, }) } + + pub fn execute(self, db: &Db) -> Response { + use std::cmp::Ordering; + + let exp_duration = match self.exptime.cmp(&0) { + Ordering::Equal => None, + // expires immediately + Ordering::Less => Some(std::time::Duration::from_secs(0)), + Ordering::Greater => { + let exptime = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + + std::time::Duration::from_secs(self.exptime as u64); + Some(exptime) + } + }; + db.with_data_mut(|data| { + data.insert( + self.key, + Content { + data: self.content, + byte_count: self.bytes, + flags: self.flags, + exp_duration, + }, + ); + if self.noreply { + Response::NoReply + } else { + Response::Stored + } + }) + } } #[cfg(test)] @@ -66,10 +104,10 @@ mod tests { #[test] fn test_set_command() { - let content = "set test 0 0 4\r\n - 1234\r\n" - .as_bytes() - .to_vec(); + let content = "set test 0 0 4 + 1234 + " + .as_bytes(); let mut parser = Parser::new(content); let _command = parser.next_string().unwrap(); let set_command = SetCommand::parse(parser).unwrap(); diff --git a/memcached/src/db.rs b/memcached/src/db.rs index e69de29..bb31222 100644 --- a/memcached/src/db.rs +++ b/memcached/src/db.rs @@ -0,0 +1,54 @@ +use std::{collections::HashMap, sync::RwLock, time::Duration}; + +pub struct Db { + data: RwLock>, +} + +impl Db { + pub fn new() -> Self { + Self { + data: RwLock::new(HashMap::new()), + } + } + + pub fn with_data_mut(&self, f: F) -> T + where + F: FnOnce(&mut HashMap) -> T, + { + f(&mut self.data.write().unwrap()) + } + + pub fn with_data(&self, f: F) -> T + where + F: FnOnce(&HashMap) -> T, + { + f(&self.data.read().unwrap()) + } +} + +impl Default for Db { + fn default() -> Self { + Self::new() + } +} + +pub struct Content { + pub data: Vec, + pub byte_count: usize, + pub flags: u32, + pub exp_duration: Option, +} + +impl Content { + pub fn is_expired(&self) -> bool { + if let Some(exp_duration) = self.exp_duration { + exp_duration.as_secs() + < std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() + } else { + false + } + } +} diff --git a/memcached/src/main.rs b/memcached/src/main.rs index 958db39..93947ee 100644 --- a/memcached/src/main.rs +++ b/memcached/src/main.rs @@ -1,12 +1,17 @@ pub mod commands; pub mod db; +pub mod response; +use anyhow::anyhow; +use bytes::{BufMut, BytesMut}; use crossbeam::channel::Receiver; +use db::Db; use itertools::Itertools; use std::{ env, net::{TcpListener, TcpStream}, num::NonZeroUsize, + sync::Arc, }; fn main() -> std::io::Result<()> { @@ -18,11 +23,14 @@ fn main() -> std::io::Result<()> { let thread_count = std::thread::available_parallelism() .unwrap_or(NonZeroUsize::MIN) .get(); + + let db = Arc::new(Db::new()); let threads = (0..thread_count) .map(|_| { let rx = rx.clone(); + let db = db.clone(); std::thread::spawn(move || { - handle_streams(rx); + handle_streams(rx, db); }) }) .collect_vec(); @@ -36,19 +44,61 @@ fn main() -> std::io::Result<()> { eprintln!("Error joining thread: {:?}", err) } } + println!("Closing server"); Ok(()) } -fn handle_streams(receiver: Receiver) { +fn handle_streams(receiver: Receiver, db: Arc) { for stream in receiver { - handle_stream(stream); + handle_stream(stream, &db); } } -fn handle_stream(mut stream: TcpStream) { +fn handle_stream(mut stream: TcpStream, db: &Db) { use std::io::Read; use std::io::Write; - let mut buf = Vec::with_capacity(1024); - let _ = stream.read_to_end(&mut buf); - let _ = stream.write(&buf); + let mut buffer = BytesMut::with_capacity(1024); + + loop { + let response = loop { + let mut buf = [0; 1024]; + let bytes_read = stream.read(&mut buf); + let bytes_read = match bytes_read { + Ok(bytes_read) => bytes_read, + Err(err) => { + if err.kind() == std::io::ErrorKind::BrokenPipe { + break Ok(None); + } + eprintln!("Failed to read from stream: {:?}", err); + break Err(anyhow!(err).context("Failed to read from stream")); + } + }; + let buf = &buf[..bytes_read]; + buffer.extend_from_slice(buf); + buffer.put_u8(b' '); + let response = commands::execute_command(&buffer, db).map(Some); + if response.is_ok() { + buffer.clear(); + break response; + } + if bytes_read == 0 { + break response; + } + }; + let response = response.unwrap_or_else(|err| { + eprintln!("Failed to execute command: {:?}", err); + Some(response::Response::Error(format!("{err}"))) + }); + let response = match response { + Some(response) => response, + None => break, + }; + let response = response.into_bytes(); + if let Err(err) = stream.write(&response) { + eprintln!("Failed to write to stream: {:?}", err); + } + if let Err(err) = stream.flush() { + eprintln!("Failed to flush stream: {:?}", err); + } + } } diff --git a/memcached/src/response.rs b/memcached/src/response.rs new file mode 100644 index 0000000..51d3d9b --- /dev/null +++ b/memcached/src/response.rs @@ -0,0 +1,50 @@ +use crate::db::Content; + +#[derive(Debug)] +pub struct ValueResponse { + pub key: String, + pub flags: u32, + pub data: Vec, + byte_count: usize, +} +#[derive(Debug)] +pub enum Response { + Stored, + NoReply, + End, + Value(ValueResponse), + Error(String), +} + +impl From<(&Content, String)> for ValueResponse { + fn from((content, key): (&Content, String)) -> Self { + Self { + key, + flags: content.flags, + data: content.data.clone(), + byte_count: content.byte_count, + } + } +} + +impl Response { + pub fn into_bytes(self) -> Vec { + match self { + Response::Stored => b"STORED\r\n".to_vec(), + Response::NoReply => Vec::new(), + Response::End => b"END\r\n".to_vec(), + Response::Value(value) => { + let mut bytes = Vec::new(); + let response = format!( + "VALUE {} {} {}\r\n", + value.key, value.flags, value.byte_count, + ); + bytes.extend_from_slice(response.as_bytes()); + bytes.extend_from_slice(&value.data); + bytes.extend_from_slice(b"\r\n"); + bytes + } + Response::Error(message) => format!("ERROR {}\r\n", message).into_bytes(), + } + } +} From b346f283849a9b6dd024ff8b79b372a7f501ffa1 Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Wed, 10 Apr 2024 10:56:55 +0300 Subject: [PATCH 03/14] gracefully handle connection closing --- Cargo.lock | 1 + memcached/Cargo.toml | 1 + memcached/src/main.rs | 106 +++----------------------- memcached/src/response.rs | 1 + memcached/src/server.rs | 151 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 166 insertions(+), 94 deletions(-) create mode 100644 memcached/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index 293068c..9e054c0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1523,6 +1523,7 @@ dependencies = [ "crossbeam", "itertools", "multipeek", + "tokio", ] [[package]] diff --git a/memcached/Cargo.toml b/memcached/Cargo.toml index 0e54bfe..b42c177 100644 --- a/memcached/Cargo.toml +++ b/memcached/Cargo.toml @@ -11,3 +11,4 @@ bytes = "1.6.0" crossbeam = "0.8" itertools = "0.12.1" multipeek = "0.1.2" +tokio = { version = "1.37.0", features = ["full"] } diff --git a/memcached/src/main.rs b/memcached/src/main.rs index 93947ee..625d6b3 100644 --- a/memcached/src/main.rs +++ b/memcached/src/main.rs @@ -1,104 +1,22 @@ pub mod commands; pub mod db; pub mod response; +pub mod server; -use anyhow::anyhow; -use bytes::{BufMut, BytesMut}; -use crossbeam::channel::Receiver; -use db::Db; -use itertools::Itertools; -use std::{ - env, - net::{TcpListener, TcpStream}, - num::NonZeroUsize, - sync::Arc, -}; +use std::env; -fn main() -> std::io::Result<()> { - let port = env::var("PORT").unwrap_or("11211".to_string()); - let address = format!("127.0.0.1:{port}"); - let listener = TcpListener::bind(address)?; - let (tx, rx) = crossbeam::channel::unbounded::(); +use anyhow::{anyhow, Context}; - let thread_count = std::thread::available_parallelism() - .unwrap_or(NonZeroUsize::MIN) - .get(); - - let db = Arc::new(Db::new()); - let threads = (0..thread_count) - .map(|_| { - let rx = rx.clone(); - let db = db.clone(); - std::thread::spawn(move || { - handle_streams(rx, db); - }) - }) - .collect_vec(); - for stream in listener.incoming() { - tx.send(stream?) - .map_err(|err| std::io::Error::new(std::io::ErrorKind::Other, err))?; - } - drop(tx); - for thread in threads { - if let Err(err) = thread.join() { - eprintln!("Error joining thread: {:?}", err) - } - } - println!("Closing server"); - Ok(()) -} +use tokio::{net::TcpListener, signal}; -fn handle_streams(receiver: Receiver, db: Arc) { - for stream in receiver { - handle_stream(stream, &db); - } -} +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let port = env::var("PORT").unwrap_or("11211".to_string()); + let address = format!("127.0.0.1:{port}"); -fn handle_stream(mut stream: TcpStream, db: &Db) { - use std::io::Read; - use std::io::Write; - let mut buffer = BytesMut::with_capacity(1024); + let tcp_listener = TcpListener::bind(address) + .await + .context("Failed to bind to address")?; - loop { - let response = loop { - let mut buf = [0; 1024]; - let bytes_read = stream.read(&mut buf); - let bytes_read = match bytes_read { - Ok(bytes_read) => bytes_read, - Err(err) => { - if err.kind() == std::io::ErrorKind::BrokenPipe { - break Ok(None); - } - eprintln!("Failed to read from stream: {:?}", err); - break Err(anyhow!(err).context("Failed to read from stream")); - } - }; - let buf = &buf[..bytes_read]; - buffer.extend_from_slice(buf); - buffer.put_u8(b' '); - let response = commands::execute_command(&buffer, db).map(Some); - if response.is_ok() { - buffer.clear(); - break response; - } - if bytes_read == 0 { - break response; - } - }; - let response = response.unwrap_or_else(|err| { - eprintln!("Failed to execute command: {:?}", err); - Some(response::Response::Error(format!("{err}"))) - }); - let response = match response { - Some(response) => response, - None => break, - }; - let response = response.into_bytes(); - if let Err(err) = stream.write(&response) { - eprintln!("Failed to write to stream: {:?}", err); - } - if let Err(err) = stream.flush() { - eprintln!("Failed to flush stream: {:?}", err); - } - } + server::run(tcp_listener, signal::ctrl_c()).await } diff --git a/memcached/src/response.rs b/memcached/src/response.rs index 51d3d9b..e222448 100644 --- a/memcached/src/response.rs +++ b/memcached/src/response.rs @@ -42,6 +42,7 @@ impl Response { bytes.extend_from_slice(response.as_bytes()); bytes.extend_from_slice(&value.data); bytes.extend_from_slice(b"\r\n"); + bytes.extend_from_slice("END\r\n".as_bytes()); bytes } Response::Error(message) => format!("ERROR {}\r\n", message).into_bytes(), diff --git a/memcached/src/server.rs b/memcached/src/server.rs new file mode 100644 index 0000000..a7e52b6 --- /dev/null +++ b/memcached/src/server.rs @@ -0,0 +1,151 @@ +use std::{future::Future, sync::Arc}; + +use crate::{anyhow, commands, response}; +use anyhow::Context; +use bytes::{BufMut, BytesMut}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt}, + net::{TcpListener, TcpStream}, +}; + +use tokio::sync::broadcast::Receiver as BroadCastReceiver; +use tokio::sync::broadcast::Sender as BroadCastSender; + +use tokio::sync::mpsc::Sender; + +use crate::db::Db; + +pub async fn run(tcp_listener: TcpListener, shut_down: impl Future) -> anyhow::Result<()> { + let db = Arc::new(Db::new()); + let (shut_down_signal_sender, _) = tokio::sync::broadcast::channel(1); + let (shut_down_complete_sender, mut shut_down_complete_receiver) = + tokio::sync::mpsc::channel::<()>(1); + + let mut listener = Listener { + tcp_listener, + db, + shut_down_signal: shut_down_signal_sender, + _shut_down_complete: shut_down_complete_sender, + }; + + tokio::select! { + _ = shut_down => { + println!("Shutting down server"); + } + _ = listener.accept() => {} + } + let Listener { + shut_down_signal, + _shut_down_complete, + .. + } = listener; + // notify all connections that the server is shutting down + drop(shut_down_signal); + + // Drop the final Listener sender. + drop(_shut_down_complete); + + let _ = shut_down_complete_receiver.recv().await; + + Ok(()) +} + +struct Listener { + tcp_listener: TcpListener, + db: Arc, + // notifies connections all of which subscribed to the broadcast sender that the server is shutting down. + shut_down_signal: BroadCastSender<()>, + // goes out of scope once the ConnectionHandler is dropped, thus signals to the server that it is finally safe to shut down once all senders are dropped. + _shut_down_complete: Sender<()>, +} + +impl Listener { + pub async fn accept(&mut self) -> anyhow::Result<()> { + loop { + let (stream, _) = self + .tcp_listener + .accept() + .await + .context("Failed to accept a new connection")?; + + let mut handler = ConnectionHandler { + connection: Connection { + stream, + buffer: BytesMut::with_capacity(1024), + db: self.db.clone(), + }, + shut_down_signal: self.shut_down_signal.subscribe(), + _shut_down_complete: self._shut_down_complete.clone(), + }; + + tokio::spawn(async move { handler.run().await }); + } + } +} + +struct ConnectionHandler { + connection: Connection, + shut_down_signal: BroadCastReceiver<()>, + // once the ConnectionHandler is dropped, the sender is dropped as well, thus notifying the server that it is safe to shut down. + _shut_down_complete: Sender<()>, +} + +impl ConnectionHandler { + async fn run(&mut self) { + tokio::select! { + _ = self.shut_down_signal.recv() => { + println!("Shutting down connection") + } + _ = self.connection.execute() => {} + } + } +} + +struct Connection { + stream: TcpStream, + db: Arc, + buffer: BytesMut, +} + +impl Connection { + async fn execute(&mut self) { + loop { + let response = loop { + let bytes_read = self.stream.read_buf(&mut self.buffer).await; + let bytes_read = match bytes_read { + Ok(bytes_read) => bytes_read, + Err(err) => { + eprintln!("Failed to read from stream: {:?}", err); + break Err(anyhow!(err).context("Failed to read from stream")); + } + }; + + self.buffer.put_u8(b' '); + let response = commands::execute_command(&self.buffer, &self.db).map(Some); + if response.is_ok() { + self.buffer.clear(); + break response; + } + if bytes_read == 0 { + // This means that the connection was closed (BrokenPipe) + break Ok(None); + } + }; + let response = response.unwrap_or_else(|err| { + eprintln!("Failed to execute command: {:?}", err); + Some(response::Response::Error(format!("{err}"))) + }); + let response = match response { + Some(response) => response, + None => break, + }; + let response = response.into_bytes(); + if let Err(err) = self.stream.write(&response).await { + eprintln!("Failed to write to stream: {:?}", err); + } + if let Err(err) = self.stream.flush().await { + eprintln!("Failed to flush stream: {:?}", err); + } + } + } +} From 89671cc9ee17a43a5b33711cf5ef1f6f43f8eaad Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Thu, 11 Apr 2024 07:25:59 +0300 Subject: [PATCH 04/14] add remaining commands --- memcached/src/commands/add.rs | 33 ++++++++ memcached/src/commands/append.rs | 33 ++++++++ memcached/src/commands/extractors/mod.rs | 86 +++++++++++++++++++++ memcached/src/commands/mod.rs | 29 +++++++- memcached/src/commands/prepend.rs | 43 +++++++++++ memcached/src/commands/replace.rs | 33 ++++++++ memcached/src/commands/set.rs | 95 ++++-------------------- memcached/src/response.rs | 2 + 8 files changed, 271 insertions(+), 83 deletions(-) create mode 100644 memcached/src/commands/add.rs create mode 100644 memcached/src/commands/append.rs create mode 100644 memcached/src/commands/extractors/mod.rs create mode 100644 memcached/src/commands/prepend.rs create mode 100644 memcached/src/commands/replace.rs diff --git a/memcached/src/commands/add.rs b/memcached/src/commands/add.rs new file mode 100644 index 0000000..6a48f76 --- /dev/null +++ b/memcached/src/commands/add.rs @@ -0,0 +1,33 @@ +use crate::{ + db::{Content, Db}, + response::Response, +}; + +use super::{extractors::ExtractedData, Parser}; + +pub struct AddCommand { + data: ExtractedData, +} + +impl AddCommand { + pub fn parse(parser: Parser) -> anyhow::Result { + let data = ExtractedData::parse(parser)?; + + Ok(Self { data }) + } + + pub fn execute(self, db: &Db) -> Response { + db.with_data_mut(|data| { + if data.contains_key(&self.data.key) { + Response::NotStored + } else { + data.insert(self.data.key.clone(), Content::from(&self.data)); + if self.data.noreply { + Response::NoReply + } else { + Response::Stored + } + } + }) + } +} diff --git a/memcached/src/commands/append.rs b/memcached/src/commands/append.rs new file mode 100644 index 0000000..a969f50 --- /dev/null +++ b/memcached/src/commands/append.rs @@ -0,0 +1,33 @@ +use crate::{db::Db, response::Response}; + +use super::{extractors::ExtractedData, Parser}; + +pub struct AppendCommand { + data: ExtractedData, +} + +impl AppendCommand { + pub fn parse(parser: Parser) -> anyhow::Result { + let data = ExtractedData::parse(parser)?; + + Ok(Self { data }) + } + + pub fn execute(self, db: &Db) -> Response { + db.with_data_mut(|data| { + let entry = data.entry(self.data.key.clone()).and_modify(|content| { + content.data.extend(self.data.content); + }); + match entry { + std::collections::hash_map::Entry::Occupied(_) => { + if self.data.noreply { + Response::NoReply + } else { + Response::Stored + } + } + std::collections::hash_map::Entry::Vacant(_) => Response::NotStored, + } + }) + } +} diff --git a/memcached/src/commands/extractors/mod.rs b/memcached/src/commands/extractors/mod.rs new file mode 100644 index 0000000..d012cdf --- /dev/null +++ b/memcached/src/commands/extractors/mod.rs @@ -0,0 +1,86 @@ +use anyhow::Context; + +use crate::{anyhow, db::Content}; +use std::cmp::Ordering; +use std::time::Duration; + +use super::Parser; +#[derive(Debug)] +pub struct ExtractedData { + pub key: String, + pub flags: u32, + pub exptime: Option, + pub bytes: usize, + pub noreply: bool, + pub content: Vec, +} + +impl ExtractedData { + pub fn parse(mut parser: Parser) -> anyhow::Result { + let key = parser.next_string().ok_or(anyhow!("Expected a key"))?; + + let flags = parser + .next_string() + .ok_or(anyhow!("Expected a flag"))? + .parse() + .context("Failed to parse flags")?; + + let exptime_in_sec = parser + .next_string() + .ok_or(anyhow!("Expected expiry time"))? + .parse::() + .context("Failed to parse exptime")?; + + let exptime = match exptime_in_sec.cmp(&0) { + Ordering::Equal => None, + // expires immediately + Ordering::Less => Some(std::time::Duration::from_secs(0)), + Ordering::Greater => { + let exptime = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + + std::time::Duration::from_secs(exptime_in_sec as u64); + Some(exptime) + } + }; + + let bytes = parser + .next_string() + .ok_or(anyhow!("Expected bytes count"))? + .parse() + .context("Failed to parse number of bytes")?; + + let maybe_noreply = parser + .peek_next_string() + .ok_or(anyhow!("Expected to get noreply or bytes"))?; + + let noreply = if maybe_noreply == "noreply" { + let _ = parser.next_string(); + true + } else { + false + }; + + let content = parser.next_bytes().ok_or(anyhow!("Expected bytes"))?; + + Ok(Self { + key, + flags, + exptime, + bytes, + noreply, + content, + }) + } +} + +impl From<&ExtractedData> for Content { + fn from(value: &ExtractedData) -> Self { + Self { + data: value.content.clone(), + byte_count: value.bytes, + flags: value.flags, + exp_duration: value.exptime, + } + } +} diff --git a/memcached/src/commands/mod.rs b/memcached/src/commands/mod.rs index c5401f1..74049dc 100644 --- a/memcached/src/commands/mod.rs +++ b/memcached/src/commands/mod.rs @@ -3,7 +3,12 @@ use itertools::Itertools; use multipeek::multipeek; use crate::{db::Db, response::Response}; -pub mod get; +mod add; +mod append; +pub mod extractors; +mod get; +mod prepend; +mod replace; mod set; pub struct Parser { @@ -84,10 +89,30 @@ pub fn execute_command(data: &[u8], db: &Db) -> anyhow::Result { println!("Executing command: {}", full_command); let command = parser.next_string().ok_or(anyhow!("Expected a command"))?; match command.as_str() { - "get" => Ok(get::GetCommand::parse(parser)?.execute(db)), + "get" => Ok(get::GetCommand::parse(parser) + .with_context(|| format!("Failed to parse set command: {}", full_command))? + .execute(db)), + "set" => Ok(set::SetCommand::parse(parser) .with_context(|| format!("Failed to parse set command: {}", full_command))? .execute(db)), + + "add" => Ok(add::AddCommand::parse(parser) + .with_context(|| format!("Failed to parse add command: {}", full_command))? + .execute(db)), + + "replace" => Ok(replace::ReplaceCommand::parse(parser) + .with_context(|| format!("Failed to parse replace command: {}", full_command))? + .execute(db)), + + "append" => Ok(append::AppendCommand::parse(parser) + .with_context(|| format!("Failed to parse append command: {}", full_command))? + .execute(db)), + + "prepend" => Ok(prepend::Prepend::parse(parser) + .with_context(|| format!("Failed to parse prepend command: {}", full_command))? + .execute(db)), + cmd => Err(anyhow!("Unknown command {cmd}")), } } diff --git a/memcached/src/commands/prepend.rs b/memcached/src/commands/prepend.rs new file mode 100644 index 0000000..becae2c --- /dev/null +++ b/memcached/src/commands/prepend.rs @@ -0,0 +1,43 @@ +use std::mem; + +use itertools::Itertools; + +use crate::{db::Db, response::Response}; + +use super::{extractors::ExtractedData, Parser}; + +pub struct Prepend { + data: ExtractedData, +} + +impl Prepend { + pub fn parse(parser: Parser) -> anyhow::Result { + let data = ExtractedData::parse(parser)?; + + Ok(Self { data }) + } + + pub fn execute(self, db: &Db) -> Response { + db.with_data_mut(|data| { + let entry = data.entry(self.data.key.clone()).and_modify(|content| { + let existing_content = mem::take(&mut content.data); + content.data = self + .data + .content + .into_iter() + .chain(existing_content) + .collect_vec(); + }); + match entry { + std::collections::hash_map::Entry::Occupied(_) => { + if self.data.noreply { + Response::NoReply + } else { + Response::Stored + } + } + std::collections::hash_map::Entry::Vacant(_) => Response::NotStored, + } + }) + } +} diff --git a/memcached/src/commands/replace.rs b/memcached/src/commands/replace.rs new file mode 100644 index 0000000..9fdcb4b --- /dev/null +++ b/memcached/src/commands/replace.rs @@ -0,0 +1,33 @@ +use crate::{ + db::{Content, Db}, + response::Response, +}; + +use super::{extractors::ExtractedData, Parser}; + +pub struct ReplaceCommand { + data: ExtractedData, +} + +impl ReplaceCommand { + pub fn parse(parser: Parser) -> anyhow::Result { + let data = ExtractedData::parse(parser)?; + + Ok(Self { data }) + } + + pub fn execute(self, db: &Db) -> Response { + db.with_data_mut(|data| { + if data.contains_key(&self.data.key) { + data.insert(self.data.key.clone(), Content::from(&self.data)); + if self.data.noreply { + Response::NoReply + } else { + Response::Stored + } + } else { + Response::NotStored + } + }) + } +} diff --git a/memcached/src/commands/set.rs b/memcached/src/commands/set.rs index ff3b2d6..8c0b566 100644 --- a/memcached/src/commands/set.rs +++ b/memcached/src/commands/set.rs @@ -1,93 +1,26 @@ -use anyhow::{anyhow, Context}; - use crate::{ db::{Content, Db}, response::Response, }; -use super::Parser; +use super::{extractors::ExtractedData, Parser}; #[derive(Debug)] pub struct SetCommand { - key: String, - flags: u32, - exptime: i64, - bytes: usize, - noreply: bool, - content: Vec, + data: ExtractedData, } impl SetCommand { - pub fn parse(mut parser: Parser) -> anyhow::Result { - let key = parser.next_string().ok_or(anyhow!("Expected a key"))?; - - let flags = parser - .next_string() - .ok_or(anyhow!("Expected a flag"))? - .parse() - .context("Failed to parse flags")?; - - let exptime = parser - .next_string() - .ok_or(anyhow!("Expected expiry time"))? - .parse() - .context("Failed to parse exptime")?; - - let bytes = parser - .next_string() - .ok_or(anyhow!("Expected bytes count"))? - .parse() - .context("Failed to parse number of bytes")?; - - let maybe_noreply = parser - .peek_next_string() - .ok_or(anyhow!("Expected to get noreply or bytes"))?; + pub fn parse(parser: Parser) -> anyhow::Result { + let data = ExtractedData::parse(parser)?; - let noreply = if maybe_noreply == "noreply" { - let _ = parser.next_string(); - true - } else { - false - }; - - let content = parser.next_bytes().ok_or(anyhow!("Expected bytes"))?; - - Ok(Self { - key, - flags, - exptime, - bytes, - noreply, - content, - }) + Ok(Self { data }) } pub fn execute(self, db: &Db) -> Response { - use std::cmp::Ordering; - - let exp_duration = match self.exptime.cmp(&0) { - Ordering::Equal => None, - // expires immediately - Ordering::Less => Some(std::time::Duration::from_secs(0)), - Ordering::Greater => { - let exptime = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - + std::time::Duration::from_secs(self.exptime as u64); - Some(exptime) - } - }; db.with_data_mut(|data| { - data.insert( - self.key, - Content { - data: self.content, - byte_count: self.bytes, - flags: self.flags, - exp_duration, - }, - ); - if self.noreply { + data.insert(self.data.key.clone(), Content::from(&self.data)); + if self.data.noreply { Response::NoReply } else { Response::Stored @@ -110,12 +43,12 @@ mod tests { .as_bytes(); let mut parser = Parser::new(content); let _command = parser.next_string().unwrap(); - let set_command = SetCommand::parse(parser).unwrap(); - assert_eq!(set_command.key, "test"); - assert_eq!(set_command.flags, 0); - assert_eq!(set_command.exptime, 0); - assert_eq!(set_command.bytes, 4); - assert!(!set_command.noreply); - assert_eq!(set_command.content, b"1234".to_vec()); + let set_command_data = SetCommand::parse(parser).unwrap().data; + assert_eq!(set_command_data.key, "test"); + assert_eq!(set_command_data.flags, 0); + assert_eq!(set_command_data.exptime, None); + assert_eq!(set_command_data.bytes, 4); + assert!(!set_command_data.noreply); + assert_eq!(set_command_data.content, b"1234".to_vec()); } } diff --git a/memcached/src/response.rs b/memcached/src/response.rs index e222448..b415a79 100644 --- a/memcached/src/response.rs +++ b/memcached/src/response.rs @@ -11,6 +11,7 @@ pub struct ValueResponse { pub enum Response { Stored, NoReply, + NotStored, End, Value(ValueResponse), Error(String), @@ -31,6 +32,7 @@ impl Response { pub fn into_bytes(self) -> Vec { match self { Response::Stored => b"STORED\r\n".to_vec(), + Response::NotStored => b"NOT_STORED\r\n".to_vec(), Response::NoReply => Vec::new(), Response::End => b"END\r\n".to_vec(), Response::Value(value) => { From 59b71a12e58c0dd3ab9a31cc5f8294d448c4af98 Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Fri, 12 Apr 2024 07:17:33 +0300 Subject: [PATCH 05/14] setup cache size --- Cargo.lock | 1 + memcached/Cargo.toml | 1 + memcached/src/commands/append.rs | 5 +- memcached/src/commands/prepend.rs | 5 +- memcached/src/db.rs | 94 +++++++++++++++++++++++++------ memcached/src/main.rs | 6 +- memcached/src/server.rs | 4 +- 7 files changed, 93 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9e054c0..a82eb95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1522,6 +1522,7 @@ dependencies = [ "bytes", "crossbeam", "itertools", + "linked-hash-map", "multipeek", "tokio", ] diff --git a/memcached/Cargo.toml b/memcached/Cargo.toml index b42c177..728bc16 100644 --- a/memcached/Cargo.toml +++ b/memcached/Cargo.toml @@ -10,5 +10,6 @@ anyhow = "1.0.81" bytes = "1.6.0" crossbeam = "0.8" itertools = "0.12.1" +linked-hash-map = "0.5.6" multipeek = "0.1.2" tokio = { version = "1.37.0", features = ["full"] } diff --git a/memcached/src/commands/append.rs b/memcached/src/commands/append.rs index a969f50..3ac68c7 100644 --- a/memcached/src/commands/append.rs +++ b/memcached/src/commands/append.rs @@ -1,4 +1,5 @@ use crate::{db::Db, response::Response}; +use linked_hash_map::Entry; use super::{extractors::ExtractedData, Parser}; @@ -19,14 +20,14 @@ impl AppendCommand { content.data.extend(self.data.content); }); match entry { - std::collections::hash_map::Entry::Occupied(_) => { + Entry::Occupied(_) => { if self.data.noreply { Response::NoReply } else { Response::Stored } } - std::collections::hash_map::Entry::Vacant(_) => Response::NotStored, + Entry::Vacant(_) => Response::NotStored, } }) } diff --git a/memcached/src/commands/prepend.rs b/memcached/src/commands/prepend.rs index becae2c..b89866b 100644 --- a/memcached/src/commands/prepend.rs +++ b/memcached/src/commands/prepend.rs @@ -1,6 +1,7 @@ use std::mem; use itertools::Itertools; +use linked_hash_map::Entry; use crate::{db::Db, response::Response}; @@ -29,14 +30,14 @@ impl Prepend { .collect_vec(); }); match entry { - std::collections::hash_map::Entry::Occupied(_) => { + Entry::Occupied(_) => { if self.data.noreply { Response::NoReply } else { Response::Stored } } - std::collections::hash_map::Entry::Vacant(_) => Response::NotStored, + Entry::Vacant(_) => Response::NotStored, } }) } diff --git a/memcached/src/db.rs b/memcached/src/db.rs index bb31222..3792ed2 100644 --- a/memcached/src/db.rs +++ b/memcached/src/db.rs @@ -1,34 +1,64 @@ -use std::{collections::HashMap, sync::RwLock, time::Duration}; +use std::{ + sync::{Arc, RwLock}, + time::Duration, +}; + +use linked_hash_map::LinkedHashMap; +use tokio::sync::Notify; pub struct Db { - data: RwLock>, + inner: Arc, +} + +// TODO: Setup a special HashMap that will allow us to store the data in a hashmap and also keep track of the oldest data to be removed once the cache size is above the threshold + +struct DbInner { + data: RwLock, + background_task: Notify, +} + +impl DbInner { + fn is_shutting_down(&self) -> bool { + self.data.read().unwrap().shut_down + } +} + +struct DbState { + max_cache_size_in_bytes: u64, + shut_down: bool, + entries: LinkedHashMap, } impl Db { - pub fn new() -> Self { - Self { - data: RwLock::new(HashMap::new()), - } + pub fn new(max_cache_size_in_bytes: u64) -> Self { + let db = DbInner { + data: RwLock::new(DbState { + entries: LinkedHashMap::new(), + max_cache_size_in_bytes, + shut_down: false, + }), + background_task: Notify::new(), + }; + + let inner = Arc::new(db); + tokio::spawn(purge_older_keys_if_cache_size_is_above_threshold( + inner.clone(), + )); + Self { inner } } pub fn with_data_mut(&self, f: F) -> T where - F: FnOnce(&mut HashMap) -> T, + F: FnOnce(&mut LinkedHashMap) -> T, { - f(&mut self.data.write().unwrap()) + f(&mut self.inner.data.write().unwrap().entries) } pub fn with_data(&self, f: F) -> T where - F: FnOnce(&HashMap) -> T, + F: FnOnce(&LinkedHashMap) -> T, { - f(&self.data.read().unwrap()) - } -} - -impl Default for Db { - fn default() -> Self { - Self::new() + f(&self.inner.data.read().unwrap().entries) } } @@ -52,3 +82,35 @@ impl Content { } } } + +async fn purge_older_keys_if_cache_size_is_above_threshold(db: Arc) { + while !db.is_shutting_down() { + db.background_task.notified().await; + let data = db.data.read().unwrap(); + let current_content_byte_size = data + .entries + .values() + .map(|data| data.byte_count as u64) + .sum::(); + let cache_size = data.max_cache_size_in_bytes; + if current_content_byte_size > cache_size { + // using i64 because we can go below 0 + let mut min_bytes_to_remove = (current_content_byte_size - cache_size) as i64; + let mut keys_to_remove = Vec::new(); + // linked hashmap maintains insertion order, so iter() gives us the oldest data first. + for (key, value) in data.entries.iter() { + if min_bytes_to_remove <= 0 { + break; + } + min_bytes_to_remove -= value.byte_count as i64; + keys_to_remove.push(key.clone()); + } + // dropping the read lock. + drop(data); + let mut content = db.data.write().unwrap(); + for key in keys_to_remove { + content.entries.remove(&key); + } + } + } +} diff --git a/memcached/src/main.rs b/memcached/src/main.rs index 625d6b3..9d1b22b 100644 --- a/memcached/src/main.rs +++ b/memcached/src/main.rs @@ -13,10 +13,14 @@ use tokio::{net::TcpListener, signal}; async fn main() -> anyhow::Result<()> { let port = env::var("PORT").unwrap_or("11211".to_string()); let address = format!("127.0.0.1:{port}"); + let cache_size = env::var("CACHE_SIZE") + .unwrap_or("10000".to_string()) + .parse::() + .context("Failed to parse CACHE_SIZE")?; let tcp_listener = TcpListener::bind(address) .await .context("Failed to bind to address")?; - server::run(tcp_listener, signal::ctrl_c()).await + server::run(tcp_listener, cache_size, signal::ctrl_c()).await } diff --git a/memcached/src/server.rs b/memcached/src/server.rs index a7e52b6..f632840 100644 --- a/memcached/src/server.rs +++ b/memcached/src/server.rs @@ -15,8 +15,8 @@ use tokio::sync::mpsc::Sender; use crate::db::Db; -pub async fn run(tcp_listener: TcpListener, shut_down: impl Future) -> anyhow::Result<()> { - let db = Arc::new(Db::new()); +pub async fn run(tcp_listener: TcpListener, cache_size: u64, shut_down: impl Future) -> anyhow::Result<()> { + let db = Arc::new(Db::new(cache_size)); let (shut_down_signal_sender, _) = tokio::sync::broadcast::channel(1); let (shut_down_complete_sender, mut shut_down_complete_receiver) = tokio::sync::mpsc::channel::<()>(1); From 910d5312d3c7c5acd25a63055eb7c2bf3c1f988a Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Fri, 12 Apr 2024 07:51:47 +0300 Subject: [PATCH 06/14] attempt at dropping task --- memcached/src/db.rs | 87 +++++++++++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 27 deletions(-) diff --git a/memcached/src/db.rs b/memcached/src/db.rs index 3792ed2..395a705 100644 --- a/memcached/src/db.rs +++ b/memcached/src/db.rs @@ -10,6 +10,24 @@ pub struct Db { inner: Arc, } +// pub(crate) struct DbDropGuard { +// db: Db, +// } + +// impl DbDropGuard { +// pub fn new(max_cache_size_in_bytes: u64) -> Self { +// Self { +// db: Db::new(max_cache_size_in_bytes), +// } +// } +// } + +// impl Drop for DbDropGuard { +// fn drop(&mut self) { +// self.db.signal_shut_down(); +// } +// } + // TODO: Setup a special HashMap that will allow us to store the data in a hashmap and also keep track of the oldest data to be removed once the cache size is above the threshold struct DbInner { @@ -41,7 +59,7 @@ impl Db { }; let inner = Arc::new(db); - tokio::spawn(purge_older_keys_if_cache_size_is_above_threshold( + tokio::spawn(purge_older_keys_if_cache_size_is_above_threshold_task( inner.clone(), )); Self { inner } @@ -51,7 +69,9 @@ impl Db { where F: FnOnce(&mut LinkedHashMap) -> T, { - f(&mut self.inner.data.write().unwrap().entries) + let result = f(&mut self.inner.data.write().unwrap().entries); + self.inner.background_task.notify_one(); + result } pub fn with_data(&self, f: F) -> T @@ -60,6 +80,13 @@ impl Db { { f(&self.inner.data.read().unwrap().entries) } + + fn signal_shut_down(&self) { + let mut data = self.inner.data.write().unwrap(); + data.shut_down = true; + drop(data); + self.inner.background_task.notify_one(); + } } pub struct Content { @@ -83,34 +110,40 @@ impl Content { } } -async fn purge_older_keys_if_cache_size_is_above_threshold(db: Arc) { +async fn purge_older_keys_if_cache_size_is_above_threshold_task(db: Arc) { while !db.is_shutting_down() { + remove_old_entries(&db); db.background_task.notified().await; - let data = db.data.read().unwrap(); - let current_content_byte_size = data - .entries - .values() - .map(|data| data.byte_count as u64) - .sum::(); - let cache_size = data.max_cache_size_in_bytes; - if current_content_byte_size > cache_size { - // using i64 because we can go below 0 - let mut min_bytes_to_remove = (current_content_byte_size - cache_size) as i64; - let mut keys_to_remove = Vec::new(); - // linked hashmap maintains insertion order, so iter() gives us the oldest data first. - for (key, value) in data.entries.iter() { - if min_bytes_to_remove <= 0 { - break; - } - min_bytes_to_remove -= value.byte_count as i64; - keys_to_remove.push(key.clone()); - } - // dropping the read lock. - drop(data); - let mut content = db.data.write().unwrap(); - for key in keys_to_remove { - content.entries.remove(&key); + println!("Notified"); + } + println!("Shutting down old entry removing background task") +} + +fn remove_old_entries(db: &DbInner) { + let data = db.data.read().unwrap(); + let current_content_byte_size = data + .entries + .values() + .map(|data| data.byte_count as u64) + .sum::(); + let cache_size = data.max_cache_size_in_bytes; + if current_content_byte_size > cache_size { + // using i64 because we can go below 0 + let mut min_bytes_to_remove = (current_content_byte_size - cache_size) as i64; + let mut keys_to_remove = Vec::new(); + // linked hashmap maintains insertion order, so iter() gives us the oldest data first. + for (key, value) in data.entries.iter() { + if min_bytes_to_remove <= 0 { + break; } + min_bytes_to_remove -= value.byte_count as i64; + keys_to_remove.push(key.clone()); + } + // dropping the read lock. + drop(data); + let mut content = db.data.write().unwrap(); + for key in keys_to_remove { + content.entries.remove(&key); } } } From 915270ffe53889de84a5fea25de104d632b14692 Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Fri, 12 Apr 2024 15:58:01 +0300 Subject: [PATCH 07/14] drop guard implementation --- memcached/src/db.rs | 37 ++++++++++++++++++++++--------------- memcached/src/server.rs | 18 +++++++++++------- 2 files changed, 33 insertions(+), 22 deletions(-) diff --git a/memcached/src/db.rs b/memcached/src/db.rs index 395a705..9cdd6cb 100644 --- a/memcached/src/db.rs +++ b/memcached/src/db.rs @@ -6,27 +6,32 @@ use std::{ use linked_hash_map::LinkedHashMap; use tokio::sync::Notify; +#[derive(Clone)] pub struct Db { inner: Arc, } -// pub(crate) struct DbDropGuard { -// db: Db, -// } +pub(crate) struct DbDropGuard { + db: Db, +} -// impl DbDropGuard { -// pub fn new(max_cache_size_in_bytes: u64) -> Self { -// Self { -// db: Db::new(max_cache_size_in_bytes), -// } -// } -// } +impl DbDropGuard { + pub fn new(max_cache_size_in_bytes: u64) -> Self { + Self { + db: Db::new(max_cache_size_in_bytes), + } + } -// impl Drop for DbDropGuard { -// fn drop(&mut self) { -// self.db.signal_shut_down(); -// } -// } + pub fn db(&self) -> Db { + self.db.clone() + } +} + +impl Drop for DbDropGuard { + fn drop(&mut self) { + self.db.signal_shut_down(); + } +} // TODO: Setup a special HashMap that will allow us to store the data in a hashmap and also keep track of the oldest data to be removed once the cache size is above the threshold @@ -86,6 +91,7 @@ impl Db { data.shut_down = true; drop(data); self.inner.background_task.notify_one(); + println!("Signaled shut down"); } } @@ -121,6 +127,7 @@ async fn purge_older_keys_if_cache_size_is_above_threshold_task(db: Arc fn remove_old_entries(db: &DbInner) { let data = db.data.read().unwrap(); + println!("Here"); let current_content_byte_size = data .entries .values() diff --git a/memcached/src/server.rs b/memcached/src/server.rs index f632840..c519e4f 100644 --- a/memcached/src/server.rs +++ b/memcached/src/server.rs @@ -1,6 +1,6 @@ -use std::{future::Future, sync::Arc}; +use std::future::Future; -use crate::{anyhow, commands, response}; +use crate::{anyhow, commands, db::DbDropGuard, response}; use anyhow::Context; use bytes::{BufMut, BytesMut}; use tokio::{ @@ -15,8 +15,12 @@ use tokio::sync::mpsc::Sender; use crate::db::Db; -pub async fn run(tcp_listener: TcpListener, cache_size: u64, shut_down: impl Future) -> anyhow::Result<()> { - let db = Arc::new(Db::new(cache_size)); +pub async fn run( + tcp_listener: TcpListener, + cache_size: u64, + shut_down: impl Future, +) -> anyhow::Result<()> { + let db = DbDropGuard::new(cache_size); let (shut_down_signal_sender, _) = tokio::sync::broadcast::channel(1); let (shut_down_complete_sender, mut shut_down_complete_receiver) = tokio::sync::mpsc::channel::<()>(1); @@ -52,7 +56,7 @@ pub async fn run(tcp_listener: TcpListener, cache_size: u64, shut_down: impl Fut struct Listener { tcp_listener: TcpListener, - db: Arc, + db: DbDropGuard, // notifies connections all of which subscribed to the broadcast sender that the server is shutting down. shut_down_signal: BroadCastSender<()>, // goes out of scope once the ConnectionHandler is dropped, thus signals to the server that it is finally safe to shut down once all senders are dropped. @@ -72,7 +76,7 @@ impl Listener { connection: Connection { stream, buffer: BytesMut::with_capacity(1024), - db: self.db.clone(), + db: self.db.db(), }, shut_down_signal: self.shut_down_signal.subscribe(), _shut_down_complete: self._shut_down_complete.clone(), @@ -103,7 +107,7 @@ impl ConnectionHandler { struct Connection { stream: TcpStream, - db: Arc, + db: Db, buffer: BytesMut, } From a5d9f920226a054f6df0c5b5bdb0921b30ca5c78 Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Mon, 15 Apr 2024 06:56:59 +0300 Subject: [PATCH 08/14] remove comment --- memcached/src/db.rs | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/memcached/src/db.rs b/memcached/src/db.rs index 9cdd6cb..199e3c9 100644 --- a/memcached/src/db.rs +++ b/memcached/src/db.rs @@ -33,8 +33,6 @@ impl Drop for DbDropGuard { } } -// TODO: Setup a special HashMap that will allow us to store the data in a hashmap and also keep track of the oldest data to be removed once the cache size is above the threshold - struct DbInner { data: RwLock, background_task: Notify, @@ -89,8 +87,7 @@ impl Db { fn signal_shut_down(&self) { let mut data = self.inner.data.write().unwrap(); data.shut_down = true; - drop(data); - self.inner.background_task.notify_one(); + // self.inner.background_task.notify_one(); println!("Signaled shut down"); } } @@ -127,7 +124,6 @@ async fn purge_older_keys_if_cache_size_is_above_threshold_task(db: Arc fn remove_old_entries(db: &DbInner) { let data = db.data.read().unwrap(); - println!("Here"); let current_content_byte_size = data .entries .values() @@ -138,19 +134,17 @@ fn remove_old_entries(db: &DbInner) { // using i64 because we can go below 0 let mut min_bytes_to_remove = (current_content_byte_size - cache_size) as i64; let mut keys_to_remove = Vec::new(); - // linked hashmap maintains insertion order, so iter() gives us the oldest data first. + // linked hashmap maintains insertion order, so iter() gives us the oldest data first which is what we want to remove for (key, value) in data.entries.iter() { if min_bytes_to_remove <= 0 { break; } min_bytes_to_remove -= value.byte_count as i64; - keys_to_remove.push(key.clone()); + keys_to_remove.push(key); } - // dropping the read lock. - drop(data); let mut content = db.data.write().unwrap(); for key in keys_to_remove { - content.entries.remove(&key); + content.entries.remove(key); } } } From c646ea0a955da1e95808a10f8ee4ff9f743f997e Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Mon, 15 Apr 2024 07:30:48 +0300 Subject: [PATCH 09/14] introduce new data structure for storing the entries together with bytes count --- memcached/src/commands/append.rs | 25 +++++---- memcached/src/commands/prepend.rs | 35 +++++-------- memcached/src/db.rs | 85 +++++++++++++++++++++++++++---- 3 files changed, 98 insertions(+), 47 deletions(-) diff --git a/memcached/src/commands/append.rs b/memcached/src/commands/append.rs index 3ac68c7..d4c06c1 100644 --- a/memcached/src/commands/append.rs +++ b/memcached/src/commands/append.rs @@ -1,5 +1,7 @@ -use crate::{db::Db, response::Response}; -use linked_hash_map::Entry; +use crate::{ + db::{Content, Db}, + response::Response, +}; use super::{extractors::ExtractedData, Parser}; @@ -16,18 +18,15 @@ impl AppendCommand { pub fn execute(self, db: &Db) -> Response { db.with_data_mut(|data| { - let entry = data.entry(self.data.key.clone()).and_modify(|content| { - content.data.extend(self.data.content); - }); - match entry { - Entry::Occupied(_) => { - if self.data.noreply { - Response::NoReply - } else { - Response::Stored - } + let appended = data.append(&self.data.key, Content::from(&self.data)); + if appended { + if self.data.noreply { + Response::NoReply + } else { + Response::Stored } - Entry::Vacant(_) => Response::NotStored, + } else { + Response::NotStored } }) } diff --git a/memcached/src/commands/prepend.rs b/memcached/src/commands/prepend.rs index b89866b..1c041b7 100644 --- a/memcached/src/commands/prepend.rs +++ b/memcached/src/commands/prepend.rs @@ -1,9 +1,7 @@ -use std::mem; - -use itertools::Itertools; -use linked_hash_map::Entry; - -use crate::{db::Db, response::Response}; +use crate::{ + db::{Content, Db}, + response::Response, +}; use super::{extractors::ExtractedData, Parser}; @@ -20,24 +18,15 @@ impl Prepend { pub fn execute(self, db: &Db) -> Response { db.with_data_mut(|data| { - let entry = data.entry(self.data.key.clone()).and_modify(|content| { - let existing_content = mem::take(&mut content.data); - content.data = self - .data - .content - .into_iter() - .chain(existing_content) - .collect_vec(); - }); - match entry { - Entry::Occupied(_) => { - if self.data.noreply { - Response::NoReply - } else { - Response::Stored - } + let prepended = data.prepend(&self.data.key, Content::from(&self.data)); + if prepended { + if self.data.noreply { + Response::NoReply + } else { + Response::Stored } - Entry::Vacant(_) => Response::NotStored, + } else { + Response::NotStored } }) } diff --git a/memcached/src/db.rs b/memcached/src/db.rs index 199e3c9..47d96ba 100644 --- a/memcached/src/db.rs +++ b/memcached/src/db.rs @@ -1,8 +1,10 @@ use std::{ + mem, sync::{Arc, RwLock}, time::Duration, }; +use itertools::Itertools; use linked_hash_map::LinkedHashMap; use tokio::sync::Notify; @@ -47,14 +49,80 @@ impl DbInner { struct DbState { max_cache_size_in_bytes: u64, shut_down: bool, - entries: LinkedHashMap, + entries: MapWithByteSizeCount, +} + +// We want to keep track of the byte size of the content so that we can remove the oldest content if the cache size exceeds the max_cache_size_in_bytes +// The LinkedHashMap maintains insertion order, so we can remove the oldest content first +// the byte_count field provides a single lookup to get the total byte size of the content stored in the map +#[derive(Default)] +pub struct MapWithByteSizeCount { + map: LinkedHashMap, + byte_count: u64, +} + +impl MapWithByteSizeCount { + fn new() -> Self { + Self { + map: LinkedHashMap::new(), + byte_count: 0, + } + } + + pub fn insert(&mut self, key: String, value: Content) { + self.byte_count += value.byte_count as u64; + self.map.insert(key, value); + } + + pub fn get(&self, key: &str) -> Option<&Content> { + self.map.get(key) + } + + pub fn iter(&self) -> linked_hash_map::Iter { + self.map.iter() + } + + pub fn contains_key(&self, key: &str) -> bool { + self.map.contains_key(key) + } + + pub fn remove(&mut self, key: &str) { + if let Some(content) = self.map.remove(key) { + self.byte_count -= content.byte_count as u64; + } + } + + /// Returns true if the key existed and the value was prepended, otherwise false + pub fn prepend(&mut self, key: &str, value: Content) -> bool { + if let Some(content) = self.map.get_mut(key) { + let existing_content = mem::take(&mut content.data); + content.data = value.data.into_iter().chain(existing_content).collect_vec(); + content.byte_count += value.byte_count; + self.byte_count += value.byte_count as u64; + true + } else { + false + } + } + + /// Returns true if the key existed and the value was appended, otherwise false + pub fn append(&mut self, key: &str, value: Content) -> bool { + if let Some(content) = self.map.get_mut(key) { + content.data.extend(value.data); + content.byte_count += value.byte_count; + self.byte_count += value.byte_count as u64; + true + } else { + false + } + } } impl Db { pub fn new(max_cache_size_in_bytes: u64) -> Self { let db = DbInner { data: RwLock::new(DbState { - entries: LinkedHashMap::new(), + entries: MapWithByteSizeCount::new(), max_cache_size_in_bytes, shut_down: false, }), @@ -70,7 +138,7 @@ impl Db { pub fn with_data_mut(&self, f: F) -> T where - F: FnOnce(&mut LinkedHashMap) -> T, + F: FnOnce(&mut MapWithByteSizeCount) -> T, { let result = f(&mut self.inner.data.write().unwrap().entries); self.inner.background_task.notify_one(); @@ -79,7 +147,7 @@ impl Db { pub fn with_data(&self, f: F) -> T where - F: FnOnce(&LinkedHashMap) -> T, + F: FnOnce(&MapWithByteSizeCount) -> T, { f(&self.inner.data.read().unwrap().entries) } @@ -87,8 +155,7 @@ impl Db { fn signal_shut_down(&self) { let mut data = self.inner.data.write().unwrap(); data.shut_down = true; - // self.inner.background_task.notify_one(); - println!("Signaled shut down"); + self.inner.background_task.notify_one(); } } @@ -124,11 +191,7 @@ async fn purge_older_keys_if_cache_size_is_above_threshold_task(db: Arc fn remove_old_entries(db: &DbInner) { let data = db.data.read().unwrap(); - let current_content_byte_size = data - .entries - .values() - .map(|data| data.byte_count as u64) - .sum::(); + let current_content_byte_size = data.entries.byte_count; let cache_size = data.max_cache_size_in_bytes; if current_content_byte_size > cache_size { // using i64 because we can go below 0 From cbff4014fcf2be70d7cd8c05e432cbc2fc65a862 Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Mon, 15 Apr 2024 07:42:41 +0300 Subject: [PATCH 10/14] feedback use unwrap_or_else --- memcached/src/main.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memcached/src/main.rs b/memcached/src/main.rs index 9d1b22b..55c177d 100644 --- a/memcached/src/main.rs +++ b/memcached/src/main.rs @@ -14,7 +14,7 @@ async fn main() -> anyhow::Result<()> { let port = env::var("PORT").unwrap_or("11211".to_string()); let address = format!("127.0.0.1:{port}"); let cache_size = env::var("CACHE_SIZE") - .unwrap_or("10000".to_string()) + .unwrap_or_else(|_| "10000".to_string()) .parse::() .context("Failed to parse CACHE_SIZE")?; From ca48ac02025780b53ee956a85a51cca534a510f3 Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Mon, 15 Apr 2024 07:48:03 +0300 Subject: [PATCH 11/14] readme and remove printlns --- memcached/README.md | 24 ++++++++++++++++++++++++ memcached/src/db.rs | 2 -- 2 files changed, 24 insertions(+), 2 deletions(-) create mode 100644 memcached/README.md diff --git a/memcached/README.md b/memcached/README.md new file mode 100644 index 0000000..d247a38 --- /dev/null +++ b/memcached/README.md @@ -0,0 +1,24 @@ +## Running the server + +`cargo run` + +In a new terminal, connect to the server via telnet: + +``` +telnet localhost 11211 +``` + +Passing some commands: + +set a value + +``` +set test 0 0 4 +1234 +``` + +get a value + +``` +get test +``` diff --git a/memcached/src/db.rs b/memcached/src/db.rs index 47d96ba..671a8dc 100644 --- a/memcached/src/db.rs +++ b/memcached/src/db.rs @@ -184,9 +184,7 @@ async fn purge_older_keys_if_cache_size_is_above_threshold_task(db: Arc while !db.is_shutting_down() { remove_old_entries(&db); db.background_task.notified().await; - println!("Notified"); } - println!("Shutting down old entry removing background task") } fn remove_old_entries(db: &DbInner) { From a3febb415d5d346cb012a0f090fabb413fb993f1 Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Mon, 15 Apr 2024 07:55:09 +0300 Subject: [PATCH 12/14] improve readme --- memcached/README.md | 16 ++++++++++++---- memcached/src/db.rs | 2 +- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/memcached/README.md b/memcached/README.md index d247a38..7968dac 100644 --- a/memcached/README.md +++ b/memcached/README.md @@ -1,10 +1,18 @@ ## Running the server -`cargo run` +```bash +cargo run +``` -In a new terminal, connect to the server via telnet: +you can also set the cache size. ie: The max number of bytes the server can hold. The default is `1000 bytes` if the `CACHE_SIZE` flag is not provided. +```bash +CACHE_SIZE=2000 cargo run ``` + +In a new terminal, connect to the server via telnet: + +```bash telnet localhost 11211 ``` @@ -12,13 +20,13 @@ Passing some commands: set a value -``` +```bash set test 0 0 4 1234 ``` get a value -``` +```bash get test ``` diff --git a/memcached/src/db.rs b/memcached/src/db.rs index 671a8dc..d34976f 100644 --- a/memcached/src/db.rs +++ b/memcached/src/db.rs @@ -52,7 +52,7 @@ struct DbState { entries: MapWithByteSizeCount, } -// We want to keep track of the byte size of the content so that we can remove the oldest content if the cache size exceeds the max_cache_size_in_bytes +// We want to keep track of the byte size of the content so that we can remove the oldest content if the entire content byte count exceeds the max_cache_size_in_bytes // The LinkedHashMap maintains insertion order, so we can remove the oldest content first // the byte_count field provides a single lookup to get the total byte size of the content stored in the map #[derive(Default)] From 1ce12f5b8e5c59e4c1b8819aec0d7c63e690f81d Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Mon, 15 Apr 2024 08:14:14 +0300 Subject: [PATCH 13/14] remove key in the get operation --- memcached/src/commands/get.rs | 14 +++++--------- memcached/src/db.rs | 17 ++++++++++++++++- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/memcached/src/commands/get.rs b/memcached/src/commands/get.rs index 3128dc1..092bd39 100644 --- a/memcached/src/commands/get.rs +++ b/memcached/src/commands/get.rs @@ -14,14 +14,10 @@ impl GetCommand { } pub fn execute(self, db: &Db) -> Response { - db.with_data(|data| { - let content = data.get(&self.key); - match content { - Some(content) if !content.is_expired() => { - Response::Value((content, self.key).into()) - } - _ => Response::End, - } - }) + let content = db.get(&self.key); + content + .as_ref() + .map(|content| Response::Value((content, self.key).into())) + .unwrap_or(Response::End) } } diff --git a/memcached/src/db.rs b/memcached/src/db.rs index d34976f..c7c48c4 100644 --- a/memcached/src/db.rs +++ b/memcached/src/db.rs @@ -145,13 +145,27 @@ impl Db { result } - pub fn with_data(&self, f: F) -> T + fn with_data(&self, f: F) -> T where F: FnOnce(&MapWithByteSizeCount) -> T, { f(&self.inner.data.read().unwrap().entries) } + pub fn get(&self, key: &str) -> Option { + let content = self.with_data(|data| data.get(key).cloned()); + if let Some(content) = content { + if content.is_expired() { + self.with_data_mut(|data| { + data.remove(key); + }); + return None; + } + return Some(content); + } + None + } + fn signal_shut_down(&self) { let mut data = self.inner.data.write().unwrap(); data.shut_down = true; @@ -159,6 +173,7 @@ impl Db { } } +#[derive(Clone)] pub struct Content { pub data: Vec, pub byte_count: usize, From b74fce89dfa9376d1eaa521bc9773f6c21ae3d75 Mon Sep 17 00:00:00 2001 From: Tev Date: Mon, 15 Apr 2024 08:19:03 +0300 Subject: [PATCH 14/14] Update memcached/README.md Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com> --- memcached/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/memcached/README.md b/memcached/README.md index 7968dac..1046991 100644 --- a/memcached/README.md +++ b/memcached/README.md @@ -4,7 +4,7 @@ cargo run ``` -you can also set the cache size. ie: The max number of bytes the server can hold. The default is `1000 bytes` if the `CACHE_SIZE` flag is not provided. +You can also set the cache size. ie: The max number of bytes the server can hold. The default is `1000 bytes` if the `CACHE_SIZE` flag is not provided. ```bash CACHE_SIZE=2000 cargo run