Skip to content

Commit 9ae19f8

Browse files
Sushisourcetaiki-e
authored andcommitted
Fix incorrect termination of select_with_strategy streams (#2635)
1 parent 7bff2be commit 9ae19f8

File tree

2 files changed

+50
-3
lines changed

2 files changed

+50
-3
lines changed

futures-util/src/stream/select_with_strategy.rs

+8-3
Original file line numberDiff line numberDiff line change
@@ -231,18 +231,23 @@ where
231231
St1: Stream,
232232
St2: Stream<Item = St1::Item>,
233233
{
234-
match poll_side(select, side, cx) {
234+
let first_done = match poll_side(select, side, cx) {
235235
Poll::Ready(Some(item)) => return Poll::Ready(Some(item)),
236236
Poll::Ready(None) => {
237237
select.internal_state.finish(side);
238+
true
238239
}
239-
Poll::Pending => (),
240+
Poll::Pending => false,
240241
};
241242
let other = side.other();
242243
match poll_side(select, other, cx) {
243244
Poll::Ready(None) => {
244245
select.internal_state.finish(other);
245-
Poll::Ready(None)
246+
if first_done {
247+
Poll::Ready(None)
248+
} else {
249+
Poll::Pending
250+
}
246251
}
247252
a => a,
248253
}

futures/tests/stream.rs

+42
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
1+
use std::cell::Cell;
12
use std::iter;
3+
use std::pin::Pin;
4+
use std::rc::Rc;
25
use std::sync::Arc;
6+
use std::task::Context;
37

48
use futures::channel::mpsc;
59
use futures::executor::block_on;
@@ -9,6 +13,7 @@ use futures::sink::SinkExt;
913
use futures::stream::{self, StreamExt};
1014
use futures::task::Poll;
1115
use futures::{ready, FutureExt};
16+
use futures_core::Stream;
1217
use futures_test::task::noop_context;
1318

1419
#[test]
@@ -419,3 +424,40 @@ fn ready_chunks() {
419424
assert_eq!(s.next().await.unwrap(), vec![4]);
420425
});
421426
}
427+
428+
struct SlowStream {
429+
times_should_poll: usize,
430+
times_polled: Rc<Cell<usize>>,
431+
}
432+
impl Stream for SlowStream {
433+
type Item = usize;
434+
435+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
436+
self.times_polled.set(self.times_polled.get() + 1);
437+
if self.times_polled.get() % 2 == 0 {
438+
cx.waker().wake_by_ref();
439+
return Poll::Pending;
440+
}
441+
if self.times_polled.get() >= self.times_should_poll {
442+
return Poll::Ready(None);
443+
}
444+
Poll::Ready(Some(self.times_polled.get()))
445+
}
446+
}
447+
448+
#[test]
449+
fn select_with_strategy_doesnt_terminate_early() {
450+
for side in [stream::PollNext::Left, stream::PollNext::Right] {
451+
let times_should_poll = 10;
452+
let count = Rc::new(Cell::new(0));
453+
let b = stream::iter([10, 20]);
454+
455+
let mut selected = stream::select_with_strategy(
456+
SlowStream { times_should_poll, times_polled: count.clone() },
457+
b,
458+
|_: &mut ()| side,
459+
);
460+
block_on(async move { while selected.next().await.is_some() {} });
461+
assert_eq!(count.get(), times_should_poll + 1);
462+
}
463+
}

0 commit comments

Comments
 (0)