diff --git a/integrations/virtiofs/src/buffer.rs b/integrations/virtiofs/src/buffer.rs new file mode 100644 index 000000000000..0cd8d1bb0e65 --- /dev/null +++ b/integrations/virtiofs/src/buffer.rs @@ -0,0 +1,117 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::cell::RefCell; +use std::cmp::min; +use std::ptr; + +use vm_memory::bitmap::BitmapSlice; +use vm_memory::VolatileSlice; + +use crate::error::*; + +/// ReadWriteAtVolatile is a trait that allows reading and writing from a slice of VolatileSlice. +pub trait ReadWriteAtVolatile { + fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result; + fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result; +} + +impl<'a, B: BitmapSlice, T: ReadWriteAtVolatile + ?Sized> ReadWriteAtVolatile for &'a T { + fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result { + (**self).read_vectored_at_volatile(bufs) + } + + fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result { + (**self).write_vectored_at_volatile(bufs) + } +} + +/// BufferWrapper is a wrapper around opendal::Buffer that implements ReadWriteAtVolatile. +pub struct BufferWrapper { + buffer: RefCell, +} + +impl BufferWrapper { + pub fn new(buffer: opendal::Buffer) -> BufferWrapper { + BufferWrapper { + buffer: RefCell::new(buffer), + } + } + + pub fn get_buffer(&self) -> opendal::Buffer { + return self.buffer.borrow().clone(); + } +} + +impl ReadWriteAtVolatile for BufferWrapper { + fn read_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result { + let slice_guards: Vec<_> = bufs.iter().map(|s| s.ptr_guard_mut()).collect(); + let iovecs: Vec<_> = slice_guards + .iter() + .map(|s| libc::iovec { + iov_base: s.as_ptr() as *mut libc::c_void, + iov_len: s.len() as libc::size_t, + }) + .collect(); + if iovecs.is_empty() { + return Ok(0); + } + let data = self.buffer.borrow().to_vec(); + let mut result = 0; + for (index, iovec) in iovecs.iter().enumerate() { + let num = min(data.len() - result, iovec.iov_len); + if num == 0 { + break; + } + unsafe { + ptr::copy_nonoverlapping(data[result..].as_ptr(), iovec.iov_base as *mut u8, num) + } + bufs[index].bitmap().mark_dirty(0, num); + result += num; + } + Ok(result) + } + + fn write_vectored_at_volatile(&self, bufs: &[&VolatileSlice]) -> Result { + let slice_guards: Vec<_> = bufs.iter().map(|s| s.ptr_guard()).collect(); + let iovecs: Vec<_> = slice_guards + .iter() + .map(|s| libc::iovec { + iov_base: s.as_ptr() as *mut libc::c_void, + iov_len: s.len() as libc::size_t, + }) + .collect(); + if iovecs.is_empty() { + return Ok(0); + } + let len = iovecs.iter().map(|iov| iov.iov_len).sum(); + let mut data = vec![0; len]; + let mut offset = 0; + for iov in iovecs.iter() { + unsafe { + ptr::copy_nonoverlapping( + iov.iov_base as *const u8, + data.as_mut_ptr().add(offset), + iov.iov_len, + ); + } + offset += iov.iov_len; + } + *self.buffer.borrow_mut() = opendal::Buffer::from(data); + Ok(len) + } +} diff --git a/integrations/virtiofs/src/filesystem.rs b/integrations/virtiofs/src/filesystem.rs index 5c4b1e84b855..764d03596648 100644 --- a/integrations/virtiofs/src/filesystem.rs +++ b/integrations/virtiofs/src/filesystem.rs @@ -16,6 +16,7 @@ // under the License. use std::collections::HashMap; +use std::ffi::CStr; use std::io::Read; use std::io::Write; use std::mem::size_of; @@ -23,6 +24,7 @@ use std::sync::Mutex; use std::time::Duration; use log::debug; +use opendal::Buffer; use opendal::ErrorKind; use opendal::Operator; use sharded_slab::Slab; @@ -30,6 +32,7 @@ use tokio::runtime::Builder; use tokio::runtime::Runtime; use vm_memory::ByteValued; +use crate::buffer::BufferWrapper; use crate::error::*; use crate::filesystem_message::*; use crate::virtiofs_util::Reader; @@ -55,21 +58,17 @@ enum FileType { File, } +struct InnerWriter { + writer: opendal::Writer, + written: u64, +} + #[derive(Clone)] struct OpenedFile { path: String, metadata: Attr, } -struct InnerWriter { - // #[allow(dead_code)] here will be removed after write is implemented. - #[allow(dead_code)] - writer: opendal::Writer, - // #[allow(dead_code)] here will be removed after write is implemented. - #[allow(dead_code)] - written: u64, -} - impl OpenedFile { fn new(file_type: FileType, path: &str, uid: u32, gid: u32) -> OpenedFile { let mut attr: Attr = unsafe { std::mem::zeroed() }; @@ -130,7 +129,7 @@ pub struct Filesystem { // Since we need to manually manage the allocation of inodes, // we record the inode of each opened file here. opened_files_map: Mutex>, - opened_files_writer: Mutex>, + opened_files_writer: tokio::sync::Mutex>, } impl Filesystem { @@ -149,7 +148,7 @@ impl Filesystem { gid: 1000, opened_files: Slab::new(), opened_files_map: Mutex::new(HashMap::new()), - opened_files_writer: Mutex::new(HashMap::new()), + opened_files_writer: tokio::sync::Mutex::new(HashMap::new()), } } @@ -170,8 +169,12 @@ impl Filesystem { Opcode::Setattr => self.setattr(in_header, r, w), Opcode::Create => self.create(in_header, r, w), Opcode::Unlink => self.unlink(in_header, r, w), - Opcode::Open => self.open(in_header, r, w), Opcode::Release => self.release(in_header, r, w), + Opcode::Flush => self.flush(in_header, r, w), + Opcode::Forget => self.forget(in_header, r), + Opcode::Open => self.open(in_header, r, w), + Opcode::Read => self.read(in_header, r, w), + Opcode::Write => self.write(in_header, r, w), } } else { Filesystem::reply_error(in_header.unique, w) @@ -226,21 +229,24 @@ impl Filesystem { Ok(w.bytes_written()) } - fn check_write_flags(&self, flags: u32) -> Result<(bool, bool)> { + fn bytes_to_str(buf: &[u8]) -> Result<&str> { + Filesystem::bytes_to_cstr(buf)?.to_str().map_err(|e| { + new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) + }) + } + + fn bytes_to_cstr(buf: &[u8]) -> Result<&CStr> { + CStr::from_bytes_with_nul(buf).map_err(|e| { + new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) + }) + } + + fn check_flags(&self, flags: u32) -> Result<(bool, bool)> { let is_trunc = flags & libc::O_TRUNC as u32 != 0 || flags & libc::O_CREAT as u32 != 0; let is_append = flags & libc::O_APPEND as u32 != 0; let mode = flags & libc::O_ACCMODE as u32; let is_write = mode == libc::O_WRONLY as u32 || mode == libc::O_RDWR as u32 || is_append; - if !is_write { - Err(Error::from(libc::EINVAL))?; - } - - // OpenDAL only supports truncate write and append write, - // so O_TRUNC or O_APPEND needs to be specified explicitly - if (is_write && !is_trunc && !is_append) || is_trunc && !is_write { - Err(Error::from(libc::EINVAL))?; - } let capability = self.core.info().full_capability(); if is_trunc && !capability.write { @@ -250,7 +256,7 @@ impl Filesystem { Err(Error::from(libc::EACCES))?; } - Ok((is_trunc, is_append)) + Ok((is_write, is_append)) } } @@ -264,7 +270,19 @@ impl Filesystem { return Filesystem::reply_error(in_header.unique, w); } - // We will directly return ok and do nothing for now. + let mut attr = OpenedFile::new(FileType::Dir, "/", self.uid, self.gid); + attr.metadata.ino = 1; + // We need to allocate the inode 1 for the root directory. The double insertion + // here makes 1 the first inode and avoids extra alignment and processing elsewhere. + self.opened_files + .insert(attr.clone()) + .expect("failed to allocate inode"); + self.opened_files + .insert(attr.clone()) + .expect("failed to allocate inode"); + let mut opened_files_map = self.opened_files_map.lock().unwrap(); + opened_files_map.insert("/".to_string(), 1); + let out = InitOut { major: KERNEL_VERSION, minor: KERNEL_MINOR_VERSION, @@ -279,15 +297,26 @@ impl Filesystem { Ok(0) } + fn flush(&self, _in_header: InHeader, _r: Reader, _w: Writer) -> Result { + // do nothing for flush. + Ok(0) + } + + fn forget(&self, _in_header: InHeader, _r: Reader) -> Result { + // do nothing for forget. + Ok(0) + } + fn lookup(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result { let name_len = in_header.len as usize - size_of::(); let mut buf = vec![0; name_len]; r.read_exact(&mut buf).map_err(|e| { new_unexpected_error("failed to decode protocol messages", Some(e.into())) })?; - let name = String::from_utf8(buf).map_err(|e| { - new_unexpected_error("failed to decode protocol messages", Some(e.into())) - })?; + let name = match Filesystem::bytes_to_str(buf.as_ref()) { + Ok(name) => name, + Err(_) => return Filesystem::reply_error(in_header.unique, w), + }; debug!("lookup: parent inode={} name={}", in_header.nodeid, name); @@ -361,9 +390,10 @@ impl Filesystem { r.read_exact(&mut buf).map_err(|e| { new_unexpected_error("failed to decode protocol messages", Some(e.into())) })?; - let name = String::from_utf8(buf).map_err(|e| { - new_unexpected_error("failed to decode protocol messages", Some(e.into())) - })?; + let name = match Filesystem::bytes_to_str(buf.as_ref()) { + Ok(name) => name, + Err(_) => return Filesystem::reply_error(in_header.unique, w), + }; debug!("create: parent inode={} name={}", in_header.nodeid, name); @@ -386,14 +416,11 @@ impl Filesystem { let mut opened_files_map = self.opened_files_map.lock().unwrap(); opened_files_map.insert(path.to_string(), inode as u64); - let writer = match self.rt.block_on(self.do_get_writer(&path, flags)) { + match self.rt.block_on(self.do_set_writer(&path, flags)) { Ok(writer) => writer, Err(_) => return Filesystem::reply_error(in_header.unique, w), }; - let mut opened_file_writer = self.opened_files_writer.lock().unwrap(); - opened_file_writer.insert(path, writer); - let entry_out = EntryOut { nodeid: attr.metadata.ino, entry_valid: DEFAULT_TTL.as_secs(), @@ -420,9 +447,10 @@ impl Filesystem { r.read_exact(&mut buf).map_err(|e| { new_unexpected_error("failed to decode protocol messages", Some(e.into())) })?; - let name = String::from_utf8(buf).map_err(|e| { - new_unexpected_error("failed to decode protocol messages", Some(e.into())) - })?; + let name = match Filesystem::bytes_to_str(buf.as_ref()) { + Ok(name) => name, + Err(_) => return Filesystem::reply_error(in_header.unique, w), + }; debug!("unlink: parent inode={} name={}", in_header.nodeid, name); @@ -440,13 +468,31 @@ impl Filesystem { return Filesystem::reply_error(in_header.unique, w); } - self.opened_files.remove(in_header.nodeid as usize); let mut opened_files_map = self.opened_files_map.lock().unwrap(); opened_files_map.remove(&path); Filesystem::reply_ok(None::, None, in_header.unique, w) } + fn release(&self, in_header: InHeader, _r: Reader, w: Writer) -> Result { + debug!("release: inode={}", in_header.nodeid); + + let path = match self + .opened_files + .get(in_header.nodeid as usize) + .map(|f| f.path.clone()) + { + Some(path) => path, + None => return Filesystem::reply_error(in_header.unique, w), + }; + + if self.rt.block_on(self.do_release_writer(&path)).is_err() { + return Filesystem::reply_error(in_header.unique, w); + } + + Filesystem::reply_ok(None::, None, in_header.unique, w) + } + fn open(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result { debug!("open: inode={}", in_header.nodeid); @@ -463,22 +509,61 @@ impl Filesystem { None => return Filesystem::reply_error(in_header.unique, w), }; - let writer = match self.rt.block_on(self.do_get_writer(&path, flags)) { + match self.rt.block_on(self.do_set_writer(&path, flags)) { Ok(writer) => writer, Err(_) => return Filesystem::reply_error(in_header.unique, w), }; - let mut opened_file_writer = self.opened_files_writer.lock().unwrap(); - opened_file_writer.insert(path, writer); - let out = OpenOut { ..Default::default() }; Filesystem::reply_ok(Some(out), None, in_header.unique, w) } - fn release(&self, in_header: InHeader, _r: Reader, w: Writer) -> Result { - debug!("release: inode={}", in_header.nodeid); + fn read(&self, in_header: InHeader, mut r: Reader, mut w: Writer) -> Result { + let path = match self + .opened_files + .get(in_header.nodeid as usize) + .map(|f| f.path.clone()) + { + Some(path) => path, + None => return Filesystem::reply_error(in_header.unique, w), + }; + + let ReadIn { offset, size, .. } = r.read_obj().map_err(|e| { + new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) + })?; + + debug!( + "read: inode={} offset={} size={}", + in_header.nodeid, offset, size + ); + + let data = match self.rt.block_on(self.do_read(&path, offset)) { + Ok(data) => data, + Err(_) => return Filesystem::reply_error(in_header.unique, w), + }; + let len = data.len(); + let buffer = BufferWrapper::new(data); + + let mut data_writer = w.split_at(size_of::()).unwrap(); + data_writer.write_from_at(&buffer, len).map_err(|e| { + new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into())) + })?; + + let out = OutHeader { + len: (size_of::() + len) as u32, + error: 0, + unique: in_header.unique, + }; + w.write_all(out.as_slice()).map_err(|e| { + new_vhost_user_fs_error("failed to encode protocol messages", Some(e.into())) + })?; + Ok(out.len as usize) + } + + fn write(&self, in_header: InHeader, mut r: Reader, w: Writer) -> Result { + debug!("write: inode={}", in_header.nodeid); let path = match self .opened_files @@ -489,10 +574,26 @@ impl Filesystem { None => return Filesystem::reply_error(in_header.unique, w), }; - let mut opened_file_writer = self.opened_files_writer.lock().unwrap(); - opened_file_writer.remove(&path); + let WriteIn { offset, size, .. } = r.read_obj().map_err(|e| { + new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) + })?; - Filesystem::reply_ok(None::, None, in_header.unique, w) + let buffer = BufferWrapper::new(Buffer::new()); + r.read_to_at(&buffer, size as usize).map_err(|e| { + new_vhost_user_fs_error("failed to decode protocol messages", Some(e.into())) + })?; + let buffer = buffer.get_buffer(); + + match self.rt.block_on(self.do_write(&path, offset, buffer)) { + Ok(writer) => writer, + Err(_) => return Filesystem::reply_error(in_header.unique, w), + }; + + let out = WriteOut { + size, + ..Default::default() + }; + Filesystem::reply_ok(Some(out), None, in_header.unique, w) } } @@ -500,7 +601,7 @@ impl Filesystem { async fn do_get_metadata(&self, path: &str) -> Result { let metadata = self.core.stat(path).await.map_err(opendal_error2error)?; let mut attr = opendal_metadata2opened_file(path, &metadata, self.uid, self.gid); - + attr.metadata.size = metadata.content_length(); let mut opened_files_map = self.opened_files_map.lock().unwrap(); if let Some(inode) = opened_files_map.get(path) { attr.metadata.ino = *inode; @@ -516,8 +617,12 @@ impl Filesystem { Ok(attr) } - async fn do_get_writer(&self, path: &str, flags: u32) -> Result { - let (_, is_append) = self.check_write_flags(flags)?; + async fn do_set_writer(&self, path: &str, flags: u32) -> Result<()> { + let (is_write, is_append) = self.check_flags(flags)?; + if !is_write { + return Ok(()); + } + let writer = self .core .writer_with(path) @@ -534,10 +639,61 @@ impl Filesystem { 0 }; - Ok(InnerWriter { writer, written }) + let inner_writer = InnerWriter { writer, written }; + let mut opened_file_writer = self.opened_files_writer.lock().await; + opened_file_writer.insert(path.to_string(), inner_writer); + + Ok(()) + } + + async fn do_release_writer(&self, path: &str) -> Result<()> { + let mut opened_file_writer = self.opened_files_writer.lock().await; + let inner_writer = opened_file_writer + .get_mut(path) + .ok_or(Error::from(libc::EINVAL))?; + inner_writer + .writer + .close() + .await + .map_err(opendal_error2error)?; + opened_file_writer.remove(path); + + Ok(()) } async fn do_delete(&self, path: &str) -> Result<()> { - self.core.delete(path).await.map_err(opendal_error2error) + self.core.delete(path).await.map_err(opendal_error2error)?; + + Ok(()) + } + + async fn do_read(&self, path: &str, offset: u64) -> Result { + let data = self + .core + .read_with(path) + .range(offset..) + .await + .map_err(opendal_error2error)?; + + Ok(data) + } + + async fn do_write(&self, path: &str, offset: u64, data: Buffer) -> Result { + let len = data.len(); + let mut opened_file_writer = self.opened_files_writer.lock().await; + let inner_writer = opened_file_writer + .get_mut(path) + .ok_or(Error::from(libc::EINVAL))?; + if offset != inner_writer.written { + return Err(Error::from(libc::EINVAL)); + } + inner_writer + .writer + .write_from(data) + .await + .map_err(opendal_error2error)?; + inner_writer.written += len as u64; + + Ok(len) } } diff --git a/integrations/virtiofs/src/filesystem_message.rs b/integrations/virtiofs/src/filesystem_message.rs index 1c7d9bb3f402..50562aa7014d 100644 --- a/integrations/virtiofs/src/filesystem_message.rs +++ b/integrations/virtiofs/src/filesystem_message.rs @@ -24,11 +24,15 @@ use crate::error::*; #[non_exhaustive] pub enum Opcode { Lookup = 1, + Forget = 2, Getattr = 3, Setattr = 4, Unlink = 10, Open = 14, + Read = 15, + Write = 16, Release = 18, + Flush = 25, Init = 26, Create = 35, Destroy = 38, @@ -40,11 +44,15 @@ impl TryFrom for Opcode { fn try_from(value: u32) -> Result { match value { 1 => Ok(Opcode::Lookup), + 2 => Ok(Opcode::Forget), 3 => Ok(Opcode::Getattr), 4 => Ok(Opcode::Setattr), 10 => Ok(Opcode::Unlink), 14 => Ok(Opcode::Open), + 15 => Ok(Opcode::Read), + 16 => Ok(Opcode::Write), 18 => Ok(Opcode::Release), + 25 => Ok(Opcode::Flush), 26 => Ok(Opcode::Init), 35 => Ok(Opcode::Create), 38 => Ok(Opcode::Destroy), @@ -217,6 +225,52 @@ pub struct OpenOut { pub padding: u32, } +/// ReadIn is used to parse the parameters passed in the Read filesystem call. +/// +/// The fields of the struct need to conform to the specific format of the virtiofs message. +/// Currently, we only need to align them exactly with virtiofsd. +/// Reference: https://gitlab.com/virtio-fs/virtiofsd/-/blob/main/src/fuse.rs?ref_type=heads#920 +#[repr(C)] +#[derive(Debug, Default, Clone, Copy)] +pub struct ReadIn { + pub fh: u64, + pub offset: u64, + pub size: u32, + pub read_flags: u32, + pub lock_owner: u64, + pub flags: u32, + pub padding: u32, +} + +/// WriteIn is used to parse the parameters passed in the Write filesystem call. +/// +/// The fields of the struct need to conform to the specific format of the virtiofs message. +/// Currently, we only need to align them exactly with virtiofsd. +/// Reference: https://gitlab.com/virtio-fs/virtiofsd/-/blob/main/src/fuse.rs?ref_type=heads#933 +#[repr(C)] +#[derive(Debug, Default, Clone, Copy)] +pub struct WriteIn { + pub fh: u64, + pub offset: u64, + pub size: u32, + pub write_flags: u32, + pub lock_owner: u64, + pub flags: u32, + pub padding: u32, +} + +/// WriteOut is used to return the number of bytes written in the Write filesystem call. +/// +/// The fields of the struct need to conform to the specific format of the virtiofs message. +/// Currently, we only need to align them exactly with virtiofsd. +/// Reference: https://gitlab.com/virtio-fs/virtiofsd/-/blob/main/src/fuse.rs?ref_type=heads#L946 +#[repr(C)] +#[derive(Debug, Default, Clone, Copy)] +pub struct WriteOut { + pub size: u32, + pub padding: u32, +} + /// We will use ByteValued to implement the encoding and decoding /// of these structures in shared memory. unsafe impl ByteValued for Attr {} @@ -229,3 +283,6 @@ unsafe impl ByteValued for AttrOut {} unsafe impl ByteValued for CreateIn {} unsafe impl ByteValued for OpenIn {} unsafe impl ByteValued for OpenOut {} +unsafe impl ByteValued for ReadIn {} +unsafe impl ByteValued for WriteIn {} +unsafe impl ByteValued for WriteOut {} diff --git a/integrations/virtiofs/src/lib.rs b/integrations/virtiofs/src/lib.rs index 4abea18c87b5..b2b00907a094 100644 --- a/integrations/virtiofs/src/lib.rs +++ b/integrations/virtiofs/src/lib.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +mod buffer; mod error; mod filesystem; mod filesystem_message; diff --git a/integrations/virtiofs/src/virtiofs_util.rs b/integrations/virtiofs/src/virtiofs_util.rs index ea531c69078d..cf0ab041a5eb 100644 --- a/integrations/virtiofs/src/virtiofs_util.rs +++ b/integrations/virtiofs/src/virtiofs_util.rs @@ -36,6 +36,7 @@ use vm_memory::GuestMemoryRegion; use vm_memory::VolatileMemory; use vm_memory::VolatileSlice; +use crate::buffer::ReadWriteAtVolatile; use crate::error::*; /// Used to consume and use data areas in shared memory between host and VMs. @@ -94,6 +95,48 @@ impl<'a, B: BitmapSlice> DescriptorChainConsumer<'a, B> { self.bytes_consumed = total_bytes_consumed; Ok(bytes_consumed) } + + fn split_at(&mut self, offset: usize) -> Result> { + let mut remain = offset; + let pos = self.buffers.iter().position(|vs| { + if remain < vs.len() { + true + } else { + remain -= vs.len(); + false + } + }); + if let Some(at) = pos { + let mut other = self.buffers.split_off(at); + if remain > 0 { + let front = other.pop_front().expect("empty VecDeque after split"); + self.buffers.push_back( + front + .subslice(0, remain) + .map_err(|_| new_vhost_user_fs_error("volatile memory error", None))?, + ); + other.push_front( + front + .offset(remain) + .map_err(|_| new_vhost_user_fs_error("volatile memory error", None))?, + ); + } + Ok(DescriptorChainConsumer { + buffers: other, + bytes_consumed: 0, + }) + } else if remain == 0 { + Ok(DescriptorChainConsumer { + buffers: VecDeque::new(), + bytes_consumed: 0, + }) + } else { + Err(new_vhost_user_fs_error( + "DescriptorChain split is out of bounds", + None, + )) + } + } } /// Provides a high-level interface for reading data in shared memory sequences. @@ -152,6 +195,16 @@ impl<'a, B: Bitmap + BitmapSlice + 'static> Reader<'a, B> { Ok(unsafe { obj.assume_init() }) } + pub fn read_to_at>( + &mut self, + dst: F, + count: usize, + ) -> io::Result { + self.buffer + .consume(count, |bufs| dst.write_vectored_at_volatile(bufs)) + .map_err(|err| err.into()) + } + #[cfg(test)] pub fn available_bytes(&self) -> usize { self.buffer.available_bytes() @@ -231,6 +284,20 @@ impl<'a, B: Bitmap + BitmapSlice + 'static> Writer<'a, B> { }) } + pub fn split_at(&mut self, offset: usize) -> Result> { + self.buffer.split_at(offset).map(|buffer| Writer { buffer }) + } + + pub fn write_from_at>( + &mut self, + src: F, + count: usize, + ) -> io::Result { + self.buffer + .consume(count, |bufs| src.read_vectored_at_volatile(bufs)) + .map_err(|err| err.into()) + } + #[cfg(test)] pub fn available_bytes(&self) -> usize { self.buffer.available_bytes()