diff --git a/Cargo.toml b/Cargo.toml index dd46cb1..4242fbe 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,6 +3,11 @@ name = "aika" version = "0.1.0" edition = "2021" +[[bin]] +name = "realtime" +path = "examples/realtime.rs" + + [dependencies] tokio = {version = "1.43.0", features = ["full", "time", "sync", "rt", "rt-multi-thread"]} tokio-macros = "2.5.0" diff --git a/examples/realtime.rs b/examples/realtime.rs new file mode 100644 index 0000000..2648b6f --- /dev/null +++ b/examples/realtime.rs @@ -0,0 +1,14 @@ +use aika::worlds::*; +use aika::universes::*; +use aika::logger::*; +use aika::TestAgent; + +#[tokio::main] +async fn main() { + let mut world = World::create(1.0, Some(2000000.0), 100, 100); + let agent_test = TestAgent::new(0, "Test".to_string()); + world.spawn(Box::new(agent_test)); + world.schedule(world.sender.clone(), 0.0, 0).await.unwrap(); + world.run(true, true).await.unwrap(); + // for testing real-time run command line features like pause, resume, and speed up and slow down +} \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index b6fc225..03d36bd 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -5,9 +5,9 @@ use worlds::{Action, Agent, Event, Loggable, Mailbox, Message, State}; extern crate tokio; -mod logger; -mod universes; -mod worlds; +pub mod logger; +pub mod universes; +pub mod worlds; pub struct TestAgent { pub id: usize, @@ -27,6 +27,7 @@ impl Agent for TestAgent { time: &f64, mailbox: &'a mut Mailbox, ) -> BoxFuture<'a, Event> { + println!("{}: {}", *time, self.name); let event = Event::new(*time, self.id, Action::Timeout(1.0)); Box::pin(async { event }) } diff --git a/src/worlds.rs b/src/worlds.rs index 44c59fb..fa89de7 100644 --- a/src/worlds.rs +++ b/src/worlds.rs @@ -171,6 +171,7 @@ pub enum ControlCommand { Resume, SetTimeScale(f64), Quit, + Schedule(f64, usize), } pub struct World { @@ -246,51 +247,89 @@ impl World { pub async fn run(&mut self, live: bool, logs: bool) -> Result<(), SimError> { // Command line interface for real-time simulation let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::channel(100); + if live { + // Spawn a single continuous CLI task let cmd_tx_clone = cmd_tx.clone(); tokio::spawn(async move { let mut reader = tokio::io::BufReader::new(tokio::io::stdin()); - let mut line = String::new(); loop { - line.clear(); + let mut line = String::new(); if reader.read_line(&mut line).await.is_ok() { let cmd = match line.trim() { "pause" => ControlCommand::Pause, "resume" => ControlCommand::Resume, "quit" => ControlCommand::Quit, cmd if cmd.starts_with("speed ") => { - if let Some(scale) = cmd - .split_whitespace() - .nth(1) - .and_then(|s| Some(s.parse::().unwrap())) + if let Some(scale) = cmd.split_whitespace().nth(1) + .and_then(|s| s.parse::().ok()) { ControlCommand::SetTimeScale(scale) } else { continue; } } + cmd if cmd.starts_with("schedule ") => { + let parts: Vec<_> = cmd.split_whitespace().collect(); + if parts.len() >= 3 { + if let (Some(time), Some(idx)) = ( + parts[1].parse::().ok(), + parts[2].parse::().ok(), + ) { + ControlCommand::Schedule(time, idx) + } else { + continue; + } + } else { + continue; + } + } _ => continue, }; if cmd_tx_clone.send(cmd).await.is_err() { - break; + break; // Exit if the channel is closed } + } else { + break; // Exit on read error } } }); } - loop { if live { - if let Some(cmd) = cmd_rx.recv().await { + while let Ok(cmd) = cmd_rx.try_recv() { match cmd { ControlCommand::Pause => self.pause()?, ControlCommand::Resume => self.resume()?, ControlCommand::SetTimeScale(scale) => self.rescale_time(scale), ControlCommand::Quit => break, + ControlCommand::Schedule(time, idx) => { + self.pending + .insert(Reverse(Event::new(time, idx, Action::Wait))); + } } } if *self.pause_rx.borrow() { - self.pause_rx.changed().await.unwrap(); + // Wait for either a state change or new commands + tokio::select! { + // Pause state changed (e.g., resumed) + _ = self.pause_rx.changed() => {}, + // New command received (e.g., "resume") + cmd = cmd_rx.recv() => { + if let Some(cmd) = cmd { + match cmd { + ControlCommand::Pause => self.pause()?, + ControlCommand::Resume => self.resume()?, + ControlCommand::SetTimeScale(scale) => self.rescale_time(scale), + ControlCommand::Quit => break, + ControlCommand::Schedule(time, idx) => { + self.pending.insert(Reverse(Event::new(time, idx, Action::Wait))); + } + } + } + } + } + // Re-check the loop after handling continue; } } @@ -354,12 +393,16 @@ impl World { } } if live { - if let Some(cmd) = cmd_rx.recv().await { + while let Ok(cmd) = cmd_rx.try_recv() { match cmd { ControlCommand::Pause => self.pause()?, ControlCommand::Resume => self.resume()?, ControlCommand::SetTimeScale(scale) => self.rescale_time(scale), ControlCommand::Quit => break, + ControlCommand::Schedule(time, idx) => { + self.pending + .insert(Reverse(Event::new(time, idx, Action::Wait))); + } } } }