diff --git a/Cargo.toml b/Cargo.toml index b7355597a78c..15d4e7d225a2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,8 +24,13 @@ anyhow = "1" async-trait = "0.1" aws-config = "0.8" aws-endpoint = "0.8" +aws-http = "0.8" aws-sdk-s3 = "0.8" +aws-sig-auth = "0.8" +aws-sigv4 = "0.8" +aws-smithy-client = "0.38" aws-smithy-http = "0.38" +aws-smithy-http-tower = "0.38" aws-types = { version = "0.8", features = ["hardcoded-credentials"] } bitflags = "1" blocking = "1" @@ -37,6 +42,7 @@ once_cell = "1" pin-project = "1" reqwest = "0.11" thiserror = "1" +tower = "0.4" [dev-dependencies] anyhow = "1.0" diff --git a/opendal_test/src/services/s3.rs b/opendal_test/src/services/s3.rs index d73cf18441c5..90e5e3e5045e 100644 --- a/opendal_test/src/services/s3.rs +++ b/opendal_test/src/services/s3.rs @@ -42,8 +42,8 @@ pub async fn new() -> Result>> { builder.bucket(&env::var("OPENDAL_S3_BUCKET").expect("OPENDAL_S3_BUCKET must set")); builder.endpoint(&env::var("OPENDAL_S3_ENDPOINT").unwrap_or_default()); builder.credential(Credential::hmac( - &env::var("OPENDAL_S3_ACCESS_KEY_ID").expect("OPENDAL_S3_ACCESS_KEY_ID must set"), - &env::var("OPENDAL_S3_SECRET_ACCESS_KEY").expect("OPENDAL_S3_SECRET_ACCESS_KEY must set"), + &env::var("OPENDAL_S3_ACCESS_KEY_ID").unwrap_or_default(), + &env::var("OPENDAL_S3_SECRET_ACCESS_KEY").unwrap_or_default(), )); Ok(Some(builder.finish().await?)) diff --git a/src/services/s3/backend.rs b/src/services/s3/backend.rs index 3cf2dc27059c..5c81222ab311 100644 --- a/src/services/s3/backend.rs +++ b/src/services/s3/backend.rs @@ -34,6 +34,7 @@ use once_cell::sync::Lazy; use super::error::parse_get_object_error; use super::error::parse_head_object_error; use super::error::parse_unexpect_error; +use super::middleware::DefaultMiddleware; use super::object_stream::S3ObjectStream; use crate::credential::Credential; use crate::error::Error; @@ -278,10 +279,20 @@ impl Builder { } } + let hyper_connector = aws_smithy_client::hyper_ext::Adapter::builder() + .build(aws_smithy_client::conns::https()); + + let aws_client = aws_smithy_client::Builder::new() + .connector(hyper_connector) + .middleware(aws_smithy_client::erase::DynMiddleware::new( + DefaultMiddleware::new(), + )) + .build(); + Ok(Arc::new(Backend { root, bucket: self.bucket.clone(), - client: aws_sdk_s3::Client::from_conf(cfg.build()), + client: aws_sdk_s3::Client::with_config(aws_client.into_dyn(), cfg.build()), })) } } diff --git a/src/services/s3/middleware.rs b/src/services/s3/middleware.rs new file mode 100644 index 000000000000..b8b07a6b7e38 --- /dev/null +++ b/src/services/s3/middleware.rs @@ -0,0 +1,88 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::fmt::Debug; + +use aws_endpoint::AwsEndpointStage; +use aws_http::auth::CredentialsStage; +use aws_http::recursion_detection::RecursionDetectionStage; +use aws_http::user_agent::UserAgentStage; +use aws_sig_auth::signer::SigV4Signer; +use aws_smithy_http_tower::map_request::AsyncMapRequestLayer; +use aws_smithy_http_tower::map_request::MapRequestLayer; +use tower::layer::util::Identity; +use tower::layer::util::Stack; +use tower::ServiceBuilder; + +use super::signer::SigningStage; + +type DefaultMiddlewareStack = Stack< + MapRequestLayer, + Stack< + MapRequestLayer, + Stack< + AsyncMapRequestLayer, + Stack< + MapRequestLayer, + Stack, Identity>, + >, + >, + >, +>; + +/// AWS Middleware Stack +/// +/// This implements the middleware stack for this service. It will: +/// 1. Load credentials asynchronously into the property bag +/// 2. Sign the request with SigV4 +/// 3. Resolve an Endpoint for the request +/// 4. Add a user agent to the request +#[derive(Debug, Default, Clone)] +#[non_exhaustive] +pub struct DefaultMiddleware; + +impl DefaultMiddleware { + pub fn new() -> Self { + Self {} + } +} + +// define the middleware stack in a non-generic location to reduce code bloat. +fn base() -> ServiceBuilder { + let credential_provider = AsyncMapRequestLayer::for_mapper(CredentialsStage::new()); + let signer = MapRequestLayer::for_mapper(SigningStage::new(SigV4Signer::new())); + let endpoint_resolver = MapRequestLayer::for_mapper(AwsEndpointStage); + let user_agent = MapRequestLayer::for_mapper(UserAgentStage::new()); + let recursion_detection = MapRequestLayer::for_mapper(RecursionDetectionStage::new()); + // These layers can be considered as occurring in order, that is: + // 1. Resolve an endpoint + // 2. Add a user agent + // 3. Acquire credentials + // 4. Sign with credentials + // (5. Dispatch over the wire) + ServiceBuilder::new() + .layer(endpoint_resolver) + .layer(user_agent) + .layer(credential_provider) + .layer(signer) + .layer(recursion_detection) +} + +impl tower::Layer for DefaultMiddleware { + type Service = >::Service; + + fn layer(&self, inner: S) -> Self::Service { + base().service(inner) + } +} diff --git a/src/services/s3/mod.rs b/src/services/s3/mod.rs index 63b26d66c0b6..27ade4d536df 100644 --- a/src/services/s3/mod.rs +++ b/src/services/s3/mod.rs @@ -17,4 +17,6 @@ pub use backend::Backend; pub use backend::Builder; mod error; +mod middleware; mod object_stream; +mod signer; diff --git a/src/services/s3/signer.rs b/src/services/s3/signer.rs new file mode 100644 index 000000000000..f8372705a77a --- /dev/null +++ b/src/services/s3/signer.rs @@ -0,0 +1,119 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +/// We borrowed code from `aws_sig_auth` here to make anonymous access possible. +/// +/// The original implementations requires `Credentials` and signing all requests. +/// We did a simple trick here: rewrite the `SigningStage` and only sign request +/// when we have a valid credentials. +/// +/// For users who specify Credentials, nothing changed. +/// For users who doesn't specify Credentials, there are two situations: +/// +/// - The env could have valid credentials, we will load credentials from env. +/// - There aren't any credentials, we will sending request without any signing +/// just like sending requests via browser or `curl`. +/// +/// # TODO +/// +/// There is a potential CVE. Users could construct an anonymous client to read +/// credentials from the environment. We should address it in the future. +use std::time::SystemTime; + +use aws_sig_auth::middleware::SigningStageError; +use aws_sig_auth::signer::OperationSigningConfig; +use aws_sig_auth::signer::RequestConfig; +use aws_sig_auth::signer::SigV4Signer; +use aws_sig_auth::signer::SigningRequirements; +use aws_sigv4::http_request::SignableBody; +use aws_smithy_http::middleware::MapRequest; +use aws_smithy_http::operation::Request; +use aws_smithy_http::property_bag::PropertyBag; +use aws_types::region::SigningRegion; +use aws_types::Credentials; +use aws_types::SigningService; + +#[derive(Clone, Debug)] +pub struct SigningStage { + signer: SigV4Signer, +} + +impl SigningStage { + pub fn new(signer: SigV4Signer) -> Self { + Self { signer } + } +} + +fn signing_config( + config: &PropertyBag, +) -> Result<(&OperationSigningConfig, RequestConfig, Option), SigningStageError> { + let operation_config = config + .get::() + .ok_or(SigningStageError::MissingSigningConfig)?; + // Here is a trick. + // We will return `Option` here instead of `Credentials`. + let credentials = config.get::().cloned(); + let region = config + .get::() + .ok_or(SigningStageError::MissingSigningRegion)?; + let signing_service = config + .get::() + .ok_or(SigningStageError::MissingSigningService)?; + let payload_override = config.get::>(); + let request_config = RequestConfig { + request_ts: config + .get::() + .copied() + .unwrap_or_else(SystemTime::now), + region, + payload_override, + service: signing_service, + }; + Ok((operation_config, request_config, credentials)) +} + +impl MapRequest for SigningStage { + type Error = SigningStageError; + + fn apply(&self, req: Request) -> Result { + req.augment(|mut req, config| { + let operation_config = config + .get::() + .ok_or(SigningStageError::MissingSigningConfig)?; + let (operation_config, request_config, creds) = + match &operation_config.signing_requirements { + SigningRequirements::Disabled => return Ok(req), + SigningRequirements::Optional => match signing_config(config) { + Ok(parts) => parts, + Err(_) => return Ok(req), + }, + SigningRequirements::Required => signing_config(config)?, + }; + + // The most tricky part here. + // + // We will try to load the credentials and only sign it when we have a + // valid credential. + if let Some(creds) = creds { + let signature = self + .signer + .sign(operation_config, &request_config, &creds, &mut req) + .map_err(|err| SigningStageError::SigningFailure(err))?; + config.insert(signature); + } + + Ok(req) + }) + } +}