Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Nesting Task scopes #4343

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/bevy_ecs/src/schedule/executor_parallel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ impl ParallelExecutor {
/// queues systems with no dependencies to run (or skip) at next opportunity.
fn prepare_systems<'scope>(
&mut self,
scope: &mut Scope<'scope, ()>,
scope: &Scope<'scope, ()>,
systems: &'scope mut [ParallelSystemContainer],
world: &'scope World,
) {
Expand Down
17 changes: 10 additions & 7 deletions crates/bevy_tasks/src/single_threaded_task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,19 +68,22 @@ impl TaskPool {

let mut scope = Scope {
executor,
results: Vec::new(),
results: Arc::new(Mutex::new(Vec::new())),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want to avoid the use of Mutexes on the single threaded executor. The primary use case for the single threaded executor is for wasm and wasm doesn't support Atomic wait on the main thread which is what mutexes use when you build with atomics for wasm.

I think without enabling atomics, Mutexes become a no op, so this might work with a default wasm build config today.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right
I didn't implement it, but Joy came up with the idea of splitting scope into a local variant and non-local variant.
Although originally we didn't decide on results storage (I was pro-shared), if we split them up we can make a thread safe LocalScope
Signatures would look something like this:

LocalScope::spawn_local(FnOnce(&LocalScope, &Scope));     // keep both scopes
Scope::spawn(FnOnce(&Scope));     // keep scope, lose local scope

scopes like this:

struct LocalScope<T> {
  local_executor: ...,
  results: RefCell<Vec<T>>,
  scope: &Scope,
}
struct Scope<T> {
  executor: ...,
  results: Mutex<Vec<T>>,
}

It's also worth mentioning that there were talks about moving wasm to multithreading although I'm not seeing any progress (nor contributing much :/)

};

f(&mut scope);

// Loop until all tasks are done
while executor.try_tick() {}

scope
let result = scope
.results
.lock()
.unwrap()
.iter()
.map(|result| result.lock().unwrap().take().unwrap())
.collect()
.collect();
result
}

// Spawns a static future onto the JS event loop. For now it is returning FakeTask
Expand Down Expand Up @@ -122,17 +125,17 @@ impl FakeTask {
pub struct Scope<'scope, T> {
executor: &'scope async_executor::LocalExecutor<'scope>,
// Vector to gather results of all futures spawned during scope run
results: Vec<Arc<Mutex<Option<T>>>>,
results: Arc<Mutex<Vec<Arc<Mutex<Option<T>>>>>>,
}

impl<'scope, T: Send + 'scope> Scope<'scope, T> {
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&self, f: Fut) {
self.spawn_local(f);
}

pub fn spawn_local<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
pub fn spawn_local<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
let result = Arc::new(Mutex::new(None));
self.results.push(result.clone());
self.results.lock().unwrap().push(result.clone());
let f = async move {
result.lock().unwrap().replace(f.await);
};
Expand Down
64 changes: 51 additions & 13 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use std::{
future::Future,
mem,
pin::Pin,
sync::Arc,
sync::{Arc, Mutex},
thread::{self, JoinHandle},
};

Expand Down Expand Up @@ -164,7 +164,7 @@ impl TaskPool {
/// This is similar to `rayon::scope` and `crossbeam::scope`
pub fn scope<'scope, F, T>(&self, f: F) -> Vec<T>
where
F: FnOnce(&mut Scope<'scope, T>) + 'scope + Send,
F: FnOnce(&Scope<'scope, T>) + 'scope + Send,
T: Send + 'static,
{
TaskPool::LOCAL_EXECUTOR.with(|local_executor| {
Expand All @@ -179,19 +179,20 @@ impl TaskPool {
let mut scope = Scope {
executor,
local_executor,
spawned: Vec::new(),
spawned: Arc::new(Mutex::new(Vec::new())),
};

f(&mut scope);

if scope.spawned.is_empty() {
let mut spawned = scope.spawned.lock().unwrap();
if spawned.is_empty() {
Vec::default()
} else if scope.spawned.len() == 1 {
vec![future::block_on(&mut scope.spawned[0])]
} else if spawned.len() == 1 {
vec![future::block_on(&mut spawned[0])]
} else {
let fut = async move {
let mut results = Vec::with_capacity(scope.spawned.len());
for task in scope.spawned {
let mut results = Vec::with_capacity(spawned.len());
for task in spawned.iter_mut() {
results.push(task.await);
}

Expand Down Expand Up @@ -265,7 +266,7 @@ impl Default for TaskPool {
pub struct Scope<'scope, T> {
executor: &'scope async_executor::Executor<'scope>,
local_executor: &'scope async_executor::LocalExecutor<'scope>,
spawned: Vec<async_executor::Task<T>>,
spawned: Arc<Mutex<Vec<async_executor::Task<T>>>>,
Copy link
Member

@TheRawMeatball TheRawMeatball Apr 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Replacing this Arc with a reference to a Mutex<Vec<T>> stored on the stack should be possible. Alternatively, a queue might also be better to have less locking overhead.

Copy link
Contributor Author

@MiniaczQ MiniaczQ Apr 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Stack of which thread?
I've never seen a solution like that so I'm slightly confused

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be similar to how the code currently makes a fresh Executor on the stack when scope is called. We'd just also make a Mutex<Vec<Task<T>>> on the stack, and store a regular rust reference to it inside Scope

}

impl<'scope, T: Send + 'scope> Scope<'scope, T> {
Expand All @@ -277,9 +278,9 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> {
/// instead.
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&mut self, f: Fut) {
pub fn spawn<Fut: Future<Output = T> + 'scope + Send>(&self, f: Fut) {
let task = self.executor.spawn(f);
self.spawned.push(task);
self.spawned.lock().unwrap().push(task);
}

/// Spawns a scoped future onto the thread-local executor. The scope *must* outlive
Expand All @@ -288,9 +289,9 @@ impl<'scope, T: Send + 'scope> Scope<'scope, T> {
/// [`Scope::spawn`] instead, unless the provided future is not `Send`.
///
/// For more information, see [`TaskPool::scope`].
pub fn spawn_local<Fut: Future<Output = T> + 'scope>(&mut self, f: Fut) {
pub fn spawn_local<Fut: Future<Output = T> + 'scope>(&self, f: Fut) {
let task = self.local_executor.spawn(f);
self.spawned.push(task);
self.spawned.lock().unwrap().push(task);
}
}

Expand Down Expand Up @@ -334,6 +335,43 @@ mod tests {
assert_eq!(count.load(Ordering::Relaxed), 100);
}

#[test]
fn test_nested_spawn() {
let pool = TaskPool::new();

let foo = Box::new(42);
let foo = &*foo;

let count = Arc::new(AtomicI32::new(0));

let outputs = pool.scope(|scope| {
for _ in 0..10 {
let count_clone = count.clone();
scope.spawn(async move {
for _ in 0..10 {
let count_clone_clone = count_clone.clone();
scope.spawn(async move {
if *foo != 42 {
panic!("not 42!?!?")
} else {
count_clone_clone.fetch_add(1, Ordering::Relaxed);
*foo
}
});
}
*foo
});
}
});

for output in &outputs {
assert_eq!(*output, 42);
}

assert_eq!(outputs.len(), 100);
assert_eq!(count.load(Ordering::Relaxed), 100);
}

#[test]
fn test_mixed_spawn_local_and_spawn() {
let pool = TaskPool::new();
Expand Down