Skip to content

Commit

Permalink
Add evalMaps and evalTaps which operate on chunks
Browse files Browse the repository at this point in the history
  • Loading branch information
CremboC committed Feb 14, 2020
1 parent 3fa7e24 commit bbd1e9e
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 0 deletions.
8 changes: 8 additions & 0 deletions benchmark/src/main/scala/fs2/benchmark/StreamBenchmark.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,4 +92,12 @@ class StreamBenchmark {
.compile
.drain
.unsafeRunSync

@Benchmark
def evalMap() =
Stream.emits(0 until n).evalMap(x => IO(x * 5)).compile.drain.unsafeRunSync

@Benchmark
def evalMaps() =
Stream.emits(0 until n).evalMaps(x => IO(x * 5)).compile.drain.unsafeRunSync
}
34 changes: 34 additions & 0 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -968,10 +968,38 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
* scala> Stream(1,2,3,4).evalMap(i => IO(println(i))).compile.drain.unsafeRunSync
* res0: Unit = ()
* }}}
*
* Note this operator will de-chunk the stream back into chunks of size 1, which has performance
* implications. For maximum performance, `evalMaps` is available, however, with caveats.
*/
def evalMap[F2[x] >: F[x], O2](f: O => F2[O2]): Stream[F2, O2] =
flatMap(o => Stream.eval(f(o)))

/**
* Like `evalMap`, but operates on chunks for performance. This means this operator
* is not lazy on every single element, rather on the chunks.
*
* For instance, `evalMap` would only print twice in the follow example (note the `take(2)`):
* @example {{{
* scala> import cats.effect.IO
* scala> Stream(1,2,3,4).evalMap(i => IO(println(i))).take(2).compile.drain.unsafeRunSync
* 1
* 2
* }}}
*
* But with `evalMaps`, it will print 4 times:
* @example {{{
* scala> import cats.effect.IO
* scala> Stream(1,2,3,4).evalMaps(i => IO(println(i))).take(2).compile.drain.unsafeRunSync
* 1
* 2
* 3
* 4
* }}}
*/
def evalMaps[F2[x] >: F[x]: Applicative, O2](f: O => F2[O2]): Stream[F2, O2] =
chunks.flatMap(o => Stream.evalUnChunk(o.traverse(f)))

/**
* Like `[[Stream#mapAccumulate]]`, but accepts a function returning an `F[_]`.
*
Expand Down Expand Up @@ -1032,6 +1060,12 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit])
def evalTap[F2[x] >: F[x]: Functor](f: O => F2[_]): Stream[F2, O] =
evalMap(o => f(o).as(o))

/**
* Alias for `evalMaps(o => f(o).as(o))`.
*/
def evalTaps[F2[x] >: F[x]: Functor: Applicative](f: O => F2[_]): Stream[F2, O] =
evalMaps(o => f(o).as(o))

/**
* Emits `true` as soon as a matching element is received, else `false` if no input matches.
* '''Pure''': this operation maps to `List.exists`
Expand Down

0 comments on commit bbd1e9e

Please # to comment.