diff --git a/build.sbt b/build.sbt index b2d185a520..5314baa516 100644 --- a/build.sbt +++ b/build.sbt @@ -5,8 +5,11 @@ import sbtcrossproject.crossProject val ReleaseTag = """^release/([\d\.]+a?)$""".r -addCommandAlias("fmt", "; compile:scalafmt; test:scalafmt; scalafmtSbt") -addCommandAlias("fmtCheck", "; compile:scalafmtCheck; test:scalafmtCheck; scalafmtSbtCheck") +addCommandAlias("fmt", "; compile:scalafmt; test:scalafmt; it:scalafmt; scalafmtSbt") +addCommandAlias( + "fmtCheck", + "; compile:scalafmtCheck; test:scalafmtCheck; it:scalafmtCheck; scalafmtSbtCheck" +) lazy val contributors = Seq( "pchiusano" -> "Paul Chiusano", @@ -257,8 +260,16 @@ lazy val root = project .settings(noPublish) .aggregate(coreJVM, coreJS, io, reactiveStreams, benchmark, experimental) +lazy val IntegrationTest = config("it").extend(Test) + lazy val core = crossProject(JVMPlatform, JSPlatform) .in(file("core")) + .configs(IntegrationTest) + .settings(Defaults.itSettings: _*) + .settings( + testOptions in IntegrationTest := Seq(Tests.Argument(TestFrameworks.ScalaTest, "-oDF")), + inConfig(IntegrationTest)(org.scalafmt.sbt.ScalafmtPlugin.scalafmtConfigSettings) + ) .settings(crossCommonSettings: _*) .settings( name := "fs2-core", diff --git a/core/jvm/src/it/scala/fs2/MemoryLeakSpec.scala b/core/jvm/src/it/scala/fs2/MemoryLeakSpec.scala new file mode 100644 index 0000000000..9480ed5f22 --- /dev/null +++ b/core/jvm/src/it/scala/fs2/MemoryLeakSpec.scala @@ -0,0 +1,255 @@ +package fs2 + +import scala.concurrent.ExecutionContext +import scala.concurrent.duration._ + +import java.lang.management.ManagementFactory +import java.nio.file.{Files, Path} + +import cats.effect.{ContextShift, IO, Timer} +import cats.implicits._ + +import org.scalatest.funsuite.AnyFunSuite + +import fs2.concurrent._ + +class MemoryLeakSpec extends AnyFunSuite { + + lazy protected implicit val ioContextShift: ContextShift[IO] = + IO.contextShift(ExecutionContext.Implicits.global) + lazy protected implicit val ioTimer: Timer[IO] = IO.timer(ExecutionContext.global) + + private def heapUsed: IO[Long] = + IO { + val runtime = Runtime.getRuntime + runtime.gc() + val total = runtime.totalMemory() + val free = runtime.freeMemory() + total - free + } + + protected def leakTest[O]( + name: String, + warmupIterations: Int = 3, + samplePeriod: FiniteDuration = 3.seconds, + monitorPeriod: FiniteDuration = 30.seconds, + limitBytesIncrease: Long = 10 * 1024 * 1024 + )(stream: => Stream[IO, O]): Unit = + test(name) { + IO.race( + stream.compile.drain, + IO.race( + monitorHeap(warmupIterations, samplePeriod, limitBytesIncrease), + IO.sleep(monitorPeriod) + ) + ) + .map { + case Left(_) => () + case Right(Right(_)) => () + case Right(Left(path)) => + fail(s"leak detected - heap dump: $path") + } + .unsafeRunSync() + } + + private def monitorHeap( + warmupIterations: Int, + samplePeriod: FiniteDuration, + limitBytesIncrease: Long + ): IO[Path] = { + def warmup(iterationsLeft: Int): IO[Path] = + if (iterationsLeft > 0) IO.sleep(samplePeriod) >> warmup(iterationsLeft - 1) + else heapUsed.flatMap(go) + + def go(initial: Long): IO[Path] = + IO.sleep(samplePeriod) >> + heapUsed.flatMap { bytes => + val delta = bytes - initial + if (delta > limitBytesIncrease) dumpHeap + else go(initial) + } + + warmup(warmupIterations) + } + + private def dumpHeap: IO[Path] = + IO { + val path = Files.createTempFile("fs2-leak-test-", ".hprof") + Files.delete(path) + val server = ManagementFactory.getPlatformMBeanServer + val mbean = ManagementFactory.newPlatformMXBeanProxy( + server, + "com.sun.management:type=HotSpotDiagnostic", + classOf[com.sun.management.HotSpotDiagnosticMXBean] + ) + mbean.dumpHeap(path.toString, true) + path + } + + leakTest("groupWithin") { + Stream + .eval(IO.never) + .covary[IO] + .groupWithin(Int.MaxValue, 1.millis) + } + + leakTest("groupWithin 2") { + def a: Stream[IO, Chunk[Int]] = + Stream + .eval(IO.never) + .covary[IO] + .groupWithin(Int.MaxValue, 1.second) + .interruptAfter(100.millis) ++ a + a + } + + leakTest("topic continuous publish") { + Stream + .eval(Topic[IO, Int](-1)) + .flatMap(topic => Stream.repeatEval(topic.publish1(1))) + } + + leakTest("brackets") { + Stream.constant(1).flatMap { _ => + Stream.bracket(IO.unit)(_ => IO.unit).flatMap(_ => Stream.emits(List(1, 2, 3))) + } + } + + leakTest("repeatPull") { + def id[F[_], A]: Pipe[F, A, A] = + _.repeatPull { + _.uncons1.flatMap { + case Some((h, t)) => Pull.output1(h).as(Some(t)) + case None => Pull.pure(None) + } + } + Stream.constant(1).covary[IO].through(id[IO, Int]) + } + + leakTest("repeatEval") { + def id[F[_], A]: Pipe[F, A, A] = { + def go(s: Stream[F, A]): Pull[F, A, Unit] = + s.pull.uncons1.flatMap { + case Some((h, t)) => Pull.output1(h) >> go(t); case None => Pull.done + } + in => go(in).stream + } + Stream.repeatEval(IO(1)).through(id[IO, Int]) + } + + leakTest("append") { + (Stream.constant(1).covary[IO] ++ Stream.empty).pull.echo.stream + } + + leakTest("drain onComplete") { + val s = Stream.repeatEval(IO(1)).pull.echo.stream.drain ++ Stream.eval_(IO(println("done"))) + Stream.empty.covary[IO].merge(s) + } + + leakTest("parJoin") { + Stream.constant(Stream.empty[IO]).parJoin(5) + } + + leakTest("dangling dequeue") { + Stream + .eval(Queue.unbounded[IO, Int]) + .flatMap(q => Stream.constant(1).flatMap(_ => Stream.empty.mergeHaltBoth(q.dequeue))) + } + + leakTest("awakeEvery") { + Stream.awakeEvery[IO](1.millis).flatMap(_ => Stream.eval(IO.unit)) + } + + leakTest("signal discrete") { + Stream + .eval(SignallingRef[IO, Unit](())) + .flatMap(signal => signal.discrete.evalMap(a => signal.set(a))) + } + + leakTest("signal continuous") { + Stream + .eval(SignallingRef[IO, Unit](())) + .flatMap(signal => signal.continuous.evalMap(a => signal.set(a))) + } + + leakTest("constant eval") { + var cnt = 0 + var start = System.currentTimeMillis + Stream + .constant(()) + .flatMap { _ => + Stream.eval(IO { + cnt = (cnt + 1) % 1000000 + if (cnt == 0) { + val now = System.currentTimeMillis + println("Elapsed: " + (now - start)) + start = now + } + }) + } + } + + leakTest("recursive flatMap") { + def loop: Stream[IO, Unit] = Stream(()).covary[IO].flatMap(_ => loop) + loop + } + + leakTest("eval + flatMap + map") { + Stream + .eval(IO.unit) + .flatMap(_ => Stream.emits(Seq())) + .map(x => x) + .repeat + } + + leakTest("queue") { + Stream + .eval(Queue.bounded[IO, Either[Throwable, Option[Int]]](10)) + .flatMap { queue => + queue + .dequeueChunk(Int.MaxValue) + .rethrow + .unNoneTerminate + .concurrently( + Stream + .constant(1, 128) + .covary[IO] + .noneTerminate + .attempt + .evalMap(queue.enqueue1(_)) + ) + .evalMap(_ => IO.unit) + } + + } + + leakTest("progress merge") { + val progress = Stream.constant(1, 128).covary[IO] + progress.merge(progress) + } + + leakTest("hung merge") { + val hung = Stream.eval(IO.async[Int](_ => ())) + val progress = Stream.constant(1, 128).covary[IO] + hung.merge(progress) + } + + leakTest("zip + flatMap + parJoin") { + val sources: Stream[IO, Stream[IO, Int]] = Stream(Stream.empty).repeat + Stream + .fixedDelay[IO](1.milliseconds) + .zip(sources) + .flatMap { + case (_, s) => + s.map(Stream.constant(_).covary[IO]).parJoinUnbounded + } + } + + leakTest("retry") { + Stream.retry(IO.unit, 1.second, _ * 2, 10).repeat + } + + leakTest("attempts + pull") { + Stream.eval(IO.unit).attempts(Stream.constant(1.second)).head.repeat + } +} diff --git a/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala b/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala deleted file mode 100644 index 2f9f4d7e5b..0000000000 --- a/core/jvm/src/test/scala/fs2/MemorySanityChecks.scala +++ /dev/null @@ -1,244 +0,0 @@ -package fs2 - -import scala.concurrent.ExecutionContext -import cats.effect.{ContextShift, IO, Timer} -import fs2.concurrent.{Queue, SignallingRef, Topic} - -// Sanity tests - not run as part of unit tests, but these should run forever -// at constant memory. -object GroupWithinSanityTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - - import scala.concurrent.duration._ - - Stream - .eval(IO.never) - .covary[IO] - .groupWithin(Int.MaxValue, 1.millis) - .compile - .drain - .unsafeRunSync() -} - -object GroupWithinSanityTest2 extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - - import scala.concurrent.duration._ - - val a: Stream[IO, Chunk[Int]] = Stream - .eval(IO.never) - .covary[IO] - .groupWithin(Int.MaxValue, 1.second) - .interruptAfter(100.millis) ++ a - - a.compile.drain - .unsafeRunSync() -} - -object TopicContinuousPublishSanityTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - Topic[IO, Int](-1) - .flatMap(topic => Stream.repeatEval(topic.publish1(1)).compile.drain) - .unsafeRunSync() -} - -object ResourceTrackerSanityTest extends App { - val big = Stream.constant(1).flatMap { _ => - Stream.bracket(IO(()))(_ => IO(())).flatMap(_ => Stream.emits(List(1, 2, 3))) - } - big.compile.drain.unsafeRunSync() -} - -object RepeatPullSanityTest extends App { - def id[F[_], A]: Pipe[F, A, A] = - _.repeatPull { - _.uncons1.flatMap { - case Some((h, t)) => Pull.output1(h).as(Some(t)); - case None => Pull.pure(None) - } - } - Stream.constant(1).covary[IO].through(id[IO, Int]).compile.drain.unsafeRunSync() -} - -object RepeatEvalSanityTest extends App { - def id[F[_], A]: Pipe[F, A, A] = { - def go(s: Stream[F, A]): Pull[F, A, Unit] = - s.pull.uncons1.flatMap { - case Some((h, t)) => Pull.output1(h) >> go(t); case None => Pull.done - } - in => go(in).stream - } - Stream.repeatEval(IO(1)).through(id[IO, Int]).compile.drain.unsafeRunSync() -} - -object AppendSanityTest extends App { - (Stream.constant(1).covary[IO] ++ Stream.empty).pull.echo.stream.compile.drain - .unsafeRunSync() -} - -object DrainOnCompleteSanityTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - val s = Stream.repeatEval(IO(1)).pull.echo.stream.drain ++ Stream.eval_(IO(println("done"))) - Stream.empty.covary[IO].merge(s).compile.drain.unsafeRunSync() -} - -object ParJoinSanityTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - Stream - .constant(Stream.empty[IO]) - .parJoin(5) - .compile - .drain - .unsafeRunSync -} - -object DanglingDequeueSanityTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - Stream - .eval(Queue.unbounded[IO, Int]) - .flatMap(q => Stream.constant(1).flatMap(_ => Stream.empty.mergeHaltBoth(q.dequeue))) - .compile - .drain - .unsafeRunSync -} - -object AwakeEverySanityTest extends App { - import scala.concurrent.duration._ - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.Implicits.global) - Stream - .awakeEvery[IO](1.millis) - .flatMap(_ => Stream.eval(IO(()))) - .compile - .drain - .unsafeRunSync -} - -object SignalDiscreteSanityTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - Stream - .eval(SignallingRef[IO, Unit](())) - .flatMap(signal => signal.discrete.evalMap(a => signal.set(a))) - .compile - .drain - .unsafeRunSync -} - -object SignalContinuousSanityTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - Stream - .eval(SignallingRef[IO, Unit](())) - .flatMap(signal => signal.continuous.evalMap(a => signal.set(a))) - .compile - .drain - .unsafeRunSync -} - -object ConstantEvalSanityTest extends App { - var cnt = 0 - var start = System.currentTimeMillis - Stream - .constant(()) - .flatMap { _ => - Stream.eval(IO { - cnt = (cnt + 1) % 1000000 - if (cnt == 0) { - val now = System.currentTimeMillis - println("Elapsed: " + (now - start)) - start = now - } - }) - } - .compile - .drain - .unsafeRunSync -} - -object RecursiveFlatMapTest extends App { - def loop: Stream[IO, Unit] = Stream(()).covary[IO].flatMap(_ => loop) - loop.compile.drain.unsafeRunSync -} - -object EvalFlatMapMapTest extends App { - Stream - .eval(IO(())) - .flatMap(_ => Stream.emits(Seq())) - .map(x => x) - .repeat - .compile - .drain - .unsafeRunSync() -} - -object QueueTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - Stream - .eval(Queue.bounded[IO, Either[Throwable, Option[Int]]](10)) - .flatMap { queue => - queue - .dequeueChunk(Int.MaxValue) - .rethrow - .unNoneTerminate - .concurrently( - Stream - .constant(1, 128) - .covary[IO] - .noneTerminate - .attempt - .evalMap(queue.enqueue1(_)) - ) - .evalMap(_ => IO.unit) - } - .compile - .drain - .unsafeRunSync() -} - -object ProgressMerge extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - val progress = Stream.constant(1, 128).covary[IO] - progress.merge(progress).compile.drain.unsafeRunSync() -} - -object HungMerge extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - val hung = Stream.eval(IO.async[Int](_ => ())) - val progress = Stream.constant(1, 128).covary[IO] - hung.merge(progress).compile.drain.unsafeRunSync() -} - -object ZipThenBindThenParJoin extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.Implicits.global) - import scala.concurrent.duration._ - - val sources: Stream[IO, Stream[IO, Int]] = Stream(Stream.empty).repeat - - Stream - .fixedDelay[IO](1.milliseconds) - .zip(sources) - .flatMap { - case (_, s) => - s.map(Stream.constant(_).covary[IO]).parJoinUnbounded - } - .compile - .drain - .unsafeRunSync() -} - -object RetrySanityTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - import scala.concurrent.duration._ - - Stream.retry(IO.unit, 1.second, _ * 2, 10).repeat.compile.drain.unsafeRunSync() -} - -object AttemptsThenPullSanityTest extends App { - implicit val cs: ContextShift[IO] = IO.contextShift(ExecutionContext.Implicits.global) - implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global) - import scala.concurrent.duration._ - Stream.eval(IO.unit).attempts(Stream.constant(1.second)).head.repeat.compile.drain.unsafeRunSync() -} diff --git a/core/shared/src/main/scala/fs2/concurrent/Signal.scala b/core/shared/src/main/scala/fs2/concurrent/Signal.scala index ccbbc95c25..3185d2f563 100644 --- a/core/shared/src/main/scala/fs2/concurrent/Signal.scala +++ b/core/shared/src/main/scala/fs2/concurrent/Signal.scala @@ -4,7 +4,7 @@ package concurrent import cats.{Applicative, Functor, Invariant} import cats.data.{OptionT, State} import cats.effect.{Async, Concurrent, Sync} -import cats.effect.concurrent.Ref +import cats.effect.concurrent.{Deferred, Ref} import cats.implicits._ import fs2.internal.Token @@ -145,75 +145,112 @@ object SignallingRef { * Like [[apply]], but initializes state using another effect constructor. */ def in[G[_]: Sync, F[_]: Concurrent, A](initial: A): G[SignallingRef[F, A]] = - Ref.in[G, F, (Long, A)]((0, initial)).flatMap { ref => - PubSub.in[G].from(PubSub.Strategy.Discrete.strategy[A](0, initial)).map { pubSub => - def modify_[B](f: A => (A, B))(stamped: (Long, A)): ((Long, A), ((Long, (Long, A)), B)) = { - val (a1, b) = f(stamped._2) - val stamp = stamped._1 + 1 - ((stamp, a1), ((stamped._1, (stamp, a1)), b)) - } - - new SignallingRef[F, A] { - def get: F[A] = - ref.get.map(_._2) - - def continuous: Stream[F, A] = - Stream.repeatEval(get) - - def set(a: A): F[Unit] = - update(_ => a) - - override def getAndSet(a: A): F[A] = - modify(old => (a, old)) - - def access: F[(A, A => F[Boolean])] = - ref.access.map { - case (access, setter) => - ( - access._2, - { (a: A) => - setter((access._1 + 1, a)).flatMap { success => - if (success) - Concurrent[F] - .start(pubSub.publish((access._1, (access._1 + 1, a)))) - .as(true) - else Applicative[F].pure(false) - } - } - ) + Ref + .in[G, F, (A, Long, Map[Token, Deferred[F, (A, Long)]])]((initial, 0L, Map.empty)) + .map(state => new SignallingRefImpl[F, A](state)) + + private final class SignallingRefImpl[F[_], A]( + state: Ref[F, (A, Long, Map[Token, Deferred[F, (A, Long)]])] + )(implicit F: Concurrent[F]) + extends SignallingRef[F, A] { + + override def get: F[A] = state.get.map(_._1) + + override def continuous: Stream[F, A] = + Stream.repeatEval(get) + + override def discrete: Stream[F, A] = { + def go(id: Token, lastUpdate: Long): Stream[F, A] = { + def getNext: F[(A, Long)] = + Deferred[F, (A, Long)] + .flatMap { deferred => + state.modify { + case s @ (a, updates, listeners) => + if (updates != lastUpdate) s -> (a -> updates).pure[F] + else (a, updates, listeners + (id -> deferred)) -> deferred.get + }.flatten } - def tryUpdate(f: A => A): F[Boolean] = - tryModify(a => (f(a), ())).map(_.nonEmpty) + Stream.eval(getNext).flatMap { case (a, l) => Stream.emit(a) ++ go(id, l) } + } - def tryModify[B](f: A => (A, B)): F[Option[B]] = - ref.tryModify(modify_(f)).flatMap { - case None => Applicative[F].pure(None) - case Some((signal, b)) => Concurrent[F].start(pubSub.publish(signal)).as(Some(b)) - } + def cleanup(id: Token): F[Unit] = + state.update(s => s.copy(_3 = s._3 - id)) - def update(f: A => A): F[Unit] = - modify(a => (f(a), ())) + Stream.bracket(F.delay(new Token))(cleanup).flatMap { id => + Stream.eval(state.get).flatMap { + case (a, l, _) => Stream.emit(a) ++ go(id, l) + } + } + } + + override def set(a: A): F[Unit] = update(_ => a) + + override def getAndSet(a: A): F[A] = modify(old => (a, old)) + + override def access: F[(A, A => F[Boolean])] = + state.access.flatMap { + case (snapshot, set) => + F.delay { + val hasBeenCalled = new java.util.concurrent.atomic.AtomicBoolean(false) + val setter = + (a: A) => + F.delay(hasBeenCalled.compareAndSet(false, true)) + .ifM( + if (a == snapshot._1) set((a, snapshot._2, snapshot._3)) else F.pure(false), + F.pure(false) + ) + (snapshot._1, setter) + } + } - def modify[B](f: A => (A, B)): F[B] = - ref.modify(modify_(f)).flatMap { - case (signal, b) => - Concurrent[F].start(pubSub.publish(signal)).as(b) + override def tryUpdate(f: A => A): F[Boolean] = + F.map(tryModify(a => (f(a), ())))(_.isDefined) + + override def tryModify[B](f: A => (A, B)): F[Option[B]] = + state + .tryModify { + case (a, updates, listeners) => + val (newA, result) = f(a) + val newUpdates = updates + 1 + val newState = (newA, newUpdates, Map.empty[Token, Deferred[F, (A, Long)]]) + val action = listeners.toVector.traverse { + case (_, deferred) => + F.start(deferred.complete(newA -> newUpdates)) } + newState -> (action *> result.pure[F]) + } + .flatMap { + case None => F.pure(None) + case Some(fb) => fb.map(Some(_)) + } - def tryModifyState[B](state: State[A, B]): F[Option[B]] = - tryModify(a => state.run(a).value) + override def update(f: A => A): F[Unit] = + modify(a => (f(a), ())) + + override def modify[B](f: A => (A, B)): F[B] = + state.modify { + case (a, updates, listeners) => + val (newA, result) = f(a) + val newUpdates = updates + 1 + val newState = (newA, newUpdates, Map.empty[Token, Deferred[F, (A, Long)]]) + val action = listeners.toVector.traverse { + case (_, deferred) => + F.start(deferred.complete(newA -> newUpdates)) + } + newState -> (action *> result.pure[F]) + }.flatten - def modifyState[B](state: State[A, B]): F[B] = - modify(a => state.run(a).value) + override def tryModifyState[B](state: State[A, B]): F[Option[B]] = { + val f = state.runF.value + tryModify(a => f(a).value) + } - def discrete: Stream[F, A] = - Stream.bracket(Sync[F].delay(Some(new Token)))(pubSub.unsubscribe).flatMap { selector => - pubSub.getStream(selector) - } - } - } + override def modifyState[B](state: State[A, B]): F[B] = { + val f = state.runF.value + modify(a => f(a).value) } + } implicit def invariantInstance[F[_]: Functor]: Invariant[SignallingRef[F, ?]] = new Invariant[SignallingRef[F, ?]] {