diff --git a/src/workerd/api/streams.h b/src/workerd/api/streams.h index 66b49a5ef9e..deab628c2b9 100644 --- a/src/workerd/api/streams.h +++ b/src/workerd/api/streams.h @@ -37,6 +37,7 @@ namespace workerd::api { api::TransformStream, \ api::FixedLengthStream, \ api::IdentityTransformStream, \ + api::IdentityTransformStream::QueuingStrategy, \ api::ReadableStream::ValuesOptions, \ api::ReadableStream::ReadableStreamAsyncIterator, \ api::ReadableStream::ReadableStreamAsyncIterator::Next, \ diff --git a/src/workerd/api/streams/identitytransformstream-backpressure-test.js b/src/workerd/api/streams/identitytransformstream-backpressure-test.js new file mode 100644 index 00000000000..591ad085cd0 --- /dev/null +++ b/src/workerd/api/streams/identitytransformstream-backpressure-test.js @@ -0,0 +1,141 @@ +import { + notStrictEqual, + strictEqual, +} from 'node:assert'; + +export const identityTransformStream = { + async test(ctrl, env, ctx) { + const ts = new IdentityTransformStream({ highWaterMark: 10 }); + const writer = ts.writable.getWriter(); + const reader = ts.readable.getReader(); + + strictEqual(writer.desiredSize, 10); + + // We shouldn't have to wait here. + const firstReady = writer.ready; + await writer.ready; + + writer.write(new Uint8Array(1)); + strictEqual(writer.desiredSize, 9); + + // Let's write a second chunk that will be buffered. This one + // should impact the desiredSize and the backpressure signal. + writer.write(new Uint8Array(9)); + strictEqual(writer.desiredSize, 0); + + // The ready promise should have been replaced + notStrictEqual(firstReady, writer.ready); + + async function waitForReady() { + strictEqual(writer.desiredSize, 0); + await writer.ready; + // The backpressure should have been relieved a bit, + // but only by the amount of what we've currently read. + strictEqual(writer.desiredSize, 1); + } + + await Promise.all([ + // We call the waitForReady first to ensure that we set up waiting on + // the ready promise before we relieve the backpressure using the read. + // If the backpressure signal is not working correctly, the test will + // fail with an error indicating that a hanging promise was canceled. + waitForReady(), + reader.read(), + ]); + + // If we read again, the backpressure should be fully resolved. + await reader.read(); + strictEqual(writer.desiredSize, 10); + } +}; + +export const identityTransformStreamNoHWM = { + async test(ctrl, env, ctx) { + // Test that the original default behavior still works as expected. + + const ts = new IdentityTransformStream(); + const writer = ts.writable.getWriter(); + const reader = ts.readable.getReader(); + + strictEqual(writer.desiredSize, 1); + + // We shouldn't have to wait here. + const firstReady = writer.ready; + await writer.ready; + + writer.write(new Uint8Array(1)); + strictEqual(writer.desiredSize, 1); + + // Let's write a second chunk that will be buffered. There should + // be no indication that the desired size has changed. + writer.write(new Uint8Array(9)); + strictEqual(writer.desiredSize, 1); + + // The ready promise should be exactly the same... + strictEqual(firstReady, writer.ready); + + async function waitForReady() { + strictEqual(writer.desiredSize, 1); + await writer.ready; + strictEqual(writer.desiredSize, 1); + } + + await Promise.all([ + // We call the waitForReady first to ensure that we set up waiting on + // the ready promise before we relieve the backpressure using the read. + // If the backpressure signal is not working correctly, the test will + // fail with an error indicating that a hanging promise was canceled. + waitForReady(), + reader.read(), + ]); + + // If we read again, the backpressure should be fully resolved. + await reader.read(); + strictEqual(writer.desiredSize, 1); + } +}; + +export const fixedLengthStream = { + async test(ctrl, env, ctx) { + const ts = new FixedLengthStream(10, { highWaterMark: 100 }); + const writer = ts.writable.getWriter(); + const reader = ts.readable.getReader(); + + // Even tho we specified 100 as our highWaterMark, we only expect 10 + // bytes total, so we'll make that our highWaterMark instead. + strictEqual(writer.desiredSize, 10); + + // We shouldn't have to wait here. + const firstReady = writer.ready; + await writer.ready; + + writer.write(new Uint8Array(1)); + strictEqual(writer.desiredSize, 9); + + // Let's write a second chunk that will be buffered. This one + // should impact the desiredSize and the backpressure signal. + writer.write(new Uint8Array(9)); + strictEqual(writer.desiredSize, 0); + + // The ready promise should have been replaced + notStrictEqual(firstReady, writer.ready); + + async function waitForReady() { + await writer.ready; + // The backpressure should have been relieved a bit, + // but only by the amount of what we've currently read. + strictEqual(writer.desiredSize, 1); + } + + await Promise.all([ + // We call the waitForReady first to ensure that we set up waiting on + // the ready promise before we relieve the backpressure using the read. + waitForReady(), + reader.read(), + ]); + + // If we read again, the backpressure should be fully resolved. + await reader.read(); + strictEqual(writer.desiredSize, 10); + } +}; diff --git a/src/workerd/api/streams/identitytransformstream-backpressure-test.wd-test b/src/workerd/api/streams/identitytransformstream-backpressure-test.wd-test new file mode 100644 index 00000000000..ef24eb871a9 --- /dev/null +++ b/src/workerd/api/streams/identitytransformstream-backpressure-test.wd-test @@ -0,0 +1,15 @@ +using Workerd = import "/workerd/workerd.capnp"; + +const unitTests :Workerd.Config = ( + services = [ + ( name = "identitytransformstream-backpressure-test", + worker = ( + modules = [ + (name = "worker", esModule = embed "identitytransformstream-backpressure-test.js") + ], + compatibilityDate = "2023-01-15", + compatibilityFlags = ["nodejs_compat"] + ) + ), + ], +); diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index 3e8eb14b9e7..ab0495f3360 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -763,6 +763,7 @@ jsg::Promise<void> WritableStreamInternalController::write( } auto prp = js.newPromiseAndResolver<void>(); + increaseCurrentWriteBufferSize(js, byteLength); queue.push_back(WriteEvent { .outputLock = IoContext::current().waitForOutputLocksIfNecessaryIoOwn(), .event = Write { @@ -780,6 +781,45 @@ jsg::Promise<void> WritableStreamInternalController::write( KJ_UNREACHABLE; } +void WritableStreamInternalController::increaseCurrentWriteBufferSize( + jsg::Lock& js, + uint64_t amount) { + currentWriteBufferSize += amount; + KJ_IF_MAYBE(highWaterMark, maybeHighWaterMark) { + updateBackpressure(js, (*highWaterMark) - currentWriteBufferSize <= 0); + } +} + +void WritableStreamInternalController::decreaseCurrentWriteBufferSize( + jsg::Lock& js, + uint64_t amount) { + currentWriteBufferSize -= amount; + KJ_IF_MAYBE(highWaterMark, maybeHighWaterMark) { + updateBackpressure(js, (*highWaterMark) - currentWriteBufferSize <= 0); + } +} + +void WritableStreamInternalController::updateBackpressure(jsg::Lock& js, bool backpressure) { + KJ_IF_MAYBE(writerLock, writeState.tryGet<WriterLocked>()) { + if (backpressure) { + // Per the spec, when backpressure is updated and is true, we replace the existing + // ready promise on the writer with a new pending promise, regardless of whether + // the existing one is resolved or not. + auto prp = js.newPromiseAndResolver<void>(); + prp.promise.markAsHandled(); + writerLock->setReadyFulfiller(prp); + return; + } + + // When backpressure is updated and is false, we resolve the ready promise on the writer + maybeResolvePromise(writerLock->getReadyFulfiller()); + } +} + +void WritableStreamInternalController::setHighWaterMark(uint64_t highWaterMark) { + maybeHighWaterMark = highWaterMark; +} + jsg::Promise<void> WritableStreamInternalController::close( jsg::Lock& js, bool markAsHandled) { @@ -1010,7 +1050,12 @@ kj::Maybe<int> WritableStreamInternalController::getDesiredSize() { KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { return uint(0); } KJ_CASE_ONEOF(errored, StreamStates::Errored) { return nullptr; } - KJ_CASE_ONEOF(writable, Writable) { return 1; } + KJ_CASE_ONEOF(writable, Writable) { + KJ_IF_MAYBE(highWaterMark, maybeHighWaterMark) { + return (*highWaterMark) - currentWriteBufferSize; + } + return 1; + } } KJ_UNREACHABLE; @@ -1024,24 +1069,27 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer) auto closedPrp = js.newPromiseAndResolver<void>(); closedPrp.promise.markAsHandled(); - auto readyPromise = js.resolvedPromise(); + auto readyPrp = js.newPromiseAndResolver<void>(); + readyPrp.promise.markAsHandled(); - auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver)); + auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver)); KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { maybeResolvePromise(lock.getClosedFulfiller()); + maybeResolvePromise(lock.getReadyFulfiller()); } KJ_CASE_ONEOF(errored, StreamStates::Errored) { maybeRejectPromise<void>(lock.getClosedFulfiller(), errored.getHandle(js)); + maybeRejectPromise<void>(lock.getReadyFulfiller(), errored.getHandle(js)); } KJ_CASE_ONEOF(writable, Writable) { - // Nothing to do. + maybeResolvePromise(lock.getReadyFulfiller()); } } writeState = kj::mv(lock); - writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPromise)); + writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise)); return true; } @@ -1076,6 +1124,7 @@ void WritableStreamInternalController::doClose() { state.init<StreamStates::Closed>(); KJ_IF_MAYBE(locked, writeState.tryGet<WriterLocked>()) { maybeResolvePromise(locked->getClosedFulfiller()); + maybeResolvePromise(locked->getReadyFulfiller()); writeState.init<Locked>(); } else KJ_IF_MAYBE(locked, writeState.tryGet<PipeLocked>()) { writeState.init<Unlocked>(); @@ -1087,6 +1136,7 @@ void WritableStreamInternalController::doError(jsg::Lock& js, v8::Local<v8::Valu state.init<StreamStates::Errored>(js.v8Ref(reason)); KJ_IF_MAYBE(locked, writeState.tryGet<WriterLocked>()) { maybeRejectPromise<void>(locked->getClosedFulfiller(), reason); + maybeResolvePromise(locked->getReadyFulfiller()); writeState.init<Locked>(); } else KJ_IF_MAYBE(locked, writeState.tryGet<PipeLocked>()) { writeState.init<Unlocked>(); @@ -1197,6 +1247,8 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo auto& writable = state.get<Writable>(); auto check = makeChecker(request); + auto amountToWrite = request.bytes.size(); + auto promise = writable->write(request.bytes.begin(), request.bytes.size()) .attach(kj::mv(request.ownBytes)); @@ -1210,18 +1262,20 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo // That's a larger refactor, though. return ioContext.awaitIoLegacy(kj::mv(promise)).then(js, ioContext.addFunctor( - [this, check, maybeAbort](jsg::Lock& js) -> jsg::Promise<void> { + [this, check, maybeAbort, amountToWrite](jsg::Lock& js) -> jsg::Promise<void> { auto& request = check(); maybeResolvePromise(request.promise); + decreaseCurrentWriteBufferSize(js, amountToWrite); queue.pop_front(); maybeAbort(js, request); return writeLoop(js, IoContext::current()); }), ioContext.addFunctor( - [this, check, maybeAbort](jsg::Lock& js, jsg::Value reason) + [this, check, maybeAbort, amountToWrite](jsg::Lock& js, jsg::Value reason) -> jsg::Promise<void> { auto handle = reason.getHandle(js); auto& request = check(); auto& writable = state.get<Writable>(); + decreaseCurrentWriteBufferSize(js, amountToWrite); maybeRejectPromise<void>(request.promise, handle); queue.pop_front(); if (!maybeAbort(js, request)) { diff --git a/src/workerd/api/streams/internal.h b/src/workerd/api/streams/internal.h index baa29dcd4a2..dde67e14630 100644 --- a/src/workerd/api/streams/internal.h +++ b/src/workerd/api/streams/internal.h @@ -226,8 +226,11 @@ class WritableStreamInternalController: public WritableStreamController { : state(closed) {} explicit WritableStreamInternalController(StreamStates::Errored errored) : state(kj::mv(errored)) {} - explicit WritableStreamInternalController(Writable writable) - : state(kj::mv(writable)) {} + explicit WritableStreamInternalController(Writable writable, + kj::Maybe<uint64_t> maybeHighWaterMark = nullptr) + : state(kj::mv(writable)), + maybeHighWaterMark(maybeHighWaterMark) { +} WritableStreamInternalController(WritableStreamInternalController&& other) = default; WritableStreamInternalController& operator=(WritableStreamInternalController&& other) = default; @@ -270,6 +273,8 @@ class WritableStreamInternalController: public WritableStreamController { void visitForGc(jsg::GcVisitor& visitor) override; + void setHighWaterMark(uint64_t highWaterMark); + private: bool isClosedOrClosing(); @@ -304,6 +309,17 @@ class WritableStreamInternalController: public WritableStreamController { kj::Maybe<PendingAbort> maybePendingAbort; + uint64_t currentWriteBufferSize = 0; + kj::Maybe<uint64_t> maybeHighWaterMark; + // The highWaterMark is the total amount of data currently buffered in + // the controller waiting to be flushed out to the underlying WritableStreamSink. + // It is used to implement backpressure signaling using desiredSize and the ready + // promise on the writer. + + void increaseCurrentWriteBufferSize(jsg::Lock& js, uint64_t amount); + void decreaseCurrentWriteBufferSize(jsg::Lock& js, uint64_t amount); + void updateBackpressure(jsg::Lock& js, bool backpressure); + struct Write { kj::Maybe<jsg::Promise<void>::Resolver> promise; std::shared_ptr<v8::BackingStore> ownBytes; diff --git a/src/workerd/api/streams/transform.c++ b/src/workerd/api/streams/transform.c++ index 8b643bbca33..0a45e16714a 100644 --- a/src/workerd/api/streams/transform.c++ +++ b/src/workerd/api/streams/transform.c++ @@ -122,20 +122,28 @@ jsg::Ref<TransformStream> TransformStream::constructor( return IdentityTransformStream::constructor(js); } -jsg::Ref<IdentityTransformStream> IdentityTransformStream::constructor(jsg::Lock& js) { +jsg::Ref<IdentityTransformStream> IdentityTransformStream::constructor( + jsg::Lock& js, + jsg::Optional<IdentityTransformStream::QueuingStrategy> maybeQueuingStrategy) { auto readableSide = kj::refcounted<IdentityTransformStreamImpl>(); auto writableSide = kj::addRef(*readableSide); auto& ioContext = IoContext::current(); + kj::Maybe<uint64_t> maybeHighWaterMark = nullptr; + KJ_IF_MAYBE(queuingStrategy, maybeQueuingStrategy) { + maybeHighWaterMark = queuingStrategy->highWaterMark; + } + return jsg::alloc<IdentityTransformStream>( jsg::alloc<ReadableStream>(ioContext, kj::mv(readableSide)), - jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide))); + jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide), maybeHighWaterMark)); } jsg::Ref<FixedLengthStream> FixedLengthStream::constructor( jsg::Lock& js, - uint64_t expectedLength) { + uint64_t expectedLength, + jsg::Optional<IdentityTransformStream::QueuingStrategy> maybeQueuingStrategy) { constexpr uint64_t MAX_SAFE_INTEGER = (1ull << 53) - 1; JSG_REQUIRE(expectedLength <= MAX_SAFE_INTEGER, TypeError, @@ -146,9 +154,17 @@ jsg::Ref<FixedLengthStream> FixedLengthStream::constructor( auto& ioContext = IoContext::current(); + kj::Maybe<uint64_t> maybeHighWaterMark = nullptr; + // For a FixedLengthStream we do not want a highWaterMark higher than the expectedLength. + KJ_IF_MAYBE(queuingStrategy, maybeQueuingStrategy) { + maybeHighWaterMark = queuingStrategy->highWaterMark.map([&](uint64_t highWaterMark) { + return kj::min(expectedLength, highWaterMark); + }); + } + return jsg::alloc<FixedLengthStream>( jsg::alloc<ReadableStream>(ioContext, kj::mv(readableSide)), - jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide))); + jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide), maybeHighWaterMark)); } } // namespace workerd::api diff --git a/src/workerd/api/streams/transform.h b/src/workerd/api/streams/transform.h index 63ab4f70d97..fa98b6e2fc2 100644 --- a/src/workerd/api/streams/transform.h +++ b/src/workerd/api/streams/transform.h @@ -81,7 +81,15 @@ class IdentityTransformStream: public TransformStream { public: using TransformStream::TransformStream; - static jsg::Ref<IdentityTransformStream> constructor(jsg::Lock& js); + struct QueuingStrategy { + jsg::Optional<uint64_t> highWaterMark; + + JSG_STRUCT(highWaterMark); + }; + + static jsg::Ref<IdentityTransformStream> constructor( + jsg::Lock& js, + jsg::Optional<QueuingStrategy> queuingStrategy = nullptr); JSG_RESOURCE_TYPE(IdentityTransformStream) { JSG_INHERIT(TransformStream); @@ -98,7 +106,10 @@ class FixedLengthStream: public IdentityTransformStream { public: using IdentityTransformStream::IdentityTransformStream; - static jsg::Ref<FixedLengthStream> constructor(jsg::Lock& js, uint64_t expectedLength); + static jsg::Ref<FixedLengthStream> constructor( + jsg::Lock& js, + uint64_t expectedLength, + jsg::Optional<QueuingStrategy> queuingStrategy = nullptr); JSG_RESOURCE_TYPE(FixedLengthStream) { JSG_INHERIT(IdentityTransformStream); diff --git a/src/workerd/api/streams/writable.c++ b/src/workerd/api/streams/writable.c++ index 5467673e15c..cf5b2aa87dc 100644 --- a/src/workerd/api/streams/writable.c++ +++ b/src/workerd/api/streams/writable.c++ @@ -188,9 +188,11 @@ void WritableStreamDefaultWriter::visitForGc(jsg::GcVisitor& visitor) { WritableStream::WritableStream( IoContext& ioContext, - kj::Own<WritableStreamSink> sink) + kj::Own<WritableStreamSink> sink, + kj::Maybe<uint64_t> maybeHighWaterMark) : ioContext(ioContext), - controller(kj::heap<WritableStreamInternalController>(ioContext.addObject(kj::mv(sink)))) { + controller(kj::heap<WritableStreamInternalController>(ioContext.addObject(kj::mv(sink)), + maybeHighWaterMark)) { getController().setOwnerRef(*this); } diff --git a/src/workerd/api/streams/writable.h b/src/workerd/api/streams/writable.h index 450ceef7299..9d544f76ca6 100644 --- a/src/workerd/api/streams/writable.h +++ b/src/workerd/api/streams/writable.h @@ -102,7 +102,8 @@ class WritableStream: public jsg::Object { kj::Own<WritableStreamJsController>>; explicit WritableStream(IoContext& ioContext, - kj::Own<WritableStreamSink> sink); + kj::Own<WritableStreamSink> sink, + kj::Maybe<uint64_t> maybeHighWaterMark = nullptr); explicit WritableStream(Controller controller);