diff --git a/crates/core/src/io.rs b/crates/core/src/io.rs index a148e08acc..181bdad8bd 100644 --- a/crates/core/src/io.rs +++ b/crates/core/src/io.rs @@ -1,6 +1,7 @@ use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, HostOutputStream}; /// An in-memory stdio output buffer. +#[derive(Clone)] pub struct OutputBuffer(MemoryOutputPipe); impl OutputBuffer { diff --git a/crates/core/src/store.rs b/crates/core/src/store.rs index 4a5cbf550d..8d6e0738d9 100644 --- a/crates/core/src/store.rs +++ b/crates/core/src/store.rs @@ -1,8 +1,9 @@ use anyhow::{anyhow, Result}; +use bytes::Bytes; use std::{ io::{Read, Write}, path::{Path, PathBuf}, - sync::Mutex, + sync::{Arc, Mutex}, time::{Duration, Instant}, }; use system_interface::io::ReadReady; @@ -11,10 +12,12 @@ use wasi_common_preview1 as wasi_preview1; use wasmtime_wasi as wasmtime_wasi_preview1; use wasmtime_wasi::preview2::{ self as wasi_preview2, HostInputStream, HostOutputStream, StdinStream, StdoutStream, + StreamError, StreamResult, Subscribe, }; use wasmtime_wasi_http::types::WasiHttpCtx; use crate::{ + async_trait, host_component::{HostComponents, HostComponentsData}, io::OutputBuffer, limits::StoreLimitsAsync, @@ -182,9 +185,7 @@ impl StoreBuilder { ctx.set_stdin(Box::new(wasi_preview1::pipe::ReadPipe::new(r))) } WasiCtxBuilder::Preview2(ctx) => { - ctx.stdin(MyStdinStream(Mutex::new(Some(Box::new( - wasi_preview2::pipe::AsyncReadStream::new(r), - ))))); + ctx.stdin(PipeStdinStream::new(r)); } }) } @@ -221,9 +222,7 @@ impl StoreBuilder { ctx.set_stdout(Box::new(wasi_preview1::pipe::WritePipe::new(w))) } WasiCtxBuilder::Preview2(ctx) => { - ctx.stdout(MyStdoutStream(Mutex::new(Some(Box::new( - wasi_preview2::pipe::AsyncWriteStream::new(1024 * 1024, w), - ))))); + ctx.stdout(PipeStdoutStream::new(w)); } }) } @@ -238,7 +237,7 @@ impl StoreBuilder { "`Store::stdout_buffered` only supported with WASI Preview 2" )), WasiCtxBuilder::Preview2(ctx) => { - ctx.stdout(MyStdoutStream(Mutex::new(Some(Box::new(buffer.writer()))))); + ctx.stdout(BufferStdoutStream(buffer.clone())); Ok(()) } })?; @@ -264,9 +263,7 @@ impl StoreBuilder { ctx.set_stderr(Box::new(wasi_preview1::pipe::WritePipe::new(w))) } WasiCtxBuilder::Preview2(ctx) => { - ctx.stderr(MyStdoutStream(Mutex::new(Some(Box::new( - wasi_preview2::pipe::AsyncWriteStream::new(1024 * 1024, w), - ))))); + ctx.stderr(PipeStdoutStream::new(w)); } }) } @@ -426,15 +423,52 @@ impl StoreBuilder { } } -struct MyStdinStream(Mutex>>); +struct PipeStdinStream { + buffer: Vec, + inner: Arc>, +} -impl StdinStream for MyStdinStream { - fn stream(&self) -> Box { - self.0 +impl PipeStdinStream { + fn new(inner: T) -> Self { + Self { + buffer: vec![0_u8; 64 * 1024], + inner: Arc::new(Mutex::new(inner)), + } + } +} + +impl Clone for PipeStdinStream { + fn clone(&self) -> Self { + Self { + buffer: vec![0_u8; 64 * 1024], + inner: self.inner.clone(), + } + } +} + +impl HostInputStream for PipeStdinStream { + fn read(&mut self, size: usize) -> StreamResult { + let size = size.min(self.buffer.len()); + + let count = self + .inner .lock() .unwrap() - .take() - .expect("MyStdinStream::stream should only be called once") + .read(&mut self.buffer[..size]) + .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?; + + Ok(Bytes::copy_from_slice(&self.buffer[..count])) + } +} + +#[async_trait] +impl Subscribe for PipeStdinStream { + async fn ready(&mut self) {} +} + +impl StdinStream for PipeStdinStream { + fn stream(&self) -> Box { + Box::new(self.clone()) } fn isatty(&self) -> bool { @@ -442,15 +476,62 @@ impl StdinStream for MyStdinStream { } } -struct MyStdoutStream(Mutex>>); +struct PipeStdoutStream(Arc>); -impl StdoutStream for MyStdoutStream { - fn stream(&self) -> Box { +impl Clone for PipeStdoutStream { + fn clone(&self) -> Self { + Self(self.0.clone()) + } +} + +impl PipeStdoutStream { + fn new(inner: T) -> Self { + Self(Arc::new(Mutex::new(inner))) + } +} + +impl HostOutputStream for PipeStdoutStream { + fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> { self.0 .lock() .unwrap() - .take() - .expect("MyStdoutStream::stream should only be called once") + .write_all(&bytes) + .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e))) + } + + fn flush(&mut self) -> Result<(), StreamError> { + self.0 + .lock() + .unwrap() + .flush() + .map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e))) + } + + fn check_write(&mut self) -> Result { + Ok(1024 * 1024) + } +} + +impl StdoutStream for PipeStdoutStream { + fn stream(&self) -> Box { + Box::new(self.clone()) + } + + fn isatty(&self) -> bool { + false + } +} + +#[async_trait] +impl Subscribe for PipeStdoutStream { + async fn ready(&mut self) {} +} + +struct BufferStdoutStream(OutputBuffer); + +impl StdoutStream for BufferStdoutStream { + fn stream(&self) -> Box { + Box::new(self.0.writer()) } fn isatty(&self) -> bool {