From df311566fa19380017ad533ab18283d2c9910e34 Mon Sep 17 00:00:00 2001 From: Dmitry Polienko Date: Thu, 14 Nov 2019 22:52:38 +0700 Subject: [PATCH] Add Stream.resourceWeak --- core/shared/src/main/scala/fs2/Stream.scala | 17 +++- .../src/test/scala/fs2/StreamSpec.scala | 92 +++++++++++++++++++ 2 files changed, 105 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 8e2f5c5ebb..c49fb742d9 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3432,11 +3432,20 @@ object Stream extends StreamLowPriority { def repeatEval[F[_], O](fo: F[O]): Stream[F, O] = eval(fo).repeat /** Converts the supplied resource in to a singleton stream. */ - def resource[F[_], O](r: Resource[F, O]): Stream[F, O] = r match { + def resource[F[_], O](r: Resource[F, O]): Stream[F, O] = + resourceWeak(r).scope + + /** + * Like [[resource]] but does not introduce a scope, allowing finalization to occur after + * subsequent appends or other scope-preserving transformations. + * + * Scopes can be manually introduced via [[scope]] if desired. + */ + def resourceWeak[F[_], O](r: Resource[F, O]): Stream[F, O] = r match { case Resource.Allocate(a) => - Stream.bracketCase(a) { case ((_, release), e) => release(e) }.map(_._1) - case Resource.Bind(r, f) => resource(r).flatMap(o => resource(f(o))) - case Resource.Suspend(r) => Stream.eval(r).flatMap(resource) + Stream.bracketCaseWeak(a) { case ((_, release), e) => release(e) }.map(_._1) + case Resource.Bind(r, f) => resourceWeak(r).flatMap(o => resourceWeak(f(o))) + case Resource.Suspend(r) => Stream.eval(r).flatMap(resourceWeak) } /** diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index f398f443e0..c3bc0c2618 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -2710,6 +2710,98 @@ class StreamSpec extends Fs2Spec { } } + "resource" in { + Ref[IO] + .of(List.empty[String]) + .flatMap { st => + def record(s: String): IO[Unit] = st.update(_ :+ s) + def mkRes(s: String): Resource[IO, Unit] = + Resource.make(record(s"acquire $s"))(_ => record(s"release $s")) + + // We aim to trigger all the possible cases, and make sure all of them + // introduce scopes. + + // Allocate + val res1 = mkRes("1") + // Bind + val res2 = mkRes("21") *> mkRes("22") + // Suspend + val res3 = Resource.suspend( + record("suspend").as(mkRes("3")) + ) + + List(res1, res2, res3) + .foldMap(Stream.resource) + .evalTap(_ => record("use")) + .append(Stream.eval_(record("done"))) + .compile + .drain *> st.get + } + .asserting( + _ shouldBe List( + "acquire 1", + "use", + "release 1", + "acquire 21", + "acquire 22", + "use", + "release 22", + "release 21", + "suspend", + "acquire 3", + "use", + "release 3", + "done" + ) + ) + } + + "resourceWeak" in { + Ref[IO] + .of(List.empty[String]) + .flatMap { st => + def record(s: String): IO[Unit] = st.update(_ :+ s) + def mkRes(s: String): Resource[IO, Unit] = + Resource.make(record(s"acquire $s"))(_ => record(s"release $s")) + + // We aim to trigger all the possible cases, and make sure none of them + // introduce scopes. + + // Allocate + val res1 = mkRes("1") + // Bind + val res2 = mkRes("21") *> mkRes("22") + // Suspend + val res3 = Resource.suspend( + record("suspend").as(mkRes("3")) + ) + + List(res1, res2, res3) + .foldMap(Stream.resourceWeak) + .evalTap(_ => record("use")) + .append(Stream.eval_(record("done"))) + .compile + .drain *> st.get + } + .asserting( + _ shouldBe List( + "acquire 1", + "use", + "acquire 21", + "acquire 22", + "use", + "suspend", + "acquire 3", + "use", + "done", + "release 3", + "release 22", + "release 21", + "release 1" + ) + ) + } + "resource safety" - { "1" in { forAll { (s1: Stream[Pure, Int]) =>