diff --git a/core/src/raw/http_util/client.rs b/core/src/raw/http_util/client.rs index 7d756030da0b..b9eedc2bcefd 100644 --- a/core/src/raw/http_util/client.rs +++ b/core/src/raw/http_util/client.rs @@ -19,22 +19,36 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::future; use std::mem; +use std::ops::Deref; use std::str::FromStr; +use std::sync::Arc; +use futures::Future; use futures::TryStreamExt; use http::Request; use http::Response; +use once_cell::sync::Lazy; use raw::oio::Read; use super::parse_content_encoding; use super::parse_content_length; use super::HttpBody; +use crate::raw::*; use crate::*; +/// Http client used across opendal for loading credentials. +/// This is merely a temporary solution because reqsign requires a reqwest client to be passed. +/// We will remove it after the next major version of reqsign, which will enable users to provide their own client. +#[allow(dead_code)] +pub(crate) static GLOBAL_REQWEST_CLIENT: Lazy = Lazy::new(reqwest::Client::new); + +/// HttpFetcher is a type erased [`HttpFetch`]. +pub type HttpFetcher = Arc; + /// HttpClient that used across opendal. #[derive(Clone)] pub struct HttpClient { - client: reqwest::Client, + fetcher: HttpFetcher, } /// We don't want users to know details about our clients. @@ -47,26 +61,24 @@ impl Debug for HttpClient { impl HttpClient { /// Create a new http client in async context. pub fn new() -> Result { - Self::build(reqwest::ClientBuilder::new()) + let fetcher = Arc::new(reqwest::Client::new()); + Ok(Self { fetcher }) } /// Construct `Self` with given [`reqwest::Client`] - pub fn with(client: reqwest::Client) -> Self { - Self { client } + pub fn with(client: impl HttpFetch) -> Self { + let fetcher = Arc::new(client); + Self { fetcher } } /// Build a new http client in async context. + #[deprecated] pub fn build(builder: reqwest::ClientBuilder) -> Result { - Ok(Self { - client: builder.build().map_err(|err| { - Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err) - })?, - }) - } - - /// Get the async client from http client. - pub fn client(&self) -> reqwest::Client { - self.client.clone() + let client = builder.build().map_err(|err| { + Error::new(ErrorKind::Unexpected, "http client build failed").set_source(err) + })?; + let fetcher = Arc::new(client); + Ok(Self { fetcher }) } /// Send a request in async way. @@ -78,6 +90,44 @@ impl HttpClient { /// Fetch a request in async way. pub async fn fetch(&self, req: Request) -> Result> { + self.fetcher.fetch(req).await + } +} + +/// HttpFetch is the trait to fetch a request in async way. +/// User should implement this trait to provide their own http client. +pub trait HttpFetch: Send + Sync + Unpin + 'static { + /// Fetch a request in async way. + fn fetch( + &self, + req: Request, + ) -> impl Future>> + MaybeSend; +} + +/// HttpFetchDyn is the dyn version of [`HttpFetch`] +/// which make it possible to use as `Arc`. +/// User should never implement this trait, but use `HttpFetch` instead. +pub trait HttpFetchDyn: Send + Sync + Unpin + 'static { + /// The dyn version of [`HttpFetch::fetch`]. + /// + /// This function returns a boxed future to make it object safe. + fn fetch_dyn(&self, req: Request) -> BoxedFuture>>; +} + +impl HttpFetchDyn for T { + fn fetch_dyn(&self, req: Request) -> BoxedFuture>> { + Box::pin(self.fetch(req)) + } +} + +impl HttpFetch for Arc { + async fn fetch(&self, req: Request) -> Result> { + self.deref().fetch_dyn(req).await + } +} + +impl HttpFetch for reqwest::Client { + async fn fetch(&self, req: Request) -> Result> { // Uri stores all string alike data in `Bytes` which means // the clone here is cheap. let uri = req.uri().clone(); @@ -86,7 +136,6 @@ impl HttpClient { let (parts, body) = req.into_parts(); let mut req_builder = self - .client .request( parts.method, reqwest::Url::from_str(&uri.to_string()).expect("input request url must be valid"), diff --git a/core/src/raw/http_util/mod.rs b/core/src/raw/http_util/mod.rs index 965ea0bd494f..226fb17b7d47 100644 --- a/core/src/raw/http_util/mod.rs +++ b/core/src/raw/http_util/mod.rs @@ -24,6 +24,11 @@ mod client; pub use client::HttpClient; +pub use client::HttpFetch; + +/// temporary client used by several features +#[allow(unused_imports)] +pub(crate) use client::GLOBAL_REQWEST_CLIENT; mod body; pub use body::HttpBody; diff --git a/core/src/services/cos/backend.rs b/core/src/services/cos/backend.rs index ff4db12ee420..6065dca95e5d 100644 --- a/core/src/services/cos/backend.rs +++ b/core/src/services/cos/backend.rs @@ -205,7 +205,7 @@ impl Builder for CosBuilder { cfg.secret_key = Some(v); } - let cred_loader = TencentCosCredentialLoader::new(client.client(), cfg); + let cred_loader = TencentCosCredentialLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg); let signer = TencentCosSigner::new(); diff --git a/core/src/services/gcs/backend.rs b/core/src/services/gcs/backend.rs index 74d837c4c85e..4c7bc7eca69a 100644 --- a/core/src/services/gcs/backend.rs +++ b/core/src/services/gcs/backend.rs @@ -292,7 +292,7 @@ impl Builder for GcsBuilder { DEFAULT_GCS_SCOPE }; - let mut token_loader = GoogleTokenLoader::new(scope, client.client()); + let mut token_loader = GoogleTokenLoader::new(scope, GLOBAL_REQWEST_CLIENT.clone()); if let Some(account) = &self.config.service_account { token_loader = token_loader.with_service_account(account); } diff --git a/core/src/services/oss/backend.rs b/core/src/services/oss/backend.rs index 6b064d36821d..7e2d67b3caf5 100644 --- a/core/src/services/oss/backend.rs +++ b/core/src/services/oss/backend.rs @@ -381,7 +381,7 @@ impl Builder for OssBuilder { })? }; - let loader = AliyunLoader::new(client.client(), cfg); + let loader = AliyunLoader::new(GLOBAL_REQWEST_CLIENT.clone(), cfg); let signer = AliyunOssSigner::new(bucket); diff --git a/core/src/services/s3/backend.rs b/core/src/services/s3/backend.rs index c441f8ac453a..57b3a2f2fa46 100644 --- a/core/src/services/s3/backend.rs +++ b/core/src/services/s3/backend.rs @@ -800,7 +800,8 @@ impl Builder for S3Builder { // If role_arn is set, we must use AssumeRoleLoad. if let Some(role_arn) = self.config.role_arn { // use current env as source credential loader. - let default_loader = AwsDefaultLoader::new(client.client(), cfg.clone()); + let default_loader = + AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg.clone()); // Build the config for assume role. let mut assume_role_cfg = AwsConfig { @@ -817,7 +818,7 @@ impl Builder for S3Builder { } let assume_role_loader = AwsAssumeRoleLoader::new( - client.client(), + GLOBAL_REQWEST_CLIENT.clone().clone(), assume_role_cfg, Box::new(default_loader), ) @@ -835,7 +836,8 @@ impl Builder for S3Builder { let loader = match loader { Some(v) => v, None => { - let mut default_loader = AwsDefaultLoader::new(client.client(), cfg); + let mut default_loader = + AwsDefaultLoader::new(GLOBAL_REQWEST_CLIENT.clone().clone(), cfg); if self.config.disable_ec2_metadata { default_loader = default_loader.with_disable_ec2_metadata(); } diff --git a/core/src/services/s3/core.rs b/core/src/services/s3/core.rs index 66bcb14e2418..79ba66239235 100644 --- a/core/src/services/s3/core.rs +++ b/core/src/services/s3/core.rs @@ -114,7 +114,7 @@ impl S3Core { async fn load_credential(&self) -> Result> { let cred = self .loader - .load_credential(self.client.client()) + .load_credential(GLOBAL_REQWEST_CLIENT.clone()) .await .map_err(new_request_credential_error)?;