Skip to content

Commit

Permalink
Merge pull request #1727 from mpilquist/topic/debug
Browse files Browse the repository at this point in the history
Added debug and debugChunks combinators
  • Loading branch information
mpilquist authored Jan 7, 2020
2 parents b47eb6a + 403a348 commit 8629db0
Showing 1 changed file with 53 additions and 0 deletions.
53 changes: 53 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,59 @@ final class Stream[+F[_], +O] private (private[fs2] val free: FreeC[F, O, Unit])
def metered[F2[x] >: F[x]: Timer](rate: FiniteDuration): Stream[F2, O] =
Stream.fixedRate[F2](rate).zipRight(this)

/**
* Logs the elements of this stream as they are pulled.
*
* By default, `toString` is called on each element and the result is printed
* to standard out. To change formatting, supply a value for the `formatter`
* param. To change the destination, supply a value for the `logger` param.
*
* This method does not change the chunk structure of the stream. To debug the
* chunk structure, see [[debugChunks]].
*
* Logging is not done in `F` because this operation is intended for debugging,
* including pure streams.
*
* @example {{{
* scala> Stream(1, 2).append(Stream(3, 4)).debug(o => s"a: $o").toList
* a: 1
* a: 2
* a: 3
* a: 4
* res0: List[Int] = List(1, 2, 3, 4)
* }}}
*/
def debug(
formatter: O => String = _.toString,
logger: String => Unit = println(_)
): Stream[F, O] =
map { o =>
logger(formatter(o))
o
}

/**
* Like [[debug]] but logs chunks as they are pulled instead of individual elements.
*
* @example {{{
* scala> Stream(1, 2, 3).append(Stream(4, 5, 6)).debugChunks(c => s"a: $c").buffer(2).debugChunks(c => s"b: $c").toList
* a: Chunk(1, 2, 3)
* b: Chunk(1, 2)
* a: Chunk(4, 5, 6)
* b: Chunk(3, 4)
* b: Chunk(5, 6)
* res0: List[Int] = List(1, 2, 3, 4, 5, 6)
* }}}
*/
def debugChunks(
formatter: Chunk[O] => String = _.toString,
logger: String => Unit = println(_)
): Stream[F, O] =
chunks.flatMap { os =>
logger(formatter(os))
Stream.chunk(os)
}

/**
* Returns a stream that when run, sleeps for duration `d` and then pulls from this stream.
*
Expand Down

0 comments on commit 8629db0

Please # to comment.