Skip to content

Implement backpressure signaling for IdentityTransformStream writable #473

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 1 commit into from
Mar 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/workerd/api/streams.h
Original file line number Diff line number Diff line change
@@ -37,6 +37,7 @@ namespace workerd::api {
api::TransformStream, \
api::FixedLengthStream, \
api::IdentityTransformStream, \
api::IdentityTransformStream::QueuingStrategy, \
api::ReadableStream::ValuesOptions, \
api::ReadableStream::ReadableStreamAsyncIterator, \
api::ReadableStream::ReadableStreamAsyncIterator::Next, \
141 changes: 141 additions & 0 deletions src/workerd/api/streams/identitytransformstream-backpressure-test.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
import {
notStrictEqual,
strictEqual,
} from 'node:assert';

export const identityTransformStream = {
async test(ctrl, env, ctx) {
const ts = new IdentityTransformStream({ highWaterMark: 10 });
const writer = ts.writable.getWriter();
const reader = ts.readable.getReader();

strictEqual(writer.desiredSize, 10);

// We shouldn't have to wait here.
const firstReady = writer.ready;
await writer.ready;

writer.write(new Uint8Array(1));
strictEqual(writer.desiredSize, 9);

// Let's write a second chunk that will be buffered. This one
// should impact the desiredSize and the backpressure signal.
writer.write(new Uint8Array(9));
strictEqual(writer.desiredSize, 0);

// The ready promise should have been replaced
notStrictEqual(firstReady, writer.ready);

async function waitForReady() {
strictEqual(writer.desiredSize, 0);
await writer.ready;
// The backpressure should have been relieved a bit,
// but only by the amount of what we've currently read.
strictEqual(writer.desiredSize, 1);
}

await Promise.all([
// We call the waitForReady first to ensure that we set up waiting on
// the ready promise before we relieve the backpressure using the read.
// If the backpressure signal is not working correctly, the test will
// fail with an error indicating that a hanging promise was canceled.
waitForReady(),
reader.read(),
]);

// If we read again, the backpressure should be fully resolved.
await reader.read();
strictEqual(writer.desiredSize, 10);
}
};

export const identityTransformStreamNoHWM = {
async test(ctrl, env, ctx) {
// Test that the original default behavior still works as expected.

const ts = new IdentityTransformStream();
const writer = ts.writable.getWriter();
const reader = ts.readable.getReader();

strictEqual(writer.desiredSize, 1);

// We shouldn't have to wait here.
const firstReady = writer.ready;
await writer.ready;

writer.write(new Uint8Array(1));
strictEqual(writer.desiredSize, 1);

// Let's write a second chunk that will be buffered. There should
// be no indication that the desired size has changed.
writer.write(new Uint8Array(9));
strictEqual(writer.desiredSize, 1);

// The ready promise should be exactly the same...
strictEqual(firstReady, writer.ready);

async function waitForReady() {
strictEqual(writer.desiredSize, 1);
await writer.ready;
strictEqual(writer.desiredSize, 1);
}

await Promise.all([
// We call the waitForReady first to ensure that we set up waiting on
// the ready promise before we relieve the backpressure using the read.
// If the backpressure signal is not working correctly, the test will
// fail with an error indicating that a hanging promise was canceled.
waitForReady(),
reader.read(),
]);

// If we read again, the backpressure should be fully resolved.
await reader.read();
strictEqual(writer.desiredSize, 1);
}
};

export const fixedLengthStream = {
async test(ctrl, env, ctx) {
const ts = new FixedLengthStream(10, { highWaterMark: 100 });
const writer = ts.writable.getWriter();
const reader = ts.readable.getReader();

// Even tho we specified 100 as our highWaterMark, we only expect 10
// bytes total, so we'll make that our highWaterMark instead.
strictEqual(writer.desiredSize, 10);

// We shouldn't have to wait here.
const firstReady = writer.ready;
await writer.ready;

writer.write(new Uint8Array(1));
strictEqual(writer.desiredSize, 9);

// Let's write a second chunk that will be buffered. This one
// should impact the desiredSize and the backpressure signal.
writer.write(new Uint8Array(9));
strictEqual(writer.desiredSize, 0);

// The ready promise should have been replaced
notStrictEqual(firstReady, writer.ready);

async function waitForReady() {
await writer.ready;
// The backpressure should have been relieved a bit,
// but only by the amount of what we've currently read.
strictEqual(writer.desiredSize, 1);
}

await Promise.all([
// We call the waitForReady first to ensure that we set up waiting on
// the ready promise before we relieve the backpressure using the read.
waitForReady(),
reader.read(),
]);

// If we read again, the backpressure should be fully resolved.
await reader.read();
strictEqual(writer.desiredSize, 10);
}
};
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
using Workerd = import "/workerd/workerd.capnp";

const unitTests :Workerd.Config = (
services = [
( name = "identitytransformstream-backpressure-test",
worker = (
modules = [
(name = "worker", esModule = embed "identitytransformstream-backpressure-test.js")
],
compatibilityDate = "2023-01-15",
compatibilityFlags = ["nodejs_compat"]
)
),
],
);
68 changes: 61 additions & 7 deletions src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
@@ -763,6 +763,7 @@ jsg::Promise<void> WritableStreamInternalController::write(
}

auto prp = js.newPromiseAndResolver<void>();
increaseCurrentWriteBufferSize(js, byteLength);
queue.push_back(WriteEvent {
.outputLock = IoContext::current().waitForOutputLocksIfNecessaryIoOwn(),
.event = Write {
@@ -780,6 +781,45 @@ jsg::Promise<void> WritableStreamInternalController::write(
KJ_UNREACHABLE;
}

void WritableStreamInternalController::increaseCurrentWriteBufferSize(
jsg::Lock& js,
uint64_t amount) {
currentWriteBufferSize += amount;
KJ_IF_MAYBE(highWaterMark, maybeHighWaterMark) {
updateBackpressure(js, (*highWaterMark) - currentWriteBufferSize <= 0);
}
}

void WritableStreamInternalController::decreaseCurrentWriteBufferSize(
jsg::Lock& js,
uint64_t amount) {
currentWriteBufferSize -= amount;
KJ_IF_MAYBE(highWaterMark, maybeHighWaterMark) {
updateBackpressure(js, (*highWaterMark) - currentWriteBufferSize <= 0);
}
}

void WritableStreamInternalController::updateBackpressure(jsg::Lock& js, bool backpressure) {
KJ_IF_MAYBE(writerLock, writeState.tryGet<WriterLocked>()) {
if (backpressure) {
// Per the spec, when backpressure is updated and is true, we replace the existing
// ready promise on the writer with a new pending promise, regardless of whether
// the existing one is resolved or not.
auto prp = js.newPromiseAndResolver<void>();
prp.promise.markAsHandled();
writerLock->setReadyFulfiller(prp);
return;
}

// When backpressure is updated and is false, we resolve the ready promise on the writer
maybeResolvePromise(writerLock->getReadyFulfiller());
}
}

void WritableStreamInternalController::setHighWaterMark(uint64_t highWaterMark) {
maybeHighWaterMark = highWaterMark;
}

jsg::Promise<void> WritableStreamInternalController::close(
jsg::Lock& js,
bool markAsHandled) {
@@ -1010,7 +1050,12 @@ kj::Maybe<int> WritableStreamInternalController::getDesiredSize() {
KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) { return uint(0); }
KJ_CASE_ONEOF(errored, StreamStates::Errored) { return nullptr; }
KJ_CASE_ONEOF(writable, Writable) { return 1; }
KJ_CASE_ONEOF(writable, Writable) {
KJ_IF_MAYBE(highWaterMark, maybeHighWaterMark) {
return (*highWaterMark) - currentWriteBufferSize;
}
return 1;
}
}

KJ_UNREACHABLE;
@@ -1024,24 +1069,27 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
auto closedPrp = js.newPromiseAndResolver<void>();
closedPrp.promise.markAsHandled();

auto readyPromise = js.resolvedPromise();
auto readyPrp = js.newPromiseAndResolver<void>();
readyPrp.promise.markAsHandled();

auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver));
auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver));

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
maybeResolvePromise(lock.getClosedFulfiller());
maybeResolvePromise(lock.getReadyFulfiller());
}
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
maybeRejectPromise<void>(lock.getClosedFulfiller(), errored.getHandle(js));
maybeRejectPromise<void>(lock.getReadyFulfiller(), errored.getHandle(js));
}
KJ_CASE_ONEOF(writable, Writable) {
// Nothing to do.
maybeResolvePromise(lock.getReadyFulfiller());
}
}

writeState = kj::mv(lock);
writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPromise));
writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));
return true;
}

@@ -1076,6 +1124,7 @@ void WritableStreamInternalController::doClose() {
state.init<StreamStates::Closed>();
KJ_IF_MAYBE(locked, writeState.tryGet<WriterLocked>()) {
maybeResolvePromise(locked->getClosedFulfiller());
maybeResolvePromise(locked->getReadyFulfiller());
writeState.init<Locked>();
} else KJ_IF_MAYBE(locked, writeState.tryGet<PipeLocked>()) {
writeState.init<Unlocked>();
@@ -1087,6 +1136,7 @@ void WritableStreamInternalController::doError(jsg::Lock& js, v8::Local<v8::Valu
state.init<StreamStates::Errored>(js.v8Ref(reason));
KJ_IF_MAYBE(locked, writeState.tryGet<WriterLocked>()) {
maybeRejectPromise<void>(locked->getClosedFulfiller(), reason);
maybeResolvePromise(locked->getReadyFulfiller());
writeState.init<Locked>();
} else KJ_IF_MAYBE(locked, writeState.tryGet<PipeLocked>()) {
writeState.init<Unlocked>();
@@ -1197,6 +1247,8 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
auto& writable = state.get<Writable>();
auto check = makeChecker(request);

auto amountToWrite = request.bytes.size();

auto promise = writable->write(request.bytes.begin(), request.bytes.size())
.attach(kj::mv(request.ownBytes));

@@ -1210,18 +1262,20 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
// That's a larger refactor, though.
return ioContext.awaitIoLegacy(kj::mv(promise)).then(js,
ioContext.addFunctor(
[this, check, maybeAbort](jsg::Lock& js) -> jsg::Promise<void> {
[this, check, maybeAbort, amountToWrite](jsg::Lock& js) -> jsg::Promise<void> {
auto& request = check();
maybeResolvePromise(request.promise);
decreaseCurrentWriteBufferSize(js, amountToWrite);
queue.pop_front();
maybeAbort(js, request);
return writeLoop(js, IoContext::current());
}), ioContext.addFunctor(
[this, check, maybeAbort](jsg::Lock& js, jsg::Value reason)
[this, check, maybeAbort, amountToWrite](jsg::Lock& js, jsg::Value reason)
-> jsg::Promise<void> {
auto handle = reason.getHandle(js);
auto& request = check();
auto& writable = state.get<Writable>();
decreaseCurrentWriteBufferSize(js, amountToWrite);
maybeRejectPromise<void>(request.promise, handle);
queue.pop_front();
if (!maybeAbort(js, request)) {
20 changes: 18 additions & 2 deletions src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
@@ -226,8 +226,11 @@ class WritableStreamInternalController: public WritableStreamController {
: state(closed) {}
explicit WritableStreamInternalController(StreamStates::Errored errored)
: state(kj::mv(errored)) {}
explicit WritableStreamInternalController(Writable writable)
: state(kj::mv(writable)) {}
explicit WritableStreamInternalController(Writable writable,
kj::Maybe<uint64_t> maybeHighWaterMark = nullptr)
: state(kj::mv(writable)),
maybeHighWaterMark(maybeHighWaterMark) {
}

WritableStreamInternalController(WritableStreamInternalController&& other) = default;
WritableStreamInternalController& operator=(WritableStreamInternalController&& other) = default;
@@ -270,6 +273,8 @@ class WritableStreamInternalController: public WritableStreamController {

void visitForGc(jsg::GcVisitor& visitor) override;

void setHighWaterMark(uint64_t highWaterMark);

private:
bool isClosedOrClosing();

@@ -304,6 +309,17 @@ class WritableStreamInternalController: public WritableStreamController {

kj::Maybe<PendingAbort> maybePendingAbort;

uint64_t currentWriteBufferSize = 0;
kj::Maybe<uint64_t> maybeHighWaterMark;
// The highWaterMark is the total amount of data currently buffered in
// the controller waiting to be flushed out to the underlying WritableStreamSink.
// It is used to implement backpressure signaling using desiredSize and the ready
// promise on the writer.

void increaseCurrentWriteBufferSize(jsg::Lock& js, uint64_t amount);
void decreaseCurrentWriteBufferSize(jsg::Lock& js, uint64_t amount);
void updateBackpressure(jsg::Lock& js, bool backpressure);

struct Write {
kj::Maybe<jsg::Promise<void>::Resolver> promise;
std::shared_ptr<v8::BackingStore> ownBytes;
24 changes: 20 additions & 4 deletions src/workerd/api/streams/transform.c++
Original file line number Diff line number Diff line change
@@ -122,20 +122,28 @@ jsg::Ref<TransformStream> TransformStream::constructor(
return IdentityTransformStream::constructor(js);
}

jsg::Ref<IdentityTransformStream> IdentityTransformStream::constructor(jsg::Lock& js) {
jsg::Ref<IdentityTransformStream> IdentityTransformStream::constructor(
jsg::Lock& js,
jsg::Optional<IdentityTransformStream::QueuingStrategy> maybeQueuingStrategy) {
auto readableSide = kj::refcounted<IdentityTransformStreamImpl>();
auto writableSide = kj::addRef(*readableSide);

auto& ioContext = IoContext::current();

kj::Maybe<uint64_t> maybeHighWaterMark = nullptr;
KJ_IF_MAYBE(queuingStrategy, maybeQueuingStrategy) {
maybeHighWaterMark = queuingStrategy->highWaterMark;
}

return jsg::alloc<IdentityTransformStream>(
jsg::alloc<ReadableStream>(ioContext, kj::mv(readableSide)),
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide)));
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide), maybeHighWaterMark));
}

jsg::Ref<FixedLengthStream> FixedLengthStream::constructor(
jsg::Lock& js,
uint64_t expectedLength) {
uint64_t expectedLength,
jsg::Optional<IdentityTransformStream::QueuingStrategy> maybeQueuingStrategy) {
constexpr uint64_t MAX_SAFE_INTEGER = (1ull << 53) - 1;

JSG_REQUIRE(expectedLength <= MAX_SAFE_INTEGER, TypeError,
@@ -146,9 +154,17 @@ jsg::Ref<FixedLengthStream> FixedLengthStream::constructor(

auto& ioContext = IoContext::current();

kj::Maybe<uint64_t> maybeHighWaterMark = nullptr;
// For a FixedLengthStream we do not want a highWaterMark higher than the expectedLength.
KJ_IF_MAYBE(queuingStrategy, maybeQueuingStrategy) {
maybeHighWaterMark = queuingStrategy->highWaterMark.map([&](uint64_t highWaterMark) {
return kj::min(expectedLength, highWaterMark);
});
}

return jsg::alloc<FixedLengthStream>(
jsg::alloc<ReadableStream>(ioContext, kj::mv(readableSide)),
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide)));
jsg::alloc<WritableStream>(ioContext, kj::mv(writableSide), maybeHighWaterMark));
}

} // namespace workerd::api
15 changes: 13 additions & 2 deletions src/workerd/api/streams/transform.h
Original file line number Diff line number Diff line change
@@ -81,7 +81,15 @@ class IdentityTransformStream: public TransformStream {
public:
using TransformStream::TransformStream;

static jsg::Ref<IdentityTransformStream> constructor(jsg::Lock& js);
struct QueuingStrategy {
jsg::Optional<uint64_t> highWaterMark;

JSG_STRUCT(highWaterMark);
};

static jsg::Ref<IdentityTransformStream> constructor(
jsg::Lock& js,
jsg::Optional<QueuingStrategy> queuingStrategy = nullptr);

JSG_RESOURCE_TYPE(IdentityTransformStream) {
JSG_INHERIT(TransformStream);
@@ -98,7 +106,10 @@ class FixedLengthStream: public IdentityTransformStream {
public:
using IdentityTransformStream::IdentityTransformStream;

static jsg::Ref<FixedLengthStream> constructor(jsg::Lock& js, uint64_t expectedLength);
static jsg::Ref<FixedLengthStream> constructor(
jsg::Lock& js,
uint64_t expectedLength,
jsg::Optional<QueuingStrategy> queuingStrategy = nullptr);

JSG_RESOURCE_TYPE(FixedLengthStream) {
JSG_INHERIT(IdentityTransformStream);
6 changes: 4 additions & 2 deletions src/workerd/api/streams/writable.c++
Original file line number Diff line number Diff line change
@@ -188,9 +188,11 @@ void WritableStreamDefaultWriter::visitForGc(jsg::GcVisitor& visitor) {

WritableStream::WritableStream(
IoContext& ioContext,
kj::Own<WritableStreamSink> sink)
kj::Own<WritableStreamSink> sink,
kj::Maybe<uint64_t> maybeHighWaterMark)
: ioContext(ioContext),
controller(kj::heap<WritableStreamInternalController>(ioContext.addObject(kj::mv(sink)))) {
controller(kj::heap<WritableStreamInternalController>(ioContext.addObject(kj::mv(sink)),
maybeHighWaterMark)) {
getController().setOwnerRef(*this);
}

3 changes: 2 additions & 1 deletion src/workerd/api/streams/writable.h
Original file line number Diff line number Diff line change
@@ -102,7 +102,8 @@ class WritableStream: public jsg::Object {
kj::Own<WritableStreamJsController>>;

explicit WritableStream(IoContext& ioContext,
kj::Own<WritableStreamSink> sink);
kj::Own<WritableStreamSink> sink,
kj::Maybe<uint64_t> maybeHighWaterMark = nullptr);

explicit WritableStream(Controller controller);