Skip to content

Commit

Permalink
Fix groupWithin to not emit empty groups, resolves #1404
Browse files Browse the repository at this point in the history
  • Loading branch information
Sebruck committed Jan 28, 2019
1 parent 9b0e472 commit b111c8f
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 1 deletion.
14 changes: 14 additions & 0 deletions core/jvm/src/test/scala/fs2/GroupWithinSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,20 @@ class GroupWithinSpec extends Fs2Spec {
runLog(action) shouldBe (result)
}

"groupWithin should never emit empty groups" in forAll {
(s: PureStream[VeryShortFiniteDuration], d: ShortFiniteDuration, maxGroupSize: SmallPositive) =>
whenever(s.get.toVector.nonEmpty) {
val action =
s.get
.covary[IO]
.evalTap(shortDuration => IO.sleep(shortDuration.get))
.groupWithin(maxGroupSize.get, d.get)
.map(_.toList)

runLog(action).foreach(group => group should not be empty)
}
}

"groupWithin should never have more elements than in its specified limit" in forAll {
(s: PureStream[VeryShortFiniteDuration], d: ShortFiniteDuration, maxGroupSize: SmallPositive) =>
val maxGroupSizeAsInt = maxGroupSize.get
Expand Down
2 changes: 1 addition & 1 deletion core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1268,7 +1268,7 @@ final class Stream[+F[_], +O] private (private val free: FreeC[Algebra[Nothing,
this.chunks.map(_.asRight.some).through(q.enqueue).onFinalize(q.enqueue1(None))

def emitNonEmpty(c: Chain[Chunk[O]]): Stream[F2, Chunk[O]] =
if (c.nonEmpty) Stream.emit(Chunk.concat(c.toList))
if (c.nonEmpty && !c.forall(_.isEmpty)) Stream.emit(Chunk.concat(c.toList))
else Stream.empty

def resize(c: Chunk[O], s: Stream[F2, Chunk[O]]): (Stream[F2, Chunk[O]], Chunk[O]) =
Expand Down

0 comments on commit b111c8f

Please # to comment.