diff --git a/Cargo.lock b/Cargo.lock index fc36ce9..56611ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -41,6 +41,12 @@ dependencies = [ "libc", ] +[[package]] +name = "async-task" +version = "4.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b75356056920673b02621b35afd0f7dda9306d03c79a30f5c56c44cf256e3de" + [[package]] name = "async-trait" version = "0.1.85" @@ -244,17 +250,44 @@ checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" name = "cpu-io-executor" version = "0.1.0" dependencies = [ + "async-task", "async-trait", "bytes", "futures", "log", "object_store", + "rayon", "testcontainers", "testcontainers-modules", "tokio", "tokio-stream", ] +[[package]] +name = "crossbeam-deque" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9dd111b7b7f7d55b72c0a6ae361660ee5853c9af73f70c3c2ef6858b950e2e51" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d0a5c400df2834b80a4c3327b3aad3a4c4cd4de0629063962b03235697506a28" + [[package]] name = "crypto-common" version = "0.1.6" @@ -1277,6 +1310,26 @@ dependencies = [ "getrandom", ] +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + [[package]] name = "redox_syscall" version = "0.3.5" diff --git a/Cargo.toml b/Cargo.toml index 2cce8c8..7eb4a85 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,6 +13,8 @@ testcontainers = "0.23" testcontainers-modules = { version = "0.11", features = ["localstack"] } tokio = { version = "1.43.0", features = ["rt-multi-thread", "macros"] } tokio-stream = "0.1.17" +rayon = "1.10.0" +async-task = "4.7.1" [[bin]] name = "single_runtime" @@ -21,3 +23,7 @@ path = "src/single_runtime.rs" [[bin]] name = "two_runtimes" path = "src/two_runtimes.rs" + +[[bin]] +name = "custom_runtime" +path = "src/custom_runtime.rs" diff --git a/src/custom_runtime.rs b/src/custom_runtime.rs new file mode 100644 index 0000000..68422f2 --- /dev/null +++ b/src/custom_runtime.rs @@ -0,0 +1,97 @@ +use std::{iter, pin::Pin, sync::Arc, time::Duration}; + +use crate::executor::AsyncExecutor; +use futures::{ + stream::{self, BoxStream}, + Stream, StreamExt, TryStreamExt, +}; +use object_store::{aws::AmazonS3Builder, ObjectStore, PutPayload, Result}; + +mod executor; +mod localstack; + +static CPU_TIME: u64 = 2; +static N_FILES: usize = 2; +static OBJECT_KEY: &str = "test"; +static N_IO_THREADS: usize = 2; + +#[tokio::main] +async fn main() { + let num_threads = std::thread::available_parallelism().unwrap().get(); + + // Start localstack container + let localstack = localstack::localstack_container().await; + let localstack_host = localstack.get_host().await.unwrap(); + let localstack_port = localstack.get_host_port_ipv4(4566).await.unwrap(); + + let object_store: Arc = Arc::new( + AmazonS3Builder::new() + .with_config("aws_access_key_id".parse().unwrap(), "user") + .with_config("aws_secret_access_key".parse().unwrap(), "password") + .with_config( + "endpoint".parse().unwrap(), + format!("http://{}:{}", localstack_host, localstack_port), + ) + .with_config("region".parse().unwrap(), "us-east-1") + .with_config("allow_http".parse().unwrap(), "true") + .with_config("bucket".parse().unwrap(), "warehouse") + .build() + .unwrap(), + ); + + // Insert object + object_store + .put( + &OBJECT_KEY.into(), + PutPayload::from_static(&[0; 10 * 1024 * 1024]), + ) + .await + .unwrap(); + + let executor = AsyncExecutor::new(); + let mut handles = vec![]; + + // Leave two cores unoccupied + for _ in 0..(num_threads - N_IO_THREADS) { + let handle = executor.spawn({ + let object_store = object_store.clone(); + async move { + execution_stream(object_store) + .try_collect::>>() + .await + .unwrap(); + } + }); + handles.push(handle); + } + + futures::future::join_all(handles).await; +} + +fn execution_stream( + object_store: Arc, +) -> Pin, object_store::Error>> + Send>> { + Box::pin(io_stream(object_store).map_ok(cpu_work).map_ok(cpu_work)) +} + +fn io_stream( + object_store: Arc, +) -> BoxStream<'static, Result, object_store::Error>> { + Box::pin( + stream::iter(iter::repeat_n(object_store, N_FILES)) + .then(|object_store| async move { + object_store + .get(&OBJECT_KEY.into()) + .await + .unwrap() + .into_stream() + .map_ok(|x| Vec::from(x)) + }) + .flatten(), + ) +} + +fn cpu_work(bytes: Vec) -> Vec { + std::thread::sleep(Duration::from_secs(CPU_TIME)); + bytes +} diff --git a/src/executor.rs b/src/executor.rs new file mode 100644 index 0000000..f53ac1c --- /dev/null +++ b/src/executor.rs @@ -0,0 +1,77 @@ +use async_task::Runnable; +use futures::FutureExt; +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context, Poll}; + +/// An executor designed to run potentially blocking futures +pub struct AsyncExecutor { + io: tokio::runtime::Handle, + cpu: Arc, +} + +impl AsyncExecutor { + pub fn new() -> Self { + let io = tokio::runtime::Handle::current(); + let cpu = rayon::ThreadPoolBuilder::new() + .num_threads(8) + .use_current_thread() + .build() + .unwrap(); + + let cpu = Arc::new(cpu); + Self { io, cpu } + } + + pub fn spawn(&self, fut: F) -> SpawnHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + let (sender, receiver) = futures::channel::oneshot::channel(); + let handle = self.io.clone(); + + // This box is technically unnecessary, but avoids some pin shenanigans + let mut boxed = Box::pin(fut); + + // Enter tokio runtime whilst polling future - allowing IO and timers to work + let io_fut = futures::future::poll_fn(move |cx| { + let _guard = handle.enter(); + boxed.poll_unpin(cx) + }); + // Route result back to oneshot + let remote_fut = io_fut.map(|out| { + let _ = sender.send(out); + }); + + // Task execution is scheduled on rayon + let cpu = self.cpu.clone(); + let (runnable, task) = async_task::spawn(remote_fut, move |runnable: Runnable<()>| { + cpu.spawn(move || { + let _ = runnable.run(); + }); + }); + runnable.schedule(); + SpawnHandle { + _task: task, + receiver, + } + } +} + +/// Handle returned by [`AsyncExecutor`] +/// +/// Cancels task on drop +pub struct SpawnHandle { + receiver: futures::channel::oneshot::Receiver, + _task: async_task::Task<()>, +} + +impl Future for SpawnHandle { + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.receiver.poll_unpin(cx).map(|x| x.ok()) + } +}