Skip to content

Commit

Permalink
Improve performance by only deferring reads if needed
Browse files Browse the repository at this point in the history
  • Loading branch information
kelunik committed Nov 12, 2022
1 parent a6b076d commit 3b7d9c6
Showing 1 changed file with 25 additions and 7 deletions.
32 changes: 25 additions & 7 deletions src/ReadableResourceStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ final class ReadableResourceStream implements ReadableStream, ResourceStream

private readonly DeferredFuture $onClose;

private int $continuousReads = 0;

private readonly \Closure $resumeSuspension;

private readonly \Closure $resetContinuousReads;

/**
* @param resource $stream Stream resource.
* @param positive-int $chunkSize Default chunk size per read operation.
Expand Down Expand Up @@ -129,6 +135,16 @@ public function __construct($stream, int $chunkSize = self::DEFAULT_CHUNK_SIZE)

EventLoop::disable($callbackId);
};

$this->resumeSuspension = static function () use (&$suspension): void {
$suspension?->resume();
$suspension = null;
};

$continuousReads = &$this->continuousReads;
$this->resetContinuousReads = static function () use (&$continuousReads): void {
$continuousReads = 0;
};
}

/**
Expand Down Expand Up @@ -191,14 +207,16 @@ public function read(?Cancellation $cancellation = null, ?int $limit = null): ?s
}
}

// Use a deferred suspension so other events are not starved by a stream that always has data available.
$this->suspension = EventLoop::getSuspension();
EventLoop::defer(function () use ($data): void {
$this->suspension?->resume($data);
$this->suspension = null;
});
if ($this->continuousReads > 10) {
// Use a deferred suspension so other events are not starved by a stream that always has data available.
$this->suspension = EventLoop::getSuspension();
EventLoop::defer($this->resumeSuspension);
$this->suspension->suspend();
} elseif ($this->continuousReads++ === 0) {
EventLoop::queue($this->resetContinuousReads);
}

return $this->suspension->suspend();
return $data;
}

public function isReadable(): bool
Expand Down

0 comments on commit 3b7d9c6

Please # to comment.