Skip to content

Commit

Permalink
Merge pull request #1355 from vpavkin/backport-1311
Browse files Browse the repository at this point in the history
Backport #1311 (fix for #1297) to 0.10
  • Loading branch information
mpilquist authored Dec 4, 2018
2 parents a7ee988 + 7817cde commit a23308b
Show file tree
Hide file tree
Showing 2 changed files with 48 additions and 4 deletions.
6 changes: 6 additions & 0 deletions core/jvm/src/test/scala/fs2/PipeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ class PipeSpec extends Fs2Spec {
runLog(r) shouldBe runLog(s.get).map(f)
}

"mapAsync.exception" in forAll { s: PureStream[Int] =>
val f = (_: Int) => IO.raiseError[Int](new RuntimeException)
val r = s.get.covary[IO].mapAsync(1)(f).attempt
runLog(r).size == 1
}

"mapAsyncUnordered" in forAll { s: PureStream[Int] =>
val f = (_: Int) + 1
val r = s.get.covary[IO].mapAsyncUnordered(16)(i => IO(f(i)))
Expand Down
46 changes: 42 additions & 4 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1828,13 +1828,51 @@ object Stream {
* the original stream.
*
* @example {{{
* scala> import cats.effect.IO, scala.concurrent.ExecutionContext.Implicits.global
* scala> Stream(1,2,3,4).mapAsync(2)(i => IO(println(i))).compile.drain.unsafeRunSync
* res0: Unit = ()
* }}}
* scala> import cats.effect.IO, scala.concurrent.ExecutionContext.Implicits.global
* scala> Stream(1,2,3,4).mapAsync(2)(i => IO(println(i))).compile.drain.unsafeRunSync
* res0: Unit = ()
* }}}
*/
def mapAsync[O2](parallelism: Int)(
f: O => F[O2])(implicit F: Effect[F], executionContext: ExecutionContext): Stream[F, O2] =
// backward compatibility workaround. see https://github.com/functional-streams-for-scala/fs2/pull/1355
F match {
case c: ConcurrentEffect[F] =>
Stream
.eval(async.mutable.Queue.bounded[F, Option[F[Either[Throwable, O2]]]](parallelism))
.flatMap { queue =>
Stream.eval(Promise.empty[F, Unit]).flatMap { dequeueDone =>
queue.dequeue.unNoneTerminate
.evalMap(identity)
.rethrow
.onFinalize(dequeueDone.complete(()))
.concurrently {
self
.evalMap { o =>
Promise.empty[F, Either[Throwable, O2]].flatMap { promise =>
val enqueue =
queue.enqueue1(Some(promise.get)).as {
Stream.eval(f(o).attempt).evalMap(promise.complete)
}

c.race(dequeueDone.get, enqueue).map {
case Left(()) => Stream.empty.covaryAll[F, Unit]
case Right(stream) => stream
}
}
}
.join(parallelism)
.drain
.onFinalize(c.race(dequeueDone.get, queue.enqueue1(None)).void)
}
}
}
case _ => mapAsyncCompat(parallelism)(f)
}

// this is a version of mapAsync without #1297 fixed
private def mapAsyncCompat[O2](parallelism: Int)(
f: O => F[O2])(implicit F: Effect[F], executionContext: ExecutionContext): Stream[F, O2] =
Stream
.eval(async.mutable.Queue.bounded[F, Option[F[Either[Throwable, O2]]]](parallelism))
.flatMap { queue =>
Expand Down

0 comments on commit a23308b

Please # to comment.