Skip to content

feat: Push tasks directly to the local runner #36

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
- uses: actions/checkout@v3
- name: Install Rust
run: rustup update stable
- run: cargo clippy --all-features --tests --examples
- run: cargo clippy --all-features --all-targets

fmt:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ exclude = ["/.*"]
[dependencies]
async-lock = "2.6"
async-task = "4.0.0"
atomic-waker = "1.1.0"
concurrent-queue = "2.0.0"
fastrand = "1.3.4"
futures-lite = "1.11.0"
Expand Down
183 changes: 129 additions & 54 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]

use std::cell::RefCell;
use std::fmt;
use std::future::Future;
use std::marker::PhantomData;
Expand Down Expand Up @@ -247,8 +248,24 @@ impl<'a> Executor<'a> {
fn schedule(&self) -> impl Fn(Runnable) + Send + Sync + 'static {
let state = self.state().clone();

// TODO(stjepang): If possible, push into the current local queue and notify the ticker.
move |runnable| {
// If possible, push into the current local queue and notify the ticker.
move |mut runnable| {
// Try to push the runnable into the current local queue and notify the ticker.
if let Some(index) = RunnerIndex::get() {
let local_queues = state.local_queues.read().unwrap();

if let Some(local_queue) = local_queues.get(index) {
match local_queue.queue.push(runnable) {
Ok(()) => {
// Wake up the ticker.
local_queue.waker.wake();
return;
}
Err(r) => runnable = r.into_inner(),
}
}
}

state.queue.push(runnable).unwrap();
state.notify();
}
Expand Down Expand Up @@ -475,7 +492,7 @@ struct State {
queue: ConcurrentQueue<Runnable>,

/// Local queues created by runners.
local_queues: RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>,
local_queues: RwLock<Slab<Arc<LocalQueue>>>,

/// Set to `true` when a sleeping ticker is notified or no tickers are sleeping.
notified: AtomicBool,
Expand All @@ -492,7 +509,7 @@ impl State {
fn new() -> State {
State {
queue: ConcurrentQueue::unbounded(),
local_queues: RwLock::new(Vec::new()),
local_queues: RwLock::new(Slab::new()),
notified: AtomicBool::new(true),
sleepers: Mutex::new(Sleepers {
count: 0,
Expand Down Expand Up @@ -660,15 +677,24 @@ impl Ticker<'_> {

/// Waits for the next runnable task to run.
async fn runnable(&self) -> Runnable {
self.runnable_with(|| self.state.queue.pop().ok()).await
self.runnable_with(|| self.state.queue.pop().ok(), |_| {})
.await
}

/// Waits for the next runnable task to run, given a function that searches for a task.
async fn runnable_with(&self, mut search: impl FnMut() -> Option<Runnable>) -> Runnable {
/// Waits for the next runnable task to run, given a function that searches for a task and a function
/// that registers the waker into some other data structure.
async fn runnable_with(
&self,
mut search: impl FnMut() -> Option<Runnable>,
mut register: impl FnMut(&Waker),
) -> Runnable {
future::poll_fn(|cx| {
loop {
match search() {
None => {
// Register our waker.
register(cx.waker());

// Move to sleeping and unnotified state.
if !self.sleep(cx.waker()) {
// If already sleeping and unnotified, return.
Expand Down Expand Up @@ -713,6 +739,15 @@ impl Drop for Ticker<'_> {
}
}

/// An entry in the local queues list.
struct LocalQueue {
/// The concurrent queue to push runnables to.
queue: ConcurrentQueue<Runnable>,

/// The waker to notify when a runnable is pushed directly to the local queue.
waker: atomic_waker::AtomicWaker,
}

/// A worker in a work-stealing executor.
///
/// This is just a ticker that also has an associated local queue for improved cache locality.
Expand All @@ -724,7 +759,10 @@ struct Runner<'a> {
ticker: Ticker<'a>,

/// The local queue.
local: Arc<ConcurrentQueue<Runnable>>,
local: Arc<LocalQueue>,

/// The index of this runner in the `local_queues` list.
index: usize,

/// Bumped every time a runnable task is found.
ticks: AtomicUsize,
Expand All @@ -733,69 +771,79 @@ struct Runner<'a> {
impl Runner<'_> {
/// Creates a runner and registers it in the executor state.
fn new(state: &State) -> Runner<'_> {
let runner = Runner {
let mut runner = Runner {
state,
ticker: Ticker::new(state),
local: Arc::new(ConcurrentQueue::bounded(512)),
local: Arc::new(LocalQueue {
queue: ConcurrentQueue::bounded(512),
waker: atomic_waker::AtomicWaker::new(),
}),
index: 0,
ticks: AtomicUsize::new(0),
};
state
runner.index = state
.local_queues
.write()
.unwrap()
.push(runner.local.clone());
.insert(runner.local.clone());
runner
}

/// Waits for the next runnable task to run.
async fn runnable(&self) -> Runnable {
// Set the task's index to our index in the list.
let _guard = RunnerIndex::set(self.index);

let runnable = self
.ticker
.runnable_with(|| {
// Try the local queue.
if let Ok(r) = self.local.pop() {
return Some(r);
}

// Try stealing from the global queue.
if let Ok(r) = self.state.queue.pop() {
steal(&self.state.queue, &self.local);
return Some(r);
}
.runnable_with(
|| {
// Try the local queue.
if let Ok(r) = self.local.queue.pop() {
return Some(r);
}

// Try stealing from other runners.
let local_queues = self.state.local_queues.read().unwrap();

// Pick a random starting point in the iterator list and rotate the list.
let n = local_queues.len();
let start = fastrand::usize(..n);
let iter = local_queues
.iter()
.chain(local_queues.iter())
.skip(start)
.take(n);

// Remove this runner's local queue.
let iter = iter.filter(|local| !Arc::ptr_eq(local, &self.local));

// Try stealing from each local queue in the list.
for local in iter {
steal(local, &self.local);
if let Ok(r) = self.local.pop() {
// Try stealing from the global queue.
if let Ok(r) = self.state.queue.pop() {
steal(&self.state.queue, &self.local.queue);
return Some(r);
}
}

None
})
// Try stealing from other runners.
let local_queues = self.state.local_queues.read().unwrap();

// Pick a random starting point in the iterator list and rotate the list.
let n = local_queues.len();
let start = fastrand::usize(..n);
let iter = local_queues
.iter()
.chain(local_queues.iter())
.skip(start)
.take(n);

// Remove this runner's local queue.
let iter = iter.filter(|(_, local)| !Arc::ptr_eq(local, &self.local));

// Try stealing from each local queue in the list.
for (_, local) in iter {
steal(&local.queue, &self.local.queue);
if let Ok(r) = self.local.queue.pop() {
return Some(r);
}
}

None
},
|waker| self.local.waker.register(waker),
)
.await;

// Bump the tick counter.
let ticks = self.ticks.fetch_add(1, Ordering::SeqCst);

if ticks % 64 == 0 {
// Steal tasks from the global queue to ensure fair task scheduling.
steal(&self.state.queue, &self.local);
steal(&self.state.queue, &self.local.queue);
}

runnable
Expand All @@ -805,14 +853,10 @@ impl Runner<'_> {
impl Drop for Runner<'_> {
fn drop(&mut self) {
// Remove the local queue.
self.state
.local_queues
.write()
.unwrap()
.retain(|local| !Arc::ptr_eq(local, &self.local));
self.state.local_queues.write().unwrap().remove(self.index);

// Re-schedule remaining tasks in the local queue.
while let Ok(r) = self.local.pop() {
while let Ok(r) = self.local.queue.pop() {
r.schedule();
}
}
Expand Down Expand Up @@ -840,6 +884,37 @@ fn steal<T>(src: &ConcurrentQueue<T>, dest: &ConcurrentQueue<T>) {
}
}

/// Guards the index of the currently running `Runner` in the `runners` array.
struct RunnerIndex;

impl RunnerIndex {
/// Returns the index of the currently running `Runner`.
fn get() -> Option<usize> {
std::thread_local! {
static INDICES: RefCell<Vec<usize>> = RefCell::new(Vec::with_capacity(1));
}

impl RunnerIndex {
/// Sets the index of the currently running `Runner`.
fn set(index: usize) -> Self {
INDICES.with(|slot| slot.borrow_mut().push(index));
RunnerIndex
}
}

impl Drop for RunnerIndex {
fn drop(&mut self) {
INDICES.try_with(|slot| slot.borrow_mut().pop()).ok();
}
}

INDICES
.try_with(|index| index.borrow().last().copied())
.ok()
.flatten()
}
}

/// Debug implementation for `Executor` and `LocalExecutor`.
fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_>) -> fmt::Result {
// Get a reference to the state.
Expand Down Expand Up @@ -873,14 +948,14 @@ fn debug_executor(executor: &Executor<'_>, name: &str, f: &mut fmt::Formatter<'_
}

/// Debug wrapper for the local runners.
struct LocalRunners<'a>(&'a RwLock<Vec<Arc<ConcurrentQueue<Runnable>>>>);
struct LocalRunners<'a>(&'a RwLock<Slab<Arc<LocalQueue>>>);

impl fmt::Debug for LocalRunners<'_> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self.0.try_read() {
Ok(lock) => f
.debug_list()
.entries(lock.iter().map(|queue| queue.len()))
.entries(lock.iter().map(|(_, local)| local.queue.len()))
.finish(),
Err(TryLockError::WouldBlock) => f.write_str("<locked>"),
Err(TryLockError::Poisoned(_)) => f.write_str("<poisoned>"),
Expand Down