diff --git a/benchmark/src/main/scala/fs2/benchmark/StreamBenchmark.scala b/benchmark/src/main/scala/fs2/benchmark/StreamBenchmark.scala index 1fd015907f..4e14ffafb7 100644 --- a/benchmark/src/main/scala/fs2/benchmark/StreamBenchmark.scala +++ b/benchmark/src/main/scala/fs2/benchmark/StreamBenchmark.scala @@ -92,4 +92,12 @@ class StreamBenchmark { .compile .drain .unsafeRunSync + + @Benchmark + def evalMap() = + Stream.emits(0 until n).evalMap(x => IO(x * 5)).compile.drain.unsafeRunSync + + @Benchmark + def evalMaps() = + Stream.emits(0 until n).evalMaps(x => IO(x * 5)).compile.drain.unsafeRunSync } diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index a55c910a79..745bb7a114 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -968,10 +968,38 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) * scala> Stream(1,2,3,4).evalMap(i => IO(println(i))).compile.drain.unsafeRunSync * res0: Unit = () * }}} + * + * Note this operator will de-chunk the stream back into chunks of size 1, which has performance + * implications. For maximum performance, `evalMaps` is available, however, with caveats. */ def evalMap[F2[x] >: F[x], O2](f: O => F2[O2]): Stream[F2, O2] = flatMap(o => Stream.eval(f(o))) + /** + * Like `evalMap`, but operates on chunks for performance. This means this operator + * is not lazy on every single element, rather on the chunks. + * + * For instance, `evalMap` would only print twice in the follow example (note the `take(2)`): + * @example {{{ + * scala> import cats.effect.IO + * scala> Stream(1,2,3,4).evalMap(i => IO(println(i))).take(2).compile.drain.unsafeRunSync + * 1 + * 2 + * }}} + * + * But with `evalMaps`, it will print 4 times: + * @example {{{ + * scala> import cats.effect.IO + * scala> Stream(1,2,3,4).evalMaps(i => IO(println(i))).take(2).compile.drain.unsafeRunSync + * 1 + * 2 + * 3 + * 4 + * }}} + */ + def evalMaps[F2[x] >: F[x]: Applicative, O2](f: O => F2[O2]): Stream[F2, O2] = + chunks.flatMap(o => Stream.evalUnChunk(o.traverse(f))) + /** * Like `[[Stream#mapAccumulate]]`, but accepts a function returning an `F[_]`. * @@ -1032,6 +1060,12 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) def evalTap[F2[x] >: F[x]: Functor](f: O => F2[_]): Stream[F2, O] = evalMap(o => f(o).as(o)) + /** + * Alias for `evalMaps(o => f(o).as(o))`. + */ + def evalTaps[F2[x] >: F[x]: Functor: Applicative](f: O => F2[_]): Stream[F2, O] = + evalMaps(o => f(o).as(o)) + /** * Emits `true` as soon as a matching element is received, else `false` if no input matches. * '''Pure''': this operation maps to `List.exists`