diff --git a/src/workerd/api/sockets.c++ b/src/workerd/api/sockets.c++ index 13ce9251c82a..0a5e78562272 100644 --- a/src/workerd/api/sockets.c++ +++ b/src/workerd/api/sockets.c++ @@ -164,23 +164,34 @@ jsg::Ref Socket::startTls(jsg::Lock& js, jsg::Optional tlsOp // The current socket's writable buffers need to be flushed. The socket's WritableStream is backed // by an AsyncIoStream which doesn't implement any buffering, so we don't need to worry about - // flushing. But this is something to keep in mind in case this assumption no longer holds in - // the future. + // flushing. But the JS WritableStream holds a queue so some data may still be buffered. This + // means we need to flush the WritableStream. // // Detach the AsyncIoStream from the Writable/Readable streams and make them unusable. - writable->removeSink(js); - readable = readable->detach(js, true); - closeFulfiller.resolver.resolve(); - - auto acceptedHostname = domain.asPtr(); - KJ_IF_MAYBE(s, tlsOptions) { - KJ_IF_MAYBE(expectedHost, s->expectedServerHostname) { - acceptedHostname = *expectedHost; + auto& context = IoContext::current(); + auto secureStreamPromise = context.awaitJs(writable->flush(js).then(js, + [this, domain = kj::heapString(domain), tlsOptions = kj::mv(tlsOptions), + tlsStarter = kj::mv(tlsStarter)](jsg::Lock& js) mutable { + writable->removeSink(js); + readable = readable->detach(js, true); + closeFulfiller.resolver.resolve(); + + auto acceptedHostname = domain.asPtr(); + KJ_IF_MAYBE(s, tlsOptions) { + KJ_IF_MAYBE(expectedHost, s->expectedServerHostname) { + acceptedHostname = *expectedHost; + } } - } - // All non-secure sockets should have a tlsStarter. - kj::Own secure = KJ_ASSERT_NONNULL(*tlsStarter)(acceptedHostname); - return setupSocket(js, kj::mv(secure), kj::mv(options), kj::mv(tlsStarter), true, kj::mv(domain)); + // All non-secure sockets should have a tlsStarter. + kj::Own secure = KJ_ASSERT_NONNULL(*tlsStarter)(acceptedHostname); + return secure; + })); + + // The existing tlsStarter gets consumed and we won't need it again. Pass in an empty tlsStarter + // to `setupSocket`. + auto newTlsStarter = kj::heap(); + return setupSocket(js, kj::newPromisedStream(kj::mv(secureStreamPromise)), kj::mv(options), + kj::mv(newTlsStarter), true, kj::mv(domain)); } void Socket::handleProxyStatus( diff --git a/src/workerd/api/streams/common.h b/src/workerd/api/streams/common.h index 90082240ba11..2d07d8e1f960 100644 --- a/src/workerd/api/streams/common.h +++ b/src/workerd/api/streams/common.h @@ -420,6 +420,10 @@ class WritableStreamController { // Indicates that no additional data will be written to the controller. All // existing pending writes should be allowed to complete. + virtual jsg::Promise flush(jsg::Lock& js, bool markAsHandler = false) = 0; + // Requests all pending data to the written. The returned promise is resolved when all + // pending writes have completed. + virtual jsg::Promise abort( jsg::Lock& js, jsg::Optional> reason) = 0; diff --git a/src/workerd/api/streams/internal.c++ b/src/workerd/api/streams/internal.c++ index 67b8dcd7ebeb..9532a473aec0 100644 --- a/src/workerd/api/streams/internal.c++ +++ b/src/workerd/api/streams/internal.c++ @@ -854,6 +854,40 @@ jsg::Promise WritableStreamInternalController::close( KJ_UNREACHABLE; } +jsg::Promise WritableStreamInternalController::flush( + jsg::Lock& js, + bool markAsHandled) { + if (isClosedOrClosing()) { + auto reason = js.v8TypeError("This WritableStream has been closed."_kj); + return rejectedMaybeHandledPromise(js, reason, markAsHandled); + } + + KJ_SWITCH_ONEOF(state) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) { + // Handled by isClosedOrClosing(). + KJ_UNREACHABLE; + } + KJ_CASE_ONEOF(errored, StreamStates::Errored) { + auto reason = errored.getHandle(js); + return rejectedMaybeHandledPromise(js, reason, markAsHandled); + } + KJ_CASE_ONEOF(writable, Writable) { + auto prp = js.newPromiseAndResolver(); + if (markAsHandled) { + prp.promise.markAsHandled(); + } + queue.push_back(WriteEvent { + .outputLock = IoContext::current().waitForOutputLocksIfNecessaryIoOwn(), + .event = Flush { .promise = kj::mv(prp.resolver) } + }); + ensureWriting(js); + return kj::mv(prp.promise); + } + } + + KJ_UNREACHABLE; +} + jsg::Promise WritableStreamInternalController::abort( jsg::Lock& js, jsg::Optional> maybeReason) { @@ -1117,7 +1151,9 @@ void WritableStreamInternalController::releaseWriter( } bool WritableStreamInternalController::isClosedOrClosing() { - return state.is() || (!queue.empty() && queue.back().event.is()); + bool isClosing = !queue.empty() && queue.back().event.is(); + bool isFlushing = !queue.empty() && queue.back().event.is(); + return state.is() || isClosing || isFlushing; } void WritableStreamInternalController::doClose() { @@ -1426,6 +1462,17 @@ jsg::Promise WritableStreamInternalController::writeLoopAfterFrontOutputLo finishError(js, handle); })); } + KJ_CASE_ONEOF(request, Flush) { + // Flushing is similar to closing the stream, the main difference is that `finishClose` + // and `writable->end()` are never called. + auto check = makeChecker(request); + + auto& checkReq = check(); + maybeResolvePromise(checkReq.promise); + queue.pop_front(); + + return js.resolvedPromise(); + } } KJ_UNREACHABLE; @@ -1610,6 +1657,9 @@ void WritableStreamInternalController::drain(jsg::Lock& js, v8::Local KJ_CASE_ONEOF(closeRequest, Close) { maybeRejectPromise(closeRequest.promise, reason); } + KJ_CASE_ONEOF(flushRequest, Flush) { + maybeRejectPromise(flushRequest.promise, reason); + } } queue.pop_front(); } @@ -1624,6 +1674,9 @@ void WritableStreamInternalController::visitForGc(jsg::GcVisitor& visitor) { KJ_CASE_ONEOF(close, Close) { visitor.visit(close.promise); } + KJ_CASE_ONEOF(flush, Flush) { + visitor.visit(flush.promise); + } KJ_CASE_ONEOF(pipe, Pipe) { visitor.visit(pipe.maybeSignal, pipe.promise); } diff --git a/src/workerd/api/streams/internal.h b/src/workerd/api/streams/internal.h index 4c214ad62236..c4eb1b1127bc 100644 --- a/src/workerd/api/streams/internal.h +++ b/src/workerd/api/streams/internal.h @@ -247,6 +247,8 @@ class WritableStreamInternalController: public WritableStreamController { jsg::Promise close(jsg::Lock& js, bool markAsHandled = false) override; + jsg::Promise flush(jsg::Lock& js, bool markAsHandled = false) override; + jsg::Promise abort(jsg::Lock& js, jsg::Optional> reason) override; kj::Maybe> tryPipeFrom( @@ -329,6 +331,9 @@ class WritableStreamInternalController: public WritableStreamController { struct Close { kj::Maybe::Resolver> promise; }; + struct Flush { + kj::Maybe::Resolver> promise; + }; struct Pipe { WritableStreamInternalController& parent; ReadableStreamController::PipeController& source; @@ -344,7 +349,7 @@ class WritableStreamInternalController: public WritableStreamController { }; struct WriteEvent { kj::Maybe>> outputLock; // must wait for this before actually writing - kj::OneOf event; + kj::OneOf event; }; std::deque queue; diff --git a/src/workerd/api/streams/standard.h b/src/workerd/api/streams/standard.h index eed0e255e41a..abdb9bafed34 100644 --- a/src/workerd/api/streams/standard.h +++ b/src/workerd/api/streams/standard.h @@ -1078,6 +1078,10 @@ class WritableStreamJsController: public WritableStreamController, jsg::Promise close(jsg::Lock& js, bool markAsHandled = false) override; + jsg::Promise flush(jsg::Lock& js, bool markAsHandled = false) override { + KJ_UNIMPLEMENTED("expected WritableStreamInternalController implementation to be enough"); + } + void doClose() override; void doError(jsg::Lock& js, v8::Local reason) override; diff --git a/src/workerd/api/streams/writable.c++ b/src/workerd/api/streams/writable.c++ index cf5b2aa87dc9..eab138d502c7 100644 --- a/src/workerd/api/streams/writable.c++ +++ b/src/workerd/api/streams/writable.c++ @@ -239,6 +239,14 @@ jsg::Promise WritableStream::close(jsg::Lock& js) { return getController().close(js); } +jsg::Promise WritableStream::flush(jsg::Lock& js) { + if (isLocked()) { + return js.rejectedPromise( + js.v8TypeError("This WritableStream is currently locked to a writer."_kj)); + } + return getController().flush(js); +} + jsg::Ref WritableStream::getWriter(jsg::Lock& js) { return WritableStreamDefaultWriter::constructor(js, JSG_THIS); } diff --git a/src/workerd/api/streams/writable.h b/src/workerd/api/streams/writable.h index 9d544f76ca64..dee3d48c712e 100644 --- a/src/workerd/api/streams/writable.h +++ b/src/workerd/api/streams/writable.h @@ -134,6 +134,7 @@ class WritableStream: public jsg::Object { // then its abort algorithm causes the transform's readable side to become errored with `reason`. jsg::Promise close(jsg::Lock& js); + jsg::Promise flush(jsg::Lock& js); jsg::Ref getWriter(jsg::Lock& js);