From 3b7d9c67e9bb8fd6bb0b238fe669a2bf5e8c5ae7 Mon Sep 17 00:00:00 2001 From: Niklas Keller Date: Sat, 12 Nov 2022 21:37:44 +0100 Subject: [PATCH] Improve performance by only deferring reads if needed --- src/ReadableResourceStream.php | 32 +++++++++++++++++++++++++------- 1 file changed, 25 insertions(+), 7 deletions(-) diff --git a/src/ReadableResourceStream.php b/src/ReadableResourceStream.php index b83ed9d..567b7c6 100644 --- a/src/ReadableResourceStream.php +++ b/src/ReadableResourceStream.php @@ -38,6 +38,12 @@ final class ReadableResourceStream implements ReadableStream, ResourceStream private readonly DeferredFuture $onClose; + private int $continuousReads = 0; + + private readonly \Closure $resumeSuspension; + + private readonly \Closure $resetContinuousReads; + /** * @param resource $stream Stream resource. * @param positive-int $chunkSize Default chunk size per read operation. @@ -129,6 +135,16 @@ public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE) EventLoop::disable($callbackId); }; + + $this->resumeSuspension = static function () use (&$suspension): void { + $suspension?->resume(); + $suspension = null; + }; + + $continuousReads = &$this->continuousReads; + $this->resetContinuousReads = static function () use (&$continuousReads): void { + $continuousReads = 0; + }; } /** @@ -191,14 +207,16 @@ public function read(?Cancellation $cancellation = null, ?int $limit = null): ?s } } - // Use a deferred suspension so other events are not starved by a stream that always has data available. - $this->suspension = EventLoop::getSuspension(); - EventLoop::defer(function () use ($data): void { - $this->suspension?->resume($data); - $this->suspension = null; - }); + if ($this->continuousReads > 10) { + // Use a deferred suspension so other events are not starved by a stream that always has data available. + $this->suspension = EventLoop::getSuspension(); + EventLoop::defer($this->resumeSuspension); + $this->suspension->suspend(); + } elseif ($this->continuousReads++ === 0) { + EventLoop::queue($this->resetContinuousReads); + } - return $this->suspension->suspend(); + return $data; } public function isReadable(): bool