Skip to content

Commit

Permalink
WritableStream is now flushed prior to startTls.
Browse files Browse the repository at this point in the history
  • Loading branch information
dom96 committed Apr 27, 2023
1 parent 575eba6 commit 74fa56a
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 16 deletions.
39 changes: 25 additions & 14 deletions src/workerd/api/sockets.c++
Original file line number Diff line number Diff line change
Expand Up @@ -164,23 +164,34 @@ jsg::Ref<Socket> Socket::startTls(jsg::Lock& js, jsg::Optional<TlsOptions> tlsOp

// The current socket's writable buffers need to be flushed. The socket's WritableStream is backed
// by an AsyncIoStream which doesn't implement any buffering, so we don't need to worry about
// flushing. But this is something to keep in mind in case this assumption no longer holds in
// the future.
// flushing. But the JS WritableStream holds a queue so some data may still be buffered. This
// means we need to flush the WritableStream.
//
// Detach the AsyncIoStream from the Writable/Readable streams and make them unusable.
writable->removeSink(js);
readable = readable->detach(js, true);
closeFulfiller.resolver.resolve();

auto acceptedHostname = domain.asPtr();
KJ_IF_MAYBE(s, tlsOptions) {
KJ_IF_MAYBE(expectedHost, s->expectedServerHostname) {
acceptedHostname = *expectedHost;
auto& context = IoContext::current();
auto secureStreamPromise = context.awaitJs(writable->flush(js).then(js,
[this, domain = kj::heapString(domain), tlsOptions = kj::mv(tlsOptions),
tlsStarter = kj::mv(tlsStarter)](jsg::Lock& js) mutable {
writable->removeSink(js);
readable = readable->detach(js, true);
closeFulfiller.resolver.resolve();

auto acceptedHostname = domain.asPtr();
KJ_IF_MAYBE(s, tlsOptions) {
KJ_IF_MAYBE(expectedHost, s->expectedServerHostname) {
acceptedHostname = *expectedHost;
}
}
}
// All non-secure sockets should have a tlsStarter.
kj::Own<kj::AsyncIoStream> secure = KJ_ASSERT_NONNULL(*tlsStarter)(acceptedHostname);
return setupSocket(js, kj::mv(secure), kj::mv(options), kj::mv(tlsStarter), true, kj::mv(domain));
// All non-secure sockets should have a tlsStarter.
kj::Own<kj::AsyncIoStream> secure = KJ_ASSERT_NONNULL(*tlsStarter)(acceptedHostname);
return secure;
}));

// The existing tlsStarter gets consumed and we won't need it again. Pass in an empty tlsStarter
// to `setupSocket`.
auto newTlsStarter = kj::heap<kj::TlsStarterCallback>();
return setupSocket(js, kj::newPromisedStream(kj::mv(secureStreamPromise)), kj::mv(options),
kj::mv(newTlsStarter), true, kj::mv(domain));
}

void Socket::handleProxyStatus(
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/api/streams/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -420,6 +420,10 @@ class WritableStreamController {
// Indicates that no additional data will be written to the controller. All
// existing pending writes should be allowed to complete.

virtual jsg::Promise<void> flush(jsg::Lock& js, bool markAsHandler = false) = 0;
// Requests all pending data to the written. The returned promise is resolved when all
// pending writes have completed.

virtual jsg::Promise<void> abort(
jsg::Lock& js,
jsg::Optional<v8::Local<v8::Value>> reason) = 0;
Expand Down
55 changes: 54 additions & 1 deletion src/workerd/api/streams/internal.c++
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,40 @@ jsg::Promise<void> WritableStreamInternalController::close(
KJ_UNREACHABLE;
}

jsg::Promise<void> WritableStreamInternalController::flush(
jsg::Lock& js,
bool markAsHandled) {
if (isClosedOrClosing()) {
auto reason = js.v8TypeError("This WritableStream has been closed."_kj);
return rejectedMaybeHandledPromise<void>(js, reason, markAsHandled);
}

KJ_SWITCH_ONEOF(state) {
KJ_CASE_ONEOF(closed, StreamStates::Closed) {
// Handled by isClosedOrClosing().
KJ_UNREACHABLE;
}
KJ_CASE_ONEOF(errored, StreamStates::Errored) {
auto reason = errored.getHandle(js);
return rejectedMaybeHandledPromise<void>(js, reason, markAsHandled);
}
KJ_CASE_ONEOF(writable, Writable) {
auto prp = js.newPromiseAndResolver<void>();
if (markAsHandled) {
prp.promise.markAsHandled();
}
queue.push_back(WriteEvent {
.outputLock = IoContext::current().waitForOutputLocksIfNecessaryIoOwn(),
.event = Flush { .promise = kj::mv(prp.resolver) }
});
ensureWriting(js);
return kj::mv(prp.promise);
}
}

KJ_UNREACHABLE;
}

jsg::Promise<void> WritableStreamInternalController::abort(
jsg::Lock& js,
jsg::Optional<v8::Local<v8::Value>> maybeReason) {
Expand Down Expand Up @@ -1117,7 +1151,9 @@ void WritableStreamInternalController::releaseWriter(
}

bool WritableStreamInternalController::isClosedOrClosing() {
return state.is<StreamStates::Closed>() || (!queue.empty() && queue.back().event.is<Close>());
bool isClosing = !queue.empty() && queue.back().event.is<Close>();
bool isFlushing = !queue.empty() && queue.back().event.is<Flush>();
return state.is<StreamStates::Closed>() || isClosing || isFlushing;
}

void WritableStreamInternalController::doClose() {
Expand Down Expand Up @@ -1426,6 +1462,17 @@ jsg::Promise<void> WritableStreamInternalController::writeLoopAfterFrontOutputLo
finishError(js, handle);
}));
}
KJ_CASE_ONEOF(request, Flush) {
// Flushing is similar to closing the stream, the main difference is that `finishClose`
// and `writable->end()` are never called.
auto check = makeChecker(request);

auto& checkReq = check();
maybeResolvePromise(checkReq.promise);
queue.pop_front();

return js.resolvedPromise();
}
}

KJ_UNREACHABLE;
Expand Down Expand Up @@ -1610,6 +1657,9 @@ void WritableStreamInternalController::drain(jsg::Lock& js, v8::Local<v8::Value>
KJ_CASE_ONEOF(closeRequest, Close) {
maybeRejectPromise<void>(closeRequest.promise, reason);
}
KJ_CASE_ONEOF(flushRequest, Flush) {
maybeRejectPromise<void>(flushRequest.promise, reason);
}
}
queue.pop_front();
}
Expand All @@ -1624,6 +1674,9 @@ void WritableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) {
KJ_CASE_ONEOF(close, Close) {
visitor.visit(close.promise);
}
KJ_CASE_ONEOF(flush, Flush) {
visitor.visit(flush.promise);
}
KJ_CASE_ONEOF(pipe, Pipe) {
visitor.visit(pipe.maybeSignal, pipe.promise);
}
Expand Down
7 changes: 6 additions & 1 deletion src/workerd/api/streams/internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,8 @@ class WritableStreamInternalController: public WritableStreamController {

jsg::Promise<void> close(jsg::Lock& js, bool markAsHandled = false) override;

jsg::Promise<void> flush(jsg::Lock& js, bool markAsHandled = false) override;

jsg::Promise<void> abort(jsg::Lock& js, jsg::Optional<v8::Local<v8::Value>> reason) override;

kj::Maybe<jsg::Promise<void>> tryPipeFrom(
Expand Down Expand Up @@ -329,6 +331,9 @@ class WritableStreamInternalController: public WritableStreamController {
struct Close {
kj::Maybe<jsg::Promise<void>::Resolver> promise;
};
struct Flush {
kj::Maybe<jsg::Promise<void>::Resolver> promise;
};
struct Pipe {
WritableStreamInternalController& parent;
ReadableStreamController::PipeController& source;
Expand All @@ -344,7 +349,7 @@ class WritableStreamInternalController: public WritableStreamController {
};
struct WriteEvent {
kj::Maybe<IoOwn<kj::Promise<void>>> outputLock; // must wait for this before actually writing
kj::OneOf<Write, Pipe, Close> event;
kj::OneOf<Write, Pipe, Close, Flush> event;
};

std::deque<WriteEvent> queue;
Expand Down
4 changes: 4 additions & 0 deletions src/workerd/api/streams/standard.h
Original file line number Diff line number Diff line change
Expand Up @@ -1078,6 +1078,10 @@ class WritableStreamJsController: public WritableStreamController,

jsg::Promise<void> close(jsg::Lock& js, bool markAsHandled = false) override;

jsg::Promise<void> flush(jsg::Lock& js, bool markAsHandled = false) override {
KJ_UNIMPLEMENTED("expected WritableStreamInternalController implementation to be enough");
}

void doClose() override;

void doError(jsg::Lock& js, v8::Local<v8::Value> reason) override;
Expand Down
8 changes: 8 additions & 0 deletions src/workerd/api/streams/writable.c++
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,14 @@ jsg::Promise<void> WritableStream::close(jsg::Lock& js) {
return getController().close(js);
}

jsg::Promise<void> WritableStream::flush(jsg::Lock& js) {
if (isLocked()) {
return js.rejectedPromise<void>(
js.v8TypeError("This WritableStream is currently locked to a writer."_kj));
}
return getController().flush(js);
}

jsg::Ref<WritableStreamDefaultWriter> WritableStream::getWriter(jsg::Lock& js) {
return WritableStreamDefaultWriter::constructor(js, JSG_THIS);
}
Expand Down
1 change: 1 addition & 0 deletions src/workerd/api/streams/writable.h
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ class WritableStream: public jsg::Object {
// then its abort algorithm causes the transform's readable side to become errored with `reason`.

jsg::Promise<void> close(jsg::Lock& js);
jsg::Promise<void> flush(jsg::Lock& js);

jsg::Ref<WritableStreamDefaultWriter> getWriter(jsg::Lock& js);

Expand Down

0 comments on commit 74fa56a

Please # to comment.