From 365c841dd0dd468bde4825f2625262f67b318370 Mon Sep 17 00:00:00 2001 From: Trevor Elliott Date: Tue, 31 Oct 2023 10:54:43 -0700 Subject: [PATCH] Keep the worker handle in an Arc to fix 7413 (#7426) --- crates/wasi-http/src/body.rs | 5 +++++ crates/wasi-http/src/types.rs | 8 +++++--- crates/wasi-http/src/types_impl.rs | 3 ++- 3 files changed, 12 insertions(+), 4 deletions(-) diff --git a/crates/wasi-http/src/body.rs b/crates/wasi-http/src/body.rs index 95bf4fa3a702..338741538052 100644 --- a/crates/wasi-http/src/body.rs +++ b/crates/wasi-http/src/body.rs @@ -16,6 +16,7 @@ pub type HyperIncomingBody = BoxBody; /// want to do that unless the user asks to consume the body. pub struct HostIncomingBodyBuilder { pub body: HyperIncomingBody, + pub worker: Option>>>, pub between_bytes_timeout: Duration, } @@ -96,6 +97,10 @@ impl HostIncomingBodyBuilder { // loop running to relieve backpressure, so we get to the trailers. let _ = body_writer.send(Ok(data)).await; } + + // Force the builder's worker to be owned by this task, ensuring that it's kept around + // until this task exits. + drop(self.worker); }); HostIncomingBody { diff --git a/crates/wasi-http/src/types.rs b/crates/wasi-http/src/types.rs index 5176fa9add04..826060e6cb42 100644 --- a/crates/wasi-http/src/types.rs +++ b/crates/wasi-http/src/types.rs @@ -8,6 +8,7 @@ use crate::{ use anyhow::Context; use http_body_util::BodyExt; use std::any::Any; +use std::sync::Arc; use std::time::Duration; use tokio::net::TcpStream; use tokio::time::timeout; @@ -37,6 +38,7 @@ pub trait WasiHttpView: Send { let (parts, body) = req.into_parts(); let body = HostIncomingBodyBuilder { body, + worker: None, // TODO: this needs to be plumbed through between_bytes_timeout: std::time::Duration::from_millis(600 * 1000), }; @@ -159,7 +161,7 @@ pub fn default_send_request( Ok(IncomingResponseInternal { resp, - worker, + worker: Arc::new(worker), between_bytes_timeout, }) }); @@ -220,7 +222,7 @@ pub struct HostIncomingResponse { pub status: u16, pub headers: FieldMap, pub body: Option, - pub worker: AbortOnDropJoinHandle>, + pub worker: Arc>>, } pub struct HostOutgoingResponse { @@ -271,7 +273,7 @@ pub enum HostFields { pub struct IncomingResponseInternal { pub resp: hyper::Response, - pub worker: AbortOnDropJoinHandle>, + pub worker: Arc>>, pub between_bytes_timeout: std::time::Duration, } diff --git a/crates/wasi-http/src/types_impl.rs b/crates/wasi-http/src/types_impl.rs index 0cbde9f5cd85..6777c0b28dc6 100644 --- a/crates/wasi-http/src/types_impl.rs +++ b/crates/wasi-http/src/types_impl.rs @@ -10,7 +10,7 @@ use crate::{ }, }; use anyhow::Context; -use std::any::Any; +use std::{any::Any, sync::Arc}; use wasmtime::component::Resource; use wasmtime_wasi::preview2::{ bindings::io::streams::{InputStream, OutputStream}, @@ -541,6 +541,7 @@ impl crate::bindings::http::types::HostFutureIncomingResponse f headers: FieldMap::from(parts.headers), body: Some(HostIncomingBodyBuilder { body, + worker: Some(Arc::clone(&resp.worker)), between_bytes_timeout: resp.between_bytes_timeout, }), worker: resp.worker,