Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add a passive stream read handler for input-stream #31

Open
autodidaddict opened this issue May 23, 2023 · 3 comments
Open

Add a passive stream read handler for input-stream #31

autodidaddict opened this issue May 23, 2023 · 3 comments

Comments

@autodidaddict
Copy link

The current methods for accessing a stream line up nicely with our current mental models for how streams work in "regular" programming environments and languages like Rust, Java, etc. You can execute a read method that will pull some number of bytes off of the stream. Executing read in a tight loop will churn through the entire contents of the stream.

The issue that comes up in wasm components that doesn't arise in "normal" code is when we attempt to do either of the following:

  • Process a single chunk of bytes bigger than what the host runtime sandbox allows us to allocate
  • Perform processing that can take long enough to exceed a per-call timeout enforced by the host.

I was looking into how to support these types of behaviors for the blob store specification and noticed that the current wit files for that use a wasi-io input-stream. The trick is in how to allow processing very large files, which are fairly common when blob stores are being used.

I propose that we add a passive handler. Rather than the component reading an explicit number of bytes with a synchronous call like read, we should be able to start a background streaming of the blob. This would allow the host to determine how much of the blob should be delivered to the component and how fast.

We could add a function to begin chunking to the io interface:

start-chunking-read: func(this: input-stream, chunk-size: u64, correlation-id: u64) -> result<_, stream-error>

This just works when there is no error, and the component then finishes its current execution context. At some time later determined by the host (by virtue of the provider of the blob store capability), the host starts making repeated calls to send individual chunks to the component. This would work essentially like the inverse of subscribe-to-input-stream, using the following interface:

interface chunked-stream-handler {
 handle-chunk: func(
     stream: input-stream, 
     correlation-id: u64,
     current-chunk: u64, 
     total-chunks: u64, 
     data: list<u8>) -> bool
}

The component can return true to let the host know to continue delivering chunks or false to cancel the chunking stream.

⚠️ disclaimer: if this type of functionality is already available in the spec and I've read things wrong, feel free to dismiss this issue.

@sunfishcode
Copy link
Member

sunfishcode commented May 23, 2023

Process a single chunk of bytes bigger than what the host runtime sandbox allows us to allocate

It's unclear how we might meet this requirement, with any approach. For example, the chunked-stream-handler function takes a data: list<u8> which would need to be allocated in the guest, so it doesn't seem to avoid this issue.

Perform processing that can take long enough to exceed a per-call timeout enforced by the host.

Is the issue here the time it takes to wait for the data to arrive? The current wasi-io/wasi-poll API expects that you can call poll_oneoff and (appear to) block for as long as needed. Some WASI hosts implement this by performing stack switching under the covers, and instead of having a strict per-call timeout, have a timeout that doesn't count time spent waiting. The eventual plan is to solve this properly in Preview 3 with integrated async. But until then, the question is, is this explicit-polling approach sufficient, or should we also add an explicit callback model?

If we do add a callback model, it's tempting to look for ways this could be modeled as just an alternative to poll_oneoff: keep the existing subscribe-to- approach with pollables, and just add a way to request a callback when a pollable has an event. That way, it would be usable with any API that uses pollables.

If the blob API wants the ability to request chunks at specific offsets, perhaps the blob API could have a function where you give it an offset and a size, and it gives you an input-stream starting at that offset.

One thing we might consider adding to wasi-io would be a way to do something like subscribe-to-input-stream that lets you specify much data you want/need. That may depend on the question of how to deal with data larger than the host permits though.

@autodidaddict
Copy link
Author

Thanks for the quick reply @sunfishcode ! I'll outline a couple high level use cases that drove this. I'm clearly not well-enough informed about how stuff in wasi-io gets converted into code.

Let's say there's a component that has been asked to perform some kind of processing on a file with a size measured in GB. It's impractical (and, for most engines, impossible) for a component to allocate that much on linear memory, so trying to deal with it all in bulk like a regular app will cause a number of different types of exceptions in the host, depending on which runtime you're using.

Secondly, let's say that the code knows about this limitation, and so it makes a bunch of read calls in a loop to perform the processing. In systems that use "gas" or other forms of metered time or CPU, looping through small chunks of a multi-gig file synchronously like this will cause a different set of failures in the host/runtime, either through timeouts or consumption of too many resources like CPU-seconds.

The callback model attempts to mitigate this by having the host deliver chunks of large blobs, allowing the host to determine how to slice the time and allowing the wasm component to process 1 chunk within its allotted time rather than attempting to process the entire file in the same timeout.

You're right that it's entirely possible to blow up allocations using the chunking callback model. However, the host could potentially limit the chunk size, and so an initial request for chunks that are too big would be rejected.

If this callback model belongs in the blob store API and not in the low level streams API, then I'm fine with proposing that change and discarding this issue.

In code, the following are what I'm trying to avoid:

// This could either allocate more bytes than the component can handle
// or take so much time that the component is shut off before the read finishes
let bytes = stream.read(x)?;

// Each individual read could succeed, but the total aggregate of all the reads could 
// take more time than allowed and cause the host to shut the component off
while !finished {
   process_piece(stream.read(x)?);
}

The callback model gives us a way to guarantee we can finish processing without relying on unique threading characteristics of specific host runtimes.

@sunfishcode
Copy link
Member

I think the question of how to negotiate chunk/buffer sizes is separable from the question of callback vs. polling (vs. integrated async).

For chunk/buffer size negotiation, in what situation would the guest in your example here keep calling read after the first read? Is it because it needs more data to finish its logical task, or because it wants to do as much as it can within its timeslice, or something else? If the host isn't in control of finished, then it's possible the host could end the "chunk" before the guest thinks it's done.

However we structure the negotiation though, we should be able to design an API to express it, on top of either poll+read or callbacks.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants