diff --git a/core/jvm/src/test/scala/fs2/PipeSpec.scala b/core/jvm/src/test/scala/fs2/PipeSpec.scala index 8adfbc81e8..771439358e 100644 --- a/core/jvm/src/test/scala/fs2/PipeSpec.scala +++ b/core/jvm/src/test/scala/fs2/PipeSpec.scala @@ -183,6 +183,12 @@ class PipeSpec extends Fs2Spec { runLog(r) shouldBe runLog(s.get).map(f) } + "mapAsync.exception" in forAll { s: PureStream[Int] => + val f = (_: Int) => IO.raiseError[Int](new RuntimeException) + val r = s.get.covary[IO].mapAsync(1)(f).attempt + runLog(r).size == 1 + } + "mapAsyncUnordered" in forAll { s: PureStream[Int] => val f = (_: Int) + 1 val r = s.get.covary[IO].mapAsyncUnordered(16)(i => IO(f(i))) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index f124fe8c80..a571e3f33c 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -1828,13 +1828,51 @@ object Stream { * the original stream. * * @example {{{ - * scala> import cats.effect.IO, scala.concurrent.ExecutionContext.Implicits.global - * scala> Stream(1,2,3,4).mapAsync(2)(i => IO(println(i))).compile.drain.unsafeRunSync - * res0: Unit = () - * }}} + * scala> import cats.effect.IO, scala.concurrent.ExecutionContext.Implicits.global + * scala> Stream(1,2,3,4).mapAsync(2)(i => IO(println(i))).compile.drain.unsafeRunSync + * res0: Unit = () + * }}} */ def mapAsync[O2](parallelism: Int)( f: O => F[O2])(implicit F: Effect[F], executionContext: ExecutionContext): Stream[F, O2] = + // backward compatibility workaround. see https://github.com/functional-streams-for-scala/fs2/pull/1355 + F match { + case c: ConcurrentEffect[F] => + Stream + .eval(async.mutable.Queue.bounded[F, Option[F[Either[Throwable, O2]]]](parallelism)) + .flatMap { queue => + Stream.eval(Promise.empty[F, Unit]).flatMap { dequeueDone => + queue.dequeue.unNoneTerminate + .evalMap(identity) + .rethrow + .onFinalize(dequeueDone.complete(())) + .concurrently { + self + .evalMap { o => + Promise.empty[F, Either[Throwable, O2]].flatMap { promise => + val enqueue = + queue.enqueue1(Some(promise.get)).as { + Stream.eval(f(o).attempt).evalMap(promise.complete) + } + + c.race(dequeueDone.get, enqueue).map { + case Left(()) => Stream.empty.covaryAll[F, Unit] + case Right(stream) => stream + } + } + } + .join(parallelism) + .drain + .onFinalize(c.race(dequeueDone.get, queue.enqueue1(None)).void) + } + } + } + case _ => mapAsyncCompat(parallelism)(f) + } + + // this is a version of mapAsync without #1297 fixed + private def mapAsyncCompat[O2](parallelism: Int)( + f: O => F[O2])(implicit F: Effect[F], executionContext: ExecutionContext): Stream[F, O2] = Stream .eval(async.mutable.Queue.bounded[F, Option[F[Either[Throwable, O2]]]](parallelism)) .flatMap { queue =>