Skip to content

Commit

Permalink
Add automatic retries
Browse files Browse the repository at this point in the history
  • Loading branch information
durch committed Sep 1, 2024
1 parent f0f67da commit af9f4fa
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 12 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<!-- [![Join the chat at https://gitter.im/durch/rust-s3](https://badges.gitter.im/durch/rust-s3.svg)](https://gitter.im/durch/rust-s3?utm_source=badge&utm_medium=badge&utm_campaign=pr-badge&utm_content=badge) -->
## rust-s3 [[docs](https://docs.rs/rust-s3/)]

Rust library for working with Amazon S3 or arbitrary S3 compatible APIs, fully compatible with **async/await** and `futures ^0.3`. All `async` features can be turned off and sync only implementations can be used.
Rust library for working with Amazon S3 or arbitrary S3 compatible APIs, fully compatible with **async/await** and `futures ^0.3`. All `async` features can be turned off and sync only implementations can be used. All requests are automatically retried once, can be further modified with a `set_retries` call.

### :raised_hands: Support further maintenance and development

Expand Down
43 changes: 43 additions & 0 deletions s3/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
extern crate serde_derive;

use std::sync::atomic::AtomicBool;
use std::sync::atomic::AtomicU8;

pub use awscreds as creds;
pub use awsregion as region;
Expand Down Expand Up @@ -35,6 +36,48 @@ const EMPTY_PAYLOAD_SHA: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934
#[cfg(not(feature = "disable-call-for-funding"))]
static INITIALIZED: AtomicBool = AtomicBool::new(false);

static RETRIES: AtomicU8 = AtomicU8::new(1);

/// Sets the number of retries for operations that may fail and need to be retried.
///
/// This function stores the specified number of retries in an atomic variable,
/// which can be safely shared across threads. This is used by the retry! macro to automatically retry all requests.
///
/// # Arguments
///
/// * `retries` - The number of retries to set.
///
/// # Example
///
/// ```rust
/// use s3::set_retries;
///
/// set_retries(3);
/// ```
pub fn set_retries(retries: u8) {
RETRIES.store(retries, std::sync::atomic::Ordering::SeqCst);
}

/// Retrieves the current number of retries set for operations.
///
/// This function loads the value of the atomic variable storing the number of retries,
/// which can be safely accessed across threads.
///
/// # Returns
///
/// The number of retries currently set, as a `u64`.
///
/// # Example
///
/// ```rust
/// use s3::get_retries;
///
/// let retries = get_retries();
/// ```
pub fn get_retries() -> u64 {
RETRIES.load(std::sync::atomic::Ordering::Relaxed) as u64
}

#[cfg(not(feature = "disable-call-for-funding"))]
#[inline(always)]
pub(crate) fn init_once() {
Expand Down
8 changes: 4 additions & 4 deletions s3/src/request/async_std_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ impl<'a> Request for SurfRequest<'a> {
}

async fn response_data(&self, etag: bool) -> Result<ResponseData, S3Error> {
let mut response = self.response().await?;
let mut response = crate::retry! {self.response().await}?;
let status_code = response.status();

let response_headers = response
Expand Down Expand Up @@ -120,7 +120,7 @@ impl<'a> Request for SurfRequest<'a> {
) -> Result<u16, S3Error> {
let mut buffer = Vec::new();

let response = self.response().await?;
let response = crate::retry! {self.response().await}?;

let status_code = response.status();

Expand All @@ -135,7 +135,7 @@ impl<'a> Request for SurfRequest<'a> {

async fn response_header(&self) -> Result<(HeaderMap, u16), S3Error> {
let mut header_map = HeaderMap::new();
let response = self.response().await?;
let response = crate::retry! {self.response().await}?;
let status_code = response.status();

for (name, value) in response.iter() {
Expand All @@ -150,7 +150,7 @@ impl<'a> Request for SurfRequest<'a> {
}

async fn response_data_to_stream(&self) -> Result<ResponseDataStream, S3Error> {
let mut response = self.response().await?;
let mut response = crate::retry! {self.response().await}?;
let status_code = response.status();

let body = response
Expand Down
6 changes: 3 additions & 3 deletions s3/src/request/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ impl<'a> Request for AttoRequest<'a> {
}

fn response_data(&self, etag: bool) -> Result<ResponseData, S3Error> {
let response = self.response()?;
let response = crate::retry! {self.response()}?;
let status_code = response.status().as_u16();

let response_headers = response
Expand Down Expand Up @@ -112,7 +112,7 @@ impl<'a> Request for AttoRequest<'a> {
}

fn response_data_to_writer<T: Write + ?Sized>(&self, writer: &mut T) -> Result<u16, S3Error> {
let mut response = self.response()?;
let mut response = crate::retry! {self.response()}?;

let status_code = response.status();
io::copy(&mut response, writer)?;
Expand All @@ -121,7 +121,7 @@ impl<'a> Request for AttoRequest<'a> {
}

fn response_header(&self) -> Result<(Self::HeaderMap, u16), S3Error> {
let response = self.response()?;
let response = crate::retry! {self.response()}?;
let status_code = response.status().as_u16();
let headers = response.headers().clone();
Ok((headers, status_code))
Expand Down
9 changes: 5 additions & 4 deletions s3/src/request/tokio_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use crate::bucket::Bucket;
use crate::command::Command;
use crate::command::HttpMethod;
use crate::error::S3Error;
use crate::retry;
use crate::utils::now_utc;

use tokio_stream::StreamExt;
Expand Down Expand Up @@ -113,7 +114,7 @@ impl<'a> Request for ReqwestRequest<'a> {
}

async fn response_data(&self, etag: bool) -> Result<ResponseData, S3Error> {
let response = self.response().await?;
let response = retry! {self.response().await }?;
let status_code = response.status().as_u16();
let mut headers = response.headers().clone();
let response_headers = headers
Expand Down Expand Up @@ -145,7 +146,7 @@ impl<'a> Request for ReqwestRequest<'a> {
writer: &mut T,
) -> Result<u16, S3Error> {
use tokio::io::AsyncWriteExt;
let response = self.response().await?;
let response = retry! {self.response().await}?;

let status_code = response.status();
let mut stream = response.bytes_stream();
Expand All @@ -158,7 +159,7 @@ impl<'a> Request for ReqwestRequest<'a> {
}

async fn response_data_to_stream(&self) -> Result<ResponseDataStream, S3Error> {
let response = self.response().await?;
let response = retry! {self.response().await}?;
let status_code = response.status();
let stream = response.bytes_stream().map_err(S3Error::Reqwest);

Expand All @@ -169,7 +170,7 @@ impl<'a> Request for ReqwestRequest<'a> {
}

async fn response_header(&self) -> Result<(Self::HeaderMap, u16), S3Error> {
let response = self.response().await?;
let response = retry! {self.response().await}?;
let status_code = response.status().as_u16();
let headers = response.headers().clone();
Ok((headers, status_code))
Expand Down
53 changes: 53 additions & 0 deletions s3/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,59 @@ pub(crate) fn error_from_response_data(response_data: ResponseData) -> Result<S3
))
}

/// Retries a given expression a specified number of times with exponential backoff.
///
/// This macro attempts to execute the provided expression up to `N` times, where `N`
/// is the value set by `set_retries`. If the expression returns `Ok`, it returns the value.
/// If the expression returns `Err`, it logs a warning and retries after a delay that increases
/// exponentially with each retry.
///
/// The delay between retries is calculated as `1 * retry_cnt.pow(2)` seconds, where `retry_cnt`
/// is the current retry attempt.
///
/// This macro supports both asynchronous and synchronous contexts:
/// - For `tokio` users, it uses `tokio::time::sleep`.
/// - For `async-std` users, it uses `async_std::task::sleep`.
/// - For synchronous contexts, it uses `std::thread::sleep`.
///
/// # Features
///
/// - `with-tokio`: Uses `tokio::time::sleep` for async retries.
/// - `with-async-std`: Uses `async_std::task::sleep` for async retries.
/// - `sync`: Uses `std::thread::sleep` for sync retries.
///
/// # Errors
///
/// If all retry attempts fail, the last error is returned.
#[macro_export]
macro_rules! retry {
($e:expr) => {{
let mut retry_cnt: u64 = 0;
let max_retries = $crate::get_retries();

loop {
match $e {
Ok(v) => break Ok(v),
Err(e) => {
log::warn!("Retrying {e}");
if retry_cnt >= max_retries {
break Err(e);
}
retry_cnt += 1;
let delay = std::time::Duration::from_secs(1 * retry_cnt.pow(2));
#[cfg(feature = "with-tokio")]
tokio::time::sleep(delay).await;
#[cfg(feature = "with-async-std")]
async_std::task::sleep(delay).await;
#[cfg(feature = "sync")]
std::thread::sleep(delay);
continue;
}
}
}
}};
}

#[cfg(test)]
mod test {
use crate::utils::etag_for_path;
Expand Down

0 comments on commit af9f4fa

Please sign in to comment.