diff --git a/io-uring-test/src/main.rs b/io-uring-test/src/main.rs index 866a80e..633e4a0 100644 --- a/io-uring-test/src/main.rs +++ b/io-uring-test/src/main.rs @@ -103,6 +103,8 @@ fn test( #[cfg(not(feature = "ci"))] tests::fs::test_statx(&mut ring, &test)?; tests::fs::test_file_splice(&mut ring, &test)?; + tests::fs::test_ftruncate(&mut ring, &test)?; + tests::fs::test_fixed_fd_install(&mut ring, &test)?; // timeout tests::timeout::test_timeout(&mut ring, &test)?; @@ -117,6 +119,7 @@ fn test( tests::net::test_tcp_write_read(&mut ring, &test)?; tests::net::test_tcp_writev_readv(&mut ring, &test)?; tests::net::test_tcp_send_recv(&mut ring, &test)?; + tests::net::test_tcp_send_bundle(&mut ring, &test)?; tests::net::test_tcp_zero_copy_send_recv(&mut ring, &test)?; tests::net::test_tcp_zero_copy_send_fixed(&mut ring, &test)?; tests::net::test_tcp_sendmsg_recvmsg(&mut ring, &test)?; @@ -130,6 +133,9 @@ fn test( tests::net::test_tcp_buffer_select_recvmsg(&mut ring, &test)?; tests::net::test_tcp_buffer_select_readv(&mut ring, &test)?; tests::net::test_tcp_recv_multi(&mut ring, &test)?; + tests::net::test_tcp_recv_bundle(&mut ring, &test)?; + tests::net::test_tcp_recv_multi_bundle(&mut ring, &test)?; + tests::net::test_tcp_shutdown(&mut ring, &test)?; tests::net::test_socket(&mut ring, &test)?; tests::net::test_udp_recvmsg_multishot(&mut ring, &test)?; diff --git a/io-uring-test/src/tests/fs.rs b/io-uring-test/src/tests/fs.rs index da3d2a0..ad12901 100644 --- a/io-uring-test/src/tests/fs.rs +++ b/io-uring-test/src/tests/fs.rs @@ -3,7 +3,7 @@ use crate::Test; use io_uring::{cqueue, opcode, squeue, types, IoUring}; use std::ffi::CString; use std::fs; -use std::io::Write; +use std::io::{Read, Write}; use std::os::unix::ffi::OsStrExt; use std::os::unix::io::{AsRawFd, FromRawFd, IntoRawFd}; @@ -810,3 +810,130 @@ pub fn test_file_splice( Ok(()) } + +pub fn test_ftruncate( + ring: &mut IoUring, + test: &Test, +) -> anyhow::Result<()> { + require!( + test; + test.probe.is_supported(opcode::Ftruncate::CODE); + ); + + println!("test ftruncate"); + + let dir = tempfile::TempDir::new_in(".")?; + let dir = dir.path(); + let file = dir.join("io-uring-test-file-input"); + + let input = &[0x9f; 1024]; + + fs::write(&file, input)?; + let fd = fs::OpenOptions::new().write(true).open(&file)?; + let fd = types::Fd(fd.as_raw_fd()); + let ftruncate_e = opcode::Ftruncate::new(fd, 512); + + unsafe { + ring.submission() + .push(&ftruncate_e.build().user_data(0x33).into()) + .expect("queue is full"); + } + + ring.submit_and_wait(1)?; + + let cqes: Vec = ring.completion().map(Into::into).collect(); + + assert_eq!(cqes.len(), 1); + assert_eq!(cqes[0].user_data(), 0x33); + assert_eq!(cqes[0].result(), 0); + assert_eq!( + fs::read(&file).expect("could not read truncated file"), + &input[..512] + ); + + let ftruncate_e = opcode::Ftruncate::new(fd, 0); + + unsafe { + ring.submission() + .push(&ftruncate_e.build().user_data(0x34).into()) + .expect("queue is full"); + } + + ring.submit_and_wait(1)?; + + let cqes: Vec = ring.completion().map(Into::into).collect(); + + assert_eq!(cqes.len(), 1); + assert_eq!(cqes[0].user_data(), 0x34); + assert_eq!(cqes[0].result(), 0); + assert_eq!( + fs::metadata(&file) + .expect("could not read truncated file") + .len(), + 0 + ); + + Ok(()) +} + +pub fn test_fixed_fd_install( + ring: &mut IoUring, + test: &Test, +) -> anyhow::Result<()> { + require!( + test; + test.probe.is_supported(opcode::Read::CODE); + test.probe.is_supported(opcode::FixedFdInstall::CODE); + ); + + println!("test fixed_fd_install"); + + let dir = tempfile::TempDir::new_in(".")?; + let dir = dir.path(); + let file = dir.join("io-uring-test-file-input"); + + let input = &[0x9f; 1024]; + let mut output = vec![0; 1024]; + + fs::write(&file, input)?; + let fd = fs::OpenOptions::new().read(true).open(&file)?; + let fd = types::Fd(fd.as_raw_fd()); + ring.submitter().register_files(&[fd.0])?; + let fd = types::Fixed(0); + + let read_e = opcode::Read::new(fd, output.as_mut_ptr(), output.len() as _); + unsafe { + ring.submission() + .push(&read_e.build().user_data(0x01).into()) + .expect("queue is full"); + } + + assert_eq!(ring.submit_and_wait(1)?, 1); + let cqes: Vec = ring.completion().map(Into::into).collect(); + assert_eq!(cqes.len(), 1); + assert_eq!(cqes[0].user_data(), 0x01); + assert_eq!(cqes[0].result(), 1024); + assert_eq!(output, input); + + let fixed_fd_install_e = opcode::FixedFdInstall::new(fd, 0); + + unsafe { + ring.submission() + .push(&fixed_fd_install_e.build().user_data(0x02).into()) + .expect("queue is full"); + } + + ring.submit_and_wait(1)?; + + let cqes: Vec = ring.completion().map(Into::into).collect(); + + assert_eq!(cqes.len(), 1); + assert_eq!(cqes[0].user_data(), 0x02); + let fd = cqes[0].result(); + assert!(fd > 0); + let mut file = unsafe { fs::File::from_raw_fd(fd) }; + file.read_exact(&mut output)?; + assert_eq!(output, input); + + Ok(()) +} diff --git a/io-uring-test/src/tests/net.rs b/io-uring-test/src/tests/net.rs index 85e0976..7b7936b 100644 --- a/io-uring-test/src/tests/net.rs +++ b/io-uring-test/src/tests/net.rs @@ -1,10 +1,12 @@ +use crate::tests::register_buf_ring; use crate::utils; use crate::Test; use io_uring::squeue::Flags; -use io_uring::types::Fd; +use io_uring::types::{BufRingEntry, Fd}; use io_uring::{cqueue, opcode, squeue, types, IoUring}; use once_cell::sync::OnceCell; use std::convert::TryInto; +use std::io::{Read, Write}; use std::net::{Shutdown, TcpListener, TcpStream}; use std::os::fd::FromRawFd; use std::os::unix::io::AsRawFd; @@ -117,6 +119,74 @@ pub fn test_tcp_send_recv( Ok(()) } +pub fn test_tcp_send_bundle( + ring: &mut IoUring, + test: &Test, +) -> anyhow::Result<()> { + require!( + test; + test.probe.is_supported(opcode::SendBundle::CODE); + ring.params().is_feature_recvsend_bundle(); // requires 6.10 + ); + + println!("test tcp_send_bundle"); + + let (send_stream, mut recv_stream) = tcp_pair()?; + + let send_fd = types::Fd(send_stream.as_raw_fd()); + + let text = b"The quick brown fox jumps over the lazy dog."; + let mut output = vec![0; text.len()]; + + let buf_ring = register_buf_ring::Builder::new(0xdead) + .ring_entries(2) + .buf_cnt(2) + .buf_len(22) + .build()?; + buf_ring.rc.register(ring)?; + let ptr1 = buf_ring.rc.ring_start.as_ptr_mut() as *mut BufRingEntry; + unsafe { + let ptr2 = ptr1.add(1); + std::ptr::copy_nonoverlapping(text.as_ptr(), ptr1.as_mut().unwrap().addr() as *mut u8, 22); + std::ptr::copy_nonoverlapping( + text[22..].as_ptr(), + ptr2.as_mut().unwrap().addr() as *mut u8, + 22, + ); + } + + let send_e = opcode::SendBundle::new(send_fd, 0xdead); + + unsafe { + let mut queue = ring.submission(); + let send_e = send_e + .build() + .user_data(0x01) + .flags(squeue::Flags::IO_LINK) + .into(); + queue.push(&send_e).expect("queue is full"); + } + + ring.submit_and_wait(1)?; + + let cqes: Vec = ring.completion().map(Into::into).collect(); + + assert_eq!(cqes.len(), 1); + assert_eq!(cqes[0].user_data(), 0x01); + assert_eq!(cqes[0].result(), text.len() as i32); + + assert_eq!( + recv_stream + .read(&mut output) + .expect("could not read stream"), + text.len() + ); + assert_eq!(&output, text); + buf_ring.rc.unregister(ring)?; + + Ok(()) +} + pub fn test_tcp_zero_copy_send_recv( ring: &mut IoUring, test: &Test, @@ -1156,6 +1226,212 @@ pub fn test_tcp_recv_multi( Ok(()) } +pub fn test_tcp_recv_bundle( + ring: &mut IoUring, + test: &Test, +) -> anyhow::Result<()> { + use std::io::Write; + + require!( + test; + test.probe.is_supported(opcode::RecvBundle::CODE); + ring.params().is_feature_recvsend_bundle(); // requires 6.10 + ); + + println!("test tcp_recv_bundle"); + + let (mut send_stream, recv_stream) = tcp_pair()?; + + let recv_fd = types::Fd(recv_stream.as_raw_fd()); + + // Send one package made of four segments, and receive as up to two buffer bundles + let mut input = vec![0x0d; 256]; + input.extend_from_slice(&[0x0e; 256]); + input.extend_from_slice(&[0x0a; 256]); + input.extend_from_slice(&[0x0d; 128]); + + // Prepare BufRing + let buf_ring = register_buf_ring::Builder::new(0xdeff) + .ring_entries(16) + .buf_cnt(32) + .buf_len(256) + .build()?; + buf_ring.rc.register(ring)?; + + send_stream.write_all(&input)?; + send_stream.shutdown(Shutdown::Write)?; + + let recv_e = opcode::RecvBundle::new(recv_fd, 0xdeff) + .build() + .user_data(0x30) + .into(); + + unsafe { + ring.submission().push(&recv_e).expect("queue is full"); + } + + ring.submit_and_wait(1)?; + + let mut cqe: cqueue::Entry = ring.completion().next().expect("cqueue is empty").into(); + + assert_eq!(cqe.user_data(), 0x30); + assert!(cqueue::buffer_select(cqe.flags()).is_some()); + let mut remaining = cqe.result() as usize; + let bufs = buf_ring + .rc + .get_bufs(&buf_ring, remaining as u32, cqe.flags()); + let mut section; + let mut input = input.as_slice(); + for buf in &bufs { + // In case of bundled recv first bundle may not be full + let to_check = std::cmp::min(256, remaining); + (section, input) = input.split_at(to_check); + assert_eq!(buf.as_slice(), section); + remaining -= to_check; + } + assert_eq!(remaining, 0); + + // Linux kernel 6.10 packs a single buffer into first recv and remaining buffers into second recv + // This behavior may change in the future + if !input.is_empty() { + assert!(cqueue::sock_nonempty(cqe.flags())); + + unsafe { + ring.submission().push(&recv_e).expect("queue is full"); + } + + ring.submit_and_wait(1)?; + + cqe = ring.completion().next().expect("cqueue is empty").into(); + + assert_eq!(cqe.user_data(), 0x30); + assert!(cqueue::buffer_select(cqe.flags()).is_some()); + remaining = cqe.result() as usize; + let second_bufs = buf_ring + .rc + .get_bufs(&buf_ring, remaining as u32, cqe.flags()); + for buf in &second_bufs { + let to_check = std::cmp::min(256, remaining); + (section, input) = input.split_at(to_check); + assert_eq!(buf.as_slice(), section); + remaining -= to_check; + } + assert_eq!(remaining, 0); + } + assert!(input.is_empty()); + + buf_ring.rc.unregister(ring)?; + + Ok(()) +} + +pub fn test_tcp_recv_multi_bundle( + ring: &mut IoUring, + test: &Test, +) -> anyhow::Result<()> { + require!( + test; + test.probe.is_supported(opcode::RecvMultiBundle::CODE); + ring.params().is_feature_recvsend_bundle(); // requires 6.10 + ); + + println!("test tcp_recv_multi_bundle"); + + let (mut send_stream, recv_stream) = tcp_pair()?; + + let recv_fd = types::Fd(recv_stream.as_raw_fd()); + + // Send one package made of four segments, and receive as up to two buffer bundles + let mut input = vec![0x0d; 256]; + input.extend_from_slice(&[0x0e; 256]); + input.extend_from_slice(&[0x0a; 256]); + input.extend_from_slice(&[0x0d; 128]); + + // Prepare BufRing + let buf_ring = register_buf_ring::Builder::new(0xdebf) + .ring_entries(2) + .buf_cnt(5) + .buf_len(256) + .build()?; + buf_ring.rc.register(ring)?; + + send_stream.write_all(&input)?; + send_stream.shutdown(Shutdown::Write)?; + + let recv_e = opcode::RecvMultiBundle::new(recv_fd, 0xdebf) + .build() + .user_data(0x31) + .into(); + + unsafe { + ring.submission().push(&recv_e).expect("queue is full"); + } + + ring.submit_and_wait(1)?; + + let mut cqe: cqueue::Entry = ring.completion().next().expect("cqueue is empty").into(); + + assert_eq!(cqe.user_data(), 0x31); + assert!(cqueue::buffer_select(cqe.flags()).is_some()); + let mut remaining = cqe.result() as usize; + let bufs = buf_ring + .rc + .get_bufs(&buf_ring, remaining as u32, cqe.flags()); + let mut section; + let mut input = input.as_slice(); + for buf in &bufs { + // In case of bundled recv first bundle may not be full + let to_check = std::cmp::min(256, remaining); + (section, input) = input.split_at(to_check); + assert_eq!(buf.as_slice(), section); + remaining -= to_check; + } + assert_eq!(remaining, 0); + + let mut used_bufs = bufs.len(); + + // Linux kernel 6.10 packs a single buffer into first recv and remaining buffers into second recv + // This behavior may change in the future + if !input.is_empty() { + assert!(cqueue::more(cqe.flags())); + + ring.submit_and_wait(1)?; + + cqe = ring.completion().next().expect("cqueue is empty").into(); + + assert_eq!(cqe.user_data(), 0x31); + assert!(cqueue::buffer_select(cqe.flags()).is_some()); + remaining = cqe.result() as usize; + let second_bufs = buf_ring + .rc + .get_bufs(&buf_ring, remaining as u32, cqe.flags()); + for buf in &second_bufs { + let to_check = std::cmp::min(256, remaining); + (section, input) = input.split_at(to_check); + assert_eq!(buf.as_slice(), section); + remaining -= to_check; + } + assert_eq!(remaining, 0); + used_bufs += second_bufs.len(); + } + assert!(input.is_empty()); + + if cqueue::more(cqe.flags()) { + ring.submit_and_wait(1)?; + cqe = ring.completion().next().expect("cqueue is empty").into(); + assert_eq!(cqe.user_data(), 0x31); + assert!(!cqueue::more(cqe.flags())); + if used_bufs < 5 { + assert_eq!(cqe.result(), 0); // Buffer space is avaialble + } else { + assert_eq!(cqe.result(), -105); // No buffer space available + } + } + buf_ring.rc.unregister(ring)?; + + Ok(()) +} + pub fn test_tcp_shutdown( ring: &mut IoUring, test: &Test, diff --git a/io-uring-test/src/tests/register_buf_ring.rs b/io-uring-test/src/tests/register_buf_ring.rs index a3eb112..9c91ebe 100644 --- a/io-uring-test/src/tests/register_buf_ring.rs +++ b/io-uring-test/src/tests/register_buf_ring.rs @@ -89,7 +89,7 @@ impl Drop for AnonymousMmap { } } -struct InnerBufRing { +pub(crate) struct InnerBufRing { // All these fields are constant once the struct is instantiated except the one of type Cell. bgid: Bgid, @@ -100,7 +100,7 @@ struct InnerBufRing { // `ring_start` holds the memory allocated for the buf_ring, the ring of entries describing // the buffers being made available to the uring interface for this buf group id. - ring_start: AnonymousMmap, + pub(crate) ring_start: AnonymousMmap, buf_list: Vec>, @@ -178,7 +178,7 @@ impl InnerBufRing { // Normally this is done automatically when building a BufRing. // // Warning: requires the CURRENT driver is already in place or will panic. - fn register(&self, ring: &mut IoUring) -> io::Result<()> + pub(crate) fn register(&self, ring: &mut IoUring) -> io::Result<()> where S: squeue::EntryMarker, C: cqueue::EntryMarker, @@ -239,7 +239,7 @@ impl InnerBufRing { // Unregister the buffer ring from the io_uring. // Normally this is done automatically when the BufRing goes out of scope. - fn unregister(&self, ring: &mut IoUring) -> io::Result<()> + pub(crate) fn unregister(&self, ring: &mut IoUring) -> io::Result<()> where S: squeue::EntryMarker, C: cqueue::EntryMarker, @@ -270,6 +270,23 @@ impl InnerBufRing { Ok(GBuf::new(buf_ring, bid, len)) } + // Returns vector of buffers for completion results that can return a bundle + pub(crate) fn get_bufs(&self, buf_ring: &FixedSizeBufRing, res: u32, flags: u32) -> Vec { + let mut bid = io_uring::cqueue::buffer_select(flags).unwrap(); + let mut len = res as usize; + let mut output = Vec::with_capacity(len / self.buf_len); + while len > 0 { + output.push(GBuf::new( + buf_ring.clone(), + bid, + std::cmp::min(len, self.buf_len), + )); + len = len.saturating_sub(self.buf_len); + bid += 1; + } + output + } + // Safety: dropping a duplicate bid is likely to cause undefined behavior // as the kernel could use the same buffer for different data concurrently. unsafe fn dropping_bid(&self, bid: Bid) { @@ -329,10 +346,10 @@ impl InnerBufRing { } #[derive(Clone)] -struct FixedSizeBufRing { +pub(crate) struct FixedSizeBufRing { // The BufRing is reference counted because each buffer handed out has a reference back to its // buffer group, or in this case, to its buffer ring. - rc: Rc, + pub(crate) rc: Rc, } impl FixedSizeBufRing { @@ -345,7 +362,7 @@ impl FixedSizeBufRing { // The Builder API for a FixedSizeBufRing. #[derive(Copy, Clone)] -struct Builder { +pub(crate) struct Builder { bgid: Bgid, ring_entries: u16, buf_cnt: u16, @@ -360,7 +377,7 @@ impl Builder { // // The caller is responsible for picking a bgid that does not conflict with other buffer // groups that have been registered with the same uring interface. - fn new(bgid: Bgid) -> Builder { + pub(crate) fn new(bgid: Bgid) -> Builder { Builder { bgid, ring_entries: 128, @@ -373,25 +390,25 @@ impl Builder { // // The number will be made a power of 2, and will be the maximum of the ring_entries setting // and the buf_cnt setting. The interface will enforce a maximum of 2^15 (32768). - fn ring_entries(mut self, ring_entries: u16) -> Builder { + pub(crate) fn ring_entries(mut self, ring_entries: u16) -> Builder { self.ring_entries = ring_entries; self } // The number of buffers to allocate. If left zero, the ring_entries value will be used. - fn buf_cnt(mut self, buf_cnt: u16) -> Builder { + pub(crate) fn buf_cnt(mut self, buf_cnt: u16) -> Builder { self.buf_cnt = buf_cnt; self } // The length to be preallocated for each buffer. - fn buf_len(mut self, buf_len: usize) -> Builder { + pub(crate) fn buf_len(mut self, buf_len: usize) -> Builder { self.buf_len = buf_len; self } // Return a FixedSizeBufRing. - fn build(&self) -> io::Result { + pub(crate) fn build(&self) -> io::Result { let mut b: Builder = *self; // Two cases where both buf_cnt and ring_entries are set to the max of the two. @@ -423,7 +440,7 @@ impl Builder { // This tracks a buffer that has been filled in by the kernel, having gotten the memory // from a buffer ring, and returned to userland via a cqe entry. -struct GBuf { +pub(crate) struct GBuf { bufgroup: FixedSizeBufRing, len: usize, bid: Bid, @@ -472,7 +489,7 @@ impl GBuf { } // Return a byte slice reference. - fn as_slice(&self) -> &[u8] { + pub(crate) fn as_slice(&self) -> &[u8] { let p = self.bufgroup.rc.stable_ptr(self.bid); unsafe { std::slice::from_raw_parts(p, self.len) } } diff --git a/src/lib.rs b/src/lib.rs index 19b6fc4..61031bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -590,6 +590,14 @@ impl Parameters { self.0.features & sys::IORING_FEAT_LINKED_FILE != 0 } + /// Whether the kernel supports `IORING_RECVSEND_BUNDLE`. + /// + /// This feature allows sending and recieving multiple buffers as a single bundle. Available + /// since kernel 6.10. + pub fn is_feature_recvsend_bundle(&self) -> bool { + self.0.features & sys::IORING_FEAT_RECVSEND_BUNDLE != 0 + } + /// The number of submission queue entries allocated. pub fn sq_entries(&self) -> u32 { self.0.sq_entries diff --git a/src/opcode.rs b/src/opcode.rs index e7a90d3..72ee846 100644 --- a/src/opcode.rs +++ b/src/opcode.rs @@ -523,7 +523,7 @@ opcode! { sqe.len = 1; sqe.__bindgen_anon_3.msg_flags = flags; sqe.__bindgen_anon_4.buf_group = buf_group; - sqe.flags |= 1 << sys::IOSQE_BUFFER_SELECT_BIT; + sqe.flags |= crate::squeue::Flags::BUFFER_SELECT.bits(); sqe.ioprio = ioprio | (sys::IORING_RECV_MULTISHOT as u16); Entry(sqe) } @@ -1090,7 +1090,7 @@ opcode! { assign_fd!(sqe.fd = fd); sqe.__bindgen_anon_3.msg_flags = flags as _; sqe.__bindgen_anon_4.buf_group = buf_group; - sqe.flags |= 1 << sys::IOSQE_BUFFER_SELECT_BIT; + sqe.flags |= crate::squeue::Flags::BUFFER_SELECT.bits(); sqe.ioprio = sys::IORING_RECV_MULTISHOT as _; Entry(sqe) } @@ -1847,3 +1847,159 @@ opcode! { Entry(sqe) } } + +// === 6.8 === + +opcode! { + /// Install a fixed file descriptor + /// + /// Turns a direct descriptor into a regular file descriptor that can be later used by regular + /// system calls that take a normal raw file descriptor + #[derive(Debug)] + pub struct FixedFdInstall { + fd: { types::Fixed }, + file_flags: { u32 }, + ;; + } + + pub const CODE = sys::IORING_OP_FIXED_FD_INSTALL; + + pub fn build(self) -> Entry { + let FixedFdInstall { fd, file_flags } = self; + + let mut sqe = sqe_zeroed(); + sqe.opcode = Self::CODE; + sqe.fd = fd.0 as _; + sqe.flags = crate::squeue::Flags::FIXED_FILE.bits(); + sqe.__bindgen_anon_3.install_fd_flags = file_flags; + Entry(sqe) + } +} + +// === 6.9 === + +opcode! { + /// Perform file truncation, equivalent to `ftruncate(2)`. + #[derive(Debug)] + pub struct Ftruncate { + fd: { impl sealed::UseFixed }, + len: { u64 }, + ;; + } + + pub const CODE = sys::IORING_OP_FTRUNCATE; + + pub fn build(self) -> Entry { + let Ftruncate { fd, len } = self; + + let mut sqe = sqe_zeroed(); + sqe.opcode = Self::CODE; + assign_fd!(sqe.fd = fd); + sqe.__bindgen_anon_1.off = len; + Entry(sqe) + } +} + +// === 6.10 === + +opcode! { + /// Send a bundle of messages on a socket in a single request. + pub struct SendBundle { + fd: { impl sealed::UseFixed }, + buf_group: { u16 }, + ;; + flags: i32 = 0, + len: u32 = 0 + } + + pub const CODE = sys::IORING_OP_SEND; + + pub fn build(self) -> Entry { + let SendBundle { fd, len, flags, buf_group } = self; + + let mut sqe = sqe_zeroed(); + sqe.opcode = Self::CODE; + assign_fd!(sqe.fd = fd); + sqe.len = len; + sqe.__bindgen_anon_3.msg_flags = flags as _; + sqe.ioprio |= sys::IORING_RECVSEND_BUNDLE as u16; + sqe.flags |= crate::squeue::Flags::BUFFER_SELECT.bits(); + sqe.__bindgen_anon_4.buf_group = buf_group; + Entry(sqe) + } +} + +opcode! { + /// Receive a bundle of buffers from a socket. + /// + /// Parameter + /// buf_group: The id of the provided buffer pool to use for the bundle. + /// + /// Note that as of kernel 6.10 first recv always gets a single buffer, while second + /// obtains the bundle of remaining buffers. This behavior may change in the future. + /// + /// Bundle variant is available since kernel 6.10 + pub struct RecvBundle { + fd: { impl sealed::UseFixed }, + buf_group: { u16 }, + ;; + flags: i32 = 0 + } + + pub const CODE = sys::IORING_OP_RECV; + + pub fn build(self) -> Entry { + let RecvBundle { fd, buf_group, flags } = self; + + let mut sqe = sqe_zeroed(); + sqe.opcode = Self::CODE; + assign_fd!(sqe.fd = fd); + sqe.__bindgen_anon_3.msg_flags = flags as _; + sqe.__bindgen_anon_4.buf_group = buf_group; + sqe.flags |= crate::squeue::Flags::BUFFER_SELECT.bits(); + sqe.ioprio |= sys::IORING_RECVSEND_BUNDLE as u16; + Entry(sqe) + } +} + +opcode! { + /// Receive multiple messages from a socket as a bundle. + /// + /// Parameter: + /// buf_group: The id of the provided buffer pool to use for each received message. + /// + /// MSG_WAITALL should not be set in flags. + /// + /// The multishot version allows the application to issue a single receive request, which + /// repeatedly posts a CQE when data is available. Each CQE will take a bundle of buffers + /// out of a provided buffer pool for receiving. The application should check the flags of each CQE, + /// regardless of its result. If a posted CQE does not have the IORING_CQE_F_MORE flag set then + /// the multishot receive will be done and the application should issue a new request. + /// + /// Note that as of kernel 6.10 first CQE always gets a single buffer, while second + /// obtains the bundle of remaining buffers. This behavior may change in the future. + /// + /// Multishot bundle variant is available since kernel 6.10. + pub struct RecvMultiBundle { + fd: { impl sealed::UseFixed }, + buf_group: { u16 }, + ;; + flags: i32 = 0 + } + + pub const CODE = sys::IORING_OP_RECV; + + pub fn build(self) -> Entry { + let RecvMultiBundle { fd, buf_group, flags } = self; + + let mut sqe = sqe_zeroed(); + sqe.opcode = Self::CODE; + assign_fd!(sqe.fd = fd); + sqe.__bindgen_anon_3.msg_flags = flags as _; + sqe.__bindgen_anon_4.buf_group = buf_group; + sqe.flags |= crate::squeue::Flags::BUFFER_SELECT.bits(); + sqe.ioprio = sys::IORING_RECV_MULTISHOT as _; + sqe.ioprio |= sys::IORING_RECVSEND_BUNDLE as u16; + Entry(sqe) + } +}