From 089e51c23c1bae9068cd4df3361a00f607bff199 Mon Sep 17 00:00:00 2001 From: Michael Pilquist Date: Mon, 14 Mar 2022 16:37:29 -0400 Subject: [PATCH] Fix slowdown in throttle due to improper use of stepLeg (#2839) * Update core/shared/src/main/scala/fs2/timeseries/TimeStamped.scala Co-authored-by: Diego E. Alonso Blas --- .../scala/fs2/timeseries/TimeStamped.scala | 55 +++++++++++-------- 1 file changed, 32 insertions(+), 23 deletions(-) diff --git a/core/shared/src/main/scala/fs2/timeseries/TimeStamped.scala b/core/shared/src/main/scala/fs2/timeseries/TimeStamped.scala index 0cbf1dbbda..b7bc6ab263 100644 --- a/core/shared/src/main/scala/fs2/timeseries/TimeStamped.scala +++ b/core/shared/src/main/scala/fs2/timeseries/TimeStamped.scala @@ -185,7 +185,10 @@ object TimeStamped { def doThrottle: Pipe2[F, TimeStamped[A], Unit, TimeStamped[A]] = { type PullFromSourceOrTicks = - (Stream[F, TimeStamped[A]], Stream[F, Unit]) => Pull[F, TimeStamped[A], Unit] + ( + Stream.StepLeg[F, TimeStamped[A]], + Stream.StepLeg[F, Unit] + ) => Pull[F, TimeStamped[A], Unit] def takeUpto( chunk: Chunk[TimeStamped[A]], @@ -196,32 +199,33 @@ object TimeStamped { } def read(upto: FiniteDuration): PullFromSourceOrTicks = { (src, ticks) => - src.pull.stepLeg.flatMap { - case Some(leg) => - val chunk = leg.head - if (chunk.isEmpty) read(upto)(leg.stream, ticks) - else { - val (toOutput, pending) = takeUpto(chunk, upto) - if (pending.isEmpty) Pull.output(toOutput) >> read(upto)(leg.stream, ticks) - else Pull.output(toOutput) >> awaitTick(upto, pending)(leg.stream, ticks) - } - case None => Pull.done + if (src.head.isEmpty) { + src.stepLeg.flatMap { + case Some(l) => read(upto)(l, ticks) + case None => Pull.done + } + } else { + val (toOutput, pending) = takeUpto(src.head, upto) + if (pending.isEmpty) Pull.output(toOutput) >> read(upto)(src.setHead(Chunk.empty), ticks) + else Pull.output(toOutput) >> awaitTick(upto, pending)(src.setHead(Chunk.empty), ticks) } } def awaitTick(upto: FiniteDuration, pending: Chunk[TimeStamped[A]]): PullFromSourceOrTicks = { (src, ticks) => - ticks.pull.stepLeg.flatMap { - case Some(leg) => - val tl = leg.stream.cons(leg.head.drop(1)) - val newUpto = upto + ((1000 / ticksPerSecond) * throttlingFactor).toLong.millis - val (toOutput, stillPending) = takeUpto(pending, newUpto) - if (stillPending.isEmpty) { - Pull.output(toOutput) >> read(newUpto)(src, tl) - } else { - Pull.output(toOutput) >> awaitTick(newUpto, stillPending)(src, tl) - } - case None => Pull.done + if (ticks.head.isEmpty) { + ticks.stepLeg.flatMap { + case Some(leg) => awaitTick(upto, pending)(src, leg) + case None => Pull.done + } + } else { + val tl = ticks.setHead(ticks.head.drop(1)) + val newUpto = upto + ((1000 / ticksPerSecond) * throttlingFactor).toLong.millis + val (toOutput, stillPending) = takeUpto(pending, newUpto) + Pull.output(toOutput) >> { + if (stillPending.isEmpty) read(newUpto)(src, tl) + else awaitTick(newUpto, stillPending)(src, tl) + } } } @@ -230,7 +234,12 @@ object TimeStamped { case Some(leg) => val head = leg.head(0) val tl = leg.head.drop(1) - Pull.output1(head) >> read(head.time)(leg.stream.cons(tl), ticks) + Pull.output1(head) >> + ticks.pull.stepLeg.flatMap { + case Some(ticksLeg) => + read(head.time)(leg.setHead(tl), ticksLeg) + case None => Pull.done + } case None => Pull.done }.stream }