diff --git a/Cargo.toml b/Cargo.toml index 06790d5f78cf..65f5bd27590a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,6 +31,7 @@ hyper = { version = "0.14", features = ["full"] } hyper-tls = "0.5.0" log = "0.4" metrics = "0.18" +minitrace = "0.4.0" once_cell = "1" pin-project = "1" quick-xml = { version = "0.22.0", features = ["serialize"] } @@ -42,8 +43,6 @@ thiserror = "1" time = "0.3.7" tokio = { version = "1.17", features = ["full"] } tower = "0.4" -minitrace = "0.4.0" - [dev-dependencies] anyhow = "1.0" @@ -60,4 +59,3 @@ rand = "0.8" sha2 = "0.10" size = "0.1" uuid = { version = "0.8", features = ["serde", "v4"] } - diff --git a/src/services/azblob/backend.rs b/src/services/azblob/backend.rs index e59144ced0e0..07845e468fbc 100644 --- a/src/services/azblob/backend.rs +++ b/src/services/azblob/backend.rs @@ -34,6 +34,7 @@ use log::error; use log::info; use log::warn; use metrics::increment_counter; +use minitrace::trace; use reqsign::services::azure::storage::Signer; use time::format_description::well_known::Rfc2822; use time::OffsetDateTime; @@ -42,11 +43,9 @@ use crate::credential::Credential; use crate::error::Error; use crate::error::Kind; use crate::error::Result; -use crate::object::BoxedObjectStream; use crate::object::Metadata; use crate::ops::HeaderRange; use crate::ops::OpDelete; -use crate::ops::OpList; use crate::ops::OpRead; use crate::ops::OpStat; use crate::ops::OpWrite; @@ -230,19 +229,20 @@ impl Backend { } #[async_trait] impl Accessor for Backend { + #[trace("read")] async fn read(&self, args: &OpRead) -> Result { increment_counter!("opendal_azure_read_requests"); let p = self.get_abs_path(&args.path); - info!( + debug!( "object {} read start: offset {:?}, size {:?}", &p, args.offset, args.size ); - let resp = self.get_object(&p, args.offset, args.size).await?; + let resp = self.get_blob(&p, args.offset, args.size).await?; match resp.status() { StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - info!( + debug!( "object {} reader created: offset {:?}, size {:?}", &p, args.offset, args.size ); @@ -252,25 +252,27 @@ impl Accessor for Backend { _ => Err(parse_error_response(resp, "read", &p).await), } } + #[trace("write")] async fn write(&self, r: BoxedAsyncReader, args: &OpWrite) -> Result { let p = self.get_abs_path(&args.path); - info!("object {} write start: size {}", &p, args.size); + debug!("object {} write start: size {}", &p, args.size); - let resp = self.put_object(&p, r, args.size).await?; + let resp = self.put_blob(&p, r, args.size).await?; match resp.status() { http::StatusCode::CREATED | http::StatusCode::OK => { - info!("object {} write finished: size {:?}", &p, args.size); + debug!("object {} write finished: size {:?}", &p, args.size); Ok(args.size as usize) } _ => Err(parse_error_response(resp, "write", &p).await), } } + #[trace("stat")] async fn stat(&self, args: &OpStat) -> Result { increment_counter!("opendal_azure_stat_requests"); let p = self.get_abs_path(&args.path); - info!("object {} stat start", &p); + debug!("object {} stat start", &p); // Stat root always returns a DIR. if self.get_rel_path(&p).is_empty() { @@ -280,11 +282,11 @@ impl Accessor for Backend { m.set_mode(ObjectMode::DIR); m.set_complete(); - info!("backed root object stat finished"); + debug!("backed root object stat finished"); return Ok(m); } - let resp = self.head_object(&p).await?; + let resp = self.get_blob_properties(&p).await?; match resp.status() { http::StatusCode::OK => { let mut m = Metadata::default(); @@ -321,7 +323,7 @@ impl Accessor for Backend { m.set_complete(); - info!("object {} stat finished: {:?}", &p, m); + debug!("object {} stat finished: {:?}", &p, m); Ok(m) } StatusCode::NOT_FOUND if p.ends_with('/') => { @@ -331,36 +333,33 @@ impl Accessor for Backend { m.set_mode(ObjectMode::DIR); m.set_complete(); - info!("object {} stat finished", &p); + debug!("object {} stat finished", &p); Ok(m) } _ => Err(parse_error_response(resp, "stat", &p).await), } } + #[trace("delete")] async fn delete(&self, args: &OpDelete) -> Result<()> { increment_counter!("opendal_azure_delete_requests"); let p = self.get_abs_path(&args.path); - info!("object {} delete start", &p); + debug!("object {} delete start", &p); - let resp = self.delete_object(&p).await?; + let resp = self.delete_blob(&p).await?; match resp.status() { StatusCode::NO_CONTENT => { - info!("object {} delete finished", &p); + debug!("object {} delete finished", &p); Ok(()) } _ => Err(parse_error_response(resp, "delete", &p).await), } } - #[warn(dead_code)] - async fn list(&self, args: &OpList) -> Result { - let _ = args; - unimplemented!() - } } impl Backend { - pub(crate) async fn get_object( + #[trace("get_blob")] + pub(crate) async fn get_blob( &self, path: &str, offset: Option, @@ -394,7 +393,8 @@ impl Backend { } }) } - pub(crate) async fn put_object( + #[trace("put_blob")] + pub(crate) async fn put_blob( &self, path: &str, r: BoxedAsyncReader, @@ -427,8 +427,11 @@ impl Backend { }) } - #[warn(dead_code)] - pub(crate) async fn head_object(&self, path: &str) -> Result> { + #[trace("get_blob_properties")] + pub(crate) async fn get_blob_properties( + &self, + path: &str, + ) -> Result> { let req = hyper::Request::head(&format!( "https://{}.{}/{}/{}", self.account_name, self.endpoint, self.container, path @@ -450,7 +453,8 @@ impl Backend { }) } - pub(crate) async fn delete_object(&self, path: &str) -> Result> { + #[trace("delete_blob")] + pub(crate) async fn delete_blob(&self, path: &str) -> Result> { let req = hyper::Request::delete(&format!( "https://{}.{}/{}/{}", self.account_name, self.endpoint, self.container, path @@ -472,16 +476,6 @@ impl Backend { } }) } - #[allow(dead_code)] - pub(crate) async fn list_object( - &self, - path: &str, - continuation_token: &str, - ) -> Result> { - let _ = path; - let _ = continuation_token; - unimplemented!() - } } struct ByteStream(hyper::Response); diff --git a/src/services/azblob/mod.rs b/src/services/azblob/mod.rs index 09bab45c9758..aac0824d52b1 100644 --- a/src/services/azblob/mod.rs +++ b/src/services/azblob/mod.rs @@ -14,5 +14,3 @@ pub mod backend; pub use backend::Backend; pub use backend::Builder; - -mod object_stream; diff --git a/src/services/azblob/object_stream.rs b/src/services/azblob/object_stream.rs deleted file mode 100644 index ea0ed57e60ed..000000000000 --- a/src/services/azblob/object_stream.rs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. diff --git a/src/services/fs/backend.rs b/src/services/fs/backend.rs index abeec84c9f9b..87ec6503597d 100644 --- a/src/services/fs/backend.rs +++ b/src/services/fs/backend.rs @@ -24,9 +24,11 @@ use futures::io; use futures::AsyncReadExt; use futures::AsyncSeekExt; use futures::AsyncWriteExt; +use log::debug; use log::error; use log::info; use metrics::increment_counter; +use minitrace::trace; use tokio::fs; use super::error::parse_io_error; @@ -125,11 +127,12 @@ impl Backend { #[async_trait] impl Accessor for Backend { + #[trace("read")] async fn read(&self, args: &OpRead) -> Result { increment_counter!("opendal_fs_read_requests"); let path = self.get_abs_path(&args.path); - info!( + debug!( "object {} read start: offset {:?}, size {:?}", &path, args.offset, args.size ); @@ -159,18 +162,19 @@ impl Accessor for Backend { None => Box::new(f), }; - info!( + debug!( "object {} reader created: offset {:?}, size {:?}", &path, args.offset, args.size ); Ok(r) } + #[trace("write")] async fn write(&self, mut r: BoxedAsyncReader, args: &OpWrite) -> Result { increment_counter!("opendal_fs_write_requests"); let path = self.get_abs_path(&args.path); - info!("object {} write start: size {}", &path, args.size); + debug!("object {} write start: size {}", &path, args.size); // Create dir before write path. // @@ -224,15 +228,16 @@ impl Accessor for Backend { e })?; - info!("object {} write finished: size {:?}", &path, args.size); + debug!("object {} write finished: size {:?}", &path, args.size); Ok(s as usize) } + #[trace("stat")] async fn stat(&self, args: &OpStat) -> Result { increment_counter!("opendal_fs_stat_requests"); let path = self.get_abs_path(&args.path); - info!("object {} stat start", &path); + debug!("object {} stat start", &path); let meta = fs::metadata(&path).await.map_err(|e| { let e = parse_io_error(e, "stat", &path); @@ -255,15 +260,16 @@ impl Accessor for Backend { ); m.set_complete(); - info!("object {} stat finished", &path); + debug!("object {} stat finished", &path); Ok(m) } + #[trace("delete")] async fn delete(&self, args: &OpDelete) -> Result<()> { increment_counter!("opendal_fs_delete_requests"); let path = self.get_abs_path(&args.path); - info!("object {} delete start", &path); + debug!("object {} delete start", &path); // PathBuf.is_dir() is not free, call metadata directly instead. let meta = fs::metadata(&path).await; @@ -289,15 +295,16 @@ impl Accessor for Backend { f.map_err(|e| parse_io_error(e, "delete", &path))?; - info!("object {} delete finished", &path); + debug!("object {} delete finished", &path); Ok(()) } + #[trace("list")] async fn list(&self, args: &OpList) -> Result { increment_counter!("opendal_fs_list_requests"); let path = self.get_abs_path(&args.path); - info!("object {} list start", &path); + debug!("object {} list start", &path); let f = fs::read_dir(&path).await.map_err(|e| { let e = parse_io_error(e, "read", &path); diff --git a/src/services/memory/backend.rs b/src/services/memory/backend.rs index e2a457d97b08..0ef9711e8b9e 100644 --- a/src/services/memory/backend.rs +++ b/src/services/memory/backend.rs @@ -24,6 +24,7 @@ use async_trait::async_trait; use bytes::Bytes; use futures::io; use futures::TryStreamExt; +use minitrace::trace; use crate::error::Error; use crate::error::Kind; @@ -79,6 +80,7 @@ impl Backend { #[async_trait] impl Accessor for Backend { + #[trace("read")] async fn read(&self, args: &OpRead) -> Result { let path = Backend::normalize_path(&args.path); @@ -119,6 +121,7 @@ impl Accessor for Backend { let r: BoxedAsyncReader = Box::new(BytesStream(data).into_async_read()); Ok(r) } + #[trace("write")] async fn write(&self, mut r: BoxedAsyncReader, args: &OpWrite) -> Result { let path = Backend::normalize_path(&args.path); @@ -146,6 +149,7 @@ impl Accessor for Backend { Ok(n as usize) } + #[trace("stat")] async fn stat(&self, args: &OpStat) -> Result { let path = Backend::normalize_path(&args.path); @@ -176,6 +180,7 @@ impl Accessor for Backend { Ok(meta) } + #[trace("delete")] async fn delete(&self, args: &OpDelete) -> Result<()> { let path = Backend::normalize_path(&args.path); @@ -184,6 +189,7 @@ impl Accessor for Backend { Ok(()) } + #[trace("list")] async fn list(&self, args: &OpList) -> Result { let path = Backend::normalize_path(&args.path); diff --git a/src/services/s3/backend.rs b/src/services/s3/backend.rs index f3139921ed4f..6e5c641e362d 100644 --- a/src/services/s3/backend.rs +++ b/src/services/s3/backend.rs @@ -35,7 +35,7 @@ use log::error; use log::info; use log::warn; use metrics::increment_counter; -use minitrace::prelude::*; +use minitrace::trace; use once_cell::sync::Lazy; use reqsign::services::aws::v4::Signer; use time::format_description::well_known::Rfc2822; @@ -400,7 +400,7 @@ impl Accessor for Backend { increment_counter!("opendal_s3_read_requests"); let p = self.get_abs_path(&args.path); - info!( + debug!( "object {} read start: offset {:?}, size {:?}", &p, args.offset, args.size ); @@ -409,7 +409,7 @@ impl Accessor for Backend { match resp.status() { StatusCode::OK | StatusCode::PARTIAL_CONTENT => { - info!( + debug!( "object {} reader created: offset {:?}, size {:?}", &p, args.offset, args.size ); @@ -422,12 +422,12 @@ impl Accessor for Backend { #[trace("write")] async fn write(&self, r: BoxedAsyncReader, args: &OpWrite) -> Result { let p = self.get_abs_path(&args.path); - info!("object {} write start: size {}", &p, args.size); + debug!("object {} write start: size {}", &p, args.size); let resp = self.put_object(&p, r, args.size).await?; match resp.status() { StatusCode::CREATED | StatusCode::OK => { - info!("object {} write finished: size {:?}", &p, args.size); + debug!("object {} write finished: size {:?}", &p, args.size); Ok(args.size as usize) } _ => Err(parse_error_response(resp, "write", &p).await), @@ -438,7 +438,7 @@ impl Accessor for Backend { increment_counter!("opendal_s3_stat_requests"); let p = self.get_abs_path(&args.path); - info!("object {} stat start", &p); + debug!("object {} stat start", &p); // Stat root always returns a DIR. if self.get_rel_path(&p).is_empty() { @@ -448,7 +448,7 @@ impl Accessor for Backend { m.set_mode(ObjectMode::DIR); m.set_complete(); - info!("backed root object stat finished"); + debug!("backed root object stat finished"); return Ok(m); } @@ -490,7 +490,7 @@ impl Accessor for Backend { m.set_complete(); - info!("object {} stat finished: {:?}", &p, m); + debug!("object {} stat finished: {:?}", &p, m); Ok(m) } StatusCode::NOT_FOUND if p.ends_with('/') => { @@ -500,7 +500,7 @@ impl Accessor for Backend { m.set_mode(ObjectMode::DIR); m.set_complete(); - info!("object {} stat finished", &p); + debug!("object {} stat finished", &p); Ok(m) } _ => Err(parse_error_response(resp, "stat", &p).await), @@ -511,13 +511,13 @@ impl Accessor for Backend { increment_counter!("opendal_s3_delete_requests"); let p = self.get_abs_path(&args.path); - info!("object {} delete start", &p); + debug!("object {} delete start", &p); let resp = self.delete_object(&p).await?; match resp.status() { StatusCode::NO_CONTENT => { - info!("object {} delete finished", &p); + debug!("object {} delete finished", &p); Ok(()) } _ => Err(parse_error_response(resp, "delete", &p).await), @@ -532,13 +532,14 @@ impl Accessor for Backend { if !path.ends_with('/') && !path.is_empty() { path.push('/') } - info!("object {} list start", &path); + debug!("object {} list start", &path); Ok(Box::new(S3ObjectStream::new(self.clone(), path))) } } impl Backend { + #[trace("get_object")] pub(crate) async fn get_object( &self, path: &str, @@ -570,6 +571,7 @@ impl Backend { }) } + #[trace("put_object")] pub(crate) async fn put_object( &self, path: &str, @@ -599,6 +601,7 @@ impl Backend { }) } + #[trace("head_object")] pub(crate) async fn head_object(&self, path: &str) -> Result> { let mut req = hyper::Request::head(&format!("{}/{}/{}", self.endpoint, self.bucket, path)) .body(hyper::Body::empty()) @@ -617,6 +620,7 @@ impl Backend { }) } + #[trace("delete_object")] pub(crate) async fn delete_object(&self, path: &str) -> Result> { let mut req = hyper::Request::delete(&format!("{}/{}/{}", self.endpoint, self.bucket, path)) @@ -636,7 +640,8 @@ impl Backend { }) } - pub(crate) async fn list_object( + #[trace("list_objects")] + pub(crate) async fn list_objects( &self, path: &str, continuation_token: &str, diff --git a/src/services/s3/object_stream.rs b/src/services/s3/object_stream.rs index 15cca590a40d..acb88fa32da0 100644 --- a/src/services/s3/object_stream.rs +++ b/src/services/s3/object_stream.rs @@ -75,7 +75,7 @@ impl futures::Stream for S3ObjectStream { let path = self.path.clone(); let token = self.token.clone(); let fut = async move { - let mut resp = backend.list_object(&path, &token).await?; + let mut resp = backend.list_objects(&path, &token).await?; if resp.status() != http::StatusCode::OK { let e = Err(Error::Object {