Skip to content

Commit 66c0956

Browse files
committed
Fix clearing waiting and back-pressure on completion
Fixes #22.
1 parent f1c2ce3 commit 66c0956

File tree

2 files changed

+26
-11
lines changed

2 files changed

+26
-11
lines changed

src/Internal/QueueState.php

+7-11
Original file line numberDiff line numberDiff line change
@@ -386,7 +386,7 @@ private function finalize(?\Throwable $exception = null, bool $disposed = false)
386386
private function relieveBackPressure(?\Throwable $exception): void
387387
{
388388
$backPressure = $this->backpressure;
389-
unset($this->backpressure);
389+
$this->backpressure = [];
390390

391391
foreach ($backPressure as $placeholder) {
392392
if ($exception) {
@@ -407,17 +407,13 @@ private function relieveBackPressure(?\Throwable $exception): void
407407
private function resolvePending(): void
408408
{
409409
$waiting = $this->waiting;
410-
unset($this->waiting);
410+
$this->waiting = [];
411411

412-
$exception = $this->exception ?? null;
413-
414-
if ($waiting) {
415-
foreach ($waiting as $suspension) {
416-
if ($exception) {
417-
$suspension->throw($exception);
418-
} else {
419-
$suspension->resume($this->currentPosition);
420-
}
412+
foreach ($waiting as $suspension) {
413+
if ($this->exception) {
414+
$suspension->throw($this->exception);
415+
} else {
416+
$suspension->resume($this->currentPosition);
421417
}
422418
}
423419
}

test/QueueTest.php

+19
Original file line numberDiff line numberDiff line change
@@ -566,4 +566,23 @@ public function provideBufferSize(): iterable
566566
yield 'buffer size 5' => [5];
567567
yield 'buffer size 10' => [10];
568568
}
569+
570+
public function testCompleteWhileContinueCancellationPending(): void
571+
{
572+
$queue = new Queue();
573+
$iterator = $queue->iterate();
574+
575+
$deferredCancellation = new DeferredCancellation();
576+
577+
$future1 = async(fn () => $iterator->continue($deferredCancellation->getCancellation()));
578+
579+
$future2 = async(function () use ($queue, $deferredCancellation): void {
580+
$deferredCancellation->cancel();
581+
$queue->complete();
582+
});
583+
584+
$future2->await();
585+
586+
self::assertFalse($future1->await());
587+
}
569588
}

0 commit comments

Comments
 (0)