Skip to content

Commit

Permalink
Merge pull request #590 from cloudflare/jsnell/streams-cleanups-anoth…
Browse files Browse the repository at this point in the history
…er-round
  • Loading branch information
jasnell authored May 4, 2023
2 parents ecbffd8 + 8145a33 commit 91de01c
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 182 deletions.
96 changes: 48 additions & 48 deletions src/workerd/api/streams/queue-test.c++
Original file line number Diff line number Diff line change
Expand Up @@ -97,18 +97,18 @@ auto read(jsg::Lock& js, auto& consumer) {

auto byobRead(jsg::Lock& js, auto& consumer, int size) {
auto prp = js.newPromiseAndResolver<ReadResult>();
consumer.read(js, ByteQueue::ReadRequest {
.resolver = kj::mv(prp.resolver),
.pullInto {
consumer.read(js, ByteQueue::ReadRequest(
kj::mv(prp.resolver),
{
.store = jsg::BackingStore::alloc(js, size),
.type = ByteQueue::ReadRequest::Type::BYOB,
},
});
}
));
return kj::mv(prp.promise);
};

auto getEntry(jsg::Lock& js, auto size) {
return kj::refcounted<ValueQueue::Entry>(js.v8Ref(v8::True(js.v8Isolate).As<v8::Value>()), size);
return kj::heap<ValueQueue::Entry>(js.v8Ref(v8::True(js.v8Isolate).As<v8::Value>()), size);
}

#pragma region ValueQueue Tests
Expand Down Expand Up @@ -388,7 +388,7 @@ KJ_TEST("ByteQueue basics work") {
KJ_ASSERT(queue.desiredSize() == 2);
KJ_ASSERT(queue.size() == 0);

auto entry = kj::refcounted<ByteQueue::Entry>(jsg::BackingStore::alloc(js, 4));
auto entry = kj::heap<ByteQueue::Entry>(jsg::BackingStore::alloc(js, 4));

queue.push(js, kj::mv(entry));

Expand All @@ -400,7 +400,7 @@ KJ_TEST("ByteQueue basics work") {
queue.close(js);

try {
auto entry = kj::refcounted<ByteQueue::Entry>(jsg::BackingStore::alloc(js, 4));
auto entry = kj::heap<ByteQueue::Entry>(jsg::BackingStore::alloc(js, 4));
queue.push(js, kj::mv(entry));
KJ_FAIL_ASSERT("The queue push after close should have failed.");
} catch (kj::Exception& ex) {
Expand All @@ -422,7 +422,7 @@ KJ_TEST("ByteQueue erroring works") {
KJ_ASSERT(queue.desiredSize() == 0);

try {
auto entry = kj::refcounted<ByteQueue::Entry>(jsg::BackingStore::alloc(js, 4));
auto entry = kj::heap<ByteQueue::Entry>(jsg::BackingStore::alloc(js, 4));
queue.push(js, kj::mv(entry));
KJ_FAIL_ASSERT("The queue push after close should have failed.");
} catch (kj::Exception& ex) {
Expand All @@ -443,7 +443,7 @@ KJ_TEST("ByteQueue with single consumer") {
auto store = jsg::BackingStore::alloc(js, 4);
memset(store.asArrayPtr().begin(), 'a', store.size());

auto entry = kj::refcounted<ByteQueue::Entry>(kj::mv(store));
auto entry = kj::heap<ByteQueue::Entry>(kj::mv(store));
queue.push(js, kj::mv(entry));

// The item was pushed into the consumer.
Expand All @@ -454,12 +454,12 @@ KJ_TEST("ByteQueue with single consumer") {
KJ_ASSERT(queue.desiredSize() == -2);

auto prp = js.newPromiseAndResolver<ReadResult>();
consumer.read(js, ByteQueue::ReadRequest {
.resolver = kj::mv(prp.resolver),
.pullInto {
consumer.read(js, ByteQueue::ReadRequest(
kj::mv(prp.resolver),
{
.store = jsg::BackingStore::alloc(js, 4),
},
});
}
));

MustCall<ReadContinuation> readContinuation([&](jsg::Lock& js, auto&& result) -> auto {
KJ_ASSERT(!result.done);
Expand Down Expand Up @@ -493,13 +493,13 @@ KJ_TEST("ByteQueue with single byob consumer") {
ByteQueue::Consumer consumer(queue);

auto prp = js.newPromiseAndResolver<ReadResult>();
consumer.read(js, ByteQueue::ReadRequest {
.resolver = kj::mv(prp.resolver),
.pullInto {
consumer.read(js, ByteQueue::ReadRequest(
kj::mv(prp.resolver),
{
.store = jsg::BackingStore::alloc(js, 4),
.type = ByteQueue::ReadRequest::Type::BYOB,
},
});
}
));

MustCall<ReadContinuation> readContinuation([&](jsg::Lock& js, auto&& result) -> auto {
KJ_ASSERT(!result.done);
Expand Down Expand Up @@ -549,13 +549,13 @@ KJ_TEST("ByteQueue with byob consumer and default consumer") {
ByteQueue::Consumer consumer2(queue);

auto prp = js.newPromiseAndResolver<ReadResult>();
consumer1.read(js, ByteQueue::ReadRequest {
.resolver = kj::mv(prp.resolver),
.pullInto {
consumer1.read(js, ByteQueue::ReadRequest(
kj::mv(prp.resolver),
{
.store = jsg::BackingStore::alloc(js, 4),
.type = ByteQueue::ReadRequest::Type::BYOB,
},
});
}
));

MustCall<ReadContinuation> readContinuation([&](jsg::Lock& js, auto&& result) -> auto {
KJ_ASSERT(!result.done);
Expand Down Expand Up @@ -616,13 +616,13 @@ KJ_TEST("ByteQueue with byob consumer and default consumer") {
});

auto prp2 = js.newPromiseAndResolver<ReadResult>();
consumer2.read(js, ByteQueue::ReadRequest {
.resolver = kj::mv(prp2.resolver),
.pullInto {
consumer2.read(js, ByteQueue::ReadRequest(
kj::mv(prp2.resolver),
{
.store = jsg::BackingStore::alloc(js, 4),
.type = ByteQueue::ReadRequest::Type::DEFAULT,
},
});
}
));
prp2.promise.then(js, read2Continuation);

js.v8Isolate->PerformMicrotaskCheckpoint();
Expand Down Expand Up @@ -917,19 +917,19 @@ KJ_TEST("ByteQueue with default consumer with atLeast") {

const auto read = [&](jsg::Lock& js, uint atLeast) {
auto prp = js.newPromiseAndResolver<ReadResult>();
consumer.read(js, ByteQueue::ReadRequest {
.resolver = kj::mv(prp.resolver),
.pullInto = {
consumer.read(js, ByteQueue::ReadRequest(
kj::mv(prp.resolver),
{
.store = jsg::BackingStore::alloc(js, 5),
.atLeast = atLeast,
},
});
}
));
return kj::mv(prp.promise);
};

const auto push = [&](auto store) {
try {
queue.push(js, kj::refcounted<ByteQueue::Entry>(kj::mv(store)));
queue.push(js, kj::heap<ByteQueue::Entry>(kj::mv(store)));
} catch (kj::Exception& ex) {
KJ_DBG(ex.getDescription());
}
Expand Down Expand Up @@ -1008,19 +1008,19 @@ KJ_TEST("ByteQueue with multiple default consumers with atLeast (same rate)") {

const auto read = [&](jsg::Lock& js, auto& consumer, uint atLeast = 1) {
auto prp = js.newPromiseAndResolver<ReadResult>();
consumer.read(js, ByteQueue::ReadRequest {
.resolver = kj::mv(prp.resolver),
.pullInto = {
consumer.read(js, ByteQueue::ReadRequest(
kj::mv(prp.resolver),
{
.store = jsg::BackingStore::alloc(js, 5),
.atLeast = atLeast,
},
});
}
));
return kj::mv(prp.promise);
};

const auto push = [&](auto store) {
try {
queue.push(js, kj::refcounted<ByteQueue::Entry>(kj::mv(store)));
queue.push(js, kj::heap<ByteQueue::Entry>(kj::mv(store)));
} catch (kj::Exception& ex) {
KJ_DBG(ex.getDescription());
}
Expand Down Expand Up @@ -1118,19 +1118,19 @@ KJ_TEST("ByteQueue with multiple default consumers with atLeast (different rate)

const auto read = [&](jsg::Lock& js, auto& consumer, uint atLeast = 1) {
auto prp = js.newPromiseAndResolver<ReadResult>();
consumer.read(js, ByteQueue::ReadRequest {
.resolver = kj::mv(prp.resolver),
.pullInto = {
consumer.read(js, ByteQueue::ReadRequest(
kj::mv(prp.resolver),
{
.store = jsg::BackingStore::alloc(js, 5),
.atLeast = atLeast,
},
});
}
));
return kj::mv(prp.promise);
};

const auto push = [&](auto store) {
try {
queue.push(js, kj::refcounted<ByteQueue::Entry>(kj::mv(store)));
queue.push(js, kj::heap<ByteQueue::Entry>(kj::mv(store)));
} catch (kj::Exception& ex) {
KJ_DBG(ex.getDescription());
}
Expand Down
Loading

0 comments on commit 91de01c

Please # to comment.