From 0939d2d2a303844ffd30a6dd529813c3e559ef6b Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Fri, 28 Jan 2022 12:13:44 +0000 Subject: [PATCH 1/5] Add Stream.fromAutoCloseable and Stream.fromAutoCloseableWeak --- core/shared/src/main/scala/fs2/Stream.scala | 12 ++++++++++ .../src/test/scala/fs2/StreamSuite.scala | 24 +++++++++++++++++++ 2 files changed, 36 insertions(+) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 2c3f2e9b95..71c3491059 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3622,6 +3622,18 @@ object Stream extends StreamLowPriority { case Resource.Pure(o) => Stream.emit(o) } + /** Converts the supplied [[Autoclosable]] in to a singleton stream. */ + def fromAutoClosable[F[_]: Sync, O <: AutoCloseable](fo: F[O]): Stream[F, O] = + Stream.resource(Resource.fromAutoCloseable(fo)) + + /** Like [[fromAutoClosable]] but does not introduce a scope, allowing finalization to occur after + * subsequent appends or other scope-preserving transformations. + * + * Scopes can be manually introduced via [[Stream#scope]] if desired. + */ + def fromAutoClosableWeak[F[_]: Sync, O <: AutoCloseable](fo: F[O]): Stream[F, O] = + Stream.resourceWeak(Resource.fromAutoCloseable(fo)) + /** Retries `fo` on failure, returning a singleton stream with the * result of `fo` as soon as it succeeds. * diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index c8b9444c2d..b9216a30a6 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -602,6 +602,30 @@ class StreamSuite extends Fs2Suite { } } + test("fromAutoClosable") { + class Auto(var closed: Boolean = false) extends AutoCloseable { + override def close(): Unit = + closed = true + } + + Stream.fromAutoClosable(IO(new Auto())).compile.toList.flatMap { + case h :: Nil => IO(assert(h.closed)) + case _ => IO(fail("Did not close AutoClosable")) + } + } + + test("fromAutoClosableWeak") { + class Auto(var closed: Boolean = false) extends AutoCloseable { + override def close(): Unit = + closed = true + } + + Stream.fromAutoClosableWeak(IO(new Auto())).compile.toList.flatMap { + case h :: Nil => IO(assert(h.closed)) + case _ => IO(fail("Did not close AutoClosable")) + } + } + group("resource safety") { test("1") { forAllF { (s1: Stream[Pure, Int]) => From 8cfc1532d309db44da73ac8404c816f5de4bbe68 Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Fri, 28 Jan 2022 12:14:53 +0000 Subject: [PATCH 2/5] Fix typos --- core/shared/src/main/scala/fs2/Stream.scala | 4 ++-- core/shared/src/test/scala/fs2/StreamSuite.scala | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 71c3491059..4fa6f37972 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3623,7 +3623,7 @@ object Stream extends StreamLowPriority { } /** Converts the supplied [[Autoclosable]] in to a singleton stream. */ - def fromAutoClosable[F[_]: Sync, O <: AutoCloseable](fo: F[O]): Stream[F, O] = + def fromAutoCloseable[F[_]: Sync, O <: AutoCloseable](fo: F[O]): Stream[F, O] = Stream.resource(Resource.fromAutoCloseable(fo)) /** Like [[fromAutoClosable]] but does not introduce a scope, allowing finalization to occur after @@ -3631,7 +3631,7 @@ object Stream extends StreamLowPriority { * * Scopes can be manually introduced via [[Stream#scope]] if desired. */ - def fromAutoClosableWeak[F[_]: Sync, O <: AutoCloseable](fo: F[O]): Stream[F, O] = + def fromAutoCloseableWeak[F[_]: Sync, O <: AutoCloseable](fo: F[O]): Stream[F, O] = Stream.resourceWeak(Resource.fromAutoCloseable(fo)) /** Retries `fo` on failure, returning a singleton stream with the diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index b9216a30a6..e1d22c9119 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -608,7 +608,7 @@ class StreamSuite extends Fs2Suite { closed = true } - Stream.fromAutoClosable(IO(new Auto())).compile.toList.flatMap { + Stream.fromAutoCloseable(IO(new Auto())).compile.toList.flatMap { case h :: Nil => IO(assert(h.closed)) case _ => IO(fail("Did not close AutoClosable")) } @@ -620,7 +620,7 @@ class StreamSuite extends Fs2Suite { closed = true } - Stream.fromAutoClosableWeak(IO(new Auto())).compile.toList.flatMap { + Stream.fromAutoCloseableWeak(IO(new Auto())).compile.toList.flatMap { case h :: Nil => IO(assert(h.closed)) case _ => IO(fail("Did not close AutoClosable")) } From 14bcbcaf0c1364ee4f820fddae7ebb189bd72dac Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Fri, 28 Jan 2022 12:31:20 +0000 Subject: [PATCH 3/5] Modify fromAutoCloseableWeak test to assert scope is held open --- core/shared/src/test/scala/fs2/StreamSuite.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index e1d22c9119..d009f8e830 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -32,6 +32,7 @@ import org.scalacheck.Arbitrary.arbitrary import org.scalacheck.Prop.forAll import org.scalacheck.effect.PropF.forAllF import fs2.concurrent.SignallingRef +import java.util.concurrent.atomic.AtomicInteger class StreamSuite extends Fs2Suite { @@ -615,14 +616,16 @@ class StreamSuite extends Fs2Suite { } test("fromAutoClosableWeak") { - class Auto(var closed: Boolean = false) extends AutoCloseable { + val counter = new AtomicInteger() + class Auto(var i: Int = 0) extends AutoCloseable { override def close(): Unit = - closed = true + i = counter.incrementAndGet() } - Stream.fromAutoCloseableWeak(IO(new Auto())).compile.toList.flatMap { - case h :: Nil => IO(assert(h.closed)) - case _ => IO(fail("Did not close AutoClosable")) + (Stream.fromAutoCloseableWeak(IO(new Auto())) + ++ Stream.fromAutoCloseable(IO(new Auto()))).compile.toList.flatMap { + case x :: y :: Nil => IO(assertEquals(x.i, 2)) >> IO(assertEquals(y.i, 1)) + case _ => IO(fail("Did not close AutoClosable")) } } From 22bf534e58058b9b7421b622fa47b7761ee7f73b Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Fri, 28 Jan 2022 12:32:13 +0000 Subject: [PATCH 4/5] AutoCloseable is a made-up word and thus hard to spell --- core/shared/src/test/scala/fs2/StreamSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamSuite.scala b/core/shared/src/test/scala/fs2/StreamSuite.scala index d009f8e830..d72d836e76 100644 --- a/core/shared/src/test/scala/fs2/StreamSuite.scala +++ b/core/shared/src/test/scala/fs2/StreamSuite.scala @@ -603,7 +603,7 @@ class StreamSuite extends Fs2Suite { } } - test("fromAutoClosable") { + test("fromAutoCloseable") { class Auto(var closed: Boolean = false) extends AutoCloseable { override def close(): Unit = closed = true @@ -615,7 +615,7 @@ class StreamSuite extends Fs2Suite { } } - test("fromAutoClosableWeak") { + test("fromAutoCloseableWeak") { val counter = new AtomicInteger() class Auto(var i: Int = 0) extends AutoCloseable { override def close(): Unit = From 98ec92e574a30079d66ab71a699891edb16ccc74 Mon Sep 17 00:00:00 2001 From: Tim Spence Date: Fri, 28 Jan 2022 12:35:12 +0000 Subject: [PATCH 5/5] Fix scaladoc --- core/shared/src/main/scala/fs2/Stream.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 4fa6f37972..dbe9ab12d2 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -3599,7 +3599,7 @@ object Stream extends StreamLowPriority { /** Alias for `eval(fo).repeat`. */ def repeatEval[F[_], O](fo: F[O]): Stream[F, O] = eval(fo).repeat - /** Converts the supplied resource in to a singleton stream. */ + /** Converts the supplied resource into a singleton stream. */ def resource[F[_], O](r: Resource[F, O])(implicit F: MonadCancel[F, _]): Stream[F, O] = resourceWeak(r).scope @@ -3622,7 +3622,7 @@ object Stream extends StreamLowPriority { case Resource.Pure(o) => Stream.emit(o) } - /** Converts the supplied [[Autoclosable]] in to a singleton stream. */ + /** Converts the supplied [[java.lang.Autoclosable]] into a singleton stream. */ def fromAutoCloseable[F[_]: Sync, O <: AutoCloseable](fo: F[O]): Stream[F, O] = Stream.resource(Resource.fromAutoCloseable(fo))