From 3fdc1166cfbe22f51428ea608b504618dfce50aa Mon Sep 17 00:00:00 2001 From: Tevin Thuku Date: Tue, 23 Apr 2024 08:06:24 +0300 Subject: [PATCH] test application --- ratelimiter/README.md | 20 +++++++- ratelimiter/src/bin/server_1.rs | 22 +++++++++ ratelimiter/src/bin/server_2.rs | 22 +++++++++ ratelimiter/src/lib.rs | 33 +++++++++++++ ratelimiter/src/main.rs | 46 ------------------- .../sliding_window_counter/distributed.rs | 19 +++----- 6 files changed, 103 insertions(+), 59 deletions(-) create mode 100644 ratelimiter/src/bin/server_1.rs create mode 100644 ratelimiter/src/bin/server_2.rs delete mode 100644 ratelimiter/src/main.rs diff --git a/ratelimiter/README.md b/ratelimiter/README.md index daf65b0..df8fbab 100644 --- a/ratelimiter/README.md +++ b/ratelimiter/README.md @@ -1,6 +1,24 @@ -## Running the dragonfly backed sliding window rate limiter +## How to run the 2 servers plus a rate limiter: + +### Running the dragonfly backed sliding window rate limiter ```bash docker-compose up -d ``` +### Run first server + +```bash +cargo run --bin server_1 +``` + +### Run second server + +``` +cargo run --bin server_2 +``` + + +- Using postman or Insomnia, call this endpoint until it returns 429 (Automate the requests sent (60 to be exact)) `localhost:8080/limited` + +- Then call this endpoint `localhost:8081/limited` and you will still get `429` diff --git a/ratelimiter/src/bin/server_1.rs b/ratelimiter/src/bin/server_1.rs new file mode 100644 index 0000000..f2b6983 --- /dev/null +++ b/ratelimiter/src/bin/server_1.rs @@ -0,0 +1,22 @@ +use actix_web::{web, App, HttpServer}; +use anyhow::Context; +use ratelimiter::{ + limited, rate_limiters::sliding_window_counter::distributed::DistributedSlidingWindowCounter, + AppStateWithIpRateLimiter, +}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let rate_limiter = DistributedSlidingWindowCounter::new().await?; + HttpServer::new(move || { + let rate_limiter = AppStateWithIpRateLimiter::new(rate_limiter.clone()); + App::new() + .app_data(web::Data::new(rate_limiter)) + .service(limited) + }) + .bind(("127.0.0.1", 8080)) + .context("Failed to bind to port")? + .run() + .await + .context("Failed to run the server") +} diff --git a/ratelimiter/src/bin/server_2.rs b/ratelimiter/src/bin/server_2.rs new file mode 100644 index 0000000..a529cbc --- /dev/null +++ b/ratelimiter/src/bin/server_2.rs @@ -0,0 +1,22 @@ +use actix_web::{web, App, HttpServer}; +use anyhow::Context; +use ratelimiter::{ + limited, rate_limiters::sliding_window_counter::distributed::DistributedSlidingWindowCounter, + AppStateWithIpRateLimiter, +}; + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let rate_limiter = DistributedSlidingWindowCounter::new().await?; + HttpServer::new(move || { + let rate_limiter = AppStateWithIpRateLimiter::new(rate_limiter.clone()); + App::new() + .app_data(web::Data::new(rate_limiter)) + .service(limited) + }) + .bind(("127.0.0.1", 8081)) + .context("Failed to bind to port")? + .run() + .await + .context("Failed to run the server") +} diff --git a/ratelimiter/src/lib.rs b/ratelimiter/src/lib.rs index b1a75b6..8eb023c 100644 --- a/ratelimiter/src/lib.rs +++ b/ratelimiter/src/lib.rs @@ -1 +1,34 @@ +use actix_web::{get, web, HttpResponse, Responder}; +use rate_limiters::sliding_window_counter::distributed::DistributedSlidingWindowCounter; +use tokio::sync::Mutex; + pub mod rate_limiters; + +#[get("/limited")] +async fn limited(data: web::Data) -> impl Responder { + let limiter = &mut data.limiter.lock().await; + let consumed_result = limiter.consume_token().await; + let consumed = match consumed_result { + Ok(consumed) => consumed, + Err(err) => { + return HttpResponse::InternalServerError() + .body(format!("Internal Server Error: {err:?}")) + } + }; + if consumed { + return HttpResponse::Ok().body("Limited, but ok for now, don't over use me!"); + } + HttpResponse::TooManyRequests().body("Rate limit exceeded, try again later") +} + +pub struct AppStateWithIpRateLimiter { + limiter: Mutex, +} + +impl AppStateWithIpRateLimiter { + pub fn new(limiter: DistributedSlidingWindowCounter) -> Self { + Self { + limiter: Mutex::new(limiter), + } + } +} diff --git a/ratelimiter/src/main.rs b/ratelimiter/src/main.rs deleted file mode 100644 index a126b8b..0000000 --- a/ratelimiter/src/main.rs +++ /dev/null @@ -1,46 +0,0 @@ -use std::sync::Mutex; - -use actix_web::{get, web, App, HttpRequest, HttpResponse, HttpServer, Responder}; -use ratelimiter::rate_limiters::sliding_window_counter::IpRateLimiter; - -#[get("/limited")] -async fn limited(data: web::Data, req: HttpRequest) -> impl Responder { - let consumed = req - .connection_info() - .peer_addr() - .map(|ip| { - let mut limiter = data.limiter.lock().unwrap(); - limiter.consume_token(ip.to_owned()) - }) - .unwrap_or_else(|| { - eprintln!("Failed to get IP address"); - false - }); - if consumed { - return HttpResponse::Ok().body("Limited, but ok for now, don't over use me!"); - } - HttpResponse::TooManyRequests().body("Rate limit exceeded, try again later") -} - -#[get("/unlimited")] -async fn unlimited() -> impl Responder { - HttpResponse::Ok().body("Unlimited! Let's Go!") -} - -struct AppStateWithIpRateLimiter { - limiter: Mutex, -} -#[tokio::main] -async fn main() -> std::io::Result<()> { - HttpServer::new(|| { - App::new() - .app_data(web::Data::new(AppStateWithIpRateLimiter { - limiter: Mutex::new(IpRateLimiter::default()), - })) - .service(limited) - .service(unlimited) - }) - .bind(("127.0.0.1", 8080))? - .run() - .await -} diff --git a/ratelimiter/src/rate_limiters/sliding_window_counter/distributed.rs b/ratelimiter/src/rate_limiters/sliding_window_counter/distributed.rs index f81b61d..6a7913f 100644 --- a/ratelimiter/src/rate_limiters/sliding_window_counter/distributed.rs +++ b/ratelimiter/src/rate_limiters/sliding_window_counter/distributed.rs @@ -3,6 +3,7 @@ use std::time::Duration; use anyhow::{anyhow, Context}; use redis::aio::ConnectionManager; +#[derive(Clone)] pub struct DistributedSlidingWindowCounter { client: ConnectionManager, window_duration: Duration, @@ -10,23 +11,16 @@ pub struct DistributedSlidingWindowCounter { } impl DistributedSlidingWindowCounter { - /// Create a new DistributedSlidingWindowCounter - /// - /// if `window_duration` is not provided, it defaults to 60 seconds - /// - /// if `max_window_tokens` is not provided, it defaults to 60 tokens - pub async fn new( - window_duration: Option, - max_window_tokens: Option, - ) -> anyhow::Result { + pub async fn new() -> anyhow::Result { let url = std::env::var("REDIS_URL").unwrap_or_else(|_| "redis://127.0.0.1/".to_owned()); let client = redis::Client::open(url).context("Failed to create redis client")?; let conn_manager = ConnectionManager::new(client) .await .context("Failed to create connection manager")?; - let window_duration = window_duration.unwrap_or(Duration::from_secs(60)); - let max_window_tokens = max_window_tokens.unwrap_or(60); + let window_duration = Duration::from_secs(60); + let max_window_tokens = 60; + Ok(Self { client: conn_manager, window_duration, @@ -60,7 +54,8 @@ impl DistributedSlidingWindowCounter { .ignore() .set("previous_window_count", current_window_count.unwrap_or(0)) .ignore() - .mget(&["current_window_count", "previous_window_count"]) + .get("current_window_count") + .get("previous_window_count") .query_async(&mut self.client) .await .context("Failed to set values in the DB")?;