Skip to content

Commit

Permalink
Fix slowdown in throttle due to improper use of stepLeg (#2839)
Browse files Browse the repository at this point in the history
* Update core/shared/src/main/scala/fs2/timeseries/TimeStamped.scala

Co-authored-by: Diego E. Alonso Blas <diesalbla@gmail.com>
  • Loading branch information
mpilquist and diesalbla authored Mar 14, 2022
1 parent 3f40b8c commit 089e51c
Showing 1 changed file with 32 additions and 23 deletions.
55 changes: 32 additions & 23 deletions core/shared/src/main/scala/fs2/timeseries/TimeStamped.scala
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ object TimeStamped {
def doThrottle: Pipe2[F, TimeStamped[A], Unit, TimeStamped[A]] = {

type PullFromSourceOrTicks =
(Stream[F, TimeStamped[A]], Stream[F, Unit]) => Pull[F, TimeStamped[A], Unit]
(
Stream.StepLeg[F, TimeStamped[A]],
Stream.StepLeg[F, Unit]
) => Pull[F, TimeStamped[A], Unit]

def takeUpto(
chunk: Chunk[TimeStamped[A]],
Expand All @@ -196,32 +199,33 @@ object TimeStamped {
}

def read(upto: FiniteDuration): PullFromSourceOrTicks = { (src, ticks) =>
src.pull.stepLeg.flatMap {
case Some(leg) =>
val chunk = leg.head
if (chunk.isEmpty) read(upto)(leg.stream, ticks)
else {
val (toOutput, pending) = takeUpto(chunk, upto)
if (pending.isEmpty) Pull.output(toOutput) >> read(upto)(leg.stream, ticks)
else Pull.output(toOutput) >> awaitTick(upto, pending)(leg.stream, ticks)
}
case None => Pull.done
if (src.head.isEmpty) {
src.stepLeg.flatMap {
case Some(l) => read(upto)(l, ticks)
case None => Pull.done
}
} else {
val (toOutput, pending) = takeUpto(src.head, upto)
if (pending.isEmpty) Pull.output(toOutput) >> read(upto)(src.setHead(Chunk.empty), ticks)
else Pull.output(toOutput) >> awaitTick(upto, pending)(src.setHead(Chunk.empty), ticks)
}
}

def awaitTick(upto: FiniteDuration, pending: Chunk[TimeStamped[A]]): PullFromSourceOrTicks = {
(src, ticks) =>
ticks.pull.stepLeg.flatMap {
case Some(leg) =>
val tl = leg.stream.cons(leg.head.drop(1))
val newUpto = upto + ((1000 / ticksPerSecond) * throttlingFactor).toLong.millis
val (toOutput, stillPending) = takeUpto(pending, newUpto)
if (stillPending.isEmpty) {
Pull.output(toOutput) >> read(newUpto)(src, tl)
} else {
Pull.output(toOutput) >> awaitTick(newUpto, stillPending)(src, tl)
}
case None => Pull.done
if (ticks.head.isEmpty) {
ticks.stepLeg.flatMap {
case Some(leg) => awaitTick(upto, pending)(src, leg)
case None => Pull.done
}
} else {
val tl = ticks.setHead(ticks.head.drop(1))
val newUpto = upto + ((1000 / ticksPerSecond) * throttlingFactor).toLong.millis
val (toOutput, stillPending) = takeUpto(pending, newUpto)
Pull.output(toOutput) >> {
if (stillPending.isEmpty) read(newUpto)(src, tl)
else awaitTick(newUpto, stillPending)(src, tl)
}
}
}

Expand All @@ -230,7 +234,12 @@ object TimeStamped {
case Some(leg) =>
val head = leg.head(0)
val tl = leg.head.drop(1)
Pull.output1(head) >> read(head.time)(leg.stream.cons(tl), ticks)
Pull.output1(head) >>
ticks.pull.stepLeg.flatMap {
case Some(ticksLeg) =>
read(head.time)(leg.setHead(tl), ticksLeg)
case None => Pull.done
}
case None => Pull.done
}.stream
}
Expand Down

0 comments on commit 089e51c

Please # to comment.