Skip to content

Commit

Permalink
Merge pull request #1683 from nigredo-tori/stream-resourceweak
Browse files Browse the repository at this point in the history
Add Stream.resourceWeak
  • Loading branch information
mpilquist authored Nov 21, 2019
2 parents 2df1cab + df31156 commit 98de720
Show file tree
Hide file tree
Showing 2 changed files with 105 additions and 4 deletions.
17 changes: 13 additions & 4 deletions core/shared/src/main/scala/fs2/Stream.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3432,11 +3432,20 @@ object Stream extends StreamLowPriority {
def repeatEval[F[_], O](fo: F[O]): Stream[F, O] = eval(fo).repeat

/** Converts the supplied resource in to a singleton stream. */
def resource[F[_], O](r: Resource[F, O]): Stream[F, O] = r match {
def resource[F[_], O](r: Resource[F, O]): Stream[F, O] =
resourceWeak(r).scope

/**
* Like [[resource]] but does not introduce a scope, allowing finalization to occur after
* subsequent appends or other scope-preserving transformations.
*
* Scopes can be manually introduced via [[scope]] if desired.
*/
def resourceWeak[F[_], O](r: Resource[F, O]): Stream[F, O] = r match {
case Resource.Allocate(a) =>
Stream.bracketCase(a) { case ((_, release), e) => release(e) }.map(_._1)
case Resource.Bind(r, f) => resource(r).flatMap(o => resource(f(o)))
case Resource.Suspend(r) => Stream.eval(r).flatMap(resource)
Stream.bracketCaseWeak(a) { case ((_, release), e) => release(e) }.map(_._1)
case Resource.Bind(r, f) => resourceWeak(r).flatMap(o => resourceWeak(f(o)))
case Resource.Suspend(r) => Stream.eval(r).flatMap(resourceWeak)
}

/**
Expand Down
92 changes: 92 additions & 0 deletions core/shared/src/test/scala/fs2/StreamSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2710,6 +2710,98 @@ class StreamSpec extends Fs2Spec {
}
}

"resource" in {
Ref[IO]
.of(List.empty[String])
.flatMap { st =>
def record(s: String): IO[Unit] = st.update(_ :+ s)
def mkRes(s: String): Resource[IO, Unit] =
Resource.make(record(s"acquire $s"))(_ => record(s"release $s"))

// We aim to trigger all the possible cases, and make sure all of them
// introduce scopes.

// Allocate
val res1 = mkRes("1")
// Bind
val res2 = mkRes("21") *> mkRes("22")
// Suspend
val res3 = Resource.suspend(
record("suspend").as(mkRes("3"))
)

List(res1, res2, res3)
.foldMap(Stream.resource)
.evalTap(_ => record("use"))
.append(Stream.eval_(record("done")))
.compile
.drain *> st.get
}
.asserting(
_ shouldBe List(
"acquire 1",
"use",
"release 1",
"acquire 21",
"acquire 22",
"use",
"release 22",
"release 21",
"suspend",
"acquire 3",
"use",
"release 3",
"done"
)
)
}

"resourceWeak" in {
Ref[IO]
.of(List.empty[String])
.flatMap { st =>
def record(s: String): IO[Unit] = st.update(_ :+ s)
def mkRes(s: String): Resource[IO, Unit] =
Resource.make(record(s"acquire $s"))(_ => record(s"release $s"))

// We aim to trigger all the possible cases, and make sure none of them
// introduce scopes.

// Allocate
val res1 = mkRes("1")
// Bind
val res2 = mkRes("21") *> mkRes("22")
// Suspend
val res3 = Resource.suspend(
record("suspend").as(mkRes("3"))
)

List(res1, res2, res3)
.foldMap(Stream.resourceWeak)
.evalTap(_ => record("use"))
.append(Stream.eval_(record("done")))
.compile
.drain *> st.get
}
.asserting(
_ shouldBe List(
"acquire 1",
"use",
"acquire 21",
"acquire 22",
"use",
"suspend",
"acquire 3",
"use",
"done",
"release 3",
"release 22",
"release 21",
"release 1"
)
)
}

"resource safety" - {
"1" in {
forAll { (s1: Stream[Pure, Int]) =>
Expand Down

0 comments on commit 98de720

Please # to comment.