Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Cannot be shared between threads safely #241

Closed
frederikbosch opened this issue Jan 15, 2020 · 7 comments
Closed

Cannot be shared between threads safely #241

frederikbosch opened this issue Jan 15, 2020 · 7 comments

Comments

@frederikbosch
Copy link

frederikbosch commented Jan 15, 2020

Bug Report

I cannot solve the following error. I guess I lack knowledge, but I have no idea where to start solving this one.

error[E0277]: `(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely

Version

cargo / rust 1.40.0
tonic 0.1.0.

Platform

Ubuntu 18.04

Crates

reqwest = { version = "0.10.0", features = ["json"]}

Description

What I am trying to is the following. I have service that receives a gRPC call from client. That service will fire a HTTP request via reqwest before it answers the client. That reqwest is also async. In pseudo-code (sort of) I have the following.

impl Initialization for InitializationService {
  type InitializeStream = Pin<Box<dyn Stream<Item = Result<InitializeResponse, Status>> + Send + Sync + 'static>>;

  async fn initialize(&self, request: Request<tonic::Streaming<InitializeRequest>>) -> Result<Response<Self::InitializeStream>, Status> {
    // ....
    let stream = request.into_inner();
    let output = async_stream::try_stream! {
       futures::pin_mut!(stream);
       while let Some(a) = stream.next().await {
          &lib.fire_http_req(a.x.clone(), y, z).await;
       }
    };
    // ....
  }
}

And then in the library I am calling I have.

impl Lib {
  pub async fn fire_http_req(&self, x: PathBuf, y: String, z: String) -> Result<(), String> {
    let resp: Vec<serde_json::Value> = reqwest::get("http://localhost")
            .await
            .unwrap()
            .json()
            .await
            .unwrap();
   Ok(())
  }
}

The result is huge error.

error[E0277]: `(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)` cannot be shared between threads safely
  |
  = help: the trait `std::marker::Sync` is not implemented for `(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)`
  = note: required because of the requirements on the impl of `std::marker::Sync` for `std::ptr::Unique<(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)>`
  = note: required because it appears within the type `std::boxed::Box<(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)>`
  = note: required because it appears within the type `std::pin::Pin<std::boxed::Box<(dyn core::future::future::Future<Output = std::result::Result<http::response::Response<hyper::body::body::Body>, hyper::error::Error>> + std::marker::Send + 'static)>>`
  = note: required because it appears within the type `hyper::client::ResponseFuture`
@LucioFranco
Copy link
Member

This is a unfortunate consequence of trait objects. Hyper's response future doesn't impl Sync but streams currently require that. I would suggest spawning the future that awaits the hyper request and use a channel to get the reply back. Since the channel will impl Sync.

@frederikbosch
Copy link
Author

Thanks, I already had the idea that I had to wrap the operation. I will give it a try and if it does not work, I might come back here and reopen the issue. Thanks for the suggestion, and again for the wonderful library.

@frederikbosch
Copy link
Author

@LucioFranco Maybe you could include an example in the docs how to solve an issue like this. When I started using a channel, I got into more problems (type inside `async` block must be known in this context). Because of the errors originating inside macros - I guess #[tonic::async_trait] - I found debugging very hard.

Instead of streaming I will redesign my gRPC proto and use repeated. Hopefully this leads to better results.

@ghost
Copy link

ghost commented Jan 17, 2020

@frederikbosch this #117 (comment) might help since it is quite similar to your case, I think it might also be solved by spawning a task using tokio::spawn and awaiting on the returned JoinHandle but I haven't tried it yet.

@LucioFranco
Copy link
Member

@frederikbosch does this example help at all? https://github.com/hyperium/tonic/blob/master/examples/src/routeguide/server.rs#L51

@frederikbosch
Copy link
Author

I used the suggestion by @abdul-rehman0, and that worked. Fantastic. My own not working code used mpsc::channel instead of oneshot::channel, it looked more the same as line referenced by @LucioFranco.

However, I still have to add #[recursion_limit = "256"] according to the compiler. Maybe I refused to do that with the mpsc code, and that was why it not worked, but that I cannot recall. Why could that recursion limit be required?

Anyway, thank you both for helping me out.

@LucioFranco
Copy link
Member

@frederikbosch the recursion limit is usually due to usage of a select! but adding that isn't a big deal.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants