Skip to content

Commit

Permalink
fixed real time commands!
Browse files Browse the repository at this point in the history
  • Loading branch information
FiberedSkies committed Jan 27, 2025
1 parent 69d399c commit 0058519
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 14 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ name = "aika"
version = "0.1.0"
edition = "2021"

[[bin]]
name = "realtime"
path = "examples/realtime.rs"


[dependencies]
tokio = {version = "1.43.0", features = ["full", "time", "sync", "rt", "rt-multi-thread"]}
tokio-macros = "2.5.0"
Expand Down
14 changes: 14 additions & 0 deletions examples/realtime.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
use aika::worlds::*;
use aika::universes::*;
use aika::logger::*;
use aika::TestAgent;

#[tokio::main]
async fn main() {
let mut world = World::create(1.0, Some(2000000.0), 100, 100);
let agent_test = TestAgent::new(0, "Test".to_string());
world.spawn(Box::new(agent_test));
world.schedule(world.sender.clone(), 0.0, 0).await.unwrap();
world.run(true, true).await.unwrap();
// for testing real-time run command line features like pause, resume, and speed up and slow down
}
7 changes: 4 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ use worlds::{Action, Agent, Event, Loggable, Mailbox, Message, State};

extern crate tokio;

mod logger;
mod universes;
mod worlds;
pub mod logger;
pub mod universes;
pub mod worlds;

pub struct TestAgent {
pub id: usize,
Expand All @@ -27,6 +27,7 @@ impl Agent for TestAgent {
time: &f64,
mailbox: &'a mut Mailbox,
) -> BoxFuture<'a, Event> {
println!("{}: {}", *time, self.name);
let event = Event::new(*time, self.id, Action::Timeout(1.0));
Box::pin(async { event })
}
Expand Down
65 changes: 54 additions & 11 deletions src/worlds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ pub enum ControlCommand {
Resume,
SetTimeScale(f64),
Quit,
Schedule(f64, usize),
}

pub struct World {
Expand Down Expand Up @@ -246,51 +247,89 @@ impl World {
pub async fn run(&mut self, live: bool, logs: bool) -> Result<(), SimError> {
// Command line interface for real-time simulation
let (cmd_tx, mut cmd_rx) = tokio::sync::mpsc::channel(100);

if live {
// Spawn a single continuous CLI task
let cmd_tx_clone = cmd_tx.clone();
tokio::spawn(async move {
let mut reader = tokio::io::BufReader::new(tokio::io::stdin());
let mut line = String::new();
loop {
line.clear();
let mut line = String::new();
if reader.read_line(&mut line).await.is_ok() {
let cmd = match line.trim() {
"pause" => ControlCommand::Pause,
"resume" => ControlCommand::Resume,
"quit" => ControlCommand::Quit,
cmd if cmd.starts_with("speed ") => {
if let Some(scale) = cmd
.split_whitespace()
.nth(1)
.and_then(|s| Some(s.parse::<f64>().unwrap()))
if let Some(scale) = cmd.split_whitespace().nth(1)
.and_then(|s| s.parse::<f64>().ok())
{
ControlCommand::SetTimeScale(scale)
} else {
continue;
}
}
cmd if cmd.starts_with("schedule ") => {
let parts: Vec<_> = cmd.split_whitespace().collect();
if parts.len() >= 3 {
if let (Some(time), Some(idx)) = (
parts[1].parse::<f64>().ok(),
parts[2].parse::<usize>().ok(),
) {
ControlCommand::Schedule(time, idx)
} else {
continue;
}
} else {
continue;
}
}
_ => continue,
};
if cmd_tx_clone.send(cmd).await.is_err() {
break;
break; // Exit if the channel is closed
}
} else {
break; // Exit on read error
}
}
});
}

loop {
if live {
if let Some(cmd) = cmd_rx.recv().await {
while let Ok(cmd) = cmd_rx.try_recv() {
match cmd {
ControlCommand::Pause => self.pause()?,
ControlCommand::Resume => self.resume()?,
ControlCommand::SetTimeScale(scale) => self.rescale_time(scale),
ControlCommand::Quit => break,
ControlCommand::Schedule(time, idx) => {
self.pending
.insert(Reverse(Event::new(time, idx, Action::Wait)));
}
}
}
if *self.pause_rx.borrow() {
self.pause_rx.changed().await.unwrap();
// Wait for either a state change or new commands
tokio::select! {
// Pause state changed (e.g., resumed)
_ = self.pause_rx.changed() => {},
// New command received (e.g., "resume")
cmd = cmd_rx.recv() => {
if let Some(cmd) = cmd {
match cmd {
ControlCommand::Pause => self.pause()?,
ControlCommand::Resume => self.resume()?,
ControlCommand::SetTimeScale(scale) => self.rescale_time(scale),
ControlCommand::Quit => break,
ControlCommand::Schedule(time, idx) => {
self.pending.insert(Reverse(Event::new(time, idx, Action::Wait)));
}
}
}
}
}
// Re-check the loop after handling
continue;
}
}
Expand Down Expand Up @@ -354,12 +393,16 @@ impl World {
}
}
if live {
if let Some(cmd) = cmd_rx.recv().await {
while let Ok(cmd) = cmd_rx.try_recv() {
match cmd {
ControlCommand::Pause => self.pause()?,
ControlCommand::Resume => self.resume()?,
ControlCommand::SetTimeScale(scale) => self.rescale_time(scale),
ControlCommand::Quit => break,
ControlCommand::Schedule(time, idx) => {
self.pending
.insert(Reverse(Event::new(time, idx, Action::Wait)));
}
}
}
}
Expand Down

0 comments on commit 0058519

Please # to comment.