diff --git a/Cargo.toml b/Cargo.toml index e2368e0549e3..91386643750b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -38,7 +38,6 @@ required-features = ["services-hdfs"] [dependencies] anyhow = "1.0.56" -async-compat = "0.2.1" # Temp workaround, should come back to tagged version after https://github.com/Nemo157/async-compression/issues/150 resolved. async-compression = { package = "async-compression-issue-150-workaround", version = "0.3.15-issue-150", features = [ "futures-io", @@ -65,6 +64,7 @@ log = "0.4.16" md5 = "0.7.0" metrics = "0.20.0" minitrace = "0.4.0" +nuclei = "0.2.1" once_cell = "1.10.0" parking_lot = "0.12.0" percent-encoding = "2.1.0" @@ -75,7 +75,6 @@ reqsign = "0.3.0" serde = { version = "1.0.136", features = ["derive"] } thiserror = "1.0.30" time = "0.3.9" -tokio = { version = "1.20.1", features = ["fs", "macros"] } [dev-dependencies] anyhow = "1.0.56" diff --git a/oay/Cargo.lock b/oay/Cargo.lock index b28625e674d5..707f0f56164e 100644 --- a/oay/Cargo.lock +++ b/oay/Cargo.lock @@ -1358,6 +1358,8 @@ checksum = "18a6dbe30758c9f83eb00cbea4ac95966305f5a7772f3f42ebfc7fc7eddbd8e1" [[package]] name = "opendal" version = "0.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "470a35200a050b8dd34d4713ba32d9e145c363115e989860ea03e784a5040533" dependencies = [ "anyhow", "async-compat", diff --git a/oay/Cargo.toml b/oay/Cargo.toml index 3d210bee7534..05c4aaa5c4a2 100644 --- a/oay/Cargo.toml +++ b/oay/Cargo.toml @@ -18,7 +18,7 @@ env_logger = "0.9.0" futures = "0.3.21" log = "0.4.17" opendal = "0.11.3" -tokio = { version = "1.20.1", features = ["rt-multi-thread"] } +tokio = { version = "1.20.1", features = ["rt-multi-thread", "macros"] } # Please comment the following patch while releasing. [patch.crates-io] diff --git a/src/services/fs/backend.rs b/src/services/fs/backend.rs index 96daad5e3b7e..6fa810c9611b 100644 --- a/src/services/fs/backend.rs +++ b/src/services/fs/backend.rs @@ -20,7 +20,6 @@ use std::path::PathBuf; use std::sync::Arc; use anyhow::anyhow; -use async_compat::Compat; use async_trait::async_trait; use futures::AsyncReadExt; use futures::AsyncSeekExt; @@ -31,10 +30,10 @@ use log::warn; use metrics::increment_counter; use minitrace::trace; use time::OffsetDateTime; -use tokio::fs; use super::dir_stream::DirStream; use super::error::parse_io_error; +use super::nfs; use crate::accessor::AccessorMetadata; use crate::error::other; use crate::error::BackendError; @@ -98,9 +97,9 @@ impl Builder { }; // If root dir is not exist, we must create it. - if let Err(e) = fs::metadata(&root).await { + if let Err(e) = nfs::metadata(&root).await { if e.kind() == std::io::ErrorKind::NotFound { - fs::create_dir_all(&root) + nfs::create_dir_all(&root) .await .map_err(|e| parse_io_error(e, "build", &root))?; } @@ -187,7 +186,7 @@ impl Accessor for Backend { })? .to_path_buf(); - fs::create_dir_all(&parent).await.map_err(|e| { + nfs::create_dir_all(&parent).await.map_err(|e| { let e = parse_io_error(e, "create", &parent.to_string_lossy()); error!( "object {} create_dir_all for parent {:?}: {:?}", @@ -196,11 +195,10 @@ impl Accessor for Backend { e })?; - fs::OpenOptions::new() + std::fs::OpenOptions::new() .create(true) .write(true) .open(&path) - .await .map_err(|e| { let e = parse_io_error(e, "create", &path); error!("object {} create: {:?}", &path, e); @@ -211,7 +209,7 @@ impl Accessor for Backend { } if args.mode() == ObjectMode::DIR { - fs::create_dir_all(&path).await.map_err(|e| { + nfs::create_dir_all(&path).await.map_err(|e| { let e = parse_io_error(e, "create", &path); error!("object {} create: {:?}", &path, e); e @@ -235,17 +233,16 @@ impl Accessor for Backend { args.size() ); - let f = fs::OpenOptions::new() + let f = std::fs::OpenOptions::new() .read(true) .open(&path) - .await .map_err(|e| { let e = parse_io_error(e, "read", &path); error!("object {} open: {:?}", &path, e); e })?; - let mut f = Compat::new(f); + let mut f = nuclei::Handle::new(f)?; if let Some(offset) = args.offset() { f.seek(SeekFrom::Start(offset)).await.map_err(|e| { @@ -293,7 +290,7 @@ impl Accessor for Backend { })? .to_path_buf(); - fs::create_dir_all(&parent).await.map_err(|e| { + nfs::create_dir_all(&parent).await.map_err(|e| { let e = parse_io_error(e, "write", &parent.to_string_lossy()); error!( "object {} create_dir_all for parent {}: {:?}", @@ -304,11 +301,10 @@ impl Accessor for Backend { e })?; - let f = fs::OpenOptions::new() + let f = std::fs::OpenOptions::new() .create(true) .write(true) .open(&path) - .await .map_err(|e| { let e = parse_io_error(e, "write", &path); error!("object {} open: {:?}", &path, e); @@ -316,7 +312,7 @@ impl Accessor for Backend { })?; debug!("object {} write finished: size {:?}", &path, args.size()); - Ok(Box::new(Compat::new(f))) + Ok(Box::new(nuclei::Handle::new(f)?)) } #[trace("stat")] @@ -326,7 +322,7 @@ impl Accessor for Backend { let path = self.get_abs_path(args.path()); debug!("object {} stat start", &path); - let meta = fs::metadata(&path).await.map_err(|e| { + let meta = nfs::metadata(&path).await.map_err(|e| { let e = parse_io_error(e, "stat", &path); warn!("object {} stat: {:?}", &path, e); e @@ -359,7 +355,7 @@ impl Accessor for Backend { debug!("object {} delete start", &path); // PathBuf.is_dir() is not free, call metadata directly instead. - let meta = fs::metadata(&path).await; + let meta = nfs::metadata(&path).await; if let Err(err) = meta { return if err.kind() == ErrorKind::NotFound { @@ -375,9 +371,9 @@ impl Accessor for Backend { let meta = meta.ok().unwrap(); let f = if meta.is_dir() { - fs::remove_dir(&path).await + nfs::remove_dir(&path).await } else { - fs::remove_file(&path).await + nfs::remove_file(&path).await }; f.map_err(|e| parse_io_error(e, "delete", &path))?; diff --git a/src/services/fs/mod.rs b/src/services/fs/mod.rs index cf7cc1563ddb..c14fac82893e 100644 --- a/src/services/fs/mod.rs +++ b/src/services/fs/mod.rs @@ -92,3 +92,4 @@ pub use backend::Builder; mod dir_stream; mod error; +mod nfs; diff --git a/src/services/fs/nfs.rs b/src/services/fs/nfs.rs new file mode 100644 index 000000000000..a62a2002ce4c --- /dev/null +++ b/src/services/fs/nfs.rs @@ -0,0 +1,90 @@ +// Copyright 2022 Datafuse Labs. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Async filesystem primitives. +//! We use [`nuclei`] to support io-uring. +//! +//! [`nuclei`]: https://docs.rs/nuclei + +#![warn(missing_docs)] +use std::io; +use std::path::Path; + +use nuclei::spawn_blocking; + +#[doc(no_inline)] +pub use std::fs::{FileType, Metadata, Permissions}; + +/// Creates a directory and its parent directories if they are missing. +/// +/// # Errors +/// +/// An error will be returned in the following situations: +/// +/// * `path` already points to an existing file or directory. +/// * The current process lacks permissions to create the directory or its missing parents. +/// * Some other I/O error occurred. +pub async fn create_dir_all>(path: P) -> io::Result<()> { + let path = path.as_ref().to_owned(); + spawn_blocking(move || std::fs::create_dir_all(&path)).await +} + +/// Reads metadata for a path. +/// +/// This function will traverse symbolic links to read metadata for the target file or directory. +/// If you want to read metadata without following symbolic links, use [`symlink_metadata()`] +/// instead. +/// +/// # Errors +/// +/// An error will be returned in the following situations: +/// +/// * `path` does not point to an existing file or directory. +/// * The current process lacks permissions to read metadata for the path. +/// * Some other I/O error occurred. +pub async fn metadata>(path: P) -> io::Result { + let path = path.as_ref().to_owned(); + spawn_blocking(move || std::fs::metadata(path)).await +} + +/// Removes an empty directory. +/// +/// Note that this function can only delete an empty directory. If you want to delete a directory +/// and all of its contents, use [`remove_dir_all()`] instead. +/// +/// # Errors +/// +/// An error will be returned in the following situations: +/// +/// * `path` is not an existing and empty directory. +/// * The current process lacks permissions to remove the directory. +/// * Some other I/O error occurred. +pub async fn remove_dir>(path: P) -> io::Result<()> { + let path = path.as_ref().to_owned(); + spawn_blocking(move || std::fs::remove_dir(&path)).await +} + +/// Removes a file. +/// +/// # Errors +/// +/// An error will be returned in the following situations: +/// +/// * `path` does not point to an existing file. +/// * The current process lacks permissions to remove the file. +/// * Some other I/O error occurred. +pub async fn remove_file>(path: P) -> io::Result<()> { + let path = path.as_ref().to_owned(); + spawn_blocking(move || std::fs::remove_file(&path)).await +}