From 2ac71e40533b7965eb0a1ec6b5699a780fa57100 Mon Sep 17 00:00:00 2001 From: Ryan Levick Date: Fri, 27 Oct 2023 18:51:45 +0200 Subject: [PATCH] Fix streaming request bodies Signed-off-by: Ryan Levick --- .../outbound-http-to-same-app/src/lib.rs | 2 +- sdk/rust/readme.md | 2 +- sdk/rust/src/http.rs | 15 +++++++++++---- sdk/rust/src/http/executor.rs | 3 +-- 4 files changed, 14 insertions(+), 8 deletions(-) diff --git a/examples/http-rust-outbound-http/outbound-http-to-same-app/src/lib.rs b/examples/http-rust-outbound-http/outbound-http-to-same-app/src/lib.rs index cf5038ea8a..180af7cc75 100644 --- a/examples/http-rust-outbound-http/outbound-http-to-same-app/src/lib.rs +++ b/examples/http-rust-outbound-http/outbound-http-to-same-app/src/lib.rs @@ -7,7 +7,7 @@ use spin_sdk::{ /// Send an HTTP request and return the response. #[http_component] async fn send_outbound(_req: Request) -> Result { - let mut res: http::Response<()> = spin_sdk::http::send( + let mut res: http::Response = spin_sdk::http::send( http::Request::builder() .method("GET") .uri("/hello") diff --git a/sdk/rust/readme.md b/sdk/rust/readme.md index ef24d00a4b..c7cbf89fee 100644 --- a/sdk/rust/readme.md +++ b/sdk/rust/readme.md @@ -41,7 +41,7 @@ server, modifies the result, then returns it: ```rust #[http_component] async fn hello_world(_req: Request) -> Result { - let mut res: http::Response<()> = spin_sdk::http::send( + let mut res: http::Response = spin_sdk::http::send( http::Request::builder() .method("GET") .uri("https://fermyon.com") diff --git a/sdk/rust/src/http.rs b/sdk/rust/src/http.rs index 536edadbe1..554336f2bd 100644 --- a/sdk/rust/src/http.rs +++ b/sdk/rust/src/http.rs @@ -535,6 +535,9 @@ impl ResponseOutparam { } /// Send an outgoing request +/// +/// If `request`` is an `OutgoingRequest` and you are streaming the body to the +/// outgoing request body sink, you need to ensure it is dropped before awaiting this function. pub async fn send(request: I) -> Result where I: TryIntoOutgoingRequest, @@ -544,18 +547,22 @@ where { let (request, body_buffer) = I::try_into_outgoing_request(request) .map_err(|e| SendError::RequestConversion(e.into()))?; - if let Some(body_buffer) = body_buffer { + let response = if let Some(body_buffer) = body_buffer { // It is part of the contract of the trait that implementors of `TryIntoOutgoingRequest` // do not call `OutgoingRequest::write`` if they return a buffered body. let mut body_sink = request.take_body(); + let response = executor::outgoing_request_send(request); body_sink .send(body_buffer) .await .map_err(|e| SendError::Http(Error::UnexpectedError(e.to_string())))?; + // The body sink needs to be dropped before we await the response, otherwise we deadlock + drop(body_sink); + response.await + } else { + executor::outgoing_request_send(request).await } - let response = executor::outgoing_request_send(request) - .await - .map_err(SendError::Http)?; + .map_err(SendError::Http)?; TryFromIncomingResponse::try_from_incoming_response(response) .await diff --git a/sdk/rust/src/http/executor.rs b/sdk/rust/src/http/executor.rs index b14c480ce7..aedc7182d5 100644 --- a/sdk/rust/src/http/executor.rs +++ b/sdk/rust/src/http/executor.rs @@ -143,9 +143,8 @@ pub(crate) fn outgoing_body(body: OutgoingBody) -> impl Sink, Error = ty pub(crate) fn outgoing_request_send( request: OutgoingRequest, ) -> impl Future> { + let response = outgoing_handler::handle(request, None); future::poll_fn({ - let response = outgoing_handler::handle(request, None); - move |context| match &response { Ok(response) => { if let Some(response) = response.get() {