diff --git a/tokio-util/Cargo.toml b/tokio-util/Cargo.toml index 4b406d81865..f16099d7829 100644 --- a/tokio-util/Cargo.toml +++ b/tokio-util/Cargo.toml @@ -34,7 +34,7 @@ rt = ["tokio/rt", "tokio/sync", "futures-util", "hashbrown"] __docs_rs = ["futures-util"] [dependencies] -tokio = { version = "1.22.0", path = "../tokio", features = ["sync"] } +tokio = { version = "1.28.0", path = "../tokio", features = ["sync"] } bytes = "1.0.0" futures-core = "0.3.0" futures-sink = "0.3.0" diff --git a/tokio-util/src/task/join_map.rs b/tokio-util/src/task/join_map.rs index c6bf5bc241a..c9aed537d4b 100644 --- a/tokio-util/src/task/join_map.rs +++ b/tokio-util/src/task/join_map.rs @@ -316,6 +316,60 @@ where self.insert(key, task); } + /// Spawn the blocking code on the blocking threadpool and store it in this `JoinMap` with the provided + /// key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_next`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// Note that blocking tasks cannot be cancelled after execution starts. + /// Replaced blocking tasks will still run to completion if the task has begun + /// to execute when it is replaced. A blocking task which is replaced before + /// it has been scheduled on a blocking worker thread will be cancelled. + /// + /// # Panics + /// + /// This method panics if called outside of a Tokio runtime. + /// + /// [`join_next`]: Self::join_next + #[track_caller] + pub fn spawn_blocking(&mut self, key: K, f: F) + where + F: FnOnce() -> V, + F: Send + 'static, + V: Send, + { + let task = self.tasks.spawn_blocking(f); + self.insert(key, task) + } + + /// Spawn the blocking code on the blocking threadpool of the provided runtime and store it in this + /// `JoinMap` with the provided key. + /// + /// If a task previously existed in the `JoinMap` for this key, that task + /// will be cancelled and replaced with the new one. The previous task will + /// be removed from the `JoinMap`; a subsequent call to [`join_next`] will + /// *not* return a cancelled [`JoinError`] for that task. + /// + /// Note that blocking tasks cannot be cancelled after execution starts. + /// Replaced blocking tasks will still run to completion if the task has begun + /// to execute when it is replaced. A blocking task which is replaced before + /// it has been scheduled on a blocking worker thread will be cancelled. + /// + /// [`join_next`]: Self::join_next + #[track_caller] + pub fn spawn_blocking_on(&mut self, key: K, f: F, handle: &Handle) + where + F: FnOnce() -> V, + F: Send + 'static, + V: Send, + { + let task = self.tasks.spawn_blocking_on(f, handle); + self.insert(key, task); + } + /// Spawn the provided task on the current [`LocalSet`] and store it in this /// `JoinMap` with the provided key. ///