Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[Merged by Bors] - tick local executor #6121

Closed
wants to merge 9 commits into from
Closed
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions crates/bevy_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
mod name;
mod task_pool_options;

use bevy_ecs::schedule::IntoSystemDescriptor;
use bevy_ecs::system::Resource;
pub use bytemuck::{bytes_of, cast_slice, Pod, Zeroable};
pub use name::*;
Expand All @@ -17,6 +18,7 @@ pub mod prelude {

use bevy_app::prelude::*;
use bevy_ecs::entity::Entity;
use bevy_tasks::prelude::tick_global_task_pools_on_main_thread;
use bevy_utils::{Duration, HashSet, Instant};
use std::borrow::Cow;
use std::ops::Range;
Expand All @@ -34,6 +36,11 @@ impl Plugin for CorePlugin {
.unwrap_or_default()
.create_default_pools();

app.add_system_to_stage(
bevy_app::CoreStage::Last,
tick_global_task_pools_on_main_thread.at_end(),
);

app.register_type::<Entity>().register_type::<Name>();

register_rust_types(app);
Expand Down
5 changes: 4 additions & 1 deletion crates/bevy_tasks/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,10 @@ pub mod prelude {
pub use crate::{
iter::ParallelIterator,
slice::{ParallelSlice, ParallelSliceMut},
usages::{AsyncComputeTaskPool, ComputeTaskPool, IoTaskPool},
usages::{
tick_global_task_pools_on_main_thread, AsyncComputeTaskPool, ComputeTaskPool,
IoTaskPool,
},
};
}

Expand Down
157 changes: 93 additions & 64 deletions crates/bevy_tasks/src/task_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use std::{
};

use concurrent_queue::ConcurrentQueue;
use futures_lite::{future, pin};
use futures_lite::{future, pin, FutureExt};

use crate::Task;

Expand Down Expand Up @@ -117,9 +117,16 @@ impl TaskPool {

thread_builder
.spawn(move || {
let shutdown_future = ex.run(shutdown_rx.recv());
// Use unwrap_err because we expect a Closed error
future::block_on(shutdown_future).unwrap_err();
TaskPool::LOCAL_EXECUTOR.with(|local_executor| {
let tick_forever = async move {
loop {
local_executor.tick().await;
}
};
let shutdown_future = ex.run(tick_forever.or(shutdown_rx.recv()));
// Use unwrap_err because we expect a Closed error
future::block_on(shutdown_future).unwrap_err();
});
})
.expect("Failed to spawn thread.")
})
Expand Down Expand Up @@ -216,71 +223,75 @@ impl TaskPool {
F: for<'scope> FnOnce(&'scope Scope<'scope, 'env, T>),
T: Send + 'static,
{
// SAFETY: This safety comment applies to all references transmuted to 'env.
// Any futures spawned with these references need to return before this function completes.
// This is guaranteed because we drive all the futures spawned onto the Scope
// to completion in this function. However, rust has no way of knowing this so we
// transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
let executor: &async_executor::Executor = &*self.executor;
let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) };
let task_scope_executor = &async_executor::Executor::default();
let task_scope_executor: &'env async_executor::Executor =
unsafe { mem::transmute(task_scope_executor) };
let spawned: ConcurrentQueue<async_executor::Task<T>> = ConcurrentQueue::unbounded();
let spawned_ref: &'env ConcurrentQueue<async_executor::Task<T>> =
unsafe { mem::transmute(&spawned) };

let scope = Scope {
executor,
task_scope_executor,
spawned: spawned_ref,
scope: PhantomData,
env: PhantomData,
};

let scope_ref: &'env Scope<'_, 'env, T> = unsafe { mem::transmute(&scope) };

f(scope_ref);

if spawned.is_empty() {
Vec::new()
} else {
let get_results = async move {
let mut results = Vec::with_capacity(spawned.len());
while let Ok(task) = spawned.pop() {
results.push(task.await);
}

results
TaskPool::LOCAL_EXECUTOR.with(|local_executor| {
// SAFETY: This safety comment applies to all references transmuted to 'env.
// Any futures spawned with these references need to return before this function completes.
// This is guaranteed because we drive all the futures spawned onto the Scope
// to completion in this function. However, rust has no way of knowing this so we
// transmute the lifetimes to 'env here to appease the compiler as it is unable to validate safety.
let executor: &async_executor::Executor = &*self.executor;
let executor: &'env async_executor::Executor = unsafe { mem::transmute(executor) };
let task_scope_executor = &async_executor::Executor::default();
let task_scope_executor: &'env async_executor::Executor =
unsafe { mem::transmute(task_scope_executor) };
let spawned: ConcurrentQueue<async_executor::Task<T>> = ConcurrentQueue::unbounded();
let spawned_ref: &'env ConcurrentQueue<async_executor::Task<T>> =
unsafe { mem::transmute(&spawned) };

let scope = Scope {
executor,
task_scope_executor,
spawned: spawned_ref,
scope: PhantomData,
env: PhantomData,
};

// Pin the futures on the stack.
pin!(get_results);

// SAFETY: This function blocks until all futures complete, so we do not read/write
// the data from futures outside of the 'scope lifetime. However,
// rust has no way of knowing this so we must convert to 'static
// here to appease the compiler as it is unable to validate safety.
let get_results: Pin<&mut (dyn Future<Output = Vec<T>> + 'static + Send)> = get_results;
let get_results: Pin<&'static mut (dyn Future<Output = Vec<T>> + 'static + Send)> =
unsafe { mem::transmute(get_results) };

// The thread that calls scope() will participate in driving tasks in the pool
// forward until the tasks that are spawned by this scope() call
// complete. (If the caller of scope() happens to be a thread in
// this thread pool, and we only have one thread in the pool, then
// simply calling future::block_on(spawned) would deadlock.)
let mut spawned = task_scope_executor.spawn(get_results);

loop {
if let Some(result) = future::block_on(future::poll_once(&mut spawned)) {
break result;
let scope_ref: &'env Scope<'_, 'env, T> = unsafe { mem::transmute(&scope) };

f(scope_ref);

if spawned.is_empty() {
Vec::new()
} else {
let get_results = async move {
let mut results = Vec::with_capacity(spawned.len());
while let Ok(task) = spawned.pop() {
results.push(task.await);
}

results
};

self.executor.try_tick();
task_scope_executor.try_tick();
// Pin the futures on the stack.
pin!(get_results);

// SAFETY: This function blocks until all futures complete, so we do not read/write
// the data from futures outside of the 'scope lifetime. However,
// rust has no way of knowing this so we must convert to 'static
// here to appease the compiler as it is unable to validate safety.
let get_results: Pin<&mut (dyn Future<Output = Vec<T>> + 'static + Send)> =
get_results;
let get_results: Pin<&'static mut (dyn Future<Output = Vec<T>> + 'static + Send)> =
unsafe { mem::transmute(get_results) };

// The thread that calls scope() will participate in driving tasks in the pool
// forward until the tasks that are spawned by this scope() call
// complete. (If the caller of scope() happens to be a thread in
// this thread pool, and we only have one thread in the pool, then
// simply calling future::block_on(spawned) would deadlock.)
let mut spawned = task_scope_executor.spawn(get_results);

loop {
if let Some(result) = future::block_on(future::poll_once(&mut spawned)) {
break result;
};

self.executor.try_tick();
task_scope_executor.try_tick();
local_executor.try_tick();
}
}
}
})
}

/// Spawns a static future onto the thread pool. The returned Task is a future. It can also be
Expand All @@ -306,6 +317,24 @@ impl TaskPool {
{
Task::new(TaskPool::LOCAL_EXECUTOR.with(|executor| executor.spawn(future)))
}

/// Runs a function with the local executor. Typically used to tick
/// the local executor on the main thread as it needs to share time with
/// other things.
///
/// ```rust
/// use bevy_tasks::TaskPool;
///
/// TaskPool::new().with_local_executor(|local_executor| {
/// local_executor.try_tick();
/// });
/// ```
pub fn with_local_executor<F, R>(&self, f: F) -> R
where
F: FnOnce(&async_executor::LocalExecutor) -> R,
{
Self::LOCAL_EXECUTOR.with(f)
}
}

impl Default for TaskPool {
Expand Down
24 changes: 24 additions & 0 deletions crates/bevy_tasks/src/usages.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,3 +109,27 @@ impl Deref for IoTaskPool {
&self.0
}
}

/// Used by `bevy_app` to tick the global tasks pools on the main thread.
pub fn tick_global_task_pools_on_main_thread() {
COMPUTE_TASK_POOL
.get()
.unwrap()
.with_local_executor(|compute_local_executor| {
ASYNC_COMPUTE_TASK_POOL
.get()
.unwrap()
.with_local_executor(|async_local_executor| {
IO_TASK_POOL
.get()
.unwrap()
.with_local_executor(|io_local_executor| {
for _ in 0..20 {
compute_local_executor.try_tick();
async_local_executor.try_tick();
io_local_executor.try_tick();
}
});
});
});
}