Skip to content

Commit

Permalink
Merge pull request #2 from lann/backport-7426
Browse files Browse the repository at this point in the history
  • Loading branch information
dicej authored Nov 8, 2023
2 parents 2448517 + 365c841 commit bdff582
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 4 deletions.
5 changes: 5 additions & 0 deletions crates/wasi-http/src/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ pub type HyperIncomingBody = BoxBody<Bytes, anyhow::Error>;
/// want to do that unless the user asks to consume the body.
pub struct HostIncomingBodyBuilder {
pub body: HyperIncomingBody,
pub worker: Option<Arc<AbortOnDropJoinHandle<anyhow::Result<()>>>>,
pub between_bytes_timeout: Duration,
}

Expand Down Expand Up @@ -96,6 +97,10 @@ impl HostIncomingBodyBuilder {
// loop running to relieve backpressure, so we get to the trailers.
let _ = body_writer.send(Ok(data)).await;
}

// Force the builder's worker to be owned by this task, ensuring that it's kept around
// until this task exits.
drop(self.worker);
});

HostIncomingBody {
Expand Down
8 changes: 5 additions & 3 deletions crates/wasi-http/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use crate::{
use anyhow::Context;
use http_body_util::BodyExt;
use std::any::Any;
use std::sync::Arc;
use std::time::Duration;
use tokio::net::TcpStream;
use tokio::time::timeout;
Expand Down Expand Up @@ -37,6 +38,7 @@ pub trait WasiHttpView: Send {
let (parts, body) = req.into_parts();
let body = HostIncomingBodyBuilder {
body,
worker: None,
// TODO: this needs to be plumbed through
between_bytes_timeout: std::time::Duration::from_millis(600 * 1000),
};
Expand Down Expand Up @@ -159,7 +161,7 @@ pub fn default_send_request(

Ok(IncomingResponseInternal {
resp,
worker,
worker: Arc::new(worker),
between_bytes_timeout,
})
});
Expand Down Expand Up @@ -220,7 +222,7 @@ pub struct HostIncomingResponse {
pub status: u16,
pub headers: FieldMap,
pub body: Option<HostIncomingBodyBuilder>,
pub worker: AbortOnDropJoinHandle<anyhow::Result<()>>,
pub worker: Arc<AbortOnDropJoinHandle<anyhow::Result<()>>>,
}

pub struct HostOutgoingResponse {
Expand Down Expand Up @@ -271,7 +273,7 @@ pub enum HostFields {

pub struct IncomingResponseInternal {
pub resp: hyper::Response<HyperIncomingBody>,
pub worker: AbortOnDropJoinHandle<anyhow::Result<()>>,
pub worker: Arc<AbortOnDropJoinHandle<anyhow::Result<()>>>,
pub between_bytes_timeout: std::time::Duration,
}

Expand Down
3 changes: 2 additions & 1 deletion crates/wasi-http/src/types_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::{
},
};
use anyhow::Context;
use std::any::Any;
use std::{any::Any, sync::Arc};
use wasmtime::component::Resource;
use wasmtime_wasi::preview2::{
bindings::io::streams::{InputStream, OutputStream},
Expand Down Expand Up @@ -541,6 +541,7 @@ impl<T: WasiHttpView> crate::bindings::http::types::HostFutureIncomingResponse f
headers: FieldMap::from(parts.headers),
body: Some(HostIncomingBodyBuilder {
body,
worker: Some(Arc::clone(&resp.worker)),
between_bytes_timeout: resp.between_bytes_timeout,
}),
worker: resp.worker,
Expand Down

0 comments on commit bdff582

Please # to comment.