Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Implement a retry mechanism for the FileUploader #2244

Merged
merged 8 commits into from
Jan 28, 2025
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion mithril-aggregator/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mithril-aggregator"
version = "0.6.20"
version = "0.6.21"
description = "A Mithril Aggregator server"
authors = { workspace = true }
edition = { workspace = true }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,7 @@ mod tests {
use std::fs::File;
use std::io::Write;

use crate::file_uploaders::FileUploadRetryPolicy;
use crate::tools::url_sanitizer::SanitizedUrlWithTrailingSlash;
use mithril_common::test_utils::TempDir;

Expand Down Expand Up @@ -768,7 +769,12 @@ mod tests {

let url_prefix =
SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root").unwrap();
let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout());
let uploader = LocalUploader::new(
url_prefix,
&target_dir,
FileUploadRetryPolicy::never(),
TestLogger::stdout(),
);
let location = ImmutableFilesUploader::batch_upload(
&uploader,
&[archive_1.clone(), archive_2.clone()],
Expand Down Expand Up @@ -802,7 +808,12 @@ mod tests {

let url_prefix =
SanitizedUrlWithTrailingSlash::parse("http://test.com:8080/base-root").unwrap();
let uploader = LocalUploader::new(url_prefix, &target_dir, TestLogger::stdout());
let uploader = LocalUploader::new(
url_prefix,
&target_dir,
FileUploadRetryPolicy::never(),
TestLogger::stdout(),
);

ImmutableFilesUploader::batch_upload(&uploader, &[archive])
.await
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ mod tests {
.unwrap();

let dumb_snapshotter = Arc::new(DumbSnapshotter::new());
let dumb_snapshot_uploader = Arc::new(DumbUploader::new());
let dumb_snapshot_uploader = Arc::new(DumbUploader::default());

let cardano_immutable_files_full_artifact_builder =
CardanoImmutableFilesFullArtifactBuilder::new(
Expand Down Expand Up @@ -245,7 +245,7 @@ mod tests {
fake_data::network(),
&Version::parse("1.0.0").unwrap(),
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbUploader::new()),
Arc::new(DumbUploader::default()),
CompressionAlgorithm::default(),
TestLogger::stdout(),
);
Expand All @@ -272,7 +272,7 @@ mod tests {
network,
&Version::parse("1.0.0").unwrap(),
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbUploader::new()),
Arc::new(DumbUploader::default()),
CompressionAlgorithm::Gzip,
TestLogger::stdout(),
);
Expand Down Expand Up @@ -301,7 +301,7 @@ mod tests {
fake_data::network(),
&Version::parse("1.0.0").unwrap(),
Arc::new(DumbSnapshotter::new()),
Arc::new(DumbUploader::new()),
Arc::new(DumbUploader::default()),
algorithm,
TestLogger::stdout(),
);
Expand Down
21 changes: 16 additions & 5 deletions mithril-aggregator/src/dependency_injection/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,8 @@ use crate::{
entities::AggregatorEpochSettings,
event_store::{EventMessage, EventStore, TransmitterService},
file_uploaders::{
CloudRemotePath, FileUploader, GcpBackendUploader, GcpUploader, LocalUploader,
CloudRemotePath, FileUploadRetryPolicy, FileUploader, GcpBackendUploader, GcpUploader,
LocalUploader,
},
http_server::{
routes::router::{self, RouterConfig, RouterState},
Expand Down Expand Up @@ -494,7 +495,7 @@ impl DependenciesBuilder {
}
}
} else {
Ok(Arc::new(DumbUploader::new()))
Ok(Arc::new(DumbUploader::new(FileUploadRetryPolicy::never())))
}
}

Expand Down Expand Up @@ -1243,6 +1244,7 @@ impl DependenciesBuilder {
)?),
remote_folder_path,
allow_overwrite,
FileUploadRetryPolicy::default(),
))
}

Expand Down Expand Up @@ -1282,12 +1284,15 @@ impl DependenciesBuilder {
Ok(vec![Arc::new(LocalUploader::new(
ancillary_url_prefix,
&target_dir,
FileUploadRetryPolicy::default(),
logger,
))])
}
}
} else {
Ok(vec![Arc::new(DumbUploader::new())])
Ok(vec![Arc::new(DumbUploader::new(
FileUploadRetryPolicy::never(),
))])
}
}

Expand Down Expand Up @@ -1320,12 +1325,15 @@ impl DependenciesBuilder {
Ok(vec![Arc::new(LocalUploader::new(
immutable_url_prefix,
&target_dir,
FileUploadRetryPolicy::default(),
logger,
))])
}
}
} else {
Ok(vec![Arc::new(DumbUploader::new())])
Ok(vec![Arc::new(DumbUploader::new(
FileUploadRetryPolicy::never(),
))])
}
}

Expand Down Expand Up @@ -1363,12 +1371,15 @@ impl DependenciesBuilder {
Ok(vec![Arc::new(LocalUploader::new(
digests_url_prefix,
&target_dir,
FileUploadRetryPolicy::default(),
logger,
))])
}
}
} else {
Ok(vec![Arc::new(DumbUploader::new())])
Ok(vec![Arc::new(DumbUploader::new(
FileUploadRetryPolicy::never(),
))])
}
}

Expand Down
32 changes: 26 additions & 6 deletions mithril-aggregator/src/file_uploaders/dumb_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ use async_trait::async_trait;
use mithril_common::{entities::FileUri, StdResult};
use std::{path::Path, sync::RwLock};

use crate::file_uploaders::FileUploader;
use crate::file_uploaders::{FileUploadRetryPolicy, FileUploader};

/// Dummy uploader for test purposes.
///
/// It actually does NOT upload any file but remembers the last file it
/// was asked to upload. This is intended to by used by integration tests.
pub struct DumbUploader {
last_uploaded: RwLock<Option<FileUri>>,
retry_policy: FileUploadRetryPolicy,
}

impl DumbUploader {
/// Create a new instance.
pub fn new() -> Self {
/// Create a new instance with a custom retry policy.
pub fn new(retry_policy: FileUploadRetryPolicy) -> Self {
Self {
last_uploaded: RwLock::new(None),
retry_policy,
}
}

Expand All @@ -34,14 +36,14 @@ impl DumbUploader {

impl Default for DumbUploader {
fn default() -> Self {
Self::new()
Self::new(FileUploadRetryPolicy::never())
}
}

#[async_trait]
impl FileUploader for DumbUploader {
/// Upload a file
async fn upload(&self, filepath: &Path) -> StdResult<FileUri> {
async fn upload_without_retry(&self, filepath: &Path) -> StdResult<FileUri> {
let mut value = self
.last_uploaded
.write()
Expand All @@ -52,15 +54,21 @@ impl FileUploader for DumbUploader {

Ok(location)
}

fn retry_policy(&self) -> FileUploadRetryPolicy {
self.retry_policy.clone()
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use super::*;

#[tokio::test]
async fn test_dumb_uploader() {
let uploader = DumbUploader::new();
let uploader = DumbUploader::default();
assert!(uploader
.get_last_upload()
.expect("uploader should not fail")
Expand All @@ -77,4 +85,16 @@ mod tests {
.expect("getting dumb uploader last value after a fake download should not fail")
);
}

#[tokio::test]
async fn retry_policy_from_file_uploader_trait_should_be_implemented() {
let expected_policy = FileUploadRetryPolicy {
attempts: 10,
delay_between_attempts: Duration::from_millis(123),
};

let uploader: Box<dyn FileUploader> = Box::new(DumbUploader::new(expected_policy.clone()));

assert_eq!(expected_policy, uploader.retry_policy());
}
}
38 changes: 36 additions & 2 deletions mithril-aggregator/src/file_uploaders/gcp_uploader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ use mithril_common::{entities::FileUri, logging::LoggerExtensions, StdResult};

use crate::FileUploader;

use super::FileUploadRetryPolicy;

/// CloudRemotePath represents a cloud remote path
#[derive(Debug, Clone, PartialEq)]
pub struct CloudRemotePath(PathBuf);
Expand Down Expand Up @@ -198,6 +200,7 @@ pub struct GcpUploader {
cloud_backend_uploader: Arc<dyn CloudBackendUploader>,
remote_folder: CloudRemotePath,
allow_overwrite: bool,
retry_policy: FileUploadRetryPolicy,
}

impl GcpUploader {
Expand All @@ -206,18 +209,20 @@ impl GcpUploader {
cloud_backend_uploader: Arc<dyn CloudBackendUploader>,
remote_folder: CloudRemotePath,
allow_overwrite: bool,
retry_policy: FileUploadRetryPolicy,
) -> Self {
Self {
cloud_backend_uploader,
remote_folder,
allow_overwrite,
retry_policy,
}
}
}

#[async_trait]
impl FileUploader for GcpUploader {
async fn upload(&self, file_path: &Path) -> StdResult<FileUri> {
async fn upload_without_retry(&self, file_path: &Path) -> StdResult<FileUri> {
let remote_file_path = self.remote_folder.join(get_file_name(file_path)?);
if !self.allow_overwrite {
if let Some(file_uri) = self
Expand All @@ -242,11 +247,17 @@ impl FileUploader for GcpUploader {

Ok(file_uri)
}

fn retry_policy(&self) -> FileUploadRetryPolicy {
self.retry_policy.clone()
}
}

#[cfg(test)]
mod tests {
use crate::test_tools::TestLogger;
use std::time::Duration;

use crate::{file_uploaders::FileUploadRetryPolicy, test_tools::TestLogger};

use super::*;

Expand Down Expand Up @@ -289,6 +300,7 @@ mod tests {
Arc::new(cloud_backend_uploader),
remote_folder_path,
allow_overwrite,
FileUploadRetryPolicy::never(),
);

let file_uri = file_uploader.upload(&local_file_path).await.unwrap();
Expand Down Expand Up @@ -320,6 +332,7 @@ mod tests {
Arc::new(cloud_backend_uploader),
remote_folder_path,
allow_overwrite,
FileUploadRetryPolicy::never(),
);

let file_uri = file_uploader.upload(&local_file_path).await.unwrap();
Expand Down Expand Up @@ -355,6 +368,7 @@ mod tests {
Arc::new(cloud_backend_uploader),
remote_folder_path,
allow_overwrite,
FileUploadRetryPolicy::never(),
);

let file_uri = file_uploader.upload(&local_file_path).await.unwrap();
Expand All @@ -377,6 +391,7 @@ mod tests {
Arc::new(cloud_backend_uploader),
CloudRemotePath::new("remote_folder"),
allow_overwrite,
FileUploadRetryPolicy::never(),
);

file_uploader
Expand All @@ -401,6 +416,7 @@ mod tests {
Arc::new(cloud_backend_uploader),
CloudRemotePath::new("remote_folder"),
allow_overwrite,
FileUploadRetryPolicy::never(),
);

file_uploader
Expand All @@ -427,6 +443,7 @@ mod tests {
Arc::new(cloud_backend_uploader),
CloudRemotePath::new("remote_folder"),
allow_overwrite,
FileUploadRetryPolicy::never(),
);

file_uploader
Expand Down Expand Up @@ -480,4 +497,21 @@ mod tests {
assert_eq!(FileUri(expected_location), location);
}
}

#[tokio::test]
async fn retry_policy_from_file_uploader_trait_should_be_implemented() {
let expected_policy = FileUploadRetryPolicy {
attempts: 10,
delay_between_attempts: Duration::from_millis(123),
};

let file_uploader: Box<dyn FileUploader> = Box::new(GcpUploader::new(
Arc::new(MockCloudBackendUploader::new()),
CloudRemotePath::new("remote_folder"),
true,
expected_policy.clone(),
));

assert_eq!(expected_policy, file_uploader.retry_policy());
}
}
Loading
Loading