Skip to content

Commit 857d9b1

Browse files
committedMar 27, 2023
Implement backpressure signaling for IdentityTransformStream writable
Enables optional use of writer.desiredSize/writer.ready for backpressure signaling with IdentityTransformStream and FixedLengthStream.
1 parent 26dab81 commit 857d9b1

9 files changed

+275
-18
lines changed
 

‎src/workerd/api/streams.h

+1
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ namespace workerd::api {
3737
api::TransformStream, \
3838
api::FixedLengthStream, \
3939
api::IdentityTransformStream, \
40+
api::IdentityTransformStream::QueuingStrategy, \
4041
api::ReadableStream::ValuesOptions, \
4142
api::ReadableStream::ReadableStreamAsyncIterator, \
4243
api::ReadableStream::ReadableStreamAsyncIterator::Next, \
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
import {
2+
notStrictEqual,
3+
strictEqual,
4+
} from 'node:assert';
5+
6+
export const identityTransformStream = {
7+
async test(ctrl, env, ctx) {
8+
const ts = new IdentityTransformStream({ highWaterMark: 10 });
9+
const writer = ts.writable.getWriter();
10+
const reader = ts.readable.getReader();
11+
12+
strictEqual(writer.desiredSize, 10);
13+
14+
// We shouldn't have to wait here.
15+
const firstReady = writer.ready;
16+
await writer.ready;
17+
18+
writer.write(new Uint8Array(1));
19+
strictEqual(writer.desiredSize, 9);
20+
21+
// Let's write a second chunk that will be buffered. This one
22+
// should impact the desiredSize and the backpressure signal.
23+
writer.write(new Uint8Array(9));
24+
strictEqual(writer.desiredSize, 0);
25+
26+
// The ready promise should have been replaced
27+
notStrictEqual(firstReady, writer.ready);
28+
29+
async function waitForReady() {
30+
strictEqual(writer.desiredSize, 0);
31+
await writer.ready;
32+
// The backpressure should have been relieved a bit,
33+
// but only by the amount of what we've currently read.
34+
strictEqual(writer.desiredSize, 1);
35+
}
36+
37+
await Promise.all([
38+
// We call the waitForReady first to ensure that we set up waiting on
39+
// the ready promise before we relieve the backpressure using the read.
40+
// If the backpressure signal is not working correctly, the test will
41+
// fail with an error indicating that a hanging promise was canceled.
42+
waitForReady(),
43+
reader.read(),
44+
]);
45+
46+
// If we read again, the backpressure should be fully resolved.
47+
await reader.read();
48+
strictEqual(writer.desiredSize, 10);
49+
}
50+
};
51+
52+
export const identityTransformStreamNoHWM = {
53+
async test(ctrl, env, ctx) {
54+
// Test that the original default behavior still works as expected.
55+
56+
const ts = new IdentityTransformStream();
57+
const writer = ts.writable.getWriter();
58+
const reader = ts.readable.getReader();
59+
60+
strictEqual(writer.desiredSize, 1);
61+
62+
// We shouldn't have to wait here.
63+
const firstReady = writer.ready;
64+
await writer.ready;
65+
66+
writer.write(new Uint8Array(1));
67+
strictEqual(writer.desiredSize, 1);
68+
69+
// Let's write a second chunk that will be buffered. There should
70+
// be no indication that the desired size has changed.
71+
writer.write(new Uint8Array(9));
72+
strictEqual(writer.desiredSize, 1);
73+
74+
// The ready promise should be exactly the same...
75+
strictEqual(firstReady, writer.ready);
76+
77+
async function waitForReady() {
78+
strictEqual(writer.desiredSize, 1);
79+
await writer.ready;
80+
strictEqual(writer.desiredSize, 1);
81+
}
82+
83+
await Promise.all([
84+
// We call the waitForReady first to ensure that we set up waiting on
85+
// the ready promise before we relieve the backpressure using the read.
86+
// If the backpressure signal is not working correctly, the test will
87+
// fail with an error indicating that a hanging promise was canceled.
88+
waitForReady(),
89+
reader.read(),
90+
]);
91+
92+
// If we read again, the backpressure should be fully resolved.
93+
await reader.read();
94+
strictEqual(writer.desiredSize, 1);
95+
}
96+
};
97+
98+
export const fixedLengthStream = {
99+
async test(ctrl, env, ctx) {
100+
const ts = new FixedLengthStream(10, { highWaterMark: 100 });
101+
const writer = ts.writable.getWriter();
102+
const reader = ts.readable.getReader();
103+
104+
// Even tho we specified 100 as our highWaterMark, we only expect 10
105+
// bytes total, so we'll make that our highWaterMark instead.
106+
strictEqual(writer.desiredSize, 10);
107+
108+
// We shouldn't have to wait here.
109+
const firstReady = writer.ready;
110+
await writer.ready;
111+
112+
writer.write(new Uint8Array(1));
113+
strictEqual(writer.desiredSize, 9);
114+
115+
// Let's write a second chunk that will be buffered. This one
116+
// should impact the desiredSize and the backpressure signal.
117+
writer.write(new Uint8Array(9));
118+
strictEqual(writer.desiredSize, 0);
119+
120+
// The ready promise should have been replaced
121+
notStrictEqual(firstReady, writer.ready);
122+
123+
async function waitForReady() {
124+
await writer.ready;
125+
// The backpressure should have been relieved a bit,
126+
// but only by the amount of what we've currently read.
127+
strictEqual(writer.desiredSize, 1);
128+
}
129+
130+
await Promise.all([
131+
// We call the waitForReady first to ensure that we set up waiting on
132+
// the ready promise before we relieve the backpressure using the read.
133+
waitForReady(),
134+
reader.read(),
135+
]);
136+
137+
// If we read again, the backpressure should be fully resolved.
138+
await reader.read();
139+
strictEqual(writer.desiredSize, 10);
140+
}
141+
};
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
using Workerd = import "/workerd/workerd.capnp";
2+
3+
const unitTests :Workerd.Config = (
4+
services = [
5+
( name = "identitytransformstream-backpressure-test",
6+
worker = (
7+
modules = [
8+
(name = "worker", esModule = embed "identitytransformstream-backpressure-test.js")
9+
],
10+
compatibilityDate = "2023-01-15",
11+
compatibilityFlags = ["nodejs_compat"]
12+
)
13+
),
14+
],
15+
);

‎src/workerd/api/streams/internal.c++

+61-7
Original file line numberDiff line numberDiff line change
@@ -763,6 +763,7 @@ jsg::Promise<void> WritableStreamInternalController::write(
763763
}
764764

765765
auto prp = js.newPromiseAndResolver<void>();
766+
increaseCurrentWriteBufferSize(js, byteLength);
766767
queue.push_back(WriteEvent {
767768
.outputLock = IoContext::current().waitForOutputLocksIfNecessaryIoOwn(),
768769
.event = Write {
@@ -780,6 +781,45 @@ jsg::Promise<void> WritableStreamInternalController::write(
780781
KJ_UNREACHABLE;
781782
}
782783

784+
void WritableStreamInternalController::increaseCurrentWriteBufferSize(
785+
jsg::Lock& js,
786+
uint64_t amount) {
787+
currentWriteBufferSize += amount;
788+
KJ_IF_MAYBE(highWaterMark, maybeHighWaterMark) {
789+
updateBackpressure(js, (*highWaterMark) - currentWriteBufferSize <= 0);
790+
}
791+
}
792+
793+
void WritableStreamInternalController::decreaseCurrentWriteBufferSize(
794+
jsg::Lock& js,
795+
uint64_t amount) {
796+
currentWriteBufferSize -= amount;
797+
KJ_IF_MAYBE(highWaterMark, maybeHighWaterMark) {
798+
updateBackpressure(js, (*highWaterMark) - currentWriteBufferSize <= 0);
799+
}
800+
}
801+
802+
void WritableStreamInternalController::updateBackpressure(jsg::Lock& js, bool backpressure) {
803+
KJ_IF_MAYBE(writerLock, writeState.tryGet<WriterLocked>()) {
804+
if (backpressure) {
805+
// Per the spec, when backpressure is updated and is true, we replace the existing
806+
// ready promise on the writer with a new pending promise, regardless of whether
807+
// the existing one is resolved or not.
808+
auto prp = js.newPromiseAndResolver<void>();
809+
prp.promise.markAsHandled();
810+
writerLock->setReadyFulfiller(prp);
811+
return;
812+
}
813+
814+
// When backpressure is updated and is false, we resolve the ready promise on the writer
815+
maybeResolvePromise(writerLock->getReadyFulfiller());
816+
}
817+
}
818+
819+
void WritableStreamInternalController::setHighWaterMark(uint64_t highWaterMark) {
820+
maybeHighWaterMark = highWaterMark;
821+
}
822+
783823
jsg::Promise<void> WritableStreamInternalController::close(
784824
jsg::Lock& js,
785825
bool markAsHandled) {
@@ -1010,7 +1050,12 @@ kj::Maybe<int> WritableStreamInternalController::getDesiredSize() {
10101050
KJ_SWITCH_ONEOF(state) {
10111051
KJ_CASE_ONEOF(closed, StreamStates::Closed) { return uint(0); }
10121052
KJ_CASE_ONEOF(errored, StreamStates::Errored) { return nullptr; }
1013-
KJ_CASE_ONEOF(writable, Writable) { return 1; }
1053+
KJ_CASE_ONEOF(writable, Writable) {
1054+
KJ_IF_MAYBE(highWaterMark, maybeHighWaterMark) {
1055+
return (*highWaterMark) - currentWriteBufferSize;
1056+
}
1057+
return 1;
1058+
}
10141059
}
10151060

10161061
KJ_UNREACHABLE;
@@ -1024,24 +1069,27 @@ bool WritableStreamInternalController::lockWriter(jsg::Lock& js, Writer& writer)
10241069
auto closedPrp = js.newPromiseAndResolver<void>();
10251070
closedPrp.promise.markAsHandled();
10261071

1027-
auto readyPromise = js.resolvedPromise();
1072+
auto readyPrp = js.newPromiseAndResolver<void>();
1073+
readyPrp.promise.markAsHandled();
10281074

1029-
auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver));
1075+
auto lock = WriterLocked(writer, kj::mv(closedPrp.resolver), kj::mv(readyPrp.resolver));
10301076

10311077
KJ_SWITCH_ONEOF(state) {
10321078
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
10331079
maybeResolvePromise(lock.getClosedFulfiller());
1080+
maybeResolvePromise(lock.getReadyFulfiller());
10341081
}
10351082
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
10361083
maybeRejectPromise<void>(lock.getClosedFulfiller(), errored.getHandle(js));
1084+
maybeRejectPromise<void>(lock.getReadyFulfiller(), errored.getHandle(js));
10371085
}
10381086
KJ_CASE_ONEOF(writable, Writable) {
1039-
// Nothing to do.
1087+
maybeResolvePromise(lock.getReadyFulfiller());
10401088
}
10411089
}
10421090

10431091
writeState = kj::mv(lock);
1044-
writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPromise));
1092+
writer.attach(*this, kj::mv(closedPrp.promise), kj::mv(readyPrp.promise));
10451093
return true;
10461094
}
10471095

@@ -1076,6 +1124,7 @@ void WritableStreamInternalController::doClose() {
10761124
state.init<StreamStates::Closed>();
10771125
KJ_IF_MAYBE(locked, writeState.tryGet<WriterLocked>()) {
10781126
maybeResolvePromise(locked->getClosedFulfiller());
1127+
maybeResolvePromise(locked->getReadyFulfiller());
10791128
writeState.init<Locked>();
10801129
} else KJ_IF_MAYBE(locked, writeState.tryGet<PipeLocked>()) {
10811130
writeState.init<Unlocked>();
@@ -1087,6 +1136,7 @@ void WritableStreamInternalController::doError(jsg::Lock& js, v8::Local<v8::Valu
10871136
state.init<StreamStates::Errored>(js.v8Ref(reason));
10881137
KJ_IF_MAYBE(locked, writeState.tryGet<WriterLocked>()) {
10891138
maybeRejectPromise<void>(locked->getClosedFulfiller(), reason);
1139+
maybeResolvePromise(locked->getReadyFulfiller());
10901140
writeState.init<Locked>();
10911141
} else KJ_IF_MAYBE(locked, writeState.tryGet<PipeLocked>()) {
10921142
writeState.init<Unlocked>();
@@ -1197,6 +1247,8 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
11971247
auto& writable = state.get<Writable>();
11981248
auto check = makeChecker(request);
11991249

1250+
auto amountToWrite = request.bytes.size();
1251+
12001252
auto promise = writable->write(request.bytes.begin(), request.bytes.size())
12011253
.attach(kj::mv(request.ownBytes));
12021254

@@ -1210,18 +1262,20 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
12101262
// That's a larger refactor, though.
12111263
return ioContext.awaitIoLegacy(kj::mv(promise)).then(js,
12121264
ioContext.addFunctor(
1213-
[this, check, maybeAbort](jsg::Lock& js) -> jsg::Promise<void> {
1265+
[this, check, maybeAbort, amountToWrite](jsg::Lock& js) -> jsg::Promise<void> {
12141266
auto& request = check();
12151267
maybeResolvePromise(request.promise);
1268+
decreaseCurrentWriteBufferSize(js, amountToWrite);
12161269
queue.pop_front();
12171270
maybeAbort(js, request);
12181271
return writeLoop(js, IoContext::current());
12191272
}), ioContext.addFunctor(
1220-
[this, check, maybeAbort](jsg::Lock& js, jsg::Value reason)
1273+
[this, check, maybeAbort, amountToWrite](jsg::Lock& js, jsg::Value reason)
12211274
-> jsg::Promise<void> {
12221275
auto handle = reason.getHandle(js);
12231276
auto& request = check();
12241277
auto& writable = state.get<Writable>();
1278+
decreaseCurrentWriteBufferSize(js, amountToWrite);
12251279
maybeRejectPromise<void>(request.promise, handle);
12261280
queue.pop_front();
12271281
if (!maybeAbort(js, request)) {

‎src/workerd/api/streams/internal.h

+18-2
Original file line numberDiff line numberDiff line change
@@ -226,8 +226,11 @@ class WritableStreamInternalController: public WritableStreamController {
226226
: state(closed) {}
227227
explicit WritableStreamInternalController(StreamStates::Errored errored)
228228
: state(kj::mv(errored)) {}
229-
explicit WritableStreamInternalController(Writable writable)
230-
: state(kj::mv(writable)) {}
229+
explicit WritableStreamInternalController(Writable writable,
230+
kj::Maybe<uint64_t> maybeHighWaterMark = nullptr)
231+
: state(kj::mv(writable)),
232+
maybeHighWaterMark(maybeHighWaterMark) {
233+
}
231234

232235
WritableStreamInternalController(WritableStreamInternalController&& other) = default;
233236
WritableStreamInternalController& operator=(WritableStreamInternalController&& other) = default;
@@ -270,6 +273,8 @@ class WritableStreamInternalController: public WritableStreamController {
270273

271274
void visitForGc(jsg::GcVisitor& visitor) override;
272275

276+
void setHighWaterMark(uint64_t highWaterMark);
277+
273278
private:
274279
bool isClosedOrClosing();
275280

@@ -304,6 +309,17 @@ class WritableStreamInternalController: public WritableStreamController {
304309

305310
kj::Maybe<PendingAbort> maybePendingAbort;
306311

312+
uint64_t currentWriteBufferSize = 0;
313+
kj::Maybe<uint64_t> maybeHighWaterMark;
314+
// The highWaterMark is the total amount of data currently buffered in
315+
// the controller waiting to be flushed out to the underlying WritableStreamSink.
316+
// It is used to implement backpressure signaling using desiredSize and the ready
317+
// promise on the writer.
318+
319+
void increaseCurrentWriteBufferSize(jsg::Lock& js, uint64_t amount);
320+
void decreaseCurrentWriteBufferSize(jsg::Lock& js, uint64_t amount);
321+
void updateBackpressure(jsg::Lock& js, bool backpressure);
322+
307323
struct Write {
308324
kj::Maybe<jsg::Promise<void>::Resolver> promise;
309325
std::shared_ptr<v8::BackingStore> ownBytes;

0 commit comments

Comments
 (0)