diff --git a/src/driver/accept.rs b/src/driver/accept.rs index 0b5f73b9..0dc2d505 100644 --- a/src/driver/accept.rs +++ b/src/driver/accept.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::driver::{Op, SharedFd, Socket}; use std::net::SocketAddr; use std::{boxed::Box, io}; @@ -37,8 +37,8 @@ impl Op { impl Completable for Accept { type Output = io::Result<(Socket, Option)>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - let fd = result?; + fn complete(self, cqe: op::CqeResult) -> Self::Output { + let fd = cqe.result?; let fd = SharedFd::new(fd as i32); let socket = Socket { fd }; let (_, addr) = unsafe { diff --git a/src/driver/close.rs b/src/driver/close.rs index 6219e6e0..006cdc39 100644 --- a/src/driver/close.rs +++ b/src/driver/close.rs @@ -1,6 +1,6 @@ use crate::driver::Op; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::io; use std::os::unix::io::RawFd; @@ -21,8 +21,8 @@ impl Op { impl Completable for Close { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - let _ = result?; + fn complete(self, cqe: op::CqeResult) -> Self::Output { + let _ = cqe.result?; Ok(()) } diff --git a/src/driver/connect.rs b/src/driver/connect.rs index c3168fc1..07270c09 100644 --- a/src/driver/connect.rs +++ b/src/driver/connect.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::driver::{Op, SharedFd}; use socket2::SockAddr; use std::io; @@ -36,7 +36,7 @@ impl Op { impl Completable for Connect { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - result.map(|_| ()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } diff --git a/src/driver/fsync.rs b/src/driver/fsync.rs index 1cd274f5..4e0faf80 100644 --- a/src/driver/fsync.rs +++ b/src/driver/fsync.rs @@ -2,7 +2,7 @@ use crate::driver::{Op, SharedFd}; use std::io; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use io_uring::{opcode, types}; pub(crate) struct Fsync { @@ -28,7 +28,7 @@ impl Op { impl Completable for Fsync { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - result.map(|_| ()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } diff --git a/src/driver/mod.rs b/src/driver/mod.rs index f09c0d4f..78d121e7 100644 --- a/src/driver/mod.rs +++ b/src/driver/mod.rs @@ -39,7 +39,7 @@ mod write; mod writev; -use io_uring::{cqueue, IoUring}; +use io_uring::IoUring; use scoped_tls::scoped_thread_local; use slab::Slab; use std::cell::RefCell; @@ -61,9 +61,11 @@ pub(crate) struct Inner { pub(crate) uring: IoUring, } -// When dropping the driver, all in-flight operations must have completed. This -// type wraps the slab and ensures that, on drop, the slab is empty. -struct Ops(Slab); +struct Ops { + // When dropping the driver, all in-flight operations must have completed. This + // type wraps the slab and ensures that, on drop, the slab is empty. + lifecycle: Slab, +} scoped_thread_local!(pub(crate) static CURRENT: Rc>); @@ -98,7 +100,7 @@ impl Driver { fn num_operations(&self) -> usize { let inner = self.inner.borrow(); - inner.ops.0.len() + inner.ops.lifecycle.len() } } @@ -117,7 +119,7 @@ impl Inner { let index = cqe.user_data() as _; - self.ops.complete(index, resultify(&cqe), cqe.flags()); + self.ops.complete(index, cqe.into()); } } @@ -158,42 +160,34 @@ impl Drop for Driver { impl Ops { fn new() -> Ops { - Ops(Slab::with_capacity(64)) + Ops { + lifecycle: Slab::with_capacity(64), + } } fn get_mut(&mut self, index: usize) -> Option<&mut op::Lifecycle> { - self.0.get_mut(index) + self.lifecycle.get_mut(index) } // Insert a new operation fn insert(&mut self) -> usize { - self.0.insert(op::Lifecycle::Submitted) + self.lifecycle.insert(op::Lifecycle::Submitted) } // Remove an operation fn remove(&mut self, index: usize) { - self.0.remove(index); + self.lifecycle.remove(index); } - fn complete(&mut self, index: usize, result: io::Result, flags: u32) { - if self.0[index].complete(result, flags) { - self.0.remove(index); + fn complete(&mut self, index: usize, cqe: op::CqeResult) { + if self.lifecycle[index].complete(cqe) { + self.lifecycle.remove(index); } } } impl Drop for Ops { fn drop(&mut self) { - assert!(self.0.is_empty()); - } -} - -fn resultify(cqe: &cqueue::Entry) -> io::Result { - let res = cqe.result(); - - if res >= 0 { - Ok(res as u32) - } else { - Err(io::Error::from_raw_os_error(-res)) + assert!(self.lifecycle.is_empty()); } } diff --git a/src/driver/noop.rs b/src/driver/noop.rs index 35f45c8e..0d1cdb98 100644 --- a/src/driver/noop.rs +++ b/src/driver/noop.rs @@ -1,4 +1,7 @@ -use crate::driver::{op::Completable, Op}; +use crate::driver::{ + op::{self, Completable}, + Op, +}; use std::io; /// No operation. Just posts a completion event, nothing else. @@ -17,8 +20,8 @@ impl Op { impl Completable for NoOp { type Output = io::Result<()>; - fn complete(self, _result: io::Result, _flags: u32) -> Self::Output { - Ok(()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } diff --git a/src/driver/op.rs b/src/driver/op.rs index 30cbb4cf..eac3d5ca 100644 --- a/src/driver/op.rs +++ b/src/driver/op.rs @@ -5,7 +5,7 @@ use std::pin::Pin; use std::rc::Rc; use std::task::{Context, Poll, Waker}; -use io_uring::squeue; +use io_uring::{cqueue, squeue}; use crate::driver; @@ -23,7 +23,7 @@ pub(crate) struct Op { pub(crate) trait Completable { type Output; - fn complete(self, result: io::Result, flags: u32) -> Self::Output; + fn complete(self, cqe: CqeResult) -> Self::Output; } pub(crate) enum Lifecycle { @@ -37,8 +37,28 @@ pub(crate) enum Lifecycle { /// must be passed to the driver and held until the operation completes. Ignored(Box), - /// The operation has completed. - Completed(io::Result, u32), + /// The operation has completed with a single cqe result + Completed(CqeResult), +} + +/// A single CQE entry +pub(crate) struct CqeResult { + pub(crate) result: io::Result, + #[allow(dead_code)] + pub(crate) flags: u32, +} + +impl From for CqeResult { + fn from(cqe: cqueue::Entry) -> Self { + let res = cqe.result(); + let flags = cqe.flags(); + let result = if res >= 0 { + Ok(res as u32) + } else { + Err(io::Error::from_raw_os_error(-res)) + }; + CqeResult { result, flags } + } } impl Op @@ -46,7 +66,7 @@ where T: Completable, { /// Create a new operation - fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc>) -> Op { + fn new(data: T, inner: &mut driver::Inner, inner_rc: &Rc>) -> Self { Op { driver: inner_rc.clone(), index: inner.ops.insert(), @@ -58,7 +78,7 @@ where /// /// `state` is stored during the operation tracking any state submitted to /// the kernel. - pub(super) fn submit_with(data: T, f: F) -> io::Result> + pub(super) fn submit_with(data: T, f: F) -> io::Result where F: FnOnce(&mut T) -> squeue::Entry, { @@ -83,7 +103,7 @@ where } /// Try submitting an operation to uring - pub(super) fn try_submit_with(data: T, f: F) -> io::Result> + pub(super) fn try_submit_with(data: T, f: F) -> io::Result where F: FnOnce(&mut T) -> squeue::Entry, { @@ -122,11 +142,10 @@ where Poll::Pending } Lifecycle::Ignored(..) => unreachable!(), - Lifecycle::Completed(result, flags) => { + Lifecycle::Completed(cqe) => { inner.ops.remove(me.index); me.index = usize::MAX; - - Poll::Ready(me.data.take().unwrap().complete(result, flags)) + Poll::Ready(me.data.take().unwrap().complete(cqe)) } } } @@ -153,16 +172,16 @@ impl Drop for Op { } impl Lifecycle { - pub(super) fn complete(&mut self, result: io::Result, flags: u32) -> bool { + pub(super) fn complete(&mut self, cqe: CqeResult) -> bool { use std::mem; match mem::replace(self, Lifecycle::Submitted) { Lifecycle::Submitted => { - *self = Lifecycle::Completed(result, flags); + *self = Lifecycle::Completed(cqe); false } Lifecycle::Waiting(waker) => { - *self = Lifecycle::Completed(result, flags); + *self = Lifecycle::Completed(cqe); waker.wake(); false } @@ -190,10 +209,10 @@ mod test { impl Completable for Rc<()> { type Output = Completion; - fn complete(self, result: io::Result, flags: u32) -> Self::Output { + fn complete(self, cqe: CqeResult) -> Self::Output { Completion { - result, - flags, + result: cqe.result, + flags: cqe.flags, data: self.clone(), } } @@ -304,7 +323,11 @@ mod test { assert_eq!(2, Rc::strong_count(&data)); assert_eq!(1, driver.num_operations()); - driver.inner.borrow_mut().ops.complete(index, Ok(1), 0); + let cqe = CqeResult { + result: Ok(1), + flags: 0, + }; + driver.inner.borrow_mut().ops.complete(index, cqe); assert_eq!(1, Rc::strong_count(&data)); assert_eq!(0, driver.num_operations()); release(driver); @@ -326,11 +349,12 @@ mod test { } fn complete(op: &Op>, result: io::Result) { - op.driver.borrow_mut().ops.complete(op.index, result, 0); + let cqe = CqeResult { result, flags: 0 }; + op.driver.borrow_mut().ops.complete(op.index, cqe); } fn release(driver: crate::driver::Driver) { // Clear ops, we aren't really doing any I/O - driver.inner.borrow_mut().ops.0.clear(); + driver.inner.borrow_mut().ops.lifecycle.clear(); } } diff --git a/src/driver/open.rs b/src/driver/open.rs index 83753c75..0d4cc8db 100644 --- a/src/driver/open.rs +++ b/src/driver/open.rs @@ -1,7 +1,7 @@ use crate::driver::{self, Op, SharedFd}; use crate::fs::{File, OpenOptions}; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::ffi::CString; use std::io; use std::path::Path; @@ -40,7 +40,7 @@ impl Op { impl Completable for Open { type Output = io::Result; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - Ok(File::from_shared_fd(SharedFd::new(result? as _))) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + Ok(File::from_shared_fd(SharedFd::new(cqe.result? as _))) } } diff --git a/src/driver/read.rs b/src/driver/read.rs index ecdc3e4b..0d2e7887 100644 --- a/src/driver/read.rs +++ b/src/driver/read.rs @@ -2,7 +2,7 @@ use crate::buf::IoBufMut; use crate::driver::{Op, SharedFd}; use crate::BufResult; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::io; pub(crate) struct Read { @@ -42,9 +42,9 @@ where { type Output = BufResult; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let mut buf = self.buf; diff --git a/src/driver/readv.rs b/src/driver/readv.rs index 118b5936..3921724c 100644 --- a/src/driver/readv.rs +++ b/src/driver/readv.rs @@ -2,7 +2,7 @@ use crate::buf::IoBufMut; use crate::driver::{Op, SharedFd}; use crate::BufResult; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use libc::iovec; use std::io; @@ -61,9 +61,9 @@ where { type Output = BufResult>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let mut bufs = self.bufs; diff --git a/src/driver/recv_from.rs b/src/driver/recv_from.rs index 2fd81b48..5f2aee02 100644 --- a/src/driver/recv_from.rs +++ b/src/driver/recv_from.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::{ buf::IoBufMut, driver::{Op, SharedFd}, @@ -60,9 +60,9 @@ where { type Output = BufResult<(usize, SocketAddr), T>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let mut buf = self.buf; diff --git a/src/driver/rename_at.rs b/src/driver/rename_at.rs index 7af8acb5..c7ddfc8e 100644 --- a/src/driver/rename_at.rs +++ b/src/driver/rename_at.rs @@ -1,6 +1,6 @@ use crate::driver::{self, Op}; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::ffi::CString; use std::io; use std::path::Path; @@ -44,7 +44,7 @@ impl Op { impl Completable for RenameAt { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - result.map(|_| ()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } diff --git a/src/driver/send_to.rs b/src/driver/send_to.rs index 82dfe78c..3659942f 100644 --- a/src/driver/send_to.rs +++ b/src/driver/send_to.rs @@ -1,5 +1,5 @@ use crate::buf::IoBuf; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::driver::{Op, SharedFd}; use crate::BufResult; use socket2::SockAddr; @@ -62,9 +62,9 @@ where { type Output = BufResult; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let buf = self.buf; diff --git a/src/driver/unlink_at.rs b/src/driver/unlink_at.rs index 59b6f297..fbebfb67 100644 --- a/src/driver/unlink_at.rs +++ b/src/driver/unlink_at.rs @@ -1,6 +1,6 @@ use crate::driver::{self, Op}; -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use std::ffi::CString; use std::io; use std::path::Path; @@ -42,7 +42,7 @@ impl Op { impl Completable for Unlink { type Output = io::Result<()>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { - result.map(|_| ()) + fn complete(self, cqe: op::CqeResult) -> Self::Output { + cqe.result.map(|_| ()) } } diff --git a/src/driver/write.rs b/src/driver/write.rs index 38b08a05..6a5b708a 100644 --- a/src/driver/write.rs +++ b/src/driver/write.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::{ buf::IoBuf, driver::{Op, SharedFd}, @@ -43,9 +43,9 @@ where { type Output = BufResult; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let buf = self.buf; diff --git a/src/driver/writev.rs b/src/driver/writev.rs index c9018093..24954a02 100644 --- a/src/driver/writev.rs +++ b/src/driver/writev.rs @@ -1,4 +1,4 @@ -use crate::driver::op::Completable; +use crate::driver::op::{self, Completable}; use crate::{ buf::IoBuf, driver::{Op, SharedFd}, @@ -61,9 +61,9 @@ where { type Output = BufResult>; - fn complete(self, result: io::Result, _flags: u32) -> Self::Output { + fn complete(self, cqe: op::CqeResult) -> Self::Output { // Convert the operation result to `usize` - let res = result.map(|v| v as usize); + let res = cqe.result.map(|v| v as usize); // Recover the buffer let buf = self.bufs;