From cf029755516494642a1123993736bf8d1bb9e9ca Mon Sep 17 00:00:00 2001 From: Joel Dice Date: Fri, 27 Oct 2023 14:52:39 -0600 Subject: [PATCH] replace `MyStd{in|out}Stream` with shareable streams `MyStd{in|out}Stream` were written based on the assumption that `Std{in|out}Stream::stream` would only ever be called once. Turns out that's not true for guest components which are composed of multiple subcomponents, since each subcomponent will potentially want its own handle, so they need to be shareable. The easiest way to do that is provide cloneable implementations of `Host{In|Out}putStream` which operate synchronously. Note that this amounts to doing synchronous I/O in an asynchronous context, which we'd normally prefer to avoid, but the properly asynchronous implementations `Host{In|Out}putStream` based on `AsyncRead`/`AsyncWrite` are quite hairy and probably not worth it for "normal" stdio streams in Spin. If this does prove to be a performance bottleneck, though, we can certainly revisit it. Signed-off-by: Joel Dice --- crates/core/src/io.rs | 1 + crates/core/src/store.rs | 125 ++++++++++++++++++++++++++++++++------- 2 files changed, 104 insertions(+), 22 deletions(-) diff --git a/crates/core/src/io.rs b/crates/core/src/io.rs index a148e08acc..181bdad8bd 100644 --- a/crates/core/src/io.rs +++ b/crates/core/src/io.rs @@ -1,6 +1,7 @@ use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, HostOutputStream}; /// An in-memory stdio output buffer. +#[derive(Clone)] pub struct OutputBuffer(MemoryOutputPipe); impl OutputBuffer { diff --git a/crates/core/src/store.rs b/crates/core/src/store.rs index 4a5cbf550d..8d6e0738d9 100644 --- a/crates/core/src/store.rs +++ b/crates/core/src/store.rs @@ -1,8 +1,9 @@ use anyhow::{anyhow, Result}; +use bytes::Bytes; use std::{ io::{Read, Write}, path::{Path, PathBuf}, - sync::Mutex, + sync::{Arc, Mutex}, time::{Duration, Instant}, }; use system_interface::io::ReadReady; @@ -11,10 +12,12 @@ use wasi_common_preview1 as wasi_preview1; use wasmtime_wasi as wasmtime_wasi_preview1; use wasmtime_wasi::preview2::{ self as wasi_preview2, HostInputStream, HostOutputStream, StdinStream, StdoutStream, + StreamError, StreamResult, Subscribe, }; use wasmtime_wasi_http::types::WasiHttpCtx; use crate::{ + async_trait, host_component::{HostComponents, HostComponentsData}, io::OutputBuffer, limits::StoreLimitsAsync, @@ -182,9 +185,7 @@ impl StoreBuilder { ctx.set_stdin(Box::new(wasi_preview1::pipe::ReadPipe::new(r))) } WasiCtxBuilder::Preview2(ctx) => { - ctx.stdin(MyStdinStream(Mutex::new(Some(Box::new( - wasi_preview2::pipe::AsyncReadStream::new(r), - ))))); + ctx.stdin(PipeStdinStream::new(r)); } }) } @@ -221,9 +222,7 @@ impl StoreBuilder { ctx.set_stdout(Box::new(wasi_preview1::pipe::WritePipe::new(w))) } WasiCtxBuilder::Preview2(ctx) => { - ctx.stdout(MyStdoutStream(Mutex::new(Some(Box::new( - wasi_preview2::pipe::AsyncWriteStream::new(1024 * 1024, w), - ))))); + ctx.stdout(PipeStdoutStream::new(w)); } }) } @@ -238,7 +237,7 @@ impl StoreBuilder { "`Store::stdout_buffered` only supported with WASI Preview 2" )), WasiCtxBuilder::Preview2(ctx) => { - ctx.stdout(MyStdoutStream(Mutex::new(Some(Box::new(buffer.writer()))))); + ctx.stdout(BufferStdoutStream(buffer.clone())); Ok(()) } })?; @@ -264,9 +263,7 @@ impl StoreBuilder { ctx.set_stderr(Box::new(wasi_preview1::pipe::WritePipe::new(w))) } WasiCtxBuilder::Preview2(ctx) => { - ctx.stderr(MyStdoutStream(Mutex::new(Some(Box::new( - wasi_preview2::pipe::AsyncWriteStream::new(1024 * 1024, w), - ))))); + ctx.stderr(PipeStdoutStream::new(w)); } }) } @@ -426,15 +423,52 @@ impl StoreBuilder { } } -struct MyStdinStream(Mutex>>); +struct PipeStdinStream { + buffer: Vec, + inner: Arc>, +} -impl StdinStream for MyStdinStream { - fn stream(&self) -> Box { - self.0 +impl PipeStdinStream { + fn new(inner: T) -> Self { + Self { + buffer: vec![0_u8; 64 * 1024], + inner: Arc::new(Mutex::new(inner)), + } + } +} + +impl Clone for PipeStdinStream { + fn clone(&self) -> Self { + Self { + buffer: vec![0_u8; 64 * 1024], + inner: self.inner.clone(), + } + } +} + +impl HostInputStream for PipeStdinStream { + fn read(&mut self, size: usize) -> StreamResult { + let size = size.min(self.buffer.len()); + + let count = self + .inner .lock() .unwrap() - .take() - .expect("MyStdinStream::stream should only be called once") + .read(&mut self.buffer[..size]) + .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?; + + Ok(Bytes::copy_from_slice(&self.buffer[..count])) + } +} + +#[async_trait] +impl Subscribe for PipeStdinStream { + async fn ready(&mut self) {} +} + +impl StdinStream for PipeStdinStream { + fn stream(&self) -> Box { + Box::new(self.clone()) } fn isatty(&self) -> bool { @@ -442,15 +476,62 @@ impl StdinStream for MyStdinStream { } } -struct MyStdoutStream(Mutex>>); +struct PipeStdoutStream(Arc>); -impl StdoutStream for MyStdoutStream { - fn stream(&self) -> Box { +impl Clone for PipeStdoutStream { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl PipeStdoutStream { + fn new(inner: T) -> Self { + Self(Arc::new(Mutex::new(inner))) + } +} + +impl HostOutputStream for PipeStdoutStream { + fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { self.0 .lock() .unwrap() - .take() - .expect("MyStdoutStream::stream should only be called once") + .write_all(&bytes) + .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e))) + } + + fn flush(&mut self) -> Result<(), StreamError> { + self.0 + .lock() + .unwrap() + .flush() + .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e))) + } + + fn check_write(&mut self) -> Result { + Ok(1024 * 1024) + } +} + +impl StdoutStream for PipeStdoutStream { + fn stream(&self) -> Box { + Box::new(self.clone()) + } + + fn isatty(&self) -> bool { + false + } +} + +#[async_trait] +impl Subscribe for PipeStdoutStream { + async fn ready(&mut self) {} +} + +struct BufferStdoutStream(OutputBuffer); + +impl StdoutStream for BufferStdoutStream { + fn stream(&self) -> Box { + Box::new(self.0.writer()) } fn isatty(&self) -> bool {