Skip to content

Commit 2500e5a

Browse files
authored
feat: make TerminatableTask terminate itself when dropped (#1151)
1 parent 93f93d2 commit 2500e5a

File tree

4 files changed

+18
-34
lines changed

4 files changed

+18
-34
lines changed

commons/zenoh-task/src/lib.rs

+17-9
Original file line numberDiff line numberDiff line change
@@ -130,10 +130,16 @@ impl TaskController {
130130
}
131131

132132
pub struct TerminatableTask {
133-
handle: JoinHandle<()>,
133+
handle: Option<JoinHandle<()>>,
134134
token: CancellationToken,
135135
}
136136

137+
impl Drop for TerminatableTask {
138+
fn drop(&mut self) {
139+
self.terminate(std::time::Duration::from_secs(10));
140+
}
141+
}
142+
137143
impl TerminatableTask {
138144
pub fn create_cancellation_token() -> CancellationToken {
139145
CancellationToken::new()
@@ -147,7 +153,7 @@ impl TerminatableTask {
147153
T: Send + 'static,
148154
{
149155
TerminatableTask {
150-
handle: rt.spawn(future.map(|_f| ())),
156+
handle: Some(rt.spawn(future.map(|_f| ()))),
151157
token,
152158
}
153159
}
@@ -168,24 +174,26 @@ impl TerminatableTask {
168174
};
169175

170176
TerminatableTask {
171-
handle: rt.spawn(task),
177+
handle: Some(rt.spawn(task)),
172178
token,
173179
}
174180
}
175181

176182
/// Attempts to terminate the task.
177183
/// Returns true if task completed / aborted within timeout duration, false otherwise.
178-
pub fn terminate(self, timeout: Duration) -> bool {
184+
pub fn terminate(&mut self, timeout: Duration) -> bool {
179185
ResolveFuture::new(async move { self.terminate_async(timeout).await }).res_sync()
180186
}
181187

182188
/// Async version of [`TerminatableTask::terminate()`].
183-
pub async fn terminate_async(self, timeout: Duration) -> bool {
189+
pub async fn terminate_async(&mut self, timeout: Duration) -> bool {
184190
self.token.cancel();
185-
if tokio::time::timeout(timeout, self.handle).await.is_err() {
186-
tracing::error!("Failed to terminate the task");
187-
return false;
188-
};
191+
if let Some(handle) = self.handle.take() {
192+
if tokio::time::timeout(timeout, handle).await.is_err() {
193+
tracing::error!("Failed to terminate the task");
194+
return false;
195+
};
196+
}
189197
true
190198
}
191199
}

zenoh-ext/src/publication_cache.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ impl<'a> PublicationCache<'a> {
257257
let PublicationCache {
258258
_queryable,
259259
local_sub,
260-
task,
260+
mut task,
261261
} = self;
262262
_queryable.undeclare().res_async().await?;
263263
local_sub.undeclare().res_async().await?;

zenoh/src/net/routing/hat/linkstate_peer/mod.rs

-10
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,6 @@ use std::{
4747
any::Any,
4848
collections::{HashMap, HashSet},
4949
sync::Arc,
50-
time::Duration,
5150
};
5251
use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher, ZenohId};
5352
use zenoh_protocol::{
@@ -116,15 +115,6 @@ struct HatTables {
116115
peers_trees_task: Option<TerminatableTask>,
117116
}
118117

119-
impl Drop for HatTables {
120-
fn drop(&mut self) {
121-
if self.peers_trees_task.is_some() {
122-
let task = self.peers_trees_task.take().unwrap();
123-
task.terminate(Duration::from_secs(10));
124-
}
125-
}
126-
}
127-
128118
impl HatTables {
129119
fn new() -> Self {
130120
Self {

zenoh/src/net/routing/hat/router/mod.rs

-14
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,6 @@ use std::{
5252
collections::{hash_map::DefaultHasher, HashMap, HashSet},
5353
hash::Hasher,
5454
sync::Arc,
55-
time::Duration,
5655
};
5756
use zenoh_config::{unwrap_or_default, ModeDependent, WhatAmI, WhatAmIMatcher, ZenohId};
5857
use zenoh_protocol::{
@@ -127,19 +126,6 @@ struct HatTables {
127126
router_peers_failover_brokering: bool,
128127
}
129128

130-
impl Drop for HatTables {
131-
fn drop(&mut self) {
132-
if self.peers_trees_task.is_some() {
133-
let task = self.peers_trees_task.take().unwrap();
134-
task.terminate(Duration::from_secs(10));
135-
}
136-
if self.routers_trees_task.is_some() {
137-
let task = self.routers_trees_task.take().unwrap();
138-
task.terminate(Duration::from_secs(10));
139-
}
140-
}
141-
}
142-
143129
impl HatTables {
144130
fn new(router_peers_failover_brokering: bool) -> Self {
145131
Self {

0 commit comments

Comments
 (0)