Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Help with timed events on a event loop structure #234

Open
fabracht opened this issue Apr 19, 2023 · 0 comments
Open

Help with timed events on a event loop structure #234

fabracht opened this issue Apr 19, 2023 · 0 comments

Comments

@fabracht
Copy link

Hello,
I'm migrating my event loop from using the mio crate to the io-uring system and I'm running into some problems. I'm having difficulties finding the culprit here, but I have the impression I'm doing things wrong in when registering the timer and also when trying to reset it.
The socket trigger works ( you can do a nc -u 127.0.0.1 44571 on a second terminal and check the messages coming in. The loop duration triggers properly and the loop breaks as expected. The problem I'm having is with the timers. I'm having a hard time spotting what I'm doing wrong, so I thought I might ask for some help from the experts.
I've pasted the code below.

I'm running this on a VM inside my m2 Mac. So I'm running with this linker flag: linker = "aarch64-linux-gnu-gcc"
I don't have a non arm computer to test this, so I can't confirm it has any relation to this. Although I don't think this is the culprit, I thought it wouldn't harm mentioning it.

event_loop.rs

use io_uring::{opcode, types};
use std::{
    collections::HashMap,
    io,
    os::fd::{AsRawFd, RawFd},
    time::Duration,
};

pub type CommonError = Box<dyn std::error::Error>;

#[derive(Debug)]
pub struct Itimerspec {
    pub it_interval: Duration,
    pub it_value: Duration,
}

pub type TimedSources<T> = (
    RawFd,
    usize,
    Box<dyn FnMut(&mut T) -> Result<i32, CommonError>>,
);

pub struct PolledSource<T: AsRawFd> {
    fd: T,
    pub callback: Box<dyn FnMut(&mut T) -> Result<i32, CommonError> + 'static>,
}

pub struct UringEventLoop<T: AsRawFd> {
    ring: io_uring::IoUring,
    pub sources: HashMap<usize, PolledSource<T>>,
    timed_sources: HashMap<usize, TimedSources<T>>,
    next_token: usize,
}

impl<T: AsRawFd> UringEventLoop<T> {
    pub fn new(event_capacity: usize) -> Self {
        Self {
            ring: io_uring::IoUring::new(event_capacity.try_into().unwrap()).unwrap(),
            sources: HashMap::new(),
            timed_sources: HashMap::new(),
            next_token: 0,
        }
    }

    pub fn generate_token(&mut self) -> usize {
        let token = self.next_token;
        self.next_token += 1;
        token
    }

    pub fn register_event_source<F>(
        &mut self,
        event_source: T,
        callback: F,
    ) -> Result<usize, CommonError>
    where
        F: FnMut(&mut T) -> Result<i32, CommonError> + 'static,
    {
        let token = self.generate_token();
        let fd = event_source.as_raw_fd();
        let polled_source = PolledSource {
            fd: event_source,
            callback: Box::new(callback),
        };
        let _ = self.sources.insert(token, polled_source);
        eprintln!("Registering event source with token {}", token);
        let poll_e = opcode::PollAdd::new(types::Fd(fd), libc::POLLIN as _)
            .multi(true) // Add this line to enable the multi feature
            .build()
            .user_data(token as _);

        let (submitter, mut submit_queue, _) = self.ring.split();

        loop {
            if submit_queue.is_full() {
                submitter.submit()?;
            }
            submit_queue.sync(); // sync with the real current queue
            match unsafe { submit_queue.push(&poll_e) } {
                Ok(_) => break,
                Err(_) => continue,
            };
        }
        eprintln!("Registered event source with token {}", token);
        Ok(token)
    }

    pub fn run(&mut self) -> Result<(), CommonError> {
        let (submitter, mut _submit_queue, mut completion_queue) = self.ring.split();
        for timed_entry in &self.timed_sources {
            eprintln!("-------  timed_entry = {:?} -----------", timed_entry.0);
        }
        'outer: loop {
            eprintln!("Running loop");
            // Submit queued events and wait
            match submitter.submit_and_wait(1) {
                Ok(_) => (),
                Err(ref err) if err.raw_os_error() == Some(libc::EBUSY) => eprintln!("EBUSY"),
                Err(err) => return Err(err.into()),
            }
            eprintln!("Finished waiting");
            // Sync with the real current queue
            completion_queue.sync();
            eprintln!("completion_queue.len() = {}", completion_queue.len());
            // Process events from the completion queue
            for completion_event in &mut completion_queue {
                let result = completion_event.result();
                let token_index = completion_event.user_data() as usize;

                // Check for errors in the event result
                if result < 0 {
                    if result != -62 {
                        let error = io::Error::from_raw_os_error(-result);
                        eprintln!("token {} error {:?}", token_index, error);
                        return Err(error.into());
                    }
                }

                eprintln!("token {} ready", token_index);

                if let Some(polled_source) = self.sources.get_mut(&token_index) {
                    let fd = &mut polled_source.fd;
                    eprintln!("Running polled source");
                    polled_source.callback.as_mut()(fd)?;
                } else if let Some(timed_source) = self.timed_sources.get_mut(&token_index) {
                    eprintln!("Running timed source");
                    if let Some(polled_source) = self.sources.get_mut(&timed_source.1) {
                        let fd = &mut polled_source.fd;
                        timed_source.2.as_mut()(fd)?;
                    }
                    eprintln!("Reseting timer"); // Doing manually for the sake of simplicity in the example
                    let updated_time_spec = Itimerspec {
                        it_interval: Duration::from_secs(0),
                        it_value: Duration::from_millis(250),
                    };
                    Self::reset_timer(&mut _submit_queue, timed_source, &updated_time_spec)?;
                } else {
                    eprintln!("Max duration on token {} reached. Exiting", token_index);
                    break 'outer;
                }
            }
        }

        Ok(())
    }

    pub fn add_duration(&mut self, time_spec: Itimerspec) -> Result<usize, CommonError> {
        let token = self.generate_token();

        let timespec = types::Timespec::new()
            .nsec(time_spec.it_value.subsec_nanos() as u32)
            .sec(time_spec.it_value.as_secs() as u64);
        eprintln!("timespec = {:?}", timespec);
        let timeout = opcode::Timeout::new(&timespec as _)
            .build()
            .user_data(token as u64);
        let (submitter, mut submit_queue, _) = self.ring.split();

        loop {
            if submit_queue.is_full() {
                submitter.submit()?;
            }
            submit_queue.sync(); // sync with the real current queue
            match unsafe { submit_queue.push(&timeout) } {
                Ok(_) => break,
                Err(e) => {
                    eprintln!("Error pushing timeout: {:?}", e);
                    continue;
                }
            };
        }
        eprintln!("Registered duration {:?} with token {}", timeout, token);
        Ok(token)
    }

    pub fn add_timer<F>(
        &mut self,
        time_spec: Itimerspec,
        token: &usize,
        callback: F,
    ) -> Result<usize, CommonError>
    where
        F: FnMut(&mut T) -> Result<i32, CommonError> + 'static,
    {
        let new_token = self.add_duration(time_spec)?;
        if let Some(polled_source) = self.sources.get_mut(token) {
            self.timed_sources.insert(
                new_token,
                (polled_source.fd.as_raw_fd(), *token, Box::new(callback)),
            );
            eprintln!(
                "Registered timer with token {} for source with token {}",
                new_token, token
            );
        }

        Ok(new_token)
    }

    fn reset_timer(
        submit_queue: &mut io_uring::SubmissionQueue,
        timed_source: &mut TimedSources<T>,
        updated_time_spec: &Itimerspec,
    ) -> Result<(), CommonError> {
        let (_, source_token, _) = timed_source;

        // Create a new timespec with the updated Itimerspec
        let new_timespec = types::Timespec::new()
            .nsec(updated_time_spec.it_value.subsec_nanos() as u32)
            .sec(updated_time_spec.it_value.as_secs() as u64);

        let new_timeout = opcode::TimeoutUpdate::new(*source_token as u64, &new_timespec as _)
            .build()
            .user_data(*source_token as u64);

        // Replace the old timer with the new one
        loop {
            submit_queue.sync(); // Sync with the real current queue
            match unsafe { submit_queue.push(&new_timeout) } {
                Ok(_) => break,
                Err(e) => {
                    eprintln!("Error pushing new timeout: {:?}", e);
                    continue;
                }
            };
        }
        eprintln!("Reset timer with token {} ", source_token);

        Ok(())
    }
}

main.rs

use std::{
    net::{IpAddr, Ipv4Addr, SocketAddr},
    os::fd::{AsRawFd, RawFd},
    time::Duration,
};

use event_loop::Itimerspec;

use crate::event_loop::UringEventLoop;

mod event_loop;

fn main() -> Result<(), Box<dyn std::error::Error>> {
    let udp_socket = std::net::UdpSocket::bind("127.0.0.1:44571")?;
    udp_socket.set_nonblocking(true)?;
    let udp_fd = udp_socket.as_raw_fd();
    let mut event_loop: UringEventLoop<RawFd> = UringEventLoop::new(1024);

    let socket_token = event_loop.register_event_source(udp_fd, move |fd| {
        let mut buffer = [0u8; 1024];

        let mut sockaddr = libc::sockaddr_in {
            sin_family: libc::AF_INET as libc::sa_family_t,
            sin_port: 0,
            sin_addr: libc::in_addr { s_addr: 0 },
            sin_zero: [0; 8],
        };

        // Receive the message using `recvfrom` from the libc crate
        let n: isize = unsafe {
            libc::recvfrom(
                fd.as_raw_fd(),
                buffer.as_mut_ptr() as *mut _,
                buffer.len(),
                0,
                &mut sockaddr as *const _ as *mut _,
                &mut std::mem::size_of_val(&sockaddr) as *const _ as *mut _,
            )
        };

        // Convert the message to a string
        let ip_bytes = sockaddr.sin_addr.s_addr.to_le_bytes();
        let socket_addr = SocketAddr::new(
            IpAddr::V4(Ipv4Addr::new(
                ip_bytes[0],
                ip_bytes[1],
                ip_bytes[2],
                ip_bytes[3],
            )),
            sockaddr.sin_port.to_be(),
        );
        println!("Received {} bytes from {}", n, socket_addr);
        Ok(0)
    })?;

    let it_interval1 = Duration::from_millis(0);
    let it_value1 = Duration::from_millis(250);
    let time_spec = Itimerspec {
        it_interval: it_interval1,
        it_value: it_value1,
    };
    event_loop.add_timer(time_spec, &socket_token, move |_socket| {
        println!("Timer fired");
        // Do send_to operation with socket
        Ok(0)
    })?;

    let it_interval = Duration::from_millis(0);
    let it_value = Duration::from_millis(1000);
    event_loop.add_duration(Itimerspec {
        it_interval,
        it_value,
    })?;

    event_loop.run()?;

    Ok(())
}

cargo.toml

[package]
name = "eq-uring"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
io-uring = "0.6.0"
libc = "0.2.141"
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant