diff --git a/src/runtime/threadpool/builder.rs b/src/runtime/threadpool/builder.rs index 5a5fb7f..cce0d7e 100644 --- a/src/runtime/threadpool/builder.rs +++ b/src/runtime/threadpool/builder.rs @@ -4,9 +4,12 @@ use std::{ io, sync::{Arc, RwLock}, }; +use std::fmt; use tokio_02::runtime; use tokio_timer_02::clock as clock_02; +type Callback = std::sync::Arc; + /// Builds a compatibility runtime with custom configuration values. /// /// This runtime is compatible with code using both the current release version @@ -43,11 +46,16 @@ use tokio_timer_02::clock as clock_02; /// // use runtime ... /// } /// ``` -#[derive(Debug)] #[cfg_attr(docsrs, doc(cfg(feature = "rt-full")))] pub struct Builder { inner: runtime::Builder, clock: clock_02::Clock, + + /// Callback to run after each thread starts. + pub(super) after_start: Option, + + /// To run before each worker thread stops + pub(super) before_stop: Option, } impl Builder { @@ -59,6 +67,9 @@ impl Builder { Builder { clock: clock_02::Clock::system(), inner: runtime::Builder::new(), + // No worker thread callbacks + after_start: None, + before_stop: None, } } @@ -119,6 +130,59 @@ impl Builder { self } + /// Executes function `f` after each thread is started but before it starts + /// doing work. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # use tokio_compat::runtime; + /// + /// # pub fn main() { + /// let runtime = runtime::Builder::new() + /// .on_thread_start(|| { + /// println!("thread started"); + /// }) + /// .build(); + /// # } + /// ``` + #[cfg(not(loom))] + pub fn on_thread_start(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.after_start = Some(Arc::new(f)); + self + } + + /// Executes function `f` before each thread stops. + /// + /// This is intended for bookkeeping and monitoring use cases. + /// + /// # Examples + /// + /// ``` + /// # use tokio_compat::runtime; + /// + /// # pub fn main() { + /// let runtime = runtime::Builder::new() + /// .on_thread_stop(|| { + /// println!("thread stopping"); + /// }) + /// .build(); + /// # } + /// ``` + #[cfg(not(loom))] + pub fn on_thread_stop(&mut self, f: F) -> &mut Self + where + F: Fn() + Send + Sync + 'static, + { + self.before_stop = Some(Arc::new(f)); + self + } + /// Set the stack size (in bytes) for worker threads. /// /// The actual stack size may be greater than this value if the platform @@ -181,11 +245,18 @@ impl Builder { let compat_sender2 = Arc::downgrade(&compat_sender); let mut lock = compat_sender.write().unwrap(); + let after_start = self.after_start.clone(); + let before_stop = self.before_stop.clone(); + let runtime = self .inner .threaded_scheduler() .enable_all() .on_thread_start(move || { + match &after_start { + Some(callback) => callback(), + None => {}, + } // We need the threadpool's sender to set up the default tokio // 0.1 executor. We also need to upgrade the weak pointer. If the // pointer is no longer valid, then the runtime has shut down and the @@ -202,7 +273,11 @@ impl Builder { .clone(); compat::set_guards(compat_sender, &compat_timer, &compat_reactor); }) - .on_thread_stop(|| { + .on_thread_stop(move || { + match &before_stop { + Some(callback) => callback(), + None => {}, + } compat::unset_guards(); }) .build()?; @@ -227,3 +302,12 @@ impl Default for Builder { Self::new() } } + +impl fmt::Debug for Builder { + fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt.debug_struct("Builder") + .field("inner", &self.inner) + .field("clock", &self.clock) + .finish() + } +} \ No newline at end of file