diff --git a/core/src/services/monoiofs/backend.rs b/core/src/services/monoiofs/backend.rs index 89c5b0ebdc30..76f6880dc77a 100644 --- a/core/src/services/monoiofs/backend.rs +++ b/core/src/services/monoiofs/backend.rs @@ -21,8 +21,10 @@ use std::path::PathBuf; use std::sync::Arc; use chrono::DateTime; +use monoio::fs::OpenOptions; use super::core::MonoiofsCore; +use super::core::BUFFER_SIZE; use super::reader::MonoiofsReader; use super::writer::MonoiofsWriter; use crate::raw::*; @@ -114,7 +116,11 @@ impl Access for MonoiofsBackend { stat: true, read: true, write: true, + write_can_append: true, delete: true, + rename: true, + create_dir: true, + copy: true, ..Default::default() }); am.into() @@ -150,10 +156,9 @@ impl Access for MonoiofsBackend { Ok((RpRead::default(), reader)) } - async fn write(&self, path: &str, _args: OpWrite) -> Result<(RpWrite, Self::Writer)> { - // TODO: create parent directory before write - let path = self.core.prepare_path(path); - let writer = MonoiofsWriter::new(self.core.clone(), path).await?; + async fn write(&self, path: &str, args: OpWrite) -> Result<(RpWrite, Self::Writer)> { + let path = self.core.prepare_write_path(path).await?; + let writer = MonoiofsWriter::new(self.core.clone(), path, args.append()).await?; Ok((RpWrite::default(), writer)) } @@ -186,4 +191,87 @@ impl Access for MonoiofsBackend { Err(err) => Err(new_std_io_error(err)), } } + + async fn rename(&self, from: &str, to: &str, _args: OpRename) -> Result { + let from = self.core.prepare_path(from); + // ensure file exists + self.core + .dispatch({ + let from = from.clone(); + move || monoio::fs::metadata(from) + }) + .await + .map_err(new_std_io_error)?; + let to = self.core.prepare_write_path(to).await?; + self.core + .dispatch(move || monoio::fs::rename(from, to)) + .await + .map_err(new_std_io_error)?; + Ok(RpRename::default()) + } + + async fn create_dir(&self, path: &str, _args: OpCreateDir) -> Result { + let path = self.core.prepare_path(path); + self.core + .dispatch(move || monoio::fs::create_dir_all(path)) + .await + .map_err(new_std_io_error)?; + Ok(RpCreateDir::default()) + } + + async fn copy(&self, from: &str, to: &str, _args: OpCopy) -> Result { + let from = self.core.prepare_path(from); + // ensure file exists + self.core + .dispatch({ + let from = from.clone(); + move || monoio::fs::metadata(from) + }) + .await + .map_err(new_std_io_error)?; + let to = self.core.prepare_write_path(to).await?; + self.core + .dispatch({ + let core = self.core.clone(); + move || async move { + let from = OpenOptions::new().read(true).open(from).await?; + let to = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(to) + .await?; + + // AsyncReadRent and AsyncWriteRent is not implemented + // for File, so we can't write this: + // monoio::io::copy(&mut from, &mut to).await?; + + let mut pos = 0; + // allocate and resize buffer + let mut buf = core.buf_pool.get(); + // set capacity of buf to exact size to avoid excessive read + buf.reserve(BUFFER_SIZE); + let _ = buf.split_off(BUFFER_SIZE); + + loop { + let result; + (result, buf) = from.read_at(buf, pos).await; + if result? == 0 { + // EOF + break; + } + let result; + (result, buf) = to.write_all_at(buf, pos).await; + result?; + pos += buf.len() as u64; + buf.clear(); + } + core.buf_pool.put(buf); + Ok(()) + } + }) + .await + .map_err(new_std_io_error)?; + Ok(RpCopy::default()) + } } diff --git a/core/src/services/monoiofs/core.rs b/core/src/services/monoiofs/core.rs index 25acc7e8cea2..30d14adc987c 100644 --- a/core/src/services/monoiofs/core.rs +++ b/core/src/services/monoiofs/core.rs @@ -75,10 +75,30 @@ impl MonoiofsCore { &self.root } + /// join root and path pub fn prepare_path(&self, path: &str) -> PathBuf { self.root.join(path.trim_end_matches('/')) } + /// join root and path, create parent dirs + pub async fn prepare_write_path(&self, path: &str) -> Result { + let path = self.prepare_path(path); + let parent = path + .parent() + .ok_or_else(|| { + Error::new( + ErrorKind::Unexpected, + "path should have parent but not, it must be malformed", + ) + .with_context("input", path.to_string_lossy()) + })? + .to_path_buf(); + self.dispatch(move || monoio::fs::create_dir_all(parent)) + .await + .map_err(new_std_io_error)?; + Ok(path) + } + /// entrypoint of each worker thread, sets up monoio runtimes and channels fn worker_entrypoint(rx: Receiver, io_uring_entries: u32) { let mut rt = RuntimeBuilder::::new() diff --git a/core/src/services/monoiofs/docs.md b/core/src/services/monoiofs/docs.md index 2db804fd6f7f..5926d460a3dd 100644 --- a/core/src/services/monoiofs/docs.md +++ b/core/src/services/monoiofs/docs.md @@ -5,11 +5,11 @@ This service can be used to: - [x] stat - [x] read - [x] write -- [ ] append -- [ ] create_dir +- [x] append +- [x] create_dir - [x] delete -- [ ] copy -- [ ] rename +- [x] copy +- [x] rename - [ ] list - [ ] ~~presign~~ - [ ] blocking diff --git a/core/src/services/monoiofs/writer.rs b/core/src/services/monoiofs/writer.rs index 20ffe65e1eff..6af3d485cfdd 100644 --- a/core/src/services/monoiofs/writer.rs +++ b/core/src/services/monoiofs/writer.rs @@ -45,10 +45,10 @@ pub struct MonoiofsWriter { } impl MonoiofsWriter { - pub async fn new(core: Arc, path: PathBuf) -> Result { + pub async fn new(core: Arc, path: PathBuf, append: bool) -> Result { let (open_result_tx, open_result_rx) = oneshot::channel(); let (tx, rx) = mpsc::unbounded(); - core.spawn(move || Self::worker_entrypoint(path, rx, open_result_tx)) + core.spawn(move || Self::worker_entrypoint(path, append, rx, open_result_tx)) .await; core.unwrap(open_result_rx.await)?; Ok(Self { core, tx, pos: 0 }) @@ -57,13 +57,15 @@ impl MonoiofsWriter { /// entrypoint of worker task that runs in context of monoio async fn worker_entrypoint( path: PathBuf, + append: bool, mut rx: mpsc::UnboundedReceiver, open_result_tx: oneshot::Sender>, ) { let result = OpenOptions::new() .write(true) .create(true) - .truncate(true) + .append(append) + .truncate(!append) .open(path) .await; // [`monoio::fs::File`] is non-Send, hence it is kept within