Skip to content

Commit

Permalink
Backport typelevel#1311 to 0.10
Browse files Browse the repository at this point in the history
  • Loading branch information
vpavkin committed Dec 3, 2018
1 parent a7ee988 commit 063580c
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 20 deletions.
6 changes: 6 additions & 0 deletions core/jvm/src/test/scala/fs2/PipeSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,12 @@ class PipeSpec extends Fs2Spec {
runLog(r) shouldBe runLog(s.get).map(f)
}

"mapAsync.exception" in forAll { s: PureStream[Int] =>
val f = (_: Int) => IO.raiseError[Int](new RuntimeException)
val r = s.get.covary[IO].mapAsync(1)(f).attempt
runLog(r).size == 1
}

"mapAsyncUnordered" in forAll { s: PureStream[Int] =>
val f = (_: Int) + 1
val r = s.get.covary[IO].mapAsyncUnordered(16)(i => IO(f(i)))
Expand Down
52 changes: 32 additions & 20 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1833,27 +1833,37 @@ object Stream {
* res0: Unit = ()
* }}}
*/
def mapAsync[O2](parallelism: Int)(
f: O => F[O2])(implicit F: Effect[F], executionContext: ExecutionContext): Stream[F, O2] =
def mapAsync[O2](parallelism: Int)(f: O => F[O2])(
implicit F: ConcurrentEffect[F],
executionContext: ExecutionContext): Stream[F, O2] =
Stream
.eval(async.mutable.Queue.bounded[F, Option[F[Either[Throwable, O2]]]](parallelism))
.flatMap { queue =>
queue.dequeue.unNoneTerminate
.evalMap(identity)
.rethrow
.concurrently {
self
.evalMap { o =>
Promise.empty[F, Either[Throwable, O2]].flatMap { promise =>
queue.enqueue1(Some(promise.get)).as {
Stream.eval(f(o).attempt).evalMap(promise.complete)
Stream.eval(Promise.empty[F, Unit]).flatMap { dequeueDone =>
queue.dequeue.unNoneTerminate
.evalMap(identity)
.rethrow
.onFinalize(dequeueDone.complete(()))
.concurrently {
self
.evalMap { o =>
Promise.empty[F, Either[Throwable, O2]].flatMap { promise =>
val enqueue =
queue.enqueue1(Some(promise.get)).as {
Stream.eval(f(o).attempt).evalMap(promise.complete)
}

F.race(dequeueDone.get, enqueue).map {
case Left(()) => Stream.empty.covaryAll[F, Unit]
case Right(stream) => stream
}
}
}
}
.join(parallelism)
.drain
.onFinalize(queue.enqueue1(None))
}
.join(parallelism)
.drain
.onFinalize(F.race(dequeueDone.get, queue.enqueue1(None)).void)
}
}
}

/**
Expand Down Expand Up @@ -2736,12 +2746,14 @@ object Stream {
def evalScan[F[_], O2](z: O2)(f: (O2, O) => F[O2]): Stream[F, O2] =
covary[F].evalScan(z)(f)

def mapAsync[F[_], O2](parallelism: Int)(
f: O => F[O2])(implicit F: Effect[F], executionContext: ExecutionContext): Stream[F, O2] =
def mapAsync[F[_], O2](parallelism: Int)(f: O => F[O2])(
implicit F: ConcurrentEffect[F],
executionContext: ExecutionContext): Stream[F, O2] =
covary[F].mapAsync(parallelism)(f)

def mapAsyncUnordered[F[_], O2](parallelism: Int)(
f: O => F[O2])(implicit F: Effect[F], executionContext: ExecutionContext): Stream[F, O2] =
def mapAsyncUnordered[F[_], O2](parallelism: Int)(f: O => F[O2])(
implicit F: ConcurrentEffect[F],
executionContext: ExecutionContext): Stream[F, O2] =
covary[F].mapAsyncUnordered(parallelism)(f)

def flatMap[F[_], O2](f: O => Stream[F, O2]): Stream[F, O2] =
Expand Down

0 comments on commit 063580c

Please # to comment.