Skip to content

Buffered stream losing Send marker #2636

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Closed
Tuetuopay opened this issue Aug 16, 2022 · 7 comments
Closed

Buffered stream losing Send marker #2636

Tuetuopay opened this issue Aug 16, 2022 · 7 comments

Comments

@Tuetuopay
Copy link

Hi,

I hit a higher-ranked lifetime error when using buffered streams, where the compiler complains that it cannot prove the future consuming the stream are Send, despite the stream itself being Send.

use futures::stream::{empty, StreamExt};
use std::future::ready;

fn send<T: Send>(_: T) {}

fn main() {
    send(async {
        empty().map(ready::<&()>).buffered(0).next().await
    });
}

Resulting error:

   Compiling playground v0.0.1 (/playground)
error: higher-ranked lifetime error
 --> src/main.rs:7:5
  |
7 | /     send(async {
8 | |         empty().map(ready::<&()>).buffered(0).next().await
9 | |     });
  | |______^
  |
  = note: could not prove for<'r> impl futures::Future<Output = Option<&'r ()>>: std::marker::Send

error: could not compile `playground` due to previous error

Buffered is the key here, as dropping it to use .then() instead of .map() + buffered() compiles fine. This shows that the stream and its future is definitely send, and the buffering makes it weird. I cannot see which bound in the Buffered struct/future makes the resulting stream !Send.

use futures::stream::{empty, StreamExt};
use std::future::ready;

fn send<T: Send>(_: T) {}

fn main() {
    send(async {
        empty().then(ready::<&()>).next().await
    });
}

Obviously, I need buffered for parallelism of the futures, as they do some long-ish network calls; and the Send requirement comes from Tonic's usage of async_trait that requires futures to be Send.

Thanks!

@taiki-e
Copy link
Member

taiki-e commented Aug 16, 2022

Hmm, if the stream, future, and output are Send, buffered should also be Send.

assert_impl!(Buffered<SendStream<SendFuture<()>>>: Send);
assert_not_impl!(Buffered<SendStream<SendFuture>>: Send);
assert_not_impl!(Buffered<SendStream<LocalFuture>>: Send);
assert_not_impl!(Buffered<LocalStream<SendFuture<()>>>: Send);

@Tuetuopay
Copy link
Author

Yeah, hence why I completely fail to see how the compiler gets confused here, and why some combinations do work while some don't. Do you think it could be a rustc bug?

@aleb
Copy link

aleb commented Aug 31, 2022

Can be worked around with:

use futures::stream::{empty, StreamExt};
use std::future::ready;
use std::sync::Arc;

fn send<T: Send>(_: T) {}

fn main() {
    // send(async { empty().map(ready::<&()>).buffered(0).next().await });
    let sem = Arc::new(tokio::sync::Semaphore::new(5));
    send(async {
        empty()
            .map(ready::<&()>)
            .map(|x| {
                let sem = sem.clone();
                async move {
                    // This must be `_permit` not just `_` otherwise the object is destroyed immediately.
                    // https://users.rust-lang.org/t/tokio-semaphore-mystery-acquire-vs-aquire-owned/79646/3
                    let _permit = sem.acquire().await.unwrap();
                    x
                }
            })
            .next()
            .await
    });
    println!("done");
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=19ec390a8091772f8378597fa03ee038

@mikeyhew
Copy link

mikeyhew commented Sep 11, 2022

I ran into this issue today. I won't post a code example because it's not materially different than the one that @Tuetuopay posted, but I should point out that the same issue exists for buffer_unordered as well as buffered.

It does seem like a rustc bug or at least a limitation of the current compiler, especially given the "could not prove" error message and the fact that it's not giving a concrete reason why the future is not Send.

Another workaround is if you call .boxed() on the stream before calling buffered on it:

use futures::stream::{empty, StreamExt};
use std::future::ready;

fn send<T: Send>(_: T) {}

fn main() {
    send(async {
        empty().map(ready::<&()>).boxed().buffered(0).next().await
    });
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=9c086487d1ff76b309796efa45846f67

@Tuetuopay
Copy link
Author

@mikeyhew wait what? that actually works, including in my actual code that's much more complex than the example. thank you so much!

With this it's more and more looks like a rustc bug, because the Box<_> version of the stream will have literally written in its type that it's Send.

@mikeyhew
Copy link

I just finished debugging a bunch of weird errors that ultimately was caused by this issue again — I must have forgotten to add the .boxed() workaround, and when I added it the errors went away — so I'm going to open an issue on https://github.com/rust-lang/rust referencing this, since it's gotta be a bug in rustc.

@taiki-e
Copy link
Member

taiki-e commented Nov 14, 2022

Closing in favor of the upstream issue.

Thanks all for the investigation!

@taiki-e taiki-e closed this as completed Nov 14, 2022
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants