Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Threadpool blocking #317

Merged
merged 15 commits into from
Apr 15, 2018
17 changes: 15 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,28 @@ script:
# Make sure the benchmarks compile
cargo build --benches --all

export ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0"
export TSAN_OPTIONS="suppressions=`pwd`/ci/tsan"

# === tokio-timer ====

# Run address sanitizer
ASAN_OPTIONS="detect_odr_violation=0 detect_leaks=0" \
RUSTFLAGS="-Z sanitizer=address" \
cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu

# Run thread sanitizer
TSAN_OPTIONS="suppressions=`pwd`/ci/tsan" \
RUSTFLAGS="-Z sanitizer=thread" \
cargo test -p tokio-timer --test hammer --target x86_64-unknown-linux-gnu

# === tokio-threadpool ====

# Run address sanitizer
RUSTFLAGS="-Z sanitizer=address" \
cargo test -p tokio-threadpool --tests

# Run thread sanitizer
RUSTFLAGS="-Z sanitizer=thread" \
cargo test -p tokio-threadpool --tests
fi
- |
set -e
Expand Down
18 changes: 18 additions & 0 deletions ci/tsan
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,21 @@
# This causes many false positives.
race:Arc*drop
race:arc*Weak*drop
race:crossbeam_deque
race:std*mpsc_queue

# This is excluded as this race shows up due to using the stealing features of
# the deque. Unfortunately, the implementation uses a fence, which makes tsan
# unhappy.
#
# TODO: It would be nice to not have to filter this out.
race:try_steal_task

# This filters out an expected data race in the treiber stack implementation.
# Treiber stacks are inherently racy. The pop operation will attempt to access
# the "next" pointer on the node it is attempting to pop. However, at this
# point it has not gained ownership of the node and another thread might beat
# it and take ownership of the node first (touching the next pointer). The
# original pop operation will fail due to the ABA guard, but tsan still picks
# up the access on the next pointer.
race:Backup::next_sleeper
83 changes: 83 additions & 0 deletions tokio-threadpool/src/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
use worker::Worker;

use futures::Poll;

/// Error raised by `blocking`.
#[derive(Debug)]
pub struct BlockingError {
_p: (),
}

/// Enter a blocking section of code.
///
/// The `blocking` function annotates a section of code that performs a blocking
/// operation, either by issuing a blocking syscall or by performing a long
/// running CPU bound computation.
///
/// When the `blocking` function enters, it hands off the responsibility of
/// processing the current work queue to another thread. Then, it calls the
/// supplied closure. The closure is permitted to block indefinitely.
///
/// If the maximum number of concurrent `blocking` calls has been reached, then
/// `NotReady` is returned and the task is notified once existing `blocking`
/// calls complete. The maximum value is specified when creating a thread pool
/// using [`Builder::max_blocking`][build]
///
/// [build]: struct.Builder.html#method.max_blocking
///
/// # Background
///
/// By default, the Tokio thread pool expects that tasks will only run for short
/// periods at a time before yielding back to the thread pool. This is the basic
/// premise of cooperative multitasking.
///
/// However, it is common to want to perform a blocking operation while
/// processing an asynchronous computation. Examples of blocking operation
/// include:
///
/// * Performing synchronous file operations (reading and writing).
/// * Blocking on acquiring a mutex.
/// * Performing a CPU bound computation, like cryptographic encryption or
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"CPU-bound"

/// decryption.
///
/// One option for dealing with blocking operations in an asynchronous context
/// is to use a thread pool dedicated to performing these operations. This not
/// ideal as it requires bidirectional message passing as well as a channel to
/// communicate which adds a level of buffering.
///
/// Instead, `blocking` hands off the responsiblity of processing the work queue
/// to another thread. This hand off is light compared to a channel and does not
/// require buffering.
///
/// # Panics
///
/// This function panics if not called from the context of a thread pool worker.
pub fn blocking<F, T>(f: F) -> Poll<T, BlockingError>
where F: FnOnce() -> T,
{
let res = Worker::with_current(|worker| {
let worker = worker.expect("not called from a runtime thread");

// Transition the worker state to blocking. This will exit the fn early
// with `NotRead` if the pool does not have enough capacity to enter
// blocking mode.
worker.transition_to_blocking()
});

// If the transition cannot happen, exit early
try_ready!(res);

// Currently in blocking mode, so call the inner closure
let ret = f();

// Try to transition out of blocking mode. This is a fast path that takes
// back ownership of the worker if the worker handoff didn't complete yet.
Worker::with_current(|worker| {
// Worker must be set since it was above.
worker.unwrap()
.transition_from_blocking();
});

// Return the result
Ok(ret.into())
}
92 changes: 91 additions & 1 deletion tokio-threadpool/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ use callback::Callback;
use config::{Config, MAX_WORKERS};
use park::{BoxPark, BoxedPark, DefaultPark};
use sender::Sender;
use pool::Pool;
use pool::{Pool, MAX_BACKUP};
use thread_pool::ThreadPool;
use worker::{self, Worker, WorkerId};

Expand Down Expand Up @@ -63,6 +63,10 @@ pub struct Builder {
/// Number of workers to spawn
pool_size: usize,

/// Maximum number of futures that can be in a blocking section
/// concurrently.
max_blocking: usize,

/// Generates the `Park` instances
new_park: Box<Fn(&WorkerId) -> BoxPark>,
}
Expand Down Expand Up @@ -99,11 +103,14 @@ impl Builder {

Builder {
pool_size: num_cpus,
max_blocking: 100,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a reason this isn't stored on the Config?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Config object is passed into the pool. Values not in Config don't need to be stored and are just used to initialize state (in this case, populate the stack of sleeping threads).

config: Config {
keep_alive: None,
name_prefix: None,
stack_size: None,
around_worker: None,
after_start: None,
before_stop: None,
},
new_park,
}
Expand Down Expand Up @@ -138,6 +145,33 @@ impl Builder {
self
}

/// Set the maximum number of concurrent blocking sections.
///
/// This must be a number between 1 and 32,768 though it is advised to keep
/// this value on the smaller side.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when the number of blocking calls exceeds max_blocking? Should be called out here.

///
/// The default value is 100.
///
/// # Examples
///
/// ```
/// # extern crate tokio_threadpool;
/// # extern crate futures;
/// # use tokio_threadpool::Builder;
///
/// # pub fn main() {
/// // Create a thread pool with default configuration values
/// let thread_pool = Builder::new()
/// .max_blocking(200)
/// .build();
/// # }
/// ```
pub fn max_blocking(&mut self, val: usize) -> &mut Self {
assert!(val <= MAX_BACKUP, "max value is {}", MAX_BACKUP);
self.max_blocking = val;
self
}

/// Set the worker thread keep alive duration
///
/// If set, a worker thread will wait for up to the specified duration for
Expand Down Expand Up @@ -255,6 +289,61 @@ impl Builder {
self
}

/// Execute function `f` after each thread is started but before it starts
/// doing work.
///
/// This is intended for bookkeeping and monitoring use cases.
///
/// # Examples
///
/// ```
/// # extern crate tokio_threadpool;
/// # extern crate futures;
/// # use tokio_threadpool::Builder;
///
/// # pub fn main() {
/// // Create a thread pool with default configuration values
/// let thread_pool = Builder::new()
/// .after_start(|| {
/// println!("thread started");
/// })
/// .build();
/// # }
/// ```
pub fn after_start<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
{
self.config.after_start = Some(Arc::new(f));
self
}

/// Execute function `f` before each thread stops.
///
/// This is intended for bookkeeping and monitoring use cases.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Presumably this cannot be guaranteed to run (whereas after_start is guaranteed to run before work can be scheduled?). Maybe worth mentioning explicitly.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the thread exits, it should run (as long as the threadpool lib itself doesn't panic).

What sort of guarantee would you expect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically, i'm wondering how a process needs to manage the threadpool during process shutdown. Servers typically are waiting on some set of futures that complete once the process has received a shutdown signal. Once these futures complete, is there a way to shutdown the pool gracefully? If the pool is just dropped, are threads guaranteed to exit cleanly before the main thread exits?

///
/// # Examples
///
/// ```
/// # extern crate tokio_threadpool;
/// # extern crate futures;
/// # use tokio_threadpool::Builder;
///
/// # pub fn main() {
/// // Create a thread pool with default configuration values
/// let thread_pool = Builder::new()
/// .before_stop(|| {
/// println!("thread stopping");
/// })
/// .build();
/// # }
/// ```
pub fn before_stop<F>(&mut self, f: F) -> &mut Self
where F: Fn() + Send + Sync + 'static
{
self.config.before_stop = Some(Arc::new(f));
self
}

/// Customize the `park` instance used by each worker thread.
///
/// The provided closure `f` is called once per worker and returns a `Park`
Expand Down Expand Up @@ -331,6 +420,7 @@ impl Builder {
let inner = Arc::new(
Pool::new(
workers.into_boxed_slice(),
self.max_blocking,
self.config.clone()));

// Wrap with `Sender`
Expand Down
16 changes: 15 additions & 1 deletion tokio-threadpool/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,32 @@
use callback::Callback;

use std::fmt;
use std::sync::Arc;
use std::time::Duration;

/// Thread pool specific configuration values
#[derive(Debug, Clone)]
#[derive(Clone)]
pub(crate) struct Config {
pub keep_alive: Option<Duration>,
// Used to configure a worker thread
pub name_prefix: Option<String>,
pub stack_size: Option<usize>,
pub around_worker: Option<Callback>,
pub after_start: Option<Arc<Fn() + Send + Sync>>,
pub before_stop: Option<Arc<Fn() + Send + Sync>>,
}

/// Max number of workers that can be part of a pool. This is the most that can
/// fit in the scheduler state. Note, that this is the max number of **active**
/// threads. There can be more standby threads.
pub(crate) const MAX_WORKERS: usize = 1 << 15;

impl fmt::Debug for Config {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
fmt.debug_struct("Config")
.field("keep_alive", &self.keep_alive)
.field("name_prefix", &self.name_prefix)
.field("stack_size", &self.stack_size)
.finish()
}
}
6 changes: 5 additions & 1 deletion tokio-threadpool/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
#![deny(warnings, missing_docs, missing_debug_implementations)]

extern crate tokio_executor;
extern crate futures;

extern crate crossbeam_deque as deque;
#[macro_use]
extern crate futures;
extern crate num_cpus;
extern crate rand;

Expand All @@ -17,6 +19,7 @@ extern crate futures2;

pub mod park;

mod blocking;
mod builder;
mod callback;
mod config;
Expand All @@ -31,6 +34,7 @@ mod task;
mod thread_pool;
mod worker;

pub use blocking::{blocking, BlockingError};
pub use builder::Builder;
pub use sender::Sender;
pub use shutdown::Shutdown;
Expand Down
Loading