Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

expose methods to monitor threads when threads on_start and on_stop #35

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 86 additions & 2 deletions src/runtime/threadpool/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,12 @@ use std::{
io,
sync::{Arc, RwLock},
};
use std::fmt;
use tokio_02::runtime;
use tokio_timer_02::clock as clock_02;

type Callback = std::sync::Arc<dyn Fn() + Send + Sync>;

/// Builds a compatibility runtime with custom configuration values.
///
/// This runtime is compatible with code using both the current release version
Expand Down Expand Up @@ -43,11 +46,16 @@ use tokio_timer_02::clock as clock_02;
/// // use runtime ...
/// }
/// ```
#[derive(Debug)]
#[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))]
pub struct Builder {
inner: runtime::Builder,
clock: clock_02::Clock,

/// Callback to run after each thread starts.
pub(super) after_start: Option<Callback>,

/// To run before each worker thread stops
pub(super) before_stop: Option<Callback>,
}

impl Builder {
Expand All @@ -59,6 +67,9 @@ impl Builder {
Builder {
clock: clock_02::Clock::system(),
inner: runtime::Builder::new(),
// No worker thread callbacks
after_start: None,
before_stop: None,
}
}

Expand Down Expand Up @@ -119,6 +130,59 @@ impl Builder {
self
}

/// Executes function `f` after each thread is started but before it starts
/// doing work.
///
/// This is intended for bookkeeping and monitoring use cases.
///
/// # Examples
///
/// ```
/// # use tokio_compat::runtime;
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new()
/// .on_thread_start(|| {
/// println!("thread started");
/// })
/// .build();
/// # }
/// ```
#[cfg(not(loom))]
pub fn on_thread_start<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.after_start = Some(Arc::new(f));
self
}

/// Executes function `f` before each thread stops.
///
/// This is intended for bookkeeping and monitoring use cases.
///
/// # Examples
///
/// ```
/// # use tokio_compat::runtime;
///
/// # pub fn main() {
/// let runtime = runtime::Builder::new()
/// .on_thread_stop(|| {
/// println!("thread stopping");
/// })
/// .build();
/// # }
/// ```
#[cfg(not(loom))]
pub fn on_thread_stop<F>(&mut self, f: F) -> &mut Self
where
F: Fn() + Send + Sync + 'static,
{
self.before_stop = Some(Arc::new(f));
self
}

/// Set the stack size (in bytes) for worker threads.
///
/// The actual stack size may be greater than this value if the platform
Expand Down Expand Up @@ -181,11 +245,18 @@ impl Builder {
let compat_sender2 = Arc::downgrade(&compat_sender);
let mut lock = compat_sender.write().unwrap();

let after_start = self.after_start.clone();
let before_stop = self.before_stop.clone();

let runtime = self
.inner
.threaded_scheduler()
.enable_all()
.on_thread_start(move || {
match &after_start {
Some(callback) => callback(),
None => {},
}
// We need the threadpool's sender to set up the default tokio
// 0.1 executor. We also need to upgrade the weak pointer. If the
// pointer is no longer valid, then the runtime has shut down and the
Expand All @@ -202,7 +273,11 @@ impl Builder {
.clone();
compat::set_guards(compat_sender, &compat_timer, &compat_reactor);
})
.on_thread_stop(|| {
.on_thread_stop(move || {
match &before_stop {
Some(callback) => callback(),
None => {},
}
compat::unset_guards();
})
.build()?;
Expand All @@ -227,3 +302,12 @@ impl Default for Builder {
Self::new()
}
}

impl fmt::Debug for Builder {
fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
fmt.debug_struct("Builder")
.field("inner", &self.inner)
.field("clock", &self.clock)
.finish()
}
}