diff --git a/src/workerd/api/streams/queue-test.c++ b/src/workerd/api/streams/queue-test.c++ index e5ded89d910..887614c889c 100644 --- a/src/workerd/api/streams/queue-test.c++ +++ b/src/workerd/api/streams/queue-test.c++ @@ -97,18 +97,18 @@ auto read(jsg::Lock& js, auto& consumer) { auto byobRead(jsg::Lock& js, auto& consumer, int size) { auto prp = js.newPromiseAndResolver(); - consumer.read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp.resolver), - .pullInto { + consumer.read(js, ByteQueue::ReadRequest( + kj::mv(prp.resolver), + { .store = jsg::BackingStore::alloc(js, size), .type = ByteQueue::ReadRequest::Type::BYOB, - }, - }); + } + )); return kj::mv(prp.promise); }; auto getEntry(jsg::Lock& js, auto size) { - return kj::refcounted(js.v8Ref(v8::True(js.v8Isolate).As()), size); + return kj::heap(js.v8Ref(v8::True(js.v8Isolate).As()), size); } #pragma region ValueQueue Tests @@ -388,7 +388,7 @@ KJ_TEST("ByteQueue basics work") { KJ_ASSERT(queue.desiredSize() == 2); KJ_ASSERT(queue.size() == 0); - auto entry = kj::refcounted(jsg::BackingStore::alloc(js, 4)); + auto entry = kj::heap(jsg::BackingStore::alloc(js, 4)); queue.push(js, kj::mv(entry)); @@ -400,7 +400,7 @@ KJ_TEST("ByteQueue basics work") { queue.close(js); try { - auto entry = kj::refcounted(jsg::BackingStore::alloc(js, 4)); + auto entry = kj::heap(jsg::BackingStore::alloc(js, 4)); queue.push(js, kj::mv(entry)); KJ_FAIL_ASSERT("The queue push after close should have failed."); } catch (kj::Exception& ex) { @@ -422,7 +422,7 @@ KJ_TEST("ByteQueue erroring works") { KJ_ASSERT(queue.desiredSize() == 0); try { - auto entry = kj::refcounted(jsg::BackingStore::alloc(js, 4)); + auto entry = kj::heap(jsg::BackingStore::alloc(js, 4)); queue.push(js, kj::mv(entry)); KJ_FAIL_ASSERT("The queue push after close should have failed."); } catch (kj::Exception& ex) { @@ -443,7 +443,7 @@ KJ_TEST("ByteQueue with single consumer") { auto store = jsg::BackingStore::alloc(js, 4); memset(store.asArrayPtr().begin(), 'a', store.size()); - auto entry = kj::refcounted(kj::mv(store)); + auto entry = kj::heap(kj::mv(store)); queue.push(js, kj::mv(entry)); // The item was pushed into the consumer. @@ -454,12 +454,12 @@ KJ_TEST("ByteQueue with single consumer") { KJ_ASSERT(queue.desiredSize() == -2); auto prp = js.newPromiseAndResolver(); - consumer.read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp.resolver), - .pullInto { + consumer.read(js, ByteQueue::ReadRequest( + kj::mv(prp.resolver), + { .store = jsg::BackingStore::alloc(js, 4), - }, - }); + } + )); MustCall readContinuation([&](jsg::Lock& js, auto&& result) -> auto { KJ_ASSERT(!result.done); @@ -493,13 +493,13 @@ KJ_TEST("ByteQueue with single byob consumer") { ByteQueue::Consumer consumer(queue); auto prp = js.newPromiseAndResolver(); - consumer.read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp.resolver), - .pullInto { + consumer.read(js, ByteQueue::ReadRequest( + kj::mv(prp.resolver), + { .store = jsg::BackingStore::alloc(js, 4), .type = ByteQueue::ReadRequest::Type::BYOB, - }, - }); + } + )); MustCall readContinuation([&](jsg::Lock& js, auto&& result) -> auto { KJ_ASSERT(!result.done); @@ -549,13 +549,13 @@ KJ_TEST("ByteQueue with byob consumer and default consumer") { ByteQueue::Consumer consumer2(queue); auto prp = js.newPromiseAndResolver(); - consumer1.read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp.resolver), - .pullInto { + consumer1.read(js, ByteQueue::ReadRequest( + kj::mv(prp.resolver), + { .store = jsg::BackingStore::alloc(js, 4), .type = ByteQueue::ReadRequest::Type::BYOB, - }, - }); + } + )); MustCall readContinuation([&](jsg::Lock& js, auto&& result) -> auto { KJ_ASSERT(!result.done); @@ -616,13 +616,13 @@ KJ_TEST("ByteQueue with byob consumer and default consumer") { }); auto prp2 = js.newPromiseAndResolver(); - consumer2.read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp2.resolver), - .pullInto { + consumer2.read(js, ByteQueue::ReadRequest( + kj::mv(prp2.resolver), + { .store = jsg::BackingStore::alloc(js, 4), .type = ByteQueue::ReadRequest::Type::DEFAULT, - }, - }); + } + )); prp2.promise.then(js, read2Continuation); js.v8Isolate->PerformMicrotaskCheckpoint(); @@ -917,19 +917,19 @@ KJ_TEST("ByteQueue with default consumer with atLeast") { const auto read = [&](jsg::Lock& js, uint atLeast) { auto prp = js.newPromiseAndResolver(); - consumer.read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp.resolver), - .pullInto = { + consumer.read(js, ByteQueue::ReadRequest( + kj::mv(prp.resolver), + { .store = jsg::BackingStore::alloc(js, 5), .atLeast = atLeast, - }, - }); + } + )); return kj::mv(prp.promise); }; const auto push = [&](auto store) { try { - queue.push(js, kj::refcounted(kj::mv(store))); + queue.push(js, kj::heap(kj::mv(store))); } catch (kj::Exception& ex) { KJ_DBG(ex.getDescription()); } @@ -1008,19 +1008,19 @@ KJ_TEST("ByteQueue with multiple default consumers with atLeast (same rate)") { const auto read = [&](jsg::Lock& js, auto& consumer, uint atLeast = 1) { auto prp = js.newPromiseAndResolver(); - consumer.read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp.resolver), - .pullInto = { + consumer.read(js, ByteQueue::ReadRequest( + kj::mv(prp.resolver), + { .store = jsg::BackingStore::alloc(js, 5), .atLeast = atLeast, - }, - }); + } + )); return kj::mv(prp.promise); }; const auto push = [&](auto store) { try { - queue.push(js, kj::refcounted(kj::mv(store))); + queue.push(js, kj::heap(kj::mv(store))); } catch (kj::Exception& ex) { KJ_DBG(ex.getDescription()); } @@ -1118,19 +1118,19 @@ KJ_TEST("ByteQueue with multiple default consumers with atLeast (different rate) const auto read = [&](jsg::Lock& js, auto& consumer, uint atLeast = 1) { auto prp = js.newPromiseAndResolver(); - consumer.read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp.resolver), - .pullInto = { + consumer.read(js, ByteQueue::ReadRequest( + kj::mv(prp.resolver), + { .store = jsg::BackingStore::alloc(js, 5), .atLeast = atLeast, - }, - }); + } + )); return kj::mv(prp.promise); }; const auto push = [&](auto store) { try { - queue.push(js, kj::refcounted(kj::mv(store))); + queue.push(js, kj::heap(kj::mv(store))); } catch (kj::Exception& ex) { KJ_DBG(ex.getDescription()); } diff --git a/src/workerd/api/streams/queue.c++ b/src/workerd/api/streams/queue.c++ index 6651c9b43df..f1aea478d37 100644 --- a/src/workerd/api/streams/queue.c++ +++ b/src/workerd/api/streams/queue.c++ @@ -42,18 +42,20 @@ void ValueQueue::Entry::visitForGc(jsg::GcVisitor& visitor) { visitor.visit(value); } -ValueQueue::Entry ValueQueue::Entry::clone(jsg::Lock& js) { - return Entry(value.addRef(js), size); -} - #pragma endregion ValueQueue::Entry #pragma region ValueQueue::QueueEntry -ValueQueue::QueueEntry ValueQueue::QueueEntry::clone() { - return QueueEntry { - .entry = kj::addRef(*entry), - }; +kj::Own ValueQueue::Entry::clone(jsg::Lock& js) { + return kj::heap(getValue(js), getSize()); +} + +ValueQueue::QueueEntry ValueQueue::QueueEntry::clone(jsg::Lock& js) { + return QueueEntry { .entry = entry->clone(js) }; +} + +void ValueQueue::QueueEntry::visitForGc(jsg::GcVisitor& visitor) { + if (entry) visitor.visit(*entry); } #pragma endregion ValueQueue::QueueEntry @@ -108,6 +110,10 @@ bool ValueQueue::Consumer::hasReadRequests() { return impl.hasReadRequests(); } +void ValueQueue::Consumer::visitForGc(jsg::GcVisitor& visitor) { + visitor.visit(impl); +} + #pragma endregion ValueQueue::Consumer ValueQueue::ValueQueue(size_t highWaterMark) : impl(highWaterMark) {} @@ -158,24 +164,30 @@ void ValueQueue::handleRead( // If there are no pending read requests and there is data in the buffer, // we will try to fulfill the read request immediately. if (state.readRequests.empty() && state.queueTotalSize > 0) { - auto entry = kj::mv(state.buffer.front()); - state.buffer.pop_front(); + auto& entry = state.buffer.front(); KJ_SWITCH_ONEOF(entry) { KJ_CASE_ONEOF(c, ConsumerImpl::Close) { - // The next item was a close sentinel! Resolve the read immediately with a close indicator. + // This case shouldn't actually happen. The queueTotalSize should be zero if the + // only item remaining in the queue is the close sentinel because we decrement the + // queueTotalSize every time we remove an item. If we get here, something is wrong. + // We'll handle it by resolving the read request and keep going but let's emit a log + // warning so we can investigate. + // Note that we do not want to remove the close sentinel here so that the next call to + // maybeDrainAndSetState will see it and handle the transition to the closed state. + KJ_LOG(ERROR, "ValueQueue::handleRead encountered a close sentinel in the queue " + "with queueTotalSize > 0. This should not happen.", state.queueTotalSize); request.resolveAsDone(js); } KJ_CASE_ONEOF(entry, QueueEntry) { request.resolve(js, entry.entry->getValue(js)); state.queueTotalSize -= entry.entry->getSize(); + state.buffer.pop_front(); } } } else if (state.queueTotalSize == 0 && consumer.isClosing()) { - // Otherwise, if state.queueTotalSize is zero and isClosing() is true, we should - // have already drained but let's take care of that now. Specifically, in this case - // there's no data in the queue and close() has already been called, so there won't - // be any more data coming. + // Otherwise, if state.queueTotalSize is zero and isClosing() is true there won't be any + // more data coming. Just resolve the read as done and move on. request.resolveAsDone(js); } else { // Otherwise, push the read request into the pending readRequests. It will be @@ -200,6 +212,17 @@ bool ValueQueue::handleMaybeClose( size_t ValueQueue::getConsumerCount() { return impl.getConsumerCount(); } +bool ValueQueue::wantsRead() const { + return impl.wantsRead(); +} + +bool ValueQueue::hasPartiallyFulfilledRead() { + // A ValueQueue can never have a partially fulfilled read. + return false; +} + +void ValueQueue::visitForGc(jsg::GcVisitor& visitor) {} + #pragma endregion ValueQueue // ====================================================================================== @@ -212,11 +235,22 @@ namespace { void maybeInvalidateByobRequest(kj::Maybe& req) { KJ_IF_MAYBE(byobRequest, req) { byobRequest->invalidate(); - req = nullptr; + // The call to byobRequest->invalidate() should have cleared the reference. + KJ_ASSERT(req == nullptr); } } } // namespace +ByteQueue::ReadRequest::ReadRequest( + jsg::Promise::Resolver resolver, + ByteQueue::ReadRequest::PullInto pullInto) + : resolver(kj::mv(resolver)), + pullInto(kj::mv(pullInto)) {} + +ByteQueue::ReadRequest::~ReadRequest() noexcept(false) { + maybeInvalidateByobRequest(byobReadRequest); +} + void ByteQueue::ReadRequest::resolveAsDone(jsg::Lock& js) { if (pullInto.filled > 0) { // There's been at least some data written, we need to respond but not @@ -255,10 +289,6 @@ void ByteQueue::ReadRequest::reject(jsg::Lock& js, jsg::Value& value) { kj::Own ByteQueue::ReadRequest::makeByobReadRequest( ConsumerImpl& consumer, QueueImpl& queue) { - // Why refcounted? One ByobReadRequest reference will be held (eventually) by - // an instance of ReadableStreamBYOBRequest and the other by this ReadRequest. - // Depending on how the read is actually fulfilled, the ByobReadRequest will - // be invalidated by one or the other. auto req = kj::heap(*this, consumer, queue); byobReadRequest = *req; return kj::mv(req); @@ -274,17 +304,25 @@ kj::ArrayPtr ByteQueue::Entry::toArrayPtr() { return store.asArrayPtr( size_t ByteQueue::Entry::getSize() const { return store.size(); } +kj::Own ByteQueue::Entry::clone(jsg::Lock& js) { + return kj::heap(store.clone()); +} + +void ByteQueue::Entry::visitForGc(jsg::GcVisitor& visitor) {} + #pragma endregion ByteQueue::Entry #pragma region ByteQueue::QueueEntry -ByteQueue::QueueEntry ByteQueue::QueueEntry::clone() { +ByteQueue::QueueEntry ByteQueue::QueueEntry::clone(jsg::Lock& js) { return QueueEntry { - .entry = kj::addRef(*entry), + .entry = entry->clone(js), .offset = offset, }; } +void ByteQueue::QueueEntry::visitForGc(jsg::GcVisitor& visitor) {} + #pragma endregion ByteQueue::QueueEntry #pragma region ByteQueue::Consumer @@ -337,6 +375,10 @@ bool ByteQueue::Consumer::hasReadRequests() { return impl.hasReadRequests(); } +void ByteQueue::Consumer::visitForGc(jsg::GcVisitor& visitor) { + visitor.visit(impl); +} + #pragma endregion ByteQueue::Consumer #pragma region ByteQueue::ByobRequest @@ -379,7 +421,7 @@ bool ByteQueue::ByobRequest::respond(jsg::Lock& js, size_t amount) { if (queue.getConsumerCount() > 1) { // Allocate the entry into which we will be copying the provided data for the // other consumers of the queue. - auto entry = kj::refcounted(jsg::BackingStore::alloc(js, amount)); + auto entry = kj::heap(jsg::BackingStore::alloc(js, amount)); auto start = sourcePtr.begin() + req.pullInto.filled; @@ -420,7 +462,7 @@ bool ByteQueue::ByobRequest::respond(jsg::Lock& js, size_t amount) { if (unaligned > 0) { auto start = sourcePtr.begin() + (amount - unaligned); - auto excess = kj::refcounted(jsg::BackingStore::alloc(js, unaligned)); + auto excess = kj::heap(jsg::BackingStore::alloc(js, unaligned)); std::copy(start, start + unaligned, excess->toArrayPtr().begin()); consumer.push(js, kj::mv(excess)); } @@ -474,6 +516,13 @@ v8::Local ByteQueue::ByobRequest::getView(jsg::Lock& js) { ByteQueue::ByteQueue(size_t highWaterMark) : impl(highWaterMark) {} void ByteQueue::close(jsg::Lock& js) { + KJ_IF_MAYBE(ready, impl.state.tryGet()) { + while (!ready->pendingByobReadRequests.empty()) { + auto& req = ready->pendingByobReadRequests.front(); + req->invalidate(); + ready->pendingByobReadRequests.pop_front(); + } + } impl.close(js); } @@ -973,8 +1022,14 @@ bool ByteQueue::hasPartiallyFulfilledRead() { return false; } +bool ByteQueue::wantsRead() const { + return impl.wantsRead(); +} + size_t ByteQueue::getConsumerCount() { return impl.getConsumerCount(); } +void ByteQueue::visitForGc(jsg::GcVisitor& visitor) {} + #pragma endregion ByteQueue } // namespace workerd::api diff --git a/src/workerd/api/streams/queue.h b/src/workerd/api/streams/queue.h index 1fef4c65142..7ab268deb34 100644 --- a/src/workerd/api/streams/queue.h +++ b/src/workerd/api/streams/queue.h @@ -10,6 +10,7 @@ #include #include #include +#include namespace workerd::api { @@ -28,18 +29,17 @@ namespace workerd::api { // is signaled. Additional data can always be pushed into the queue beyond // the high water mark, but it is not advisable to do so. // -// - All data stored in the queue is in the form of refcounted entries. The +// - All data stored in the queue is in the form of entries. The // specific type of entry depends on the queue type. Every entry has a -// calculated size, which is dependent on the type of entry. The reason -// these entries are refcounted is because...(see the next bullet point) +// calculated size, which is dependent on the type of entry. // // - Every queue has one or more consumers. Each consumer maintains its own -// internal buffer of refcounted entries that it has yet to consume. Because -// entries are refcounted, there is ever only one copy of any given chunk of -// data in memory, with each consumer possessing only a reference to it. -// Whenever data is pushed into the queue, references are pushed into each of -// the consumers. As data is consumed from the internal buffer, the reference -// counted entries are freed. The underlying data is freed once the last +// internal buffer of entries that it has yet to consume. Entries are +// structured such that there is ever only one copy of any given chunk of +// data in memory, with each entry in each consumer possessing only a reference +// to it. Whenever data is pushed into the queue, references are pushed into +// each of the consumers. As data is consumed from the internal buffer, the +// entries are freed. The underlying data is freed once the last // reference is released. // // - Every consumer has an remaining buffer size, which is the sum of the sizes @@ -58,7 +58,7 @@ namespace workerd::api { // in the same way but Byte Queue consumers have a number of unique details. // // - As mentioned above, every consumer maintains an internal data buffer -// consisting of refcounted pointers to the data that has been pushed into +// consisting of references to the data that has been pushed into // the queue. // // - Every consumer maintains a list of pending reads. A read is a request to @@ -87,8 +87,8 @@ namespace workerd::api { // // The bookkeeping for a value queue is fairly simple: // -// - A single refcounted value entry is created. -// - References to that single value entry are distributed to each of +// - A single value entry is created. +// - Clones of that single value entry are distributed to each of // the value queue consumers. // - If a consumer has a pending read, the read is fulfilled immediately // and the reference is never added to that consumer's internal buffer. @@ -162,8 +162,11 @@ class QueueImpl final { // Closes the queue. The close is forwarded on to all consumers. // If we are already closed or errored, do nothing here. KJ_IF_MAYBE(ready, state.template tryGet()) { - for (auto& consumer : ready->consumers) { - consumer.ref->close(js); + // We copy the list of consumers in case the consumers remove themselves + // from the queue during the close callback, invalidating the iterator. + auto consumers = ready->consumers; + for (auto consumer : consumers) { + consumer->close(js); } state.template init(); } @@ -183,8 +186,11 @@ class QueueImpl final { // all pending consume promises. // If we are already closed or errored, do nothing here. KJ_IF_MAYBE(ready, state.template tryGet()) { - for (auto& consumer : ready->consumers) { - consumer.ref->error(js, reason.addRef(js)); + // We copy the list of consumers in case the consumers remove themselves + // from the queue during the error callback, invalidating the iterator. + auto consumers = ready->consumers; + for (auto consumer : consumers) { + consumer->error(js, reason.addRef(js)); } state = kj::mv(reason); } @@ -196,8 +202,8 @@ class QueueImpl final { // If we are already closed or errored, set totalQueueSize to zero. totalQueueSize = 0; KJ_IF_MAYBE(ready, state.template tryGet()) { - for (auto& consumer : ready->consumers) { - totalQueueSize = kj::max(totalQueueSize, consumer.ref->size()); + for (auto consumer : ready->consumers) { + totalQueueSize = kj::max(totalQueueSize, consumer->size()); } } } @@ -213,14 +219,14 @@ class QueueImpl final { auto& ready = KJ_REQUIRE_NONNULL(state.template tryGet(), "The queue is closed or errored."); - for (auto& consumer : ready.consumers) { + for (auto consumer : ready.consumers) { KJ_IF_MAYBE(skip, skipConsumer) { - if (consumer.ref == &(*skip)) { + if (&(*skip) == consumer) { continue; } } - consumer.ref->push(js, kj::addRef(*entry)); + consumer->push(js, entry->clone(js)); } } @@ -241,8 +247,8 @@ class QueueImpl final { KJ_CASE_ONEOF(closed, Closed) { return false; } KJ_CASE_ONEOF(errored, Errored) { return false; } KJ_CASE_ONEOF(ready, Ready) { - for (auto& consumer : ready.consumers) { - if (consumer.ref->hasReadRequests()) return true; + for (auto consumer : ready.consumers) { + if (consumer->hasReadRequests()) return true; } return false; } @@ -263,18 +269,8 @@ class QueueImpl final { struct Closed {}; using Errored = jsg::Value; - struct ConsumerRef { - ConsumerImpl* ref; - bool operator==(ConsumerRef& other) const { - return hashCode() == other.hashCode(); - } - auto hashCode() const { - return kj::hashCode(ref); - } - }; - struct Ready final: public State { - kj::HashSet consumers; + std::set consumers; }; size_t highWaterMark; @@ -283,13 +279,13 @@ class QueueImpl final { void addConsumer(ConsumerImpl* consumer) { KJ_IF_MAYBE(ready, state.template tryGet()) { - ready->consumers.insert(ConsumerRef { .ref = consumer }); + ready->consumers.insert(consumer); } } void removeConsumer(ConsumerImpl* consumer) { KJ_IF_MAYBE(ready, state.template tryGet()) { - ready->consumers.eraseMatch(ConsumerRef { .ref = consumer }); + ready->consumers.erase(consumer); maybeUpdateBackpressure(); } } @@ -460,7 +456,7 @@ class ConsumerImpl final { otherReady.buffer.push_back(Close {}); } KJ_CASE_ONEOF(entry, QueueEntry) { - otherReady.buffer.push_back(entry.clone()); + otherReady.buffer.push_back(entry.clone(js)); } } } @@ -599,7 +595,7 @@ class ValueQueue final { void reject(jsg::Lock& js, jsg::Value& value); }; - class Entry final: public kj::Refcounted { + class Entry { // A value queue entry consists of an arbitrary JavaScript value and a size that is // calculated by the size algorithm function provided in the stream constructor. public: @@ -611,7 +607,7 @@ class ValueQueue final { void visitForGc(jsg::GcVisitor& visitor); - Entry clone(jsg::Lock& js); + kj::Own clone(jsg::Lock& js); private: jsg::Value value; @@ -620,11 +616,9 @@ class ValueQueue final { struct QueueEntry { kj::Own entry; - QueueEntry clone(); + QueueEntry clone(jsg::Lock& js); - void visitForGc(jsg::GcVisitor& visitor) { - if (entry) visitor.visit(*entry); - } + void visitForGc(jsg::GcVisitor& visitor); }; class Consumer final { @@ -657,9 +651,7 @@ class ValueQueue final { bool hasReadRequests(); - void visitForGc(jsg::GcVisitor& visitor) { - visitor.visit(impl); - } + void visitForGc(jsg::GcVisitor& visitor); private: ConsumerImpl impl; @@ -683,16 +675,11 @@ class ValueQueue final { size_t getConsumerCount(); - bool wantsRead() const { - return impl.wantsRead(); - } + bool wantsRead() const; - bool hasPartiallyFulfilledRead() { - // A ValueQueue can never have a partially fulfilled read. - return false; - } + bool hasPartiallyFulfilledRead(); - void visitForGc(jsg::GcVisitor& visitor) {} + void visitForGc(jsg::GcVisitor& visitor); private: QueueImpl impl; @@ -733,13 +720,18 @@ class ByteQueue final { // which happens either when respond(), respondWithNewView(), or invalidate() // is called, or when the ByobRequest is destroyed, whichever comes first. - struct { + struct PullInto { jsg::BackingStore store; size_t filled = 0; size_t atLeast = 1; Type type = Type::DEFAULT; } pullInto; + ReadRequest(jsg::Promise::Resolver resolver, + PullInto pullInto); + ReadRequest(ReadRequest&&) = default; + ReadRequest& operator=(ReadRequest&&) = default; + ~ReadRequest() noexcept(false); void resolveAsDone(jsg::Lock& js); void resolve(jsg::Lock& js); void reject(jsg::Lock& js, jsg::Value& value); @@ -767,7 +759,7 @@ class ByteQueue final { ~ByobRequest() noexcept(false); - ReadRequest& getRequest() { return KJ_ASSERT_NONNULL(request); } + inline ReadRequest& getRequest() { return KJ_ASSERT_NONNULL(request); } bool respond(jsg::Lock& js, size_t amount); @@ -795,7 +787,7 @@ class ByteQueue final { std::deque> pendingByobReadRequests; }; - class Entry final: public kj::Refcounted { + class Entry { // A byte queue entry consists of a jsg::BackingStore containing a non-zero-length // sequence of bytes. The size is determined by the number of bytes in the entry. public: @@ -805,7 +797,9 @@ class ByteQueue final { size_t getSize() const; - inline void visitForGc(jsg::GcVisitor& visitor) {} + void visitForGc(jsg::GcVisitor& visitor); + + kj::Own clone(jsg::Lock& js); private: jsg::BackingStore store; @@ -815,9 +809,9 @@ class ByteQueue final { kj::Own entry; size_t offset; - QueueEntry clone(); + QueueEntry clone(jsg::Lock& js); - void visitForGc(jsg::GcVisitor& visitor) {} + void visitForGc(jsg::GcVisitor& visitor); }; class Consumer { @@ -850,9 +844,7 @@ class ByteQueue final { bool hasReadRequests(); - void visitForGc(jsg::GcVisitor& visitor) { - visitor.visit(impl); - } + void visitForGc(jsg::GcVisitor& visitor); private: ConsumerImpl impl; @@ -874,9 +866,7 @@ class ByteQueue final { size_t getConsumerCount(); - bool wantsRead() const { - return impl.wantsRead(); - } + bool wantsRead() const; bool hasPartiallyFulfilledRead(); @@ -890,7 +880,7 @@ class ByteQueue final { // their lifespan to be attached to the ReadableStreamBYOBRequest object but internally they // will be disconnected as appropriate. - void visitForGc(jsg::GcVisitor& visitor) {} + void visitForGc(jsg::GcVisitor& visitor); private: QueueImpl impl; diff --git a/src/workerd/api/streams/standard.c++ b/src/workerd/api/streams/standard.c++ index 041ba04d36c..0670597d634 100644 --- a/src/workerd/api/streams/standard.c++ +++ b/src/workerd/api/streams/standard.c++ @@ -495,6 +495,40 @@ int getHighWaterMark(const UnderlyingSource& underlyingSource, } // namespace +template +struct ReadPendingScope { + // It is possible for the controller state to be released synchronously while + // we are in the middle of a read. When that happens we need to defer the actual + // close/error state change until the read call is complete. ReadPendingScope + // handles this for us. It is constructed when we start a read and destructed + // synchronously when the call to read completes. + jsg::Lock& js; + C& controller; + ReadPendingScope(jsg::Lock& js, C& controller) + : js(js), controller(controller) { + controller.pendingReadCount++; + } + KJ_DISALLOW_COPY_AND_MOVE(ReadPendingScope); + ~ReadPendingScope() noexcept(false) { + // When we have no pending reads left, we need to check to see if we have a + // pending close or errored state. + --controller.pendingReadCount; + if (!controller.isReadPending()) { + KJ_IF_MAYBE(state, controller.maybePendingState) { + KJ_SWITCH_ONEOF(*state) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) { + controller.doClose(); + } + KJ_CASE_ONEOF(errored, StreamStates::Errored) { + controller.doError(js, errored.getHandle(js)); + } + } + } + controller.maybePendingState = nullptr; + } + } +}; + class ReadableStreamJsController: public ReadableStreamController { // The ReadableStreamJsController provides the implementation of custom // ReadableStreams backed by a user-code provided Underlying Source. The implementation @@ -625,9 +659,23 @@ private: // The lock state is separate because a closed or errored stream can still be locked. bool disturbed = false; + size_t pendingReadCount = 0; + kj::Maybe> maybePendingState = nullptr; + + void setPendingState(kj::OneOf pending) { + if (maybePendingState == nullptr) { + maybePendingState = kj::mv(pending); + } + } + + bool isReadPending() const { + return pendingReadCount > 0; + } friend ReadableLockImpl; friend ReadableLockImpl::PipeLocked; + + friend struct ReadPendingScope; }; class WritableStreamJsController: public WritableStreamController { @@ -830,7 +878,7 @@ jsg::Promise ReadableImpl::cancel( template bool ReadableImpl::canCloseOrEnqueue() { - return state.template is() && !closeRequested; + return state.template is(); } template @@ -1575,8 +1623,7 @@ struct ReadableState { }; } // namespace -struct ValueReadable final: public api::ValueQueue::ConsumerImpl::StateListener, - public kj::Refcounted { +struct ValueReadable final: public api::ValueQueue::ConsumerImpl::StateListener { using State = ReadableState; kj::Maybe state; @@ -1606,17 +1653,11 @@ struct ValueReadable final: public api::ValueQueue::ConsumerImpl::StateListener, // and starts to receive new data that becomes enqueued. When clone // is used, any state currently held by this consumer is copied to the // new consumer. - return kj::refcounted(js, owner, *this); + return kj::heap(js, owner, *this); } jsg::Promise read(jsg::Lock& js) { KJ_IF_MAYBE(s, state) { - // It's possible for the controller to be closed synchronously while the - // read operation is executing. In that case, we want to make sure we keep - // a reference so it'll survice at least long enough for the read method - // to complete. - auto self KJ_UNUSED = kj::addRef(*this); - auto prp = js.newPromiseAndResolver(); s->consumer->read(js, ValueQueue::ReadRequest { .resolver = kj::mv(prp.resolver), @@ -1679,8 +1720,7 @@ struct ValueReadable final: public api::ValueQueue::ConsumerImpl::StateListener, } }; -struct ByteReadable final: public api::ByteQueue::ConsumerImpl::StateListener, - public kj::Refcounted { +struct ByteReadable final: public api::ByteQueue::ConsumerImpl::StateListener { using State = ReadableState; kj::Maybe state; int autoAllocateChunkSize; @@ -1716,19 +1756,13 @@ struct ByteReadable final: public api::ByteQueue::ConsumerImpl::StateListener, // and starts to receive new data that becomes enqueued. When clone // is used, any state currently held by this consumer is copied to the // new consumer. - return kj::refcounted(js, owner, *this); + return kj::heap(js, owner, *this); } jsg::Promise read( jsg::Lock& js, kj::Maybe byobOptions) { KJ_IF_MAYBE(s, state) { - // It's possible for the controller to be closed synchronously while the - // read operation is executing. In that case, we want to make sure we keep - // a reference so it'll survice at least long enough for the read method - // to complete. - auto self KJ_UNUSED = kj::addRef(*this); - auto prp = js.newPromiseAndResolver(); KJ_IF_MAYBE(byob, byobOptions) { @@ -1738,22 +1772,22 @@ struct ByteReadable final: public api::ByteQueue::ConsumerImpl::StateListener, // with the element size. No matter what, atLeast cannot be less than 1. auto atLeast = kj::max(source.getElementSize(), byob->atLeast.orDefault(1)); atLeast = kj::max(1, atLeast - (atLeast % source.getElementSize())); - s->consumer->read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp.resolver), - .pullInto { + s->consumer->read(js, ByteQueue::ReadRequest( + kj::mv(prp.resolver), + { .store = source.detach(js), .atLeast = atLeast, .type = ByteQueue::ReadRequest::Type::BYOB, - }, - }); + } + )); } else { - s->consumer->read(js, ByteQueue::ReadRequest { - .resolver = kj::mv(prp.resolver), - .pullInto { + s->consumer->read(js, ByteQueue::ReadRequest( + kj::mv(prp.resolver), + { .store = jsg::BackingStore::alloc(js, autoAllocateChunkSize), .type = ByteQueue::ReadRequest::Type::BYOB, - }, - }); + } + )); } return kj::mv(prp.promise); @@ -1897,7 +1931,7 @@ void ReadableStreamDefaultController::enqueue( } if (!errored) { - impl.enqueue(js, kj::refcounted(js.v8Ref(value), size), JSG_THIS); + impl.enqueue(js, kj::heap(js.v8Ref(value), size), JSG_THIS); } } @@ -1988,7 +2022,7 @@ void ReadableStreamBYOBRequest::respond(jsg::Lock& js, int bytesWritten) { // While this particular request may be invalidated, there are still // other branches we can push the data to. Let's do so. jsg::BufferSource source(js, impl.view.getHandle(js)); - auto entry = kj::refcounted(source.detach(js)); + auto entry = kj::heap(source.detach(js)); impl.controller->impl.enqueue(js, kj::mv(entry), impl.controller.addRef()); } else { JSG_REQUIRE(bytesWritten > 0, @@ -2026,7 +2060,7 @@ void ReadableStreamBYOBRequest::respondWithNewView(jsg::Lock& js, jsg::BufferSou if (impl.readRequest->isInvalidated() && impl.controller->impl.consumerCount() >= 1) { // While this particular request may be invalidated, there are still // other branches we can push the data to. Let's do so. - auto entry = kj::refcounted(view.detach(js)); + auto entry = kj::heap(view.detach(js)); impl.controller->impl.enqueue(js, kj::mv(entry), impl.controller.addRef()); } else { JSG_REQUIRE(view.size() > 0, @@ -2123,7 +2157,7 @@ void ReadableByteStreamController::enqueue(jsg::Lock& js, jsg::BufferSource chun (*byobRequest)->invalidate(js); } - impl.enqueue(js, kj::refcounted(chunk.detach(js)), JSG_THIS); + impl.enqueue(js, kj::heap(chunk.detach(js)), JSG_THIS); } void ReadableByteStreamController::error(jsg::Lock& js, v8::Local reason) { @@ -2205,6 +2239,17 @@ jsg::Promise ReadableStreamJsController::cancel( return consumer->cancel(js, reason.getHandle(js)); }; + KJ_IF_MAYBE(pendingState, maybePendingState) { + KJ_SWITCH_ONEOF(*pendingState) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) { + return js.resolvedPromise(); + } + KJ_CASE_ONEOF(errored, StreamStates::Errored) { + return js.rejectedPromise(errored.addRef(js)); + } + } + } + KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { return js.resolvedPromise(); @@ -2230,8 +2275,12 @@ void ReadableStreamJsController::doClose() { // We detach ourselves from the underlying controller by releasing the ValueReadable or // ByteReadable in the state and changing that to closed. // We also clean up other state here. - state.init(); - lock.onClose(); + if (isReadPending()) { + setPendingState(StreamStates::Closed()); + } else { + state.init(); + lock.onClose(); + } } void ReadableStreamJsController::doError(jsg::Lock& js, v8::Local reason) { @@ -2241,8 +2290,12 @@ void ReadableStreamJsController::doError(jsg::Lock& js, v8::Local rea // erroring. We detach ourselves from the underlying controller by releasing the ValueReadable // or ByteReadable in the state and changing that to errored. // We also clean up other state here. - state.init(js.v8Ref(reason)); - lock.onError(js, reason); + if (isReadPending()) { + setPendingState(js.v8Ref(reason)); + } else { + state.init(js.v8Ref(reason)); + lock.onError(js, reason); + } } bool ReadableStreamJsController::hasPendingReadRequests() { @@ -2264,6 +2317,9 @@ bool ReadableStreamJsController::isByteOriented() const { } bool ReadableStreamJsController::isClosedOrErrored() const { + KJ_IF_MAYBE(pending, maybePendingState) { + return true; + } return state.is() || state.is(); } @@ -2311,7 +2367,17 @@ kj::Maybe> ReadableStreamJsController::read( js.v8TypeError("Unable to use a zero-length ArrayBuffer."_kj)); } - if (state.is()) { + if (state.is() || maybePendingState != nullptr) { + KJ_IF_MAYBE(pendingState, maybePendingState) { + KJ_SWITCH_ONEOF(*pendingState) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) { + // Fall through to the BYOB read case below. + } + KJ_CASE_ONEOF(errored, StreamStates::Errored) { + return js.rejectedPromise(errored.addRef(js)); + } + } + } // If it is a BYOB read, then the spec requires that we return an empty // view of the same type provided, that uses the same backing memory // as that provided, but with zero-length. @@ -2325,6 +2391,19 @@ kj::Maybe> ReadableStreamJsController::read( } } + KJ_IF_MAYBE(pendingState, maybePendingState) { + KJ_SWITCH_ONEOF(*pendingState) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) { + // The closed state for BYOB reads is handled in the maybeByobOptions check above. + KJ_ASSERT(maybeByobOptions == nullptr); + return js.resolvedPromise(ReadResult { .done = true }); + } + KJ_CASE_ONEOF(errored, StreamStates::Errored) { + return js.rejectedPromise(errored.addRef(js)); + } + } + } + KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { // The closed state for BYOB reads is handled in the maybeByobOptions check above. @@ -2338,9 +2417,11 @@ kj::Maybe> ReadableStreamJsController::read( // The ReadableStreamDefaultController does not support ByobOptions. // It should never happen, but let's make sure. KJ_ASSERT(maybeByobOptions == nullptr); + ReadPendingScope readPendingScope(js, *this); return consumer->read(js); } KJ_CASE_ONEOF(consumer, kj::Own) { + ReadPendingScope readPendingScope(js, *this); return consumer->read(js, kj::mv(maybeByobOptions)); } } @@ -2358,6 +2439,27 @@ ReadableStreamController::Tee ReadableStreamJsController::tee(jsg::Lock& js) { lock.state.init(); disturbed = true; + KJ_IF_MAYBE(pendingState, maybePendingState) { + KJ_SWITCH_ONEOF(*pendingState) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) { + return Tee { + .branch1 = jsg::alloc( + kj::heap(StreamStates::Closed())), + .branch2 = jsg::alloc( + kj::heap(StreamStates::Closed())), + }; + } + KJ_CASE_ONEOF(errored, StreamStates::Errored) { + return Tee { + .branch1 = jsg::alloc(kj::heap( + errored.addRef(js))), + .branch2 = jsg::alloc(kj::heap( + errored.addRef(js))), + }; + } + } + } + KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { return Tee { @@ -2420,7 +2522,7 @@ void ReadableStreamJsController::setup( TypeError, "The autoAllocateChunkSize option cannot be zero."); - state = kj::refcounted(controller.addRef(), this, autoAllocateChunkSize); + state = kj::heap(controller.addRef(), this, autoAllocateChunkSize); controller->start(js); } else { JSG_REQUIRE(type == "", TypeError, @@ -2428,7 +2530,7 @@ void ReadableStreamJsController::setup( auto controller = jsg::alloc( kj::mv(underlyingSource), kj::mv(queuingStrategy)); - state = kj::refcounted(controller.addRef(), this); + state = kj::heap(controller.addRef(), this); controller->start(js); } } @@ -2439,6 +2541,15 @@ kj::Maybe ReadableStreamJsController: } void ReadableStreamJsController::visitForGc(jsg::GcVisitor& visitor) { + KJ_IF_MAYBE(pendingState, maybePendingState) { + KJ_SWITCH_ONEOF(*pendingState) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) {} + KJ_CASE_ONEOF(error, StreamStates::Errored) { + visitor.visit(error); + } + } + } + KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) {} KJ_CASE_ONEOF(error, StreamStates::Errored) { @@ -2455,6 +2566,10 @@ void ReadableStreamJsController::visitForGc(jsg::GcVisitor& visitor) { }; kj::Maybe ReadableStreamJsController::getDesiredSize() { + KJ_IF_MAYBE(pendingState, maybePendingState) { + return nullptr; + } + KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { return nullptr; } KJ_CASE_ONEOF(errored, StreamStates::Errored) { return nullptr; } @@ -2469,12 +2584,24 @@ kj::Maybe ReadableStreamJsController::getDesiredSize() { } kj::Maybe> ReadableStreamJsController::isErrored(jsg::Lock& js) { + KJ_IF_MAYBE(pendingState, maybePendingState) { + KJ_SWITCH_ONEOF(*pendingState) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) { return nullptr; } + KJ_CASE_ONEOF(error, StreamStates::Errored) { + return error.getHandle(js); + } + } + } return state.tryGet().map([&](jsg::Value& reason) { return reason.getHandle(js); }); } bool ReadableStreamJsController::canCloseOrEnqueue() { + KJ_IF_MAYBE(pendingState, maybePendingState) { + return false; + } + KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { return false; } KJ_CASE_ONEOF(errored, StreamStates::Errored) { return false; } @@ -2496,6 +2623,9 @@ bool ReadableStreamJsController::hasBackpressure() { kj::Maybe, jsg::Ref>> ReadableStreamJsController::getController() { + KJ_IF_MAYBE(pendingState, maybePendingState) { + return nullptr; + } KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { return nullptr; } KJ_CASE_ONEOF(errored, StreamStates::Errored) { return nullptr; } @@ -2526,11 +2656,19 @@ public: } void doClose() override { - state.template init(); + if (isReadPending()) { + setPendingState(StreamStates::Closed()); + } else { + state.template init(); + } } void doError(jsg::Lock& js, v8::Local reason) override { - state.template init(js.v8Ref(reason)); + if (isReadPending()) { + setPendingState(js.v8Ref(reason)); + } else { + state.template init(js.v8Ref(reason)); + } } jsg::Promise> allBytes(jsg::Lock& js) { @@ -2569,7 +2707,23 @@ private: kj::Vector parts; uint64_t runningTotal = 0; + size_t pendingReadCount = 0; + kj::Maybe> maybePendingState = nullptr; + jsg::Promise loop(jsg::Lock& js) { + // It really shouldn't be possible for this to be called here given how we are sequencing + // out reads, but let's handle it just to be safe. + KJ_IF_MAYBE(pendingState, maybePendingState) { + KJ_SWITCH_ONEOF(*pendingState) { + KJ_CASE_ONEOF(closed, StreamStates::Closed) { + return js.resolvedPromise(KJ_MAP(p, parts) { return p.asArrayPtr(); }); + } + KJ_CASE_ONEOF(errored, StreamStates::Errored) { + return js.template rejectedPromise(errored.getHandle(js)); + } + } + } + KJ_SWITCH_ONEOF(state) { KJ_CASE_ONEOF(closed, StreamStates::Closed) { return js.resolvedPromise(KJ_MAP(p, parts) { return p.asArrayPtr(); }); @@ -2579,6 +2733,7 @@ private: } KJ_CASE_ONEOF(readable, Readable) { const auto read = [&](auto& js) { + ReadPendingScope scope(js, *this); if constexpr (kj::isSameType()) { return readable->read(js, nullptr); } else { @@ -2626,6 +2781,12 @@ private: return loop(js); }; + // An important detail is that after calling read, the readable may be + // destroyed synchronously while read is being processed. Do not access + // it in either the onSuccess or onFailure callbacks above! Instead, call + // loop again to iterate to the next step which will check whether readable + // is still alive. + return maybeAddFunctor(js, read(js), kj::mv(onSuccess), kj::mv(onFailure)); } } @@ -2643,6 +2804,18 @@ private: dest += part.size(); } } + + void setPendingState(kj::OneOf pending) { + if (maybePendingState == nullptr) { + maybePendingState = kj::mv(pending); + } + } + + bool isReadPending() const { + return pendingReadCount > 0; + } + + friend struct ReadPendingScope>; }; template @@ -2740,6 +2913,9 @@ private: } KJ_CASE_ONEOF(pumping, Pumping) { const auto read = [&](auto& js) { + // There's no need to use a ReadPendingScope here because synchronous + // calls to doClose/doError will not impact the lifetime of the readable + // state. if constexpr (kj::isSameType()) { return readable->read(js, nullptr); } else { diff --git a/src/workerd/api/streams/standard.h b/src/workerd/api/streams/standard.h index 01d79912c12..c93cc3eceb0 100644 --- a/src/workerd/api/streams/standard.h +++ b/src/workerd/api/streams/standard.h @@ -238,7 +238,6 @@ class ReadableImpl { kj::OneOf state; Algorithms algorithms; - bool closeRequested = false; bool disturbed = false; bool pullAgain = false; bool pulling = false; diff --git a/src/workerd/jsg/buffersource.h b/src/workerd/jsg/buffersource.h index d9fd7368b73..09c44876c23 100644 --- a/src/workerd/jsg/buffersource.h +++ b/src/workerd/jsg/buffersource.h @@ -190,6 +190,10 @@ class BackingStore { byteLength -= bytes; } + inline BackingStore clone() { + return BackingStore(backingStore, byteLength, byteOffset, elementSize, ctor, integerType); + } + private: std::shared_ptr backingStore; size_t byteLength;