diff --git a/Cargo.lock b/Cargo.lock index 498ce10e5c0f..3393a8c70d36 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2433,14 +2433,12 @@ dependencies = [ "common_utils", "dyn-clone", "error-stack", - "futures 0.3.28", "hyper", "hyper-proxy", "masking", "once_cell", "router_env", "serde", - "storage_impl", "thiserror", "tokio 1.32.0", ] @@ -2671,9 +2669,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -2681,9 +2679,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" @@ -2709,9 +2707,9 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-lite" @@ -2730,9 +2728,9 @@ dependencies = [ [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -2741,15 +2739,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-timer" @@ -2759,9 +2757,9 @@ checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -5072,8 +5070,6 @@ dependencies = [ "async-bb8-diesel", "async-trait", "awc", - "aws-config", - "aws-sdk-s3", "base64 0.21.5", "bb8", "bigdecimal", diff --git a/config/config.example.toml b/config/config.example.toml index 732c9292c28a..7aa6da046de8 100644 --- a/config/config.example.toml +++ b/config/config.example.toml @@ -509,14 +509,6 @@ client_secret = "paypal_secret_key" # Secret key for PayPal onboarding partner_id = "paypal_partner_id" # Partner ID for PayPal onboarding enabled = true # Switch to enable or disable PayPal onboarding -[frm] -enabled = true - -[paypal_onboarding] -client_id = "paypal_client_id" # Client ID for PayPal onboarding -client_secret = "paypal_secret_key" # Secret key for PayPal onboarding -partner_id = "paypal_partner_id" # Partner ID for PayPal onboarding -enabled = true # Switch to enable or disable PayPal onboarding [events] source = "logs" # The event sink to push events supports kafka or logs (stdout) @@ -531,9 +523,9 @@ connector_logs_topic = "topic" # Kafka topic to be used for connector outgoing_webhook_logs_topic = "topic" # Kafka topic to be used for outgoing webhook events # File storage configuration -[file_storage_config] -file_storage_scheme = "aws_s3" # File storage scheme to be used +[file_storage] +file_storage_backend = "aws_s3" # File storage backend to be used -[file_storage_config.aws_s3] +[file_storage.aws_s3] region = "us-east-1" # The AWS region used by the AWS S3 for file storage bucket_name = "bucket1" # The AWS S3 bucket name for file storage diff --git a/config/development.toml b/config/development.toml index 91269005a0f0..ea075ec326cf 100644 --- a/config/development.toml +++ b/config/development.toml @@ -542,3 +542,6 @@ client_id = "" client_secret = "" partner_id = "" enabled = true + +[file_storage] +file_storage_backend = "file_system" diff --git a/config/docker_compose.toml b/config/docker_compose.toml index 450fe106a31f..4c1b1326e4ca 100644 --- a/config/docker_compose.toml +++ b/config/docker_compose.toml @@ -398,3 +398,6 @@ enabled = true [events] source = "logs" + +[file_storage] +file_storage_backend = "file_system" diff --git a/crates/common_utils/src/fs_utils.rs b/crates/common_utils/src/fs_utils.rs index 5175a4f66908..e18e8e054598 100644 --- a/crates/common_utils/src/fs_utils.rs +++ b/crates/common_utils/src/fs_utils.rs @@ -3,7 +3,6 @@ //! use std::{ - fmt::{Display, Formatter}, fs::{remove_file, File}, io::{Read, Write}, path::PathBuf, @@ -15,12 +14,14 @@ use crate::errors::CustomResult; /// Constructs the file path for a given file key within the file system. /// The file path is generated based on the workspace path and the provided file key. -pub fn get_file_path(file_key: String) -> PathBuf { +fn get_file_path(file_key: impl AsRef) -> PathBuf { let mut file_path = PathBuf::new(); #[cfg(feature = "logs")] file_path.push(router_env::env::workspace_path()); + #[cfg(not(feature = "logs"))] + file_path.push(std::env::current_dir()); file_path.push("files"); - file_path.push(file_key); + file_path.push(file_key.as_ref()); file_path } @@ -32,56 +33,68 @@ impl FileSystem { /// Saves the provided file data to the file system under the specified file key. pub fn save_file_to_fs( &self, - file_key: String, + file_key: impl AsRef, file_data: Vec, ) -> CustomResult<(), FileSystemStorageError> { let file_path = get_file_path(file_key); let mut file = File::create(file_path) .into_report() - .change_context(FileSystemStorageError("Failed to create file"))?; + .change_context(FileSystemStorageError::CreateFailure)?; file.write_all(&file_data) .into_report() - .change_context(FileSystemStorageError("Failed while writing into file"))?; + .change_context(FileSystemStorageError::WriteFailure)?; Ok(()) } /// Deletes the file associated with the specified file key from the file system. pub fn delete_file_from_fs( &self, - file_key: String, + file_key: impl AsRef, ) -> CustomResult<(), FileSystemStorageError> { let file_path = get_file_path(file_key); remove_file(file_path) .into_report() - .change_context(FileSystemStorageError("Failed while deleting the file"))?; + .change_context(FileSystemStorageError::DeleteFailure)?; Ok(()) } /// Retrieves the file content associated with the specified file key from the file system. pub fn retrieve_file_from_fs( &self, - file_key: String, + file_key: impl AsRef, ) -> CustomResult, FileSystemStorageError> { let mut received_data: Vec = Vec::new(); let file_path = get_file_path(file_key); let mut file = File::open(file_path) .into_report() - .change_context(FileSystemStorageError("Failed while opening the file"))?; + .change_context(FileSystemStorageError::FileOpenFailure)?; file.read_to_end(&mut received_data) .into_report() - .change_context(FileSystemStorageError("Failed while reading the file"))?; + .change_context(FileSystemStorageError::ReadFailure)?; Ok(received_data) } } -/// Represents an error that can occur during file system storage operations locally. -#[derive(Debug)] -pub struct FileSystemStorageError(&'static str); +/// Represents an error that can occur during local file system storage operations. +#[derive(Debug, thiserror::Error)] +pub enum FileSystemStorageError { + /// Error indicating opening a file failed + #[error("Failed while opening the file")] + FileOpenFailure, -impl std::error::Error for FileSystemStorageError {} + /// Error indicating file creation failed. + #[error("Failed to create file")] + CreateFailure, -impl Display for FileSystemStorageError { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "Local file system storage error: {}", self.0) - } + /// Error indicating reading a file failed. + #[error("Failed while reading the file")] + ReadFailure, + + /// Error indicating writing to a file failed. + #[error("Failed while writing into file")] + WriteFailure, + + /// Error indicating file deletion failed. + #[error("Failed while deleting the file")] + DeleteFailure, } diff --git a/crates/external_services/Cargo.toml b/crates/external_services/Cargo.toml index a0b743af1bde..51c5ecc8f2a8 100644 --- a/crates/external_services/Cargo.toml +++ b/crates/external_services/Cargo.toml @@ -23,7 +23,6 @@ aws-smithy-client = "0.55.3" base64 = "0.21.2" dyn-clone = "1.0.11" error-stack = "0.3.1" -futures = "0.3.28" once_cell = "1.18.0" serde = { version = "1.0.193", features = ["derive"] } thiserror = "1.0.40" @@ -35,4 +34,3 @@ hyper = "0.14.26" common_utils = { version = "0.1.0", path = "../common_utils" } masking = { version = "0.1.0", path = "../masking" } router_env = { version = "0.1.0", path = "../router_env", features = ["log_extra_implicit_fields", "log_custom_entries_to_extra"] } -storage_impl = { version = "0.1.0", path = "../storage_impl", default-features = false } diff --git a/crates/external_services/src/file_storage.rs b/crates/external_services/src/file_storage.rs index 1fd433d1bd54..b35626dc4038 100644 --- a/crates/external_services/src/file_storage.rs +++ b/crates/external_services/src/file_storage.rs @@ -2,12 +2,13 @@ //! Module for managing file storage operations with support for multiple storage schemes. //! +use std::fmt::{Display, Formatter}; + use common_utils::{ errors::{CustomResult, FileStorageError}, fs_utils, }; use error_stack::ResultExt; -use storage_impl::errors::ApplicationError; #[cfg(feature = "aws_s3")] use crate::file_storage::aws_s3::{AwsFileStorageClient, AwsFileStorageConfig}; @@ -18,7 +19,7 @@ pub mod aws_s3; /// Enum representing different file storage configurations, allowing for multiple storage schemes. #[derive(Debug, Clone, Default, serde::Deserialize)] -#[serde(tag = "file_storage_scheme")] +#[serde(tag = "file_storage_backend")] #[serde(rename_all = "snake_case")] pub enum FileStorageConfig { /// AWS S3 storage configuration. @@ -34,24 +35,13 @@ pub enum FileStorageConfig { impl FileStorageConfig { /// Validates the file storage configuration. - pub fn validate(&self) -> Result<(), ApplicationError> { + pub fn validate(&self) -> Result<(), InvalidFileStorageConfig> { match self { #[cfg(feature = "aws_s3")] Self::AwsS3 { aws_s3 } => aws_s3.validate(), Self::FileSystem => Ok(()), } } - - /// Retrieves the appropriate file storage client based on the configuration. - pub async fn get_file_storage_client(&self) -> FileStorageScheme { - match self { - #[cfg(feature = "aws_s3")] - Self::AwsS3 { aws_s3 } => FileStorageScheme::AwsS3 { - client: aws_s3.get_aws_file_storage_client().await, - }, - Self::FileSystem => FileStorageScheme::FileSystem(fs_utils::FileSystem), - } - } } /// Enum representing different file storage clients. @@ -61,7 +51,7 @@ pub enum FileStorageScheme { #[cfg(feature = "aws_s3")] AwsS3 { /// AWS S3 file storage client. - client: &'static AwsFileStorageClient, + client: AwsFileStorageClient, }, /// Local file system storage client. FileSystem(fs_utils::FileSystem), @@ -71,7 +61,7 @@ impl FileStorageScheme { /// Uploads a file to the selected storage scheme. pub async fn upload_file( &self, - file_key: String, + file_key: impl AsRef, file: Vec, ) -> CustomResult<(), FileStorageError> { match self { @@ -87,7 +77,10 @@ impl FileStorageScheme { } /// Deletes a file from the selected storage scheme. - pub async fn delete_file(&self, file_key: String) -> CustomResult<(), FileStorageError> { + pub async fn delete_file( + &self, + file_key: impl AsRef, + ) -> CustomResult<(), FileStorageError> { match self { #[cfg(feature = "aws_s3")] Self::AwsS3 { client } => client @@ -101,7 +94,10 @@ impl FileStorageScheme { } /// Retrieves a file from the selected storage scheme. - pub async fn retrieve_file(&self, file_key: String) -> CustomResult, FileStorageError> { + pub async fn retrieve_file( + &self, + file_key: impl AsRef, + ) -> CustomResult, FileStorageError> { match self { #[cfg(feature = "aws_s3")] Self::AwsS3 { client } => client @@ -114,3 +110,15 @@ impl FileStorageScheme { } } } + +/// Error thrown when the file storage config is invalid +#[derive(Debug, Clone)] +pub struct InvalidFileStorageConfig(&'static str); + +impl std::error::Error for InvalidFileStorageConfig {} + +impl Display for InvalidFileStorageConfig { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "file_storage: {}", self.0) + } +} diff --git a/crates/external_services/src/file_storage/aws_s3.rs b/crates/external_services/src/file_storage/aws_s3.rs index d6de590bdeba..3c528f8f3815 100644 --- a/crates/external_services/src/file_storage/aws_s3.rs +++ b/crates/external_services/src/file_storage/aws_s3.rs @@ -7,12 +7,8 @@ use aws_sdk_s3::{ }; use aws_sdk_sts::config::Region; use common_utils::{errors::CustomResult, ext_traits::ConfigExt}; -use error_stack::{IntoReport, ResultExt}; -use futures::TryStreamExt; -use storage_impl::errors::ApplicationError; -static AWS_FILE_STORAGE_CLIENT: tokio::sync::OnceCell = - tokio::sync::OnceCell::const_new(); +use super::InvalidFileStorageConfig; /// Configuration for AWS S3 file storage. #[derive(Debug, serde::Deserialize, Clone, Default)] @@ -25,27 +21,17 @@ pub struct AwsFileStorageConfig { } impl AwsFileStorageConfig { - /// Retrieves the AWS S3 file storage client, initializing it if necessary. - #[inline] - pub async fn get_aws_file_storage_client(&self) -> &'static AwsFileStorageClient { - AWS_FILE_STORAGE_CLIENT - .get_or_init(|| AwsFileStorageClient::new(self)) - .await - } - /// Validates the AWS S3 file storage configuration. - pub fn validate(&self) -> Result<(), ApplicationError> { + pub(super) fn validate(&self) -> Result<(), InvalidFileStorageConfig> { use common_utils::fp_utils::when; when(self.region.is_default_or_empty(), || { - Err(ApplicationError::InvalidConfigurationValueError( - "aws s3 region must not be empty".into(), - )) + Err(InvalidFileStorageConfig("aws s3 region must not be empty")) })?; when(self.bucket_name.is_default_or_empty(), || { - Err(ApplicationError::InvalidConfigurationValueError( - "aws s3 bucket name must not be empty".into(), + Err(InvalidFileStorageConfig( + "aws s3 bucket name must not be empty", )) }) } @@ -65,7 +51,6 @@ impl AwsFileStorageClient { pub async fn new(config: &AwsFileStorageConfig) -> Self { let region_provider = RegionProviderChain::first_try(Region::new(config.region.clone())); let sdk_config = aws_config::from_env().region(region_provider).load().await; - println!("config_check {:?}", config); Self { inner_client: Client::new(&sdk_config), bucket_name: config.bucket_name.clone(), @@ -73,66 +58,55 @@ impl AwsFileStorageClient { } /// Uploads a file to AWS S3. - pub async fn upload_file_to_s3( + pub(super) async fn upload_file_to_s3( &self, - file_key: String, + file_key: impl AsRef, file: Vec, ) -> CustomResult<(), AwsS3StorageError> { - let bucket_name = self.bucket_name.clone(); - let upload_res = self - .inner_client + self.inner_client .put_object() - .bucket(bucket_name) - .key(file_key.clone()) + .bucket(&self.bucket_name) + .key(file_key.as_ref()) .body(file.into()) .send() - .await; - upload_res.map_err(AwsS3StorageError::UploadFailure)?; + .await + .map_err(AwsS3StorageError::UploadFailure)?; Ok(()) } /// Deletes a file from AWS S3. - pub async fn delete_file_from_s3( + pub(super) async fn delete_file_from_s3( &self, - file_key: String, + file_key: impl AsRef, ) -> CustomResult<(), AwsS3StorageError> { - let bucket_name = self.bucket_name.clone(); - let delete_res = self - .inner_client + self.inner_client .delete_object() - .bucket(bucket_name) - .key(file_key) + .bucket(&self.bucket_name) + .key(file_key.as_ref()) .send() - .await; - delete_res.map_err(AwsS3StorageError::DeleteFailure)?; + .await + .map_err(AwsS3StorageError::DeleteFailure)?; Ok(()) } /// Retrieves a file from AWS S3. - pub async fn retrieve_file_from_s3( + pub(super) async fn retrieve_file_from_s3( &self, - file_key: String, + file_key: impl AsRef, ) -> CustomResult, AwsS3StorageError> { - let bucket_name = self.bucket_name.clone(); - let get_res = self + Ok(self .inner_client .get_object() - .bucket(bucket_name) - .key(file_key) + .bucket(&self.bucket_name) + .key(file_key.as_ref()) .send() - .await; - let mut object = get_res.map_err(AwsS3StorageError::RetrieveFailure)?; - let mut received_data: Vec = Vec::new(); - while let Some(bytes) = object + .await + .map_err(AwsS3StorageError::RetrieveFailure)? .body - .try_next() + .collect() .await - .into_report() - .change_context(AwsS3StorageError::InvalidFileRetrieved)? - { - received_data.extend_from_slice(&bytes); // Collect the bytes in the Vec - } - Ok(received_data) + .map_err(AwsS3StorageError::UnknownError)? + .to_vec()) } } @@ -154,4 +128,8 @@ pub enum AwsS3StorageError { /// Error indicating that invalid file data was received from S3. #[error("Invalid file data received from S3")] InvalidFileRetrieved, + + /// Unknown error occured. + #[error("Unknown error occured: {0:?}")] + UnknownError(aws_sdk_s3::primitives::ByteStreamError), } diff --git a/crates/router/Cargo.toml b/crates/router/Cargo.toml index 88272033fb04..41ac30524297 100644 --- a/crates/router/Cargo.toml +++ b/crates/router/Cargo.toml @@ -10,9 +10,9 @@ license.workspace = true [features] default = ["kv_store", "stripe", "oltp", "olap", "backwards_compatibility", "accounts_cache", "dummy_connector", "payouts", "business_profile_routing", "connector_choice_mca_id", "profile_specific_fallback_routing", "retry", "frm"] -aws_s3 = ["dep:aws-sdk-s3", "dep:aws-config"] -kms = ["external_services/kms", "dep:aws-config"] -email = ["external_services/email", "dep:aws-config", "olap"] +aws_s3 = ["external_services/aws_s3"] +kms = ["external_services/kms"] +email = ["external_services/email", "olap"] frm = [] stripe = ["dep:serde_qs"] release = ["kms", "stripe", "aws_s3", "email", "backwards_compatibility", "business_profile_routing", "accounts_cache", "kv_store", "connector_choice_mca_id", "profile_specific_fallback_routing", "vergen", "recon"] @@ -41,8 +41,6 @@ actix-web = "4.3.1" async-bb8-diesel = { git = "https://github.com/jarnura/async-bb8-diesel", rev = "53b4ab901aab7635c8215fd1c2d542c8db443094" } argon2 = { version = "0.5.0", features = ["std"] } async-trait = "0.1.68" -aws-config = { version = "0.55.3", optional = true } -aws-sdk-s3 = { version = "0.28.0", optional = true } base64 = "0.21.2" bb8 = "0.8" bigdecimal = "0.3.1" diff --git a/crates/router/src/configs/settings.rs b/crates/router/src/configs/settings.rs index a7a08968c423..1c7fc2d00016 100644 --- a/crates/router/src/configs/settings.rs +++ b/crates/router/src/configs/settings.rs @@ -89,7 +89,7 @@ pub struct Settings { pub api_keys: ApiKeys, #[cfg(feature = "kms")] pub kms: kms::KmsConfig, - pub file_storage_config: FileStorageConfig, + pub file_storage: FileStorageConfig, pub tokenization: TokenizationConfig, pub connector_customer: ConnectorCustomer, #[cfg(feature = "dummy_connector")] @@ -839,7 +839,10 @@ impl Settings { .validate() .map_err(|error| ApplicationError::InvalidConfigurationValueError(error.into()))?; - self.file_storage_config.validate()?; + self.file_storage + .validate() + .map_err(|err| ApplicationError::InvalidConfigurationValueError(err.to_string()))?; + self.lock_settings.validate()?; self.events.validate()?; Ok(()) diff --git a/crates/router/src/core/files/helpers.rs b/crates/router/src/core/files/helpers.rs index 9d9324974d20..d7fdd34f7b3f 100644 --- a/crates/router/src/core/files/helpers.rs +++ b/crates/router/src/core/files/helpers.rs @@ -102,10 +102,8 @@ pub async fn delete_file_using_file_id( }; match provider { diesel_models::enums::FileUploadProvider::Router => state - .conf - .file_storage_config - .get_file_storage_client() - .await + .file_storage_client + .as_ref() .delete_file(provider_file_id) .await .change_context(errors::ApiErrorResponse::InternalServerError), @@ -204,10 +202,8 @@ pub async fn retrieve_file_and_provider_file_id_from_file_id( diesel_models::enums::FileUploadProvider::Router => Ok(( Some( state - .conf - .file_storage_config - .get_file_storage_client() - .await + .file_storage_client + .as_ref() .retrieve_file(provider_file_id.clone()) .await .change_context(errors::ApiErrorResponse::InternalServerError)?, @@ -336,10 +332,8 @@ pub async fn upload_and_get_provider_provider_file_id_profile_id( )) } else { state - .conf - .file_storage_config - .get_file_storage_client() - .await + .file_storage_client + .as_ref() .upload_file(file_key.clone(), create_file_request.file.clone()) .await .change_context(errors::ApiErrorResponse::InternalServerError)?; diff --git a/crates/router/src/routes/app.rs b/crates/router/src/routes/app.rs index 0c489dbe63a7..1efd6c665664 100644 --- a/crates/router/src/routes/app.rs +++ b/crates/router/src/routes/app.rs @@ -3,8 +3,12 @@ use std::sync::Arc; use actix_web::{web, Scope}; #[cfg(all(feature = "kms", feature = "olap"))] use analytics::AnalyticsConfig; +use common_utils::fs_utils; #[cfg(feature = "email")] use external_services::email::{ses::AwsSes, EmailService}; +#[cfg(feature = "aws_s3")] +use external_services::file_storage::aws_s3::AwsFileStorageClient; +use external_services::file_storage::{FileStorageConfig, FileStorageScheme}; #[cfg(feature = "kms")] use external_services::kms::{self, decrypt::KmsDecrypt}; #[cfg(all(feature = "olap", feature = "kms"))] @@ -66,6 +70,7 @@ pub struct AppState { #[cfg(feature = "olap")] pub pool: crate::analytics::AnalyticsProvider, pub request_id: Option, + pub file_storage_client: Arc, } impl scheduler::SchedulerAppState for AppState { @@ -132,6 +137,17 @@ pub async fn create_email_client(settings: &settings::Settings) -> impl EmailSer } } +/// Retrieves the appropriate file storage client based on the file storage configuration. +async fn get_file_storage_client(file_storage_config: &FileStorageConfig) -> FileStorageScheme { + match file_storage_config { + #[cfg(feature = "aws_s3")] + FileStorageConfig::AwsS3 { aws_s3 } => FileStorageScheme::AwsS3 { + client: AwsFileStorageClient::new(aws_s3).await, + }, + FileStorageConfig::FileSystem => FileStorageScheme::FileSystem(fs_utils::FileSystem), + } +} + impl AppState { /// # Panics /// @@ -220,6 +236,8 @@ impl AppState { #[cfg(feature = "email")] let email_client = Arc::new(create_email_client(&conf).await); + let file_storage_client = get_file_storage_client(&conf.file_storage).await; + Self { flow_name: String::from("default"), store, @@ -233,6 +251,7 @@ impl AppState { #[cfg(feature = "olap")] pool, request_id: None, + file_storage_client: Arc::new(file_storage_client), } }) .await