Skip to content

Commit 0284d1b

Browse files
authored
macros: make select! budget-aware (#7164)
1 parent 710bc80 commit 0284d1b

File tree

4 files changed

+64
-5
lines changed

4 files changed

+64
-5
lines changed

tokio/src/macros/select.rs

+4
Original file line numberDiff line numberDiff line change
@@ -660,6 +660,10 @@ doc! {macro_rules! select {
660660
let mut futures = &mut futures;
661661

662662
$crate::macros::support::poll_fn(|cx| {
663+
// Return `Pending` when the task budget is depleted since budget-aware futures
664+
// are going to yield anyway and other futures will not cooperate.
665+
::std::task::ready!($crate::macros::support::poll_budget_available(cx));
666+
663667
// Track if any branch returns pending. If no branch completes
664668
// **or** returns pending, this implies that all branches are
665669
// disabled.

tokio/src/macros/support.rs

+17-1
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,24 @@ cfg_macros! {
77
pub fn thread_rng_n(n: u32) -> u32 {
88
crate::runtime::context::thread_rng_n(n)
99
}
10+
11+
cfg_coop! {
12+
#[doc(hidden)]
13+
#[inline]
14+
pub fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> {
15+
crate::task::coop::poll_budget_available(cx)
16+
}
17+
}
18+
19+
cfg_not_coop! {
20+
#[doc(hidden)]
21+
#[inline]
22+
pub fn poll_budget_available(_: &mut Context<'_>) -> Poll<()> {
23+
Poll::Ready(())
24+
}
25+
}
1026
}
1127

1228
pub use std::future::{Future, IntoFuture};
1329
pub use std::pin::Pin;
14-
pub use std::task::Poll;
30+
pub use std::task::{Context, Poll};

tokio/src/task/coop/mod.rs

+19-4
Original file line numberDiff line numberDiff line change
@@ -305,12 +305,27 @@ cfg_coop! {
305305

306306
Poll::Ready(restore)
307307
} else {
308-
defer(cx);
308+
register_waker(cx);
309309
Poll::Pending
310310
}
311311
}).unwrap_or(Poll::Ready(RestoreOnPending(Cell::new(Budget::unconstrained()))))
312312
}
313313

314+
/// Returns `Poll::Ready` if the current task has budget to consume, and `Poll::Pending` otherwise.
315+
///
316+
/// Note that in contrast to `poll_proceed`, this method does not consume any budget and is used when
317+
/// polling for budget availability.
318+
#[inline]
319+
pub(crate) fn poll_budget_available(cx: &mut Context<'_>) -> Poll<()> {
320+
if has_budget_remaining() {
321+
Poll::Ready(())
322+
} else {
323+
register_waker(cx);
324+
325+
Poll::Pending
326+
}
327+
}
328+
314329
cfg_rt! {
315330
cfg_unstable_metrics! {
316331
#[inline(always)]
@@ -326,7 +341,7 @@ cfg_coop! {
326341
fn inc_budget_forced_yield_count() {}
327342
}
328343

329-
fn defer(cx: &mut Context<'_>) {
344+
fn register_waker(cx: &mut Context<'_>) {
330345
context::defer(cx.waker());
331346
}
332347
}
@@ -335,8 +350,8 @@ cfg_coop! {
335350
#[inline(always)]
336351
fn inc_budget_forced_yield_count() {}
337352

338-
fn defer(cx: &mut Context<'_>) {
339-
cx.waker().wake_by_ref();
353+
fn register_waker(cx: &mut Context<'_>) {
354+
cx.waker().wake_by_ref()
340355
}
341356
}
342357

tokio/tests/macros_select.rs

+24
Original file line numberDiff line numberDiff line change
@@ -716,3 +716,27 @@ async fn temporary_lifetime_extension() {
716716
() = &mut std::future::ready(()) => {},
717717
}
718718
}
719+
720+
#[tokio::test]
721+
async fn select_is_budget_aware() {
722+
const BUDGET: usize = 128;
723+
724+
let task = || {
725+
Box::pin(async move {
726+
tokio::select! {
727+
biased;
728+
729+
() = tokio::task::coop::consume_budget() => {},
730+
() = std::future::ready(()) => {}
731+
}
732+
})
733+
};
734+
735+
for _ in 0..BUDGET {
736+
let poll = futures::poll!(&mut task());
737+
assert!(poll.is_ready());
738+
}
739+
740+
let poll = futures::poll!(&mut task());
741+
assert!(poll.is_pending());
742+
}

0 commit comments

Comments
 (0)