From 07cd1ec67e794ae7a615722423a3e81a3877345e Mon Sep 17 00:00:00 2001 From: sfauvel Date: Wed, 22 Jan 2025 17:05:43 +0100 Subject: [PATCH 1/8] feat: Implements a `RetryableFileUploader` over `FileUploader` --- mithril-aggregator/src/file_uploaders/mod.rs | 2 + .../file_uploaders/retryable_file_uploader.rs | 104 ++++++++++++++++++ 2 files changed, 106 insertions(+) create mode 100644 mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs diff --git a/mithril-aggregator/src/file_uploaders/mod.rs b/mithril-aggregator/src/file_uploaders/mod.rs index 211b63ddc1a..405403b4565 100644 --- a/mithril-aggregator/src/file_uploaders/mod.rs +++ b/mithril-aggregator/src/file_uploaders/mod.rs @@ -3,12 +3,14 @@ mod gcp_uploader; mod interface; mod local_snapshot_uploader; mod local_uploader; +mod retryable_file_uploader; pub use dumb_uploader::*; pub use gcp_uploader::{CloudRemotePath, GcpBackendUploader, GcpUploader}; pub use interface::FileUploader; pub use local_snapshot_uploader::LocalSnapshotUploader; pub use local_uploader::LocalUploader; +pub use retryable_file_uploader::RetryableFileUploader; #[cfg(test)] pub use interface::MockFileUploader; diff --git a/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs b/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs new file mode 100644 index 00000000000..4796192871c --- /dev/null +++ b/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs @@ -0,0 +1,104 @@ +use std::path::Path; + +use async_trait::async_trait; +use mithril_common::{entities::FileUri, StdResult}; + +use super::FileUploader; + +/// A FileUploader decorator that retries the upload using the decorated FileUploader until it succeeds. +/// The upload fails after a number of attempts. +pub struct RetryableFileUploader { + wrapped_uploader: T, + call_limit: u32, +} + +impl RetryableFileUploader { + pub fn new(wrapped_uploader: T, call_limit: u32) -> Self { + Self { + wrapped_uploader, + call_limit, + } + } +} + +#[async_trait] +impl FileUploader for RetryableFileUploader { + async fn upload(&self, filepath: &Path) -> StdResult { + for _retry in 0..self.call_limit { + let result = self.wrapped_uploader.upload(filepath).await; + if result.is_ok() { + return result; + } + } + + Err(anyhow::anyhow!("Upload retry limit reached")) + } +} + +#[cfg(test)] +mod tests { + use anyhow::anyhow; + use std::path::PathBuf; + + use mithril_common::entities::FileUri; + use mockall::predicate::eq; + + use crate::file_uploaders::MockFileUploader; + + use super::*; + + #[tokio::test] + async fn should_call_inner_uploader() { + let mut inner_uploader = MockFileUploader::new(); + inner_uploader + .expect_upload() + .with(eq(PathBuf::from("file_to_upload"))) + .times(1) + .returning(|_| Ok(FileUri("http://test.com".to_string()))); + + let retry_uploader = RetryableFileUploader::new(inner_uploader, 4); + retry_uploader + .upload(&PathBuf::from("file_to_upload")) + .await + .unwrap(); + } + + #[tokio::test] + async fn should_recall_inner_uploader_if_fail() { + let mut inner_uploader = MockFileUploader::new(); + inner_uploader + .expect_upload() + .with(eq(PathBuf::from("file_to_upload"))) + .times(2) + .returning(|_| Err(anyhow!("Failure while uploading..."))); + inner_uploader + .expect_upload() + .with(eq(PathBuf::from("file_to_upload"))) + .times(1) + .returning(|_| Ok(FileUri("http://test.com".to_string()))); + + let retry_uploader = RetryableFileUploader::new(inner_uploader, 4); + retry_uploader + .upload(&PathBuf::from("file_to_upload")) + .await + .unwrap(); + } + + #[tokio::test] + async fn should_recall_a_failing_inner_uploader_up_to_the_limit() { + let mut inner_uploader = MockFileUploader::new(); + + inner_uploader + .expect_upload() + .with(eq(PathBuf::from("file_to_upload"))) + .times(4) + .returning(move |_| Err(anyhow!("Failure while uploading..."))); + + let retry_uploader = RetryableFileUploader::new(inner_uploader, 4); + + retry_uploader + .upload(&PathBuf::from("file_to_upload")) + .await + .expect_err("An error should be returned when all retries are done"); + } +} From 3baf4ee57b52b7b9861f0671591e2b65f41346c8 Mon Sep 17 00:00:00 2001 From: sfauvel Date: Wed, 22 Jan 2025 17:43:13 +0100 Subject: [PATCH 2/8] feat: Add a delay between two upload attempts on `RetryableFileUploader` --- .../file_uploaders/retryable_file_uploader.rs | 64 +++++++++++++++---- 1 file changed, 52 insertions(+), 12 deletions(-) diff --git a/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs b/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs index 4796192871c..e7a76b18e28 100644 --- a/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs @@ -1,7 +1,8 @@ -use std::path::Path; +use std::{path::Path, time::Duration}; use async_trait::async_trait; use mithril_common::{entities::FileUri, StdResult}; +use tokio::time; use super::FileUploader; @@ -10,13 +11,15 @@ use super::FileUploader; pub struct RetryableFileUploader { wrapped_uploader: T, call_limit: u32, + delay_between_attemps: Duration, } impl RetryableFileUploader { - pub fn new(wrapped_uploader: T, call_limit: u32) -> Self { + pub fn new(wrapped_uploader: T, call_limit: u32, delay_between_attemps: Duration) -> Self { Self { wrapped_uploader, call_limit, + delay_between_attemps, } } } @@ -24,21 +27,24 @@ impl RetryableFileUploader { #[async_trait] impl FileUploader for RetryableFileUploader { async fn upload(&self, filepath: &Path) -> StdResult { - for _retry in 0..self.call_limit { - let result = self.wrapped_uploader.upload(filepath).await; - if result.is_ok() { - return result; + let mut nb_attemps = 0; + loop { + nb_attemps += 1; + match self.wrapped_uploader.upload(filepath).await { + Ok(result) => return Ok(result), + Err(_) if nb_attemps >= self.call_limit => { + return Err(anyhow::anyhow!("Upload retry limit reached")); + } + _ => time::sleep(self.delay_between_attemps).await, } } - - Err(anyhow::anyhow!("Upload retry limit reached")) } } #[cfg(test)] mod tests { use anyhow::anyhow; - use std::path::PathBuf; + use std::{path::PathBuf, time::Instant}; use mithril_common::entities::FileUri; use mockall::predicate::eq; @@ -56,7 +62,7 @@ mod tests { .times(1) .returning(|_| Ok(FileUri("http://test.com".to_string()))); - let retry_uploader = RetryableFileUploader::new(inner_uploader, 4); + let retry_uploader = RetryableFileUploader::new(inner_uploader, 4, Duration::ZERO); retry_uploader .upload(&PathBuf::from("file_to_upload")) .await @@ -77,7 +83,8 @@ mod tests { .times(1) .returning(|_| Ok(FileUri("http://test.com".to_string()))); - let retry_uploader = RetryableFileUploader::new(inner_uploader, 4); + let retry_uploader = RetryableFileUploader::new(inner_uploader, 4, Duration::ZERO); + retry_uploader .upload(&PathBuf::from("file_to_upload")) .await @@ -94,11 +101,44 @@ mod tests { .times(4) .returning(move |_| Err(anyhow!("Failure while uploading..."))); - let retry_uploader = RetryableFileUploader::new(inner_uploader, 4); + let retry_uploader = RetryableFileUploader::new(inner_uploader, 4, Duration::ZERO); + + retry_uploader + .upload(&PathBuf::from("file_to_upload")) + .await + .expect_err("An error should be returned when all retries are done"); + } + + #[tokio::test] + async fn should_delay_between_retries() { + let mut inner_uploader = MockFileUploader::new(); + + inner_uploader + .expect_upload() + .times(4) + .returning(move |_| Err(anyhow!("Failure while uploading..."))); + + let delay = Duration::from_millis(50); + let retry_uploader = RetryableFileUploader::new(inner_uploader, 4, delay); + let start = Instant::now(); retry_uploader .upload(&PathBuf::from("file_to_upload")) .await .expect_err("An error should be returned when all retries are done"); + let duration = start.elapsed(); + + assert!( + duration >= delay * 3, + "Duration should be at least 3 times the delay ({}ms) but was {}ms", + delay.as_millis() * 3, + duration.as_millis() + ); + assert!( + duration < delay * 4, + "Duration should be less than 4 times the delay ({}ms) but was {}ms", + delay.as_millis() * 4, + duration.as_millis() + ); } } From 8c0052c3cd54838a25c13857b3afaab50bf69450 Mon Sep 17 00:00:00 2001 From: sfauvel Date: Thu, 23 Jan 2025 11:01:05 +0100 Subject: [PATCH 3/8] refacto: Replace generic by `Box dyn` in `RetryableFileUploader` --- .../src/file_uploaders/retryable_file_uploader.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs b/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs index e7a76b18e28..c4b46a49bc7 100644 --- a/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs @@ -11,15 +11,15 @@ use super::FileUploader; pub struct RetryableFileUploader { wrapped_uploader: T, call_limit: u32, - delay_between_attemps: Duration, + delay_between_attempts: Duration, } impl RetryableFileUploader { - pub fn new(wrapped_uploader: T, call_limit: u32, delay_between_attemps: Duration) -> Self { + pub fn new(wrapped_uploader: T, call_limit: u32, delay_between_attempts: Duration) -> Self { Self { wrapped_uploader, call_limit, - delay_between_attemps, + delay_between_attempts, } } } @@ -35,7 +35,7 @@ impl FileUploader for RetryableFileUploader { Err(_) if nb_attemps >= self.call_limit => { return Err(anyhow::anyhow!("Upload retry limit reached")); } - _ => time::sleep(self.delay_between_attemps).await, + _ => time::sleep(self.delay_between_attempts).await, } } } From 89dda3f744785d0273727f771cbe0e0af5829184 Mon Sep 17 00:00:00 2001 From: sfauvel Date: Fri, 24 Jan 2025 12:18:32 +0100 Subject: [PATCH 4/8] refacto: Implement retry behavior on `FileUploader`trait instead of creating a decorator --- .../src/file_uploaders/dumb_uploader.rs | 2 +- .../src/file_uploaders/gcp_uploader.rs | 2 +- .../src/file_uploaders/interface.rs | 204 +++++++++++++++++- .../file_uploaders/local_snapshot_uploader.rs | 2 +- .../src/file_uploaders/local_uploader.rs | 2 +- mithril-aggregator/src/file_uploaders/mod.rs | 2 - .../file_uploaders/retryable_file_uploader.rs | 144 ------------- 7 files changed, 203 insertions(+), 155 deletions(-) delete mode 100644 mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs diff --git a/mithril-aggregator/src/file_uploaders/dumb_uploader.rs b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs index 08e6ded8d02..1325d1bbf4d 100644 --- a/mithril-aggregator/src/file_uploaders/dumb_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs @@ -41,7 +41,7 @@ impl Default for DumbUploader { #[async_trait] impl FileUploader for DumbUploader { /// Upload a file - async fn upload(&self, filepath: &Path) -> StdResult { + async fn upload_without_retry(&self, filepath: &Path) -> StdResult { let mut value = self .last_uploaded .write() diff --git a/mithril-aggregator/src/file_uploaders/gcp_uploader.rs b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs index 91626c9867e..8f98c614c5b 100644 --- a/mithril-aggregator/src/file_uploaders/gcp_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs @@ -217,7 +217,7 @@ impl GcpUploader { #[async_trait] impl FileUploader for GcpUploader { - async fn upload(&self, file_path: &Path) -> StdResult { + async fn upload_without_retry(&self, file_path: &Path) -> StdResult { let remote_file_path = self.remote_folder.join(get_file_name(file_path)?); if !self.allow_overwrite { if let Some(file_uri) = self diff --git a/mithril-aggregator/src/file_uploaders/interface.rs b/mithril-aggregator/src/file_uploaders/interface.rs index eda9e0c7ea9..7cd5c75037c 100644 --- a/mithril-aggregator/src/file_uploaders/interface.rs +++ b/mithril-aggregator/src/file_uploaders/interface.rs @@ -1,11 +1,205 @@ use async_trait::async_trait; use mithril_common::{entities::FileUri, StdResult}; -use std::path::Path; +use std::{ + any::{Any, TypeId}, + path::Path, + time::Duration, +}; -/// FileUploader represents a file uploader interactor +/// Policy for retrying file uploads. +pub struct FileUploadRetryPolicy { + attempts: usize, + delay_between_attempts: Duration, +} + +impl FileUploadRetryPolicy { + fn never() -> Self { + Self { + attempts: 1, + delay_between_attempts: Duration::from_secs(0), + } + } +} + +impl Default for FileUploadRetryPolicy { + fn default() -> Self { + Self { + attempts: 3, + delay_between_attempts: Duration::from_secs(5), + } + } +} + +/// FileUploader represents a file uploader interactor. +/// It retries the upload operation according to the retry policy. #[cfg_attr(test, mockall::automock)] #[async_trait] -pub trait FileUploader: Sync + Send { - /// Upload a file - async fn upload(&self, filepath: &Path) -> StdResult; +pub trait FileUploader: Any + Sync + Send { + /// Try to upload once. + async fn upload_without_retry(&self, filepath: &Path) -> StdResult; + + fn retry_policy(&self) -> FileUploadRetryPolicy { + FileUploadRetryPolicy::never() + } + + // Upload a file + async fn upload(&self, filepath: &Path) -> StdResult { + let retry_policy = self.retry_policy(); + + let mut nb_attempts = 0; + loop { + nb_attempts += 1; + match self.upload_without_retry(filepath).await { + Ok(result) => return Ok(result), + Err(_) if nb_attempts >= retry_policy.attempts => { + return Err(anyhow::anyhow!("Upload retry limit reached")); + } + _ => tokio::time::sleep(retry_policy.delay_between_attempts).await, + } + } + } +} + +#[cfg(test)] +mod tests { + use std::{path::PathBuf, time::Instant}; + + use super::*; + use anyhow::anyhow; + use mockall::{mock, predicate::eq}; + + mock! { + TestFileUploaderWithDefaultRetryPolicy { + } + #[async_trait] + impl FileUploader for TestFileUploaderWithDefaultRetryPolicy { + async fn upload_without_retry(&self, filepath: &Path) -> StdResult; + } + } + + mock! { + TestFileUploader { + } + + #[async_trait] + impl FileUploader for TestFileUploader { + async fn upload_without_retry(&self, filepath: &Path) -> StdResult; + fn retry_policy(&self) -> FileUploadRetryPolicy; + } + } + + #[tokio::test] + async fn upload_return_the_result_of_upload_without_retry() { + let mut uploader = MockTestFileUploaderWithDefaultRetryPolicy::new(); + uploader + .expect_upload_without_retry() + .with(eq(PathBuf::from("file_to_upload"))) + .times(1) + .returning(|_| Ok(FileUri("file_uploaded".to_string()))); + + let file_uploaded = uploader.upload(Path::new("file_to_upload")).await.unwrap(); + assert_eq!(FileUri("file_uploaded".to_string()), file_uploaded); + } + + #[tokio::test] + async fn when_upload_fails_do_not_retry_by_default() { + let mut uploader = MockTestFileUploaderWithDefaultRetryPolicy::new(); + uploader + .expect_upload_without_retry() + .with(eq(PathBuf::from("file_to_upload"))) + .times(1) + .returning(|_| Err(anyhow!("Failure while uploading..."))); + + uploader + .upload(Path::new("file_to_upload")) + .await + .expect_err("Should fail on upload"); + } + + #[tokio::test] + async fn should_retry_if_fail() { + let mut uploader = MockTestFileUploader::new(); + + uploader + .expect_retry_policy() + .returning(|| FileUploadRetryPolicy { + attempts: 50, + delay_between_attempts: Duration::ZERO, + }); + + uploader + .expect_upload_without_retry() + .with(eq(PathBuf::from("file_to_upload"))) + .times(2) + .returning(|_| Err(anyhow!("Failure while uploading..."))); + uploader + .expect_upload_without_retry() + .with(eq(PathBuf::from("file_to_upload"))) + .times(1) + .returning(|_| Ok(FileUri("file_uploaded".to_string()))); + + let file_uploaded = uploader.upload(Path::new("file_to_upload")).await.unwrap(); + assert_eq!(FileUri("file_uploaded".to_string()), file_uploaded); + } + + #[tokio::test] + async fn should_recall_a_failing_inner_uploader_up_to_the_limit() { + let mut uploader = MockTestFileUploader::new(); + + uploader + .expect_retry_policy() + .returning(|| FileUploadRetryPolicy { + attempts: 4, + delay_between_attempts: Duration::ZERO, + }); + + uploader + .expect_upload_without_retry() + .with(eq(PathBuf::from("file_to_upload"))) + .times(4) + .returning(|_| Err(anyhow!("Failure while uploading..."))); + + uploader + .upload(&PathBuf::from("file_to_upload")) + .await + .expect_err("An error should be returned when all retries are done"); + } + + #[tokio::test] + async fn should_delay_between_retries() { + let mut uploader = MockTestFileUploader::new(); + + let delay = Duration::from_millis(50); + uploader + .expect_retry_policy() + .returning(move || FileUploadRetryPolicy { + attempts: 4, + delay_between_attempts: delay, + }); + + uploader + .expect_upload_without_retry() + .times(4) + .returning(move |_| Err(anyhow!("Failure while uploading..."))); + + let start = Instant::now(); + uploader + .upload(&PathBuf::from("file_to_upload")) + .await + .expect_err("An error should be returned when all retries are done"); + let duration = start.elapsed(); + + assert!( + duration >= delay * 3, + "Duration should be at least 3 times the delay ({}ms) but was {}ms", + delay.as_millis() * 3, + duration.as_millis() + ); + assert!( + duration < delay * 4, + "Duration should be less than 4 times the delay ({}ms) but was {}ms", + delay.as_millis() * 4, + duration.as_millis() + ); + } } diff --git a/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs b/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs index 98e90d53bba..841b3ada3fd 100644 --- a/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/local_snapshot_uploader.rs @@ -42,7 +42,7 @@ impl LocalSnapshotUploader { #[async_trait] impl FileUploader for LocalSnapshotUploader { - async fn upload(&self, filepath: &Path) -> StdResult { + async fn upload_without_retry(&self, filepath: &Path) -> StdResult { let archive_name = filepath.file_name().unwrap().to_str().unwrap(); let target_path = &self.target_location.join(archive_name); tokio::fs::copy(filepath, target_path) diff --git a/mithril-aggregator/src/file_uploaders/local_uploader.rs b/mithril-aggregator/src/file_uploaders/local_uploader.rs index da52a6cba24..b06e9c8c186 100644 --- a/mithril-aggregator/src/file_uploaders/local_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/local_uploader.rs @@ -40,7 +40,7 @@ impl LocalUploader { #[async_trait] impl FileUploader for LocalUploader { - async fn upload(&self, filepath: &Path) -> StdResult { + async fn upload_without_retry(&self, filepath: &Path) -> StdResult { let archive_name = filepath.file_name().unwrap().to_str().unwrap(); let target_path = &self.target_location.join(archive_name); tokio::fs::copy(filepath, target_path) diff --git a/mithril-aggregator/src/file_uploaders/mod.rs b/mithril-aggregator/src/file_uploaders/mod.rs index 405403b4565..211b63ddc1a 100644 --- a/mithril-aggregator/src/file_uploaders/mod.rs +++ b/mithril-aggregator/src/file_uploaders/mod.rs @@ -3,14 +3,12 @@ mod gcp_uploader; mod interface; mod local_snapshot_uploader; mod local_uploader; -mod retryable_file_uploader; pub use dumb_uploader::*; pub use gcp_uploader::{CloudRemotePath, GcpBackendUploader, GcpUploader}; pub use interface::FileUploader; pub use local_snapshot_uploader::LocalSnapshotUploader; pub use local_uploader::LocalUploader; -pub use retryable_file_uploader::RetryableFileUploader; #[cfg(test)] pub use interface::MockFileUploader; diff --git a/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs b/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs deleted file mode 100644 index c4b46a49bc7..00000000000 --- a/mithril-aggregator/src/file_uploaders/retryable_file_uploader.rs +++ /dev/null @@ -1,144 +0,0 @@ -use std::{path::Path, time::Duration}; - -use async_trait::async_trait; -use mithril_common::{entities::FileUri, StdResult}; -use tokio::time; - -use super::FileUploader; - -/// A FileUploader decorator that retries the upload using the decorated FileUploader until it succeeds. -/// The upload fails after a number of attempts. -pub struct RetryableFileUploader { - wrapped_uploader: T, - call_limit: u32, - delay_between_attempts: Duration, -} - -impl RetryableFileUploader { - pub fn new(wrapped_uploader: T, call_limit: u32, delay_between_attempts: Duration) -> Self { - Self { - wrapped_uploader, - call_limit, - delay_between_attempts, - } - } -} - -#[async_trait] -impl FileUploader for RetryableFileUploader { - async fn upload(&self, filepath: &Path) -> StdResult { - let mut nb_attemps = 0; - loop { - nb_attemps += 1; - match self.wrapped_uploader.upload(filepath).await { - Ok(result) => return Ok(result), - Err(_) if nb_attemps >= self.call_limit => { - return Err(anyhow::anyhow!("Upload retry limit reached")); - } - _ => time::sleep(self.delay_between_attempts).await, - } - } - } -} - -#[cfg(test)] -mod tests { - use anyhow::anyhow; - use std::{path::PathBuf, time::Instant}; - - use mithril_common::entities::FileUri; - use mockall::predicate::eq; - - use crate::file_uploaders::MockFileUploader; - - use super::*; - - #[tokio::test] - async fn should_call_inner_uploader() { - let mut inner_uploader = MockFileUploader::new(); - inner_uploader - .expect_upload() - .with(eq(PathBuf::from("file_to_upload"))) - .times(1) - .returning(|_| Ok(FileUri("http://test.com".to_string()))); - - let retry_uploader = RetryableFileUploader::new(inner_uploader, 4, Duration::ZERO); - retry_uploader - .upload(&PathBuf::from("file_to_upload")) - .await - .unwrap(); - } - - #[tokio::test] - async fn should_recall_inner_uploader_if_fail() { - let mut inner_uploader = MockFileUploader::new(); - inner_uploader - .expect_upload() - .with(eq(PathBuf::from("file_to_upload"))) - .times(2) - .returning(|_| Err(anyhow!("Failure while uploading..."))); - inner_uploader - .expect_upload() - .with(eq(PathBuf::from("file_to_upload"))) - .times(1) - .returning(|_| Ok(FileUri("http://test.com".to_string()))); - - let retry_uploader = RetryableFileUploader::new(inner_uploader, 4, Duration::ZERO); - - retry_uploader - .upload(&PathBuf::from("file_to_upload")) - .await - .unwrap(); - } - - #[tokio::test] - async fn should_recall_a_failing_inner_uploader_up_to_the_limit() { - let mut inner_uploader = MockFileUploader::new(); - - inner_uploader - .expect_upload() - .with(eq(PathBuf::from("file_to_upload"))) - .times(4) - .returning(move |_| Err(anyhow!("Failure while uploading..."))); - - let retry_uploader = RetryableFileUploader::new(inner_uploader, 4, Duration::ZERO); - - retry_uploader - .upload(&PathBuf::from("file_to_upload")) - .await - .expect_err("An error should be returned when all retries are done"); - } - - #[tokio::test] - async fn should_delay_between_retries() { - let mut inner_uploader = MockFileUploader::new(); - - inner_uploader - .expect_upload() - .times(4) - .returning(move |_| Err(anyhow!("Failure while uploading..."))); - - let delay = Duration::from_millis(50); - let retry_uploader = RetryableFileUploader::new(inner_uploader, 4, delay); - - let start = Instant::now(); - retry_uploader - .upload(&PathBuf::from("file_to_upload")) - .await - .expect_err("An error should be returned when all retries are done"); - let duration = start.elapsed(); - - assert!( - duration >= delay * 3, - "Duration should be at least 3 times the delay ({}ms) but was {}ms", - delay.as_millis() * 3, - duration.as_millis() - ); - assert!( - duration < delay * 4, - "Duration should be less than 4 times the delay ({}ms) but was {}ms", - delay.as_millis() * 4, - duration.as_millis() - ); - } -} From 5eafafc1b36ddeab0ef4c33fe02dfbee12093b6c Mon Sep 17 00:00:00 2001 From: sfauvel Date: Fri, 24 Jan 2025 17:43:11 +0100 Subject: [PATCH 5/8] feature: implement file upload retry and configure it in builder --- .../src/dependency_injection/builder.rs | 15 ++++--- .../src/file_uploaders/dumb_uploader.rs | 31 +++++++++++++ .../src/file_uploaders/gcp_uploader.rs | 44 ++++++++++++++++++- .../src/file_uploaders/interface.rs | 20 +++++---- .../src/file_uploaders/local_uploader.rs | 41 ++++++++++++++++- mithril-aggregator/src/file_uploaders/mod.rs | 2 +- 6 files changed, 136 insertions(+), 17 deletions(-) diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index 1e22cde1692..fa748839d28 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -67,7 +67,8 @@ use crate::{ entities::AggregatorEpochSettings, event_store::{EventMessage, EventStore, TransmitterService}, file_uploaders::{ - CloudRemotePath, FileUploader, GcpBackendUploader, GcpUploader, LocalUploader, + CloudRemotePath, FileUploadRetryPolicy, FileUploader, GcpBackendUploader, GcpUploader, + LocalUploader, }, http_server::{ routes::router::{self, RouterConfig, RouterState}, @@ -1235,7 +1236,7 @@ impl DependenciesBuilder { DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string()) })?; - Ok(GcpUploader::new( + Ok(GcpUploader::with_retry_policy( Arc::new(GcpBackendUploader::try_new( bucket, self.configuration.snapshot_use_cdn_domain, @@ -1243,6 +1244,7 @@ impl DependenciesBuilder { )?), remote_folder_path, allow_overwrite, + FileUploadRetryPolicy::default(), )) } @@ -1279,9 +1281,10 @@ impl DependenciesBuilder { } })?; - Ok(vec![Arc::new(LocalUploader::new( + Ok(vec![Arc::new(LocalUploader::with_retry_policy( ancillary_url_prefix, &target_dir, + FileUploadRetryPolicy::default(), logger, ))]) } @@ -1317,9 +1320,10 @@ impl DependenciesBuilder { .join(CARDANO_DB_ARTIFACTS_DIR) .join("immutable"); - Ok(vec![Arc::new(LocalUploader::new( + Ok(vec![Arc::new(LocalUploader::with_retry_policy( immutable_url_prefix, &target_dir, + FileUploadRetryPolicy::default(), logger, ))]) } @@ -1360,9 +1364,10 @@ impl DependenciesBuilder { } })?; - Ok(vec![Arc::new(LocalUploader::new( + Ok(vec![Arc::new(LocalUploader::with_retry_policy( digests_url_prefix, &target_dir, + FileUploadRetryPolicy::default(), logger, ))]) } diff --git a/mithril-aggregator/src/file_uploaders/dumb_uploader.rs b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs index 1325d1bbf4d..421203f9d0d 100644 --- a/mithril-aggregator/src/file_uploaders/dumb_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs @@ -5,12 +5,15 @@ use std::{path::Path, sync::RwLock}; use crate::file_uploaders::FileUploader; +use super::interface::FileUploadRetryPolicy; + /// Dummy uploader for test purposes. /// /// It actually does NOT upload any file but remembers the last file it /// was asked to upload. This is intended to by used by integration tests. pub struct DumbUploader { last_uploaded: RwLock>, + retry_policy: FileUploadRetryPolicy, } impl DumbUploader { @@ -18,6 +21,15 @@ impl DumbUploader { pub fn new() -> Self { Self { last_uploaded: RwLock::new(None), + retry_policy: FileUploadRetryPolicy::never(), + } + } + + /// Create a new instance with a custom retry policy. + pub fn with_retry_policy(retry_policy: FileUploadRetryPolicy) -> Self { + Self { + last_uploaded: RwLock::new(None), + retry_policy, } } @@ -52,10 +64,16 @@ impl FileUploader for DumbUploader { Ok(location) } + + fn retry_policy(&self) -> FileUploadRetryPolicy { + self.retry_policy.clone() + } } #[cfg(test)] mod tests { + use std::time::Duration; + use super::*; #[tokio::test] @@ -77,4 +95,17 @@ mod tests { .expect("getting dumb uploader last value after a fake download should not fail") ); } + + #[tokio::test] + async fn retry_policy_from_file_uploader_trait_should_be_implemented() { + let expected_policy = FileUploadRetryPolicy { + attempts: 10, + delay_between_attempts: Duration::from_millis(123), + }; + + let uploader: Box = + Box::new(DumbUploader::with_retry_policy(expected_policy.clone())); + + assert_eq!(expected_policy, uploader.retry_policy()); + } } diff --git a/mithril-aggregator/src/file_uploaders/gcp_uploader.rs b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs index 8f98c614c5b..1ca8b14727a 100644 --- a/mithril-aggregator/src/file_uploaders/gcp_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs @@ -17,6 +17,8 @@ use mithril_common::{entities::FileUri, logging::LoggerExtensions, StdResult}; use crate::FileUploader; +use super::FileUploadRetryPolicy; + /// CloudRemotePath represents a cloud remote path #[derive(Debug, Clone, PartialEq)] pub struct CloudRemotePath(PathBuf); @@ -198,6 +200,7 @@ pub struct GcpUploader { cloud_backend_uploader: Arc, remote_folder: CloudRemotePath, allow_overwrite: bool, + retry_policy: FileUploadRetryPolicy, } impl GcpUploader { @@ -211,6 +214,22 @@ impl GcpUploader { cloud_backend_uploader, remote_folder, allow_overwrite, + retry_policy: FileUploadRetryPolicy::never(), + } + } + + /// Create a new instance with a custom retry policy. + pub fn with_retry_policy( + cloud_backend_uploader: Arc, + remote_folder: CloudRemotePath, + allow_overwrite: bool, + retry_policy: FileUploadRetryPolicy, + ) -> Self { + Self { + cloud_backend_uploader, + remote_folder, + allow_overwrite, + retry_policy, } } } @@ -242,11 +261,17 @@ impl FileUploader for GcpUploader { Ok(file_uri) } + + fn retry_policy(&self) -> FileUploadRetryPolicy { + self.retry_policy.clone() + } } #[cfg(test)] mod tests { - use crate::test_tools::TestLogger; + use std::time::Duration; + + use crate::{file_uploaders::FileUploadRetryPolicy, test_tools::TestLogger}; use super::*; @@ -480,4 +505,21 @@ mod tests { assert_eq!(FileUri(expected_location), location); } } + + #[tokio::test] + async fn retry_policy_from_file_uploader_trait_should_be_implemented() { + let expected_policy = FileUploadRetryPolicy { + attempts: 10, + delay_between_attempts: Duration::from_millis(123), + }; + + let file_uploader: Box = Box::new(GcpUploader::with_retry_policy( + Arc::new(MockCloudBackendUploader::new()), + CloudRemotePath::new("remote_folder"), + true, + expected_policy.clone(), + )); + + assert_eq!(expected_policy, file_uploader.retry_policy()); + } } diff --git a/mithril-aggregator/src/file_uploaders/interface.rs b/mithril-aggregator/src/file_uploaders/interface.rs index 7cd5c75037c..809b43446c3 100644 --- a/mithril-aggregator/src/file_uploaders/interface.rs +++ b/mithril-aggregator/src/file_uploaders/interface.rs @@ -1,19 +1,19 @@ use async_trait::async_trait; use mithril_common::{entities::FileUri, StdResult}; -use std::{ - any::{Any, TypeId}, - path::Path, - time::Duration, -}; +use std::{any::Any, path::Path, time::Duration}; /// Policy for retrying file uploads. +#[derive(Debug, PartialEq, Clone)] pub struct FileUploadRetryPolicy { - attempts: usize, - delay_between_attempts: Duration, + /// Number of attempts to upload a file. + pub attempts: usize, + /// Delay between two attempts. + pub delay_between_attempts: Duration, } impl FileUploadRetryPolicy { - fn never() -> Self { + /// Create a policy that never retries. + pub fn never() -> Self { Self { attempts: 1, delay_between_attempts: Duration::from_secs(0), @@ -22,6 +22,7 @@ impl FileUploadRetryPolicy { } impl Default for FileUploadRetryPolicy { + /// Create a default retry policy. fn default() -> Self { Self { attempts: 3, @@ -38,11 +39,12 @@ pub trait FileUploader: Any + Sync + Send { /// Try to upload once. async fn upload_without_retry(&self, filepath: &Path) -> StdResult; + /// Get the retry policy for this uploader. fn retry_policy(&self) -> FileUploadRetryPolicy { FileUploadRetryPolicy::never() } - // Upload a file + /// Upload a file with retries according to the retry policy. async fn upload(&self, filepath: &Path) -> StdResult { let retry_policy = self.retry_policy(); diff --git a/mithril-aggregator/src/file_uploaders/local_uploader.rs b/mithril-aggregator/src/file_uploaders/local_uploader.rs index b06e9c8c186..1c1324ae347 100644 --- a/mithril-aggregator/src/file_uploaders/local_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/local_uploader.rs @@ -6,7 +6,7 @@ use std::path::{Path, PathBuf}; use mithril_common::StdResult; use mithril_common::{entities::FileUri, logging::LoggerExtensions}; -use crate::file_uploaders::FileUploader; +use crate::file_uploaders::{FileUploadRetryPolicy, FileUploader}; use crate::tools::url_sanitizer::SanitizedUrlWithTrailingSlash; /// LocalUploader is a file uploader working using local files @@ -17,6 +17,7 @@ pub struct LocalUploader { /// Target folder where to store files archive target_location: PathBuf, + retry_policy: FileUploadRetryPolicy, logger: Logger, } @@ -26,6 +27,20 @@ impl LocalUploader { server_url_prefix: SanitizedUrlWithTrailingSlash, target_location: &Path, logger: Logger, + ) -> Self { + Self::with_retry_policy( + server_url_prefix, + target_location, + FileUploadRetryPolicy::never(), + logger, + ) + } + + pub(crate) fn with_retry_policy( + server_url_prefix: SanitizedUrlWithTrailingSlash, + target_location: &Path, + retry_policy: FileUploadRetryPolicy, + logger: Logger, ) -> Self { let logger = logger.new_with_component_name::(); debug!(logger, "New LocalUploader created"; "server_url_prefix" => &server_url_prefix.as_str()); @@ -33,6 +48,7 @@ impl LocalUploader { Self { server_url_prefix, target_location: target_location.to_path_buf(), + retry_policy, logger, } } @@ -56,6 +72,10 @@ impl FileUploader for LocalUploader { ); Ok(FileUri(location)) } + + fn retry_policy(&self) -> FileUploadRetryPolicy { + self.retry_policy.clone() + } } #[cfg(test)] @@ -63,6 +83,7 @@ mod tests { use std::fs::File; use std::io::Write; use std::path::{Path, PathBuf}; + use std::time::Duration; use mithril_common::test_utils::TempDir; @@ -150,4 +171,22 @@ mod tests { .await .expect_err("Uploading a directory should fail"); } + + #[tokio::test] + async fn retry_policy_from_file_uploader_trait_should_be_implemented() { + let target_dir = TempDir::create("local_uploader", "test_retry_policy"); + let expected_policy = FileUploadRetryPolicy { + attempts: 10, + delay_between_attempts: Duration::from_millis(123), + }; + + let uploader: Box = Box::new(LocalUploader::with_retry_policy( + SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root/").unwrap(), + &target_dir, + expected_policy.clone(), + TestLogger::stdout(), + )); + + assert_eq!(expected_policy, uploader.retry_policy()); + } } diff --git a/mithril-aggregator/src/file_uploaders/mod.rs b/mithril-aggregator/src/file_uploaders/mod.rs index 211b63ddc1a..c47075e4b49 100644 --- a/mithril-aggregator/src/file_uploaders/mod.rs +++ b/mithril-aggregator/src/file_uploaders/mod.rs @@ -6,7 +6,7 @@ mod local_uploader; pub use dumb_uploader::*; pub use gcp_uploader::{CloudRemotePath, GcpBackendUploader, GcpUploader}; -pub use interface::FileUploader; +pub use interface::{FileUploadRetryPolicy, FileUploader}; pub use local_snapshot_uploader::LocalSnapshotUploader; pub use local_uploader::LocalUploader; From a99ec3062d3367364a55243bacdcccd07e067f3e Mon Sep 17 00:00:00 2001 From: sfauvel Date: Tue, 28 Jan 2025 09:58:51 +0100 Subject: [PATCH 6/8] refacto: Add parameter `FileUploadRetryPolicy` to `new` function of the Uploaders and remove `with_retry_policy` --- .../cardano_database_artifacts/immutable.rs | 15 ++++++++++-- .../cardano_immutable_files_full.rs | 8 +++---- .../src/dependency_injection/builder.rs | 22 ++++++++++------- .../src/file_uploaders/dumb_uploader.rs | 21 ++++------------ .../src/file_uploaders/gcp_uploader.rs | 22 ++++++----------- .../src/file_uploaders/interface.rs | 4 ++-- .../src/file_uploaders/local_uploader.rs | 24 +++++++------------ .../tests/test_extensions/runtime_tester.rs | 2 +- 8 files changed, 55 insertions(+), 63 deletions(-) diff --git a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs index 6896aa59a40..41d9a98400f 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_database_artifacts/immutable.rs @@ -735,6 +735,7 @@ mod tests { use std::fs::File; use std::io::Write; + use crate::file_uploaders::FileUploadRetryPolicy; use crate::tools::url_sanitizer::SanitizedUrlWithTrailingSlash; use mithril_common::test_utils::TempDir; @@ -768,7 +769,12 @@ mod tests { let url_prefix = SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root").unwrap(); - let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()); + let uploader = LocalUploader::new( + url_prefix, + &target_dir, + FileUploadRetryPolicy::never(), + TestLogger::stdout(), + ); let location = ImmutableFilesUploader::batch_upload( &uploader, &[archive_1.clone(), archive_2.clone()], @@ -802,7 +808,12 @@ mod tests { let url_prefix = SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root").unwrap(); - let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()); + let uploader = LocalUploader::new( + url_prefix, + &target_dir, + FileUploadRetryPolicy::never(), + TestLogger::stdout(), + ); ImmutableFilesUploader::batch_upload(&uploader, &[archive]) .await diff --git a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs index 64aa4efbb94..823e2dffad5 100644 --- a/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs +++ b/mithril-aggregator/src/artifact_builder/cardano_immutable_files_full.rs @@ -197,7 +197,7 @@ mod tests { .unwrap(); let dumb_snapshotter = Arc::new(DumbSnapshotter::new()); - let dumb_snapshot_uploader = Arc::new(DumbUploader::new()); + let dumb_snapshot_uploader = Arc::new(DumbUploader::default()); let cardano_immutable_files_full_artifact_builder = CardanoImmutableFilesFullArtifactBuilder::new( @@ -245,7 +245,7 @@ mod tests { fake_data::network(), &Version::parse("1.0.0").unwrap(), Arc::new(DumbSnapshotter::new()), - Arc::new(DumbUploader::new()), + Arc::new(DumbUploader::default()), CompressionAlgorithm::default(), TestLogger::stdout(), ); @@ -272,7 +272,7 @@ mod tests { network, &Version::parse("1.0.0").unwrap(), Arc::new(DumbSnapshotter::new()), - Arc::new(DumbUploader::new()), + Arc::new(DumbUploader::default()), CompressionAlgorithm::Gzip, TestLogger::stdout(), ); @@ -301,7 +301,7 @@ mod tests { fake_data::network(), &Version::parse("1.0.0").unwrap(), Arc::new(DumbSnapshotter::new()), - Arc::new(DumbUploader::new()), + Arc::new(DumbUploader::default()), algorithm, TestLogger::stdout(), ); diff --git a/mithril-aggregator/src/dependency_injection/builder.rs b/mithril-aggregator/src/dependency_injection/builder.rs index fa748839d28..648788cb64d 100644 --- a/mithril-aggregator/src/dependency_injection/builder.rs +++ b/mithril-aggregator/src/dependency_injection/builder.rs @@ -495,7 +495,7 @@ impl DependenciesBuilder { } } } else { - Ok(Arc::new(DumbUploader::new())) + Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never()))) } } @@ -1236,7 +1236,7 @@ impl DependenciesBuilder { DependenciesBuilderError::MissingConfiguration("snapshot_bucket_name".to_string()) })?; - Ok(GcpUploader::with_retry_policy( + Ok(GcpUploader::new( Arc::new(GcpBackendUploader::try_new( bucket, self.configuration.snapshot_use_cdn_domain, @@ -1281,7 +1281,7 @@ impl DependenciesBuilder { } })?; - Ok(vec![Arc::new(LocalUploader::with_retry_policy( + Ok(vec![Arc::new(LocalUploader::new( ancillary_url_prefix, &target_dir, FileUploadRetryPolicy::default(), @@ -1290,7 +1290,9 @@ impl DependenciesBuilder { } } } else { - Ok(vec![Arc::new(DumbUploader::new())]) + Ok(vec![Arc::new(DumbUploader::new( + FileUploadRetryPolicy::never(), + ))]) } } @@ -1320,7 +1322,7 @@ impl DependenciesBuilder { .join(CARDANO_DB_ARTIFACTS_DIR) .join("immutable"); - Ok(vec![Arc::new(LocalUploader::with_retry_policy( + Ok(vec![Arc::new(LocalUploader::new( immutable_url_prefix, &target_dir, FileUploadRetryPolicy::default(), @@ -1329,7 +1331,9 @@ impl DependenciesBuilder { } } } else { - Ok(vec![Arc::new(DumbUploader::new())]) + Ok(vec![Arc::new(DumbUploader::new( + FileUploadRetryPolicy::never(), + ))]) } } @@ -1364,7 +1368,7 @@ impl DependenciesBuilder { } })?; - Ok(vec![Arc::new(LocalUploader::with_retry_policy( + Ok(vec![Arc::new(LocalUploader::new( digests_url_prefix, &target_dir, FileUploadRetryPolicy::default(), @@ -1373,7 +1377,9 @@ impl DependenciesBuilder { } } } else { - Ok(vec![Arc::new(DumbUploader::new())]) + Ok(vec![Arc::new(DumbUploader::new( + FileUploadRetryPolicy::never(), + ))]) } } diff --git a/mithril-aggregator/src/file_uploaders/dumb_uploader.rs b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs index 421203f9d0d..eeebfb6cdca 100644 --- a/mithril-aggregator/src/file_uploaders/dumb_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/dumb_uploader.rs @@ -3,9 +3,7 @@ use async_trait::async_trait; use mithril_common::{entities::FileUri, StdResult}; use std::{path::Path, sync::RwLock}; -use crate::file_uploaders::FileUploader; - -use super::interface::FileUploadRetryPolicy; +use crate::file_uploaders::{FileUploadRetryPolicy, FileUploader}; /// Dummy uploader for test purposes. /// @@ -17,16 +15,8 @@ pub struct DumbUploader { } impl DumbUploader { - /// Create a new instance. - pub fn new() -> Self { - Self { - last_uploaded: RwLock::new(None), - retry_policy: FileUploadRetryPolicy::never(), - } - } - /// Create a new instance with a custom retry policy. - pub fn with_retry_policy(retry_policy: FileUploadRetryPolicy) -> Self { + pub fn new(retry_policy: FileUploadRetryPolicy) -> Self { Self { last_uploaded: RwLock::new(None), retry_policy, @@ -46,7 +36,7 @@ impl DumbUploader { impl Default for DumbUploader { fn default() -> Self { - Self::new() + Self::new(FileUploadRetryPolicy::never()) } } @@ -78,7 +68,7 @@ mod tests { #[tokio::test] async fn test_dumb_uploader() { - let uploader = DumbUploader::new(); + let uploader = DumbUploader::default(); assert!(uploader .get_last_upload() .expect("uploader should not fail") @@ -103,8 +93,7 @@ mod tests { delay_between_attempts: Duration::from_millis(123), }; - let uploader: Box = - Box::new(DumbUploader::with_retry_policy(expected_policy.clone())); + let uploader: Box = Box::new(DumbUploader::new(expected_policy.clone())); assert_eq!(expected_policy, uploader.retry_policy()); } diff --git a/mithril-aggregator/src/file_uploaders/gcp_uploader.rs b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs index 1ca8b14727a..f6fce7b910c 100644 --- a/mithril-aggregator/src/file_uploaders/gcp_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/gcp_uploader.rs @@ -209,20 +209,6 @@ impl GcpUploader { cloud_backend_uploader: Arc, remote_folder: CloudRemotePath, allow_overwrite: bool, - ) -> Self { - Self { - cloud_backend_uploader, - remote_folder, - allow_overwrite, - retry_policy: FileUploadRetryPolicy::never(), - } - } - - /// Create a new instance with a custom retry policy. - pub fn with_retry_policy( - cloud_backend_uploader: Arc, - remote_folder: CloudRemotePath, - allow_overwrite: bool, retry_policy: FileUploadRetryPolicy, ) -> Self { Self { @@ -314,6 +300,7 @@ mod tests { Arc::new(cloud_backend_uploader), remote_folder_path, allow_overwrite, + FileUploadRetryPolicy::never(), ); let file_uri = file_uploader.upload(&local_file_path).await.unwrap(); @@ -345,6 +332,7 @@ mod tests { Arc::new(cloud_backend_uploader), remote_folder_path, allow_overwrite, + FileUploadRetryPolicy::never(), ); let file_uri = file_uploader.upload(&local_file_path).await.unwrap(); @@ -380,6 +368,7 @@ mod tests { Arc::new(cloud_backend_uploader), remote_folder_path, allow_overwrite, + FileUploadRetryPolicy::never(), ); let file_uri = file_uploader.upload(&local_file_path).await.unwrap(); @@ -402,6 +391,7 @@ mod tests { Arc::new(cloud_backend_uploader), CloudRemotePath::new("remote_folder"), allow_overwrite, + FileUploadRetryPolicy::never(), ); file_uploader @@ -426,6 +416,7 @@ mod tests { Arc::new(cloud_backend_uploader), CloudRemotePath::new("remote_folder"), allow_overwrite, + FileUploadRetryPolicy::never(), ); file_uploader @@ -452,6 +443,7 @@ mod tests { Arc::new(cloud_backend_uploader), CloudRemotePath::new("remote_folder"), allow_overwrite, + FileUploadRetryPolicy::never(), ); file_uploader @@ -513,7 +505,7 @@ mod tests { delay_between_attempts: Duration::from_millis(123), }; - let file_uploader: Box = Box::new(GcpUploader::with_retry_policy( + let file_uploader: Box = Box::new(GcpUploader::new( Arc::new(MockCloudBackendUploader::new()), CloudRemotePath::new("remote_folder"), true, diff --git a/mithril-aggregator/src/file_uploaders/interface.rs b/mithril-aggregator/src/file_uploaders/interface.rs index 809b43446c3..91b131cd3b4 100644 --- a/mithril-aggregator/src/file_uploaders/interface.rs +++ b/mithril-aggregator/src/file_uploaders/interface.rs @@ -1,6 +1,6 @@ use async_trait::async_trait; use mithril_common::{entities::FileUri, StdResult}; -use std::{any::Any, path::Path, time::Duration}; +use std::{path::Path, time::Duration}; /// Policy for retrying file uploads. #[derive(Debug, PartialEq, Clone)] @@ -35,7 +35,7 @@ impl Default for FileUploadRetryPolicy { /// It retries the upload operation according to the retry policy. #[cfg_attr(test, mockall::automock)] #[async_trait] -pub trait FileUploader: Any + Sync + Send { +pub trait FileUploader: Sync + Send { /// Try to upload once. async fn upload_without_retry(&self, filepath: &Path) -> StdResult; diff --git a/mithril-aggregator/src/file_uploaders/local_uploader.rs b/mithril-aggregator/src/file_uploaders/local_uploader.rs index 1c1324ae347..1fb27a3d921 100644 --- a/mithril-aggregator/src/file_uploaders/local_uploader.rs +++ b/mithril-aggregator/src/file_uploaders/local_uploader.rs @@ -24,19 +24,6 @@ pub struct LocalUploader { impl LocalUploader { /// LocalUploader factory pub(crate) fn new( - server_url_prefix: SanitizedUrlWithTrailingSlash, - target_location: &Path, - logger: Logger, - ) -> Self { - Self::with_retry_policy( - server_url_prefix, - target_location, - FileUploadRetryPolicy::never(), - logger, - ) - } - - pub(crate) fn with_retry_policy( server_url_prefix: SanitizedUrlWithTrailingSlash, target_location: &Path, retry_policy: FileUploadRetryPolicy, @@ -122,7 +109,12 @@ mod tests { let url_prefix = SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root").unwrap(); - let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout()); + let uploader = LocalUploader::new( + url_prefix, + &target_dir, + FileUploadRetryPolicy::never(), + TestLogger::stdout(), + ); let location = FileUploader::upload(&uploader, &archive) .await .expect("local upload should not fail"); @@ -145,6 +137,7 @@ mod tests { let uploader = LocalUploader::new( SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root/").unwrap(), &target_dir, + FileUploadRetryPolicy::never(), TestLogger::stdout(), ); FileUploader::upload(&uploader, &archive).await.unwrap(); @@ -165,6 +158,7 @@ mod tests { let uploader = LocalUploader::new( SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root/").unwrap(), &target_dir, + FileUploadRetryPolicy::never(), TestLogger::stdout(), ); FileUploader::upload(&uploader, &source_dir) @@ -180,7 +174,7 @@ mod tests { delay_between_attempts: Duration::from_millis(123), }; - let uploader: Box = Box::new(LocalUploader::with_retry_policy( + let uploader: Box = Box::new(LocalUploader::new( SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root/").unwrap(), &target_dir, expected_policy.clone(), diff --git a/mithril-aggregator/tests/test_extensions/runtime_tester.rs b/mithril-aggregator/tests/test_extensions/runtime_tester.rs index 3b22296bf86..40d1b3ee58f 100644 --- a/mithril-aggregator/tests/test_extensions/runtime_tester.rs +++ b/mithril-aggregator/tests/test_extensions/runtime_tester.rs @@ -130,7 +130,7 @@ impl RuntimeTester { let logger = build_logger(); let global_logger = slog_scope::set_global_logger(logger.clone()); let network = configuration.network.clone(); - let snapshot_uploader = Arc::new(DumbUploader::new()); + let snapshot_uploader = Arc::new(DumbUploader::default()); let immutable_file_observer = Arc::new(DumbImmutableFileObserver::new()); immutable_file_observer .shall_return(Some(start_time_point.immutable_file_number)) From 96d4a5c61f1a436b8e5fb05404cc8916fcc9d154 Mon Sep 17 00:00:00 2001 From: sfauvel Date: Tue, 28 Jan 2025 11:33:39 +0100 Subject: [PATCH 7/8] refacto: Add nb attempts in error message --- mithril-aggregator/src/file_uploaders/interface.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mithril-aggregator/src/file_uploaders/interface.rs b/mithril-aggregator/src/file_uploaders/interface.rs index 91b131cd3b4..854a4f91608 100644 --- a/mithril-aggregator/src/file_uploaders/interface.rs +++ b/mithril-aggregator/src/file_uploaders/interface.rs @@ -54,7 +54,10 @@ pub trait FileUploader: Sync + Send { match self.upload_without_retry(filepath).await { Ok(result) => return Ok(result), Err(_) if nb_attempts >= retry_policy.attempts => { - return Err(anyhow::anyhow!("Upload retry limit reached")); + return Err(anyhow::anyhow!( + "Upload failed after {} attempts", + nb_attempts + )); } _ => tokio::time::sleep(retry_policy.delay_between_attempts).await, } From 26f599639973c97ea9cd7c77a87b94fd925b5f86 Mon Sep 17 00:00:00 2001 From: sfauvel Date: Tue, 28 Jan 2025 10:50:52 +0100 Subject: [PATCH 8/8] chore: upgrade crate versions * mithril-aggregator from `0.6.20` to `0.6.21` --- Cargo.lock | 2 +- mithril-aggregator/Cargo.toml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0710cd3fc4b..46f155c0063 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3578,7 +3578,7 @@ dependencies = [ [[package]] name = "mithril-aggregator" -version = "0.6.20" +version = "0.6.21" dependencies = [ "anyhow", "async-trait", diff --git a/mithril-aggregator/Cargo.toml b/mithril-aggregator/Cargo.toml index f249bc6a212..5ea9e3a3b9b 100644 --- a/mithril-aggregator/Cargo.toml +++ b/mithril-aggregator/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mithril-aggregator" -version = "0.6.20" +version = "0.6.21" description = "A Mithril Aggregator server" authors = { workspace = true } edition = { workspace = true }