Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

replace MyStd{in|out}Stream with shareable streams #1982

Merged
merged 1 commit into from
Oct 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/core/src/io.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use wasmtime_wasi::preview2::{pipe::MemoryOutputPipe, HostOutputStream};

/// An in-memory stdio output buffer.
#[derive(Clone)]
pub struct OutputBuffer(MemoryOutputPipe);

impl OutputBuffer {
Expand Down
125 changes: 103 additions & 22 deletions crates/core/src/store.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use anyhow::{anyhow, Result};
use bytes::Bytes;
use std::{
io::{Read, Write},
path::{Path, PathBuf},
sync::Mutex,
sync::{Arc, Mutex},
time::{Duration, Instant},
};
use system_interface::io::ReadReady;
Expand All @@ -11,10 +12,12 @@ use wasi_common_preview1 as wasi_preview1;
use wasmtime_wasi as wasmtime_wasi_preview1;
use wasmtime_wasi::preview2::{
self as wasi_preview2, HostInputStream, HostOutputStream, StdinStream, StdoutStream,
StreamError, StreamResult, Subscribe,
};
use wasmtime_wasi_http::types::WasiHttpCtx;

use crate::{
async_trait,
host_component::{HostComponents, HostComponentsData},
io::OutputBuffer,
limits::StoreLimitsAsync,
Expand Down Expand Up @@ -182,9 +185,7 @@ impl StoreBuilder {
ctx.set_stdin(Box::new(wasi_preview1::pipe::ReadPipe::new(r)))
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.stdin(MyStdinStream(Mutex::new(Some(Box::new(
wasi_preview2::pipe::AsyncReadStream::new(r),
)))));
ctx.stdin(PipeStdinStream::new(r));
}
})
}
Expand Down Expand Up @@ -221,9 +222,7 @@ impl StoreBuilder {
ctx.set_stdout(Box::new(wasi_preview1::pipe::WritePipe::new(w)))
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.stdout(MyStdoutStream(Mutex::new(Some(Box::new(
wasi_preview2::pipe::AsyncWriteStream::new(1024 * 1024, w),
)))));
ctx.stdout(PipeStdoutStream::new(w));
}
})
}
Expand All @@ -238,7 +237,7 @@ impl StoreBuilder {
"`Store::stdout_buffered` only supported with WASI Preview 2"
)),
WasiCtxBuilder::Preview2(ctx) => {
ctx.stdout(MyStdoutStream(Mutex::new(Some(Box::new(buffer.writer())))));
ctx.stdout(BufferStdoutStream(buffer.clone()));
Ok(())
}
})?;
Expand All @@ -264,9 +263,7 @@ impl StoreBuilder {
ctx.set_stderr(Box::new(wasi_preview1::pipe::WritePipe::new(w)))
}
WasiCtxBuilder::Preview2(ctx) => {
ctx.stderr(MyStdoutStream(Mutex::new(Some(Box::new(
wasi_preview2::pipe::AsyncWriteStream::new(1024 * 1024, w),
)))));
ctx.stderr(PipeStdoutStream::new(w));
}
})
}
Expand Down Expand Up @@ -426,31 +423,115 @@ impl StoreBuilder {
}
}

struct MyStdinStream(Mutex<Option<Box<dyn HostInputStream>>>);
struct PipeStdinStream<T> {
buffer: Vec<u8>,
inner: Arc<Mutex<T>>,
}

impl StdinStream for MyStdinStream {
fn stream(&self) -> Box<dyn HostInputStream> {
self.0
impl<T> PipeStdinStream<T> {
fn new(inner: T) -> Self {
Self {
buffer: vec![0_u8; 64 * 1024],
inner: Arc::new(Mutex::new(inner)),
}
}
}

impl<T> Clone for PipeStdinStream<T> {
fn clone(&self) -> Self {
Self {
buffer: vec![0_u8; 64 * 1024],
inner: self.inner.clone(),
}
}
}

impl<T: Read + Send + Sync + 'static> HostInputStream for PipeStdinStream<T> {
fn read(&mut self, size: usize) -> StreamResult<Bytes> {
let size = size.min(self.buffer.len());

let count = self
.inner
.lock()
.unwrap()
.take()
.expect("MyStdinStream::stream should only be called once")
.read(&mut self.buffer[..size])
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))?;

Ok(Bytes::copy_from_slice(&self.buffer[..count]))
}
}

#[async_trait]
impl<T: Read + Send + Sync + 'static> Subscribe for PipeStdinStream<T> {
async fn ready(&mut self) {}
}

impl<T: Read + Send + Sync + 'static> StdinStream for PipeStdinStream<T> {
fn stream(&self) -> Box<dyn HostInputStream> {
Box::new(self.clone())
}

fn isatty(&self) -> bool {
false
}
}

struct MyStdoutStream(Mutex<Option<Box<dyn HostOutputStream>>>);
struct PipeStdoutStream<T>(Arc<Mutex<T>>);

impl StdoutStream for MyStdoutStream {
fn stream(&self) -> Box<dyn HostOutputStream> {
impl<T> Clone for PipeStdoutStream<T> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}

impl<T> PipeStdoutStream<T> {
fn new(inner: T) -> Self {
Self(Arc::new(Mutex::new(inner)))
}
}

impl<T: Write + Send + Sync + 'static> HostOutputStream for PipeStdoutStream<T> {
fn write(&mut self, bytes: Bytes) -> Result<(), StreamError> {
self.0
.lock()
.unwrap()
.take()
.expect("MyStdoutStream::stream should only be called once")
.write_all(&bytes)
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
}

fn flush(&mut self) -> Result<(), StreamError> {
self.0
.lock()
.unwrap()
.flush()
.map_err(|e| StreamError::LastOperationFailed(anyhow::anyhow!(e)))
}

fn check_write(&mut self) -> Result<usize, StreamError> {
Ok(1024 * 1024)
}
}

impl<T: Write + Send + Sync + 'static> StdoutStream for PipeStdoutStream<T> {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(self.clone())
}

fn isatty(&self) -> bool {
false
}
}

#[async_trait]
impl<T: Write + Send + Sync + 'static> Subscribe for PipeStdoutStream<T> {
async fn ready(&mut self) {}
}

struct BufferStdoutStream(OutputBuffer);

impl StdoutStream for BufferStdoutStream {
fn stream(&self) -> Box<dyn HostOutputStream> {
Box::new(self.0.writer())
}

fn isatty(&self) -> bool {
Expand Down