Skip to content

Commit

Permalink
feat: try to support epoll (#478)
Browse files Browse the repository at this point in the history
* feat: try to support iouring

* chore: make cargo docs work

* feat: all in epoll

* chore: apply review

* chore: fix tokio macros

* chore: fix features
  • Loading branch information
Chojan Shang authored Jul 29, 2022
1 parent fd3ef54 commit b609cde
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 22 deletions.
3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ required-features = ["services-hdfs"]

[dependencies]
anyhow = "1.0.56"
async-compat = "0.2.1"
# Temp workaround, should come back to tagged version after https://github.com/Nemo157/async-compression/issues/150 resolved.
async-compression = { package = "async-compression-issue-150-workaround", version = "0.3.15-issue-150", features = [
"futures-io",
Expand All @@ -65,6 +64,7 @@ log = "0.4.16"
md5 = "0.7.0"
metrics = "0.20.0"
minitrace = "0.4.0"
nuclei = "0.2.1"
once_cell = "1.10.0"
parking_lot = "0.12.0"
percent-encoding = "2.1.0"
Expand All @@ -75,7 +75,6 @@ reqsign = "0.3.0"
serde = { version = "1.0.136", features = ["derive"] }
thiserror = "1.0.30"
time = "0.3.9"
tokio = { version = "1.20.1", features = ["fs", "macros"] }

[dev-dependencies]
anyhow = "1.0.56"
Expand Down
2 changes: 2 additions & 0 deletions oay/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion oay/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ env_logger = "0.9.0"
futures = "0.3.21"
log = "0.4.17"
opendal = "0.11.3"
tokio = { version = "1.20.1", features = ["rt-multi-thread"] }
tokio = { version = "1.20.1", features = ["rt-multi-thread", "macros"] }

# Please comment the following patch while releasing.
[patch.crates-io]
Expand Down
34 changes: 15 additions & 19 deletions src/services/fs/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::path::PathBuf;
use std::sync::Arc;

use anyhow::anyhow;
use async_compat::Compat;
use async_trait::async_trait;
use futures::AsyncReadExt;
use futures::AsyncSeekExt;
Expand All @@ -31,10 +30,10 @@ use log::warn;
use metrics::increment_counter;
use minitrace::trace;
use time::OffsetDateTime;
use tokio::fs;

use super::dir_stream::DirStream;
use super::error::parse_io_error;
use super::nfs;
use crate::accessor::AccessorMetadata;
use crate::error::other;
use crate::error::BackendError;
Expand Down Expand Up @@ -98,9 +97,9 @@ impl Builder {
};

// If root dir is not exist, we must create it.
if let Err(e) = fs::metadata(&root).await {
if let Err(e) = nfs::metadata(&root).await {
if e.kind() == std::io::ErrorKind::NotFound {
fs::create_dir_all(&root)
nfs::create_dir_all(&root)
.await
.map_err(|e| parse_io_error(e, "build", &root))?;
}
Expand Down Expand Up @@ -187,7 +186,7 @@ impl Accessor for Backend {
})?
.to_path_buf();

fs::create_dir_all(&parent).await.map_err(|e| {
nfs::create_dir_all(&parent).await.map_err(|e| {
let e = parse_io_error(e, "create", &parent.to_string_lossy());
error!(
"object {} create_dir_all for parent {:?}: {:?}",
Expand All @@ -196,11 +195,10 @@ impl Accessor for Backend {
e
})?;

fs::OpenOptions::new()
std::fs::OpenOptions::new()
.create(true)
.write(true)
.open(&path)
.await
.map_err(|e| {
let e = parse_io_error(e, "create", &path);
error!("object {} create: {:?}", &path, e);
Expand All @@ -211,7 +209,7 @@ impl Accessor for Backend {
}

if args.mode() == ObjectMode::DIR {
fs::create_dir_all(&path).await.map_err(|e| {
nfs::create_dir_all(&path).await.map_err(|e| {
let e = parse_io_error(e, "create", &path);
error!("object {} create: {:?}", &path, e);
e
Expand All @@ -235,17 +233,16 @@ impl Accessor for Backend {
args.size()
);

let f = fs::OpenOptions::new()
let f = std::fs::OpenOptions::new()
.read(true)
.open(&path)
.await
.map_err(|e| {
let e = parse_io_error(e, "read", &path);
error!("object {} open: {:?}", &path, e);
e
})?;

let mut f = Compat::new(f);
let mut f = nuclei::Handle::new(f)?;

if let Some(offset) = args.offset() {
f.seek(SeekFrom::Start(offset)).await.map_err(|e| {
Expand Down Expand Up @@ -293,7 +290,7 @@ impl Accessor for Backend {
})?
.to_path_buf();

fs::create_dir_all(&parent).await.map_err(|e| {
nfs::create_dir_all(&parent).await.map_err(|e| {
let e = parse_io_error(e, "write", &parent.to_string_lossy());
error!(
"object {} create_dir_all for parent {}: {:?}",
Expand All @@ -304,19 +301,18 @@ impl Accessor for Backend {
e
})?;

let f = fs::OpenOptions::new()
let f = std::fs::OpenOptions::new()
.create(true)
.write(true)
.open(&path)
.await
.map_err(|e| {
let e = parse_io_error(e, "write", &path);
error!("object {} open: {:?}", &path, e);
e
})?;

debug!("object {} write finished: size {:?}", &path, args.size());
Ok(Box::new(Compat::new(f)))
Ok(Box::new(nuclei::Handle::new(f)?))
}

#[trace("stat")]
Expand All @@ -326,7 +322,7 @@ impl Accessor for Backend {
let path = self.get_abs_path(args.path());
debug!("object {} stat start", &path);

let meta = fs::metadata(&path).await.map_err(|e| {
let meta = nfs::metadata(&path).await.map_err(|e| {
let e = parse_io_error(e, "stat", &path);
warn!("object {} stat: {:?}", &path, e);
e
Expand Down Expand Up @@ -359,7 +355,7 @@ impl Accessor for Backend {
debug!("object {} delete start", &path);

// PathBuf.is_dir() is not free, call metadata directly instead.
let meta = fs::metadata(&path).await;
let meta = nfs::metadata(&path).await;

if let Err(err) = meta {
return if err.kind() == ErrorKind::NotFound {
Expand All @@ -375,9 +371,9 @@ impl Accessor for Backend {
let meta = meta.ok().unwrap();

let f = if meta.is_dir() {
fs::remove_dir(&path).await
nfs::remove_dir(&path).await
} else {
fs::remove_file(&path).await
nfs::remove_file(&path).await
};

f.map_err(|e| parse_io_error(e, "delete", &path))?;
Expand Down
1 change: 1 addition & 0 deletions src/services/fs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,3 +92,4 @@ pub use backend::Builder;

mod dir_stream;
mod error;
mod nfs;
90 changes: 90 additions & 0 deletions src/services/fs/nfs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2022 Datafuse Labs.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Async filesystem primitives.
//! We use [`nuclei`] to support io-uring.
//!
//! [`nuclei`]: https://docs.rs/nuclei
#![warn(missing_docs)]
use std::io;
use std::path::Path;

use nuclei::spawn_blocking;

#[doc(no_inline)]
pub use std::fs::{FileType, Metadata, Permissions};

/// Creates a directory and its parent directories if they are missing.
///
/// # Errors
///
/// An error will be returned in the following situations:
///
/// * `path` already points to an existing file or directory.
/// * The current process lacks permissions to create the directory or its missing parents.
/// * Some other I/O error occurred.
pub async fn create_dir_all<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
spawn_blocking(move || std::fs::create_dir_all(&path)).await
}

/// Reads metadata for a path.
///
/// This function will traverse symbolic links to read metadata for the target file or directory.
/// If you want to read metadata without following symbolic links, use [`symlink_metadata()`]
/// instead.
///
/// # Errors
///
/// An error will be returned in the following situations:
///
/// * `path` does not point to an existing file or directory.
/// * The current process lacks permissions to read metadata for the path.
/// * Some other I/O error occurred.
pub async fn metadata<P: AsRef<Path>>(path: P) -> io::Result<Metadata> {
let path = path.as_ref().to_owned();
spawn_blocking(move || std::fs::metadata(path)).await
}

/// Removes an empty directory.
///
/// Note that this function can only delete an empty directory. If you want to delete a directory
/// and all of its contents, use [`remove_dir_all()`] instead.
///
/// # Errors
///
/// An error will be returned in the following situations:
///
/// * `path` is not an existing and empty directory.
/// * The current process lacks permissions to remove the directory.
/// * Some other I/O error occurred.
pub async fn remove_dir<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
spawn_blocking(move || std::fs::remove_dir(&path)).await
}

/// Removes a file.
///
/// # Errors
///
/// An error will be returned in the following situations:
///
/// * `path` does not point to an existing file.
/// * The current process lacks permissions to remove the file.
/// * Some other I/O error occurred.
pub async fn remove_file<P: AsRef<Path>>(path: P) -> io::Result<()> {
let path = path.as_ref().to_owned();
spawn_blocking(move || std::fs::remove_file(&path)).await
}

1 comment on commit b609cde

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for opendal ready!

✅ Preview
https://opendal-lt8u22wim-databend.vercel.app

Built with commit b609cde.
This pull request is being automatically deployed with vercel-action

Please # to comment.