Skip to content

Commit

Permalink
[release-13.0] Backport stream backpressure and associated fixups to …
Browse files Browse the repository at this point in the history
…13.0 (#7009)

* WASI preview 2 output-streams: new backpressure and flushing design (#6877)

* Stream backpressure v2

Co-authored-by: Pat Hickey <phickey@fastly.com>
Co-authored-by: Trevor Elliott <telliott@fastly.com>
Co-authored-by: Dan Gohman <dev@sunfishcode.online>

Stop testing pseudocode

Restructure when notifications are sent, and make sure to flush the writer

Fix the wasi-http module versions of flush and blocking_flush

Use blocking_write_and_flush for blocking writes in the adapters

Fix a warning in wasi-http

Remove an unused DropPollable

add comment explaining try_write for tcpstream

refactor: separate struct for representing TcpReadStream

by factoring into HostTcpSocket a little bit

tcp read stream: handle stream closing

tcp tests: use blocking_read where its expecting to wait for input

move common test body into wasi-sockets-tests/src/lib.rs

ensure parent socket outlives pollable

input and output streams can be children now

tcp's streams are the sockets children

tcp.wit: document child relationships

tcp tests: fix to drop socket after its child streams

review feedback: propogate worker task panic

style

error source fix

tcp: use preview2::spawn, and propogate worker panics

join handle await always propogates panic

background task handles ewouldblock as well

document choice of constant

* sync wit notes into wasi-http

* improve wit docs for output-stream

* doc: document `HostOutputStream` (#6980)

* doc: document `HostOutputStream`

Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Co-authored-by: Pat Hickey <pat@moreproductive.org>

* fix(wasi): fail when `MemoryOutputStream` buffer is full

Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>

---------

Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Co-authored-by: Pat Hickey <pat@moreproductive.org>

* rustfmt

prtest:full

* windows and doc fixes

* cli test wasi-http: use blocking-write-and-flush

* Disable some tests, and adjust timeouts when running under qemu

* Try to reproduce the riscv64 failures

* Update riscv to LLVM 17 with beta rust

* Revert "Try to reproduce the riscv64 failures"

This reverts commit 8ac6781.

* Pin the beta version for riscv64

* Fix a warning on nightly

---------

Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Co-authored-by: Roman Volosatovs <rvolosatovs@users.noreply.github.com>
Co-authored-by: Trevor Elliott <telliott@fastly.com>
Co-authored-by: Alex Crichton <alex@alexcrichton.com>

* Un-ignore now-passing test

With the merging of #6877 prints to stdout with preview2 should now work
without requiring extra sleeps or such.

* Remove submodule re-added by accident

This was removed in #6195 but re-added in #6877, I believe by accident,
so this re-deletes it. I've also edited `.gitmodules` a bit while I was
here to remove it and additionally keep other entries up-to-date with
matching paths.

* add to release notes

---------

Signed-off-by: Roman Volosatovs <rvolosatovs@riseup.net>
Co-authored-by: Roman Volosatovs <rvolosatovs@users.noreply.github.com>
Co-authored-by: Trevor Elliott <telliott@fastly.com>
Co-authored-by: Alex Crichton <alex@alexcrichton.com>
  • Loading branch information
4 people authored Sep 13, 2023
1 parent 194f095 commit 409bdd0
Show file tree
Hide file tree
Showing 41 changed files with 2,048 additions and 1,295 deletions.
11 changes: 4 additions & 7 deletions .gitmodules
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
[submodule "spec_testsuite"]
[submodule "tests/spec_testsuite"]
path = tests/spec_testsuite
url = https://github.com/WebAssembly/testsuite
[submodule "crates/c-api/examples/wasm-c-api"]
[submodule "crates/c-api/wasm-c-api"]
path = crates/c-api/wasm-c-api
url = https://github.com/WebAssembly/wasm-c-api
[submodule "WASI"]
[submodule "crates/wasi-common/WASI"]
path = crates/wasi-common/WASI
url = https://github.com/WebAssembly/WASI
[submodule "crates/wasi-nn/spec"]
Expand All @@ -13,10 +13,7 @@
[submodule "tests/wasi_testsuite/wasi-threads"]
path = tests/wasi_testsuite/wasi-threads
url = https://github.com/WebAssembly/wasi-threads
[submodule "crates/wasi-http/wasi-http"]
path = crates/wasi-http/wasi-http
url = https://github.com/WebAssembly/wasi-http
[submodule "tests/wasi_testsuite/wasi"]
[submodule "tests/wasi_testsuite/wasi-common"]
path = tests/wasi_testsuite/wasi-common
url = https://github.com/WebAssembly/wasi-testsuite.git
branch = prod/testsuite-base
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,8 @@ futures = { version = "0.3.27", default-features = false }
indexmap = "2.0.0"
pretty_env_logger = "0.5.0"
syn = "2.0.25"
test-log = { version = "0.2", default-features = false, features = ["trace"] }
tracing-subscriber = { version = "0.3.1", default-features = false, features = ['fmt', 'env-filter'] }

[features]
default = [
Expand Down
5 changes: 5 additions & 0 deletions RELEASES.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@ Unreleased.
be turned off.
[#6547](https://github.com/bytecodealliance/wasmtime/pull/6547)

* WASI Preview 2 output-stream has been redesigned with changes to
backpressure and flushing. The `HostOutputStream` trait has changed
substantially.
[#6877](https://github.com/bytecodealliance/wasmtime/pull/6877)

### Removed

* Wasmtime's experimental implementation of wasi-crypto has been removed. More
Expand Down
8 changes: 7 additions & 1 deletion ci/build-test-matrix.js
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,13 @@ const array = [
"qemu_target": "riscv64-linux-user",
"name": "Test Linux riscv64",
"filter": "linux-riscv64",
"isa": "riscv64"
"isa": "riscv64",
// There appears to be a miscompile in Rust 1.72 for riscv64 where
// wasmtime-wasi tests are segfaulting in CI with the stack pointing in
// Tokio. Updating rustc seems to do the trick, so without doing a full
// rigorous investigation this uses beta for now but Rust 1.73 should be
// good to go for this.
"rust": "beta-2023-09-10",
}
];

Expand Down
7 changes: 2 additions & 5 deletions crates/test-programs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,9 @@ tracing = { workspace = true }
[dev-dependencies]
anyhow = { workspace = true }
tempfile = { workspace = true }
test-log = { version = "0.2", default-features = false, features = ["trace"] }
test-log = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { version = "0.3.1", default-features = false, features = [
'fmt',
'env-filter',
] }
tracing-subscriber = { workspace = true }
lazy_static = "1"
wasmtime = { workspace = true, features = ['cranelift', 'component-model'] }

Expand Down
42 changes: 41 additions & 1 deletion crates/test-programs/reactor-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,21 @@ wit_bindgen::generate!("test-reactor" in "../../wasi/wit");
export_test_reactor!(T);

struct T;
use wasi::io::streams;
use wasi::poll::poll;

static mut STATE: Vec<String> = Vec::new();

struct DropPollable {
pollable: poll::Pollable,
}

impl Drop for DropPollable {
fn drop(&mut self) {
poll::drop_pollable(self.pollable);
}
}

impl TestReactor for T {
fn add_strings(ss: Vec<String>) -> u32 {
for s in ss {
Expand All @@ -24,10 +36,38 @@ impl TestReactor for T {
}

fn write_strings_to(o: OutputStream) -> Result<(), ()> {
let sub = DropPollable {
pollable: streams::subscribe_to_output_stream(o),
};
unsafe {
for s in STATE.iter() {
wasi::io::streams::write(o, s.as_bytes()).map_err(|_| ())?;
let mut out = s.as_bytes();
while !out.is_empty() {
poll::poll_oneoff(&[sub.pollable]);
let n = match streams::check_write(o) {
Ok(n) => n,
Err(_) => return Err(()),
};

let len = (n as usize).min(out.len());
match streams::write(o, &out[..len]) {
Ok(_) => out = &out[len..],
Err(_) => return Err(()),
}
}
}

match streams::flush(o) {
Ok(_) => {}
Err(_) => return Err(()),
}

poll::poll_oneoff(&[sub.pollable]);
match streams::check_write(o) {
Ok(_) => {}
Err(_) => return Err(()),
}

Ok(())
}
}
Expand Down
2 changes: 1 addition & 1 deletion crates/test-programs/tests/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ async fn reactor_tests() -> Result<()> {
// `host` and `wasi-common` crate.
// Note, this works because of the add_to_linker invocations using the
// `host` crate for `streams`, not because of `with` in the bindgen macro.
let writepipe = preview2::pipe::MemoryOutputPipe::new();
let writepipe = preview2::pipe::MemoryOutputPipe::new(4096);
let table_ix = preview2::TableStreamExt::push_output_stream(
store.data_mut().table_mut(),
Box::new(writepipe.clone()),
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-http-components-sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ fn instantiate_component(
}

fn run(name: &str) -> anyhow::Result<()> {
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut table = Table::new();
let component = get_component(name);
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-http-components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ async fn instantiate_component(
}

async fn run(name: &str) -> anyhow::Result<()> {
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut table = Table::new();
let component = get_component(name);
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-http-modules.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ async fn instantiate_module(module: Module, ctx: Ctx) -> Result<(Store<Ctx>, Fun
}

async fn run(name: &str) -> anyhow::Result<()> {
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut table = Table::new();
let module = get_module(name);
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-preview1-host-in-preview2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub fn prepare_workspace(exe_name: &str) -> Result<TempDir> {

async fn run(name: &str, inherit_stdio: bool) -> Result<()> {
let workspace = prepare_workspace(name)?;
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut linker = Linker::new(&ENGINE);
add_to_linker_async(&mut linker)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-preview2-components-sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub fn prepare_workspace(exe_name: &str) -> Result<TempDir> {

fn run(name: &str, inherit_stdio: bool) -> Result<()> {
let workspace = prepare_workspace(name)?;
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut linker = Linker::new(&ENGINE);
add_to_linker(&mut linker)?;
Expand Down
4 changes: 2 additions & 2 deletions crates/test-programs/tests/wasi-preview2-components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub fn prepare_workspace(exe_name: &str) -> Result<TempDir> {

async fn run(name: &str, inherit_stdio: bool) -> Result<()> {
let workspace = prepare_workspace(name)?;
let stdout = MemoryOutputPipe::new();
let stderr = MemoryOutputPipe::new();
let stdout = MemoryOutputPipe::new(4096);
let stderr = MemoryOutputPipe::new(4096);
let r = {
let mut linker = Linker::new(&ENGINE);
add_to_linker(&mut linker)?;
Expand Down
58 changes: 40 additions & 18 deletions crates/test-programs/wasi-http-tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ pub mod bindings {
});
}

use anyhow::{anyhow, Context, Result};
use anyhow::{anyhow, Result};
use std::fmt;
use std::sync::OnceLock;

Expand Down Expand Up @@ -42,6 +42,16 @@ impl Response {
}
}

struct DropPollable {
pollable: poll::Pollable,
}

impl Drop for DropPollable {
fn drop(&mut self) {
poll::drop_pollable(self.pollable);
}
}

pub async fn request(
method: http_types::Method,
scheme: http_types::Scheme,
Expand Down Expand Up @@ -72,27 +82,39 @@ pub async fn request(
let request_body = http_types::outgoing_request_write(request)
.map_err(|_| anyhow!("outgoing request write failed"))?;

if let Some(body) = body {
let output_stream_pollable = streams::subscribe_to_output_stream(request_body);
let len = body.len();
if len == 0 {
let (_written, _status) = streams::write(request_body, &[])
.map_err(|_| anyhow!("request_body stream write failed"))
.context("writing empty request body")?;
} else {
let mut body_cursor = 0;
while body_cursor < body.len() {
let (written, _status) = streams::write(request_body, &body[body_cursor..])
.map_err(|_| anyhow!("request_body stream write failed"))
.context("writing request body")?;
body_cursor += written as usize;
if let Some(mut buf) = body {
let sub = DropPollable {
pollable: streams::subscribe_to_output_stream(request_body),
};
while !buf.is_empty() {
poll::poll_oneoff(&[sub.pollable]);

let permit = match streams::check_write(request_body) {
Ok(n) => usize::try_from(n)?,
Err(_) => anyhow::bail!("output stream error"),
};

let len = buf.len().min(permit);
let (chunk, rest) = buf.split_at(len);
buf = rest;

match streams::write(request_body, chunk) {
Err(_) => anyhow::bail!("output stream error"),
_ => {}
}
}

// TODO: enable when working as expected
// let _ = poll::poll_oneoff(&[output_stream_pollable]);
match streams::flush(request_body) {
Err(_) => anyhow::bail!("output stream error"),
_ => {}
}

poll::poll_oneoff(&[sub.pollable]);

poll::drop_pollable(output_stream_pollable);
match streams::check_write(request_body) {
Ok(_) => {}
Err(_) => anyhow::bail!("output stream error"),
};
}

let future_response = outgoing_handler::handle(request, None);
Expand Down
Loading

0 comments on commit 409bdd0

Please # to comment.