From dead200f3d89961bb25bb690633ec09b0cbffdc9 Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Sun, 2 Feb 2020 12:25:45 +0300 Subject: [PATCH 1/4] Add withTimeout combinator --- core/shared/src/main/scala/fs2/Stream.scala | 24 +++++++++++++++++++ .../src/test/scala/fs2/StreamSpec.scala | 11 +++++++++ 2 files changed, 35 insertions(+) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 0131a46178..1e4ac1a6d2 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -13,6 +13,7 @@ import fs2.internal.{Resource => _, _} import java.io.PrintStream import scala.annotation.tailrec +import scala.concurrent.TimeoutException import scala.concurrent.duration._ /** @@ -2713,6 +2714,29 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) )(f: (Stream[F, O], Stream[F2, O2]) => Stream[F2, O3]): Stream[F2, O3] = f(this, s2) + /** Fails this stream with a [[TimeoutException]] if it does not complete within given `timeout`. */ + def withTimeout[F2[x] >: F[x]]( + timeout: FiniteDuration + )(implicit F2: Concurrent[F2], timer: Timer[F2]): Stream[F2, O] = + Stream.eval(Deferred.tryable[F2, Unit]).flatMap { complete => + Stream.eval(Deferred[F2, Either[Throwable, Unit]]).flatMap { interrupt => + val newStream = this.onFinalize(complete.complete(())).interruptWhen(interrupt) + val sleepAndInterrupt = + timer + .sleep(timeout) + .flatMap { _ => + complete.tryGet.flatMap { + case Some(_) => + F2.unit + case None => + interrupt.complete(Left(new TimeoutException(s"Timeout after $timeout"))) + } + } + + Stream.eval(sleepAndInterrupt.start).flatMap(_ => newStream) + } + } + /** * Translates effect type from `F` to `G` using the supplied `FunctionK`. * diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index f63598009e..3ab610592d 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -6,6 +6,7 @@ import cats.effect._ import cats.effect.concurrent.{Deferred, Ref, Semaphore} import cats.implicits._ import scala.concurrent.duration._ +import scala.concurrent.TimeoutException import org.scalactic.anyvals._ import org.scalatest.{Assertion, Succeeded} import fs2.concurrent.{Queue, SignallingRef} @@ -3786,4 +3787,14 @@ class StreamSpec extends Fs2Spec { } } } + + "withTimeout" - { + "timeout never-ending stream" in { + Stream.never[IO].withTimeout(100.millis).compile.drain.assertThrows[TimeoutException] + } + + "not trigger timeout on successfully completed stream" in { + Stream.sleep(10.millis).withTimeout(1.second).compile.drain.assertNoException + } + } } From 1086a617f21f79be167c461eaa4dfd31090ec5e4 Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Tue, 4 Feb 2020 00:13:07 +0300 Subject: [PATCH 2/4] Simplify the implementation --- core/shared/src/main/scala/fs2/Stream.scala | 26 +++++-------------- .../src/test/scala/fs2/StreamSpec.scala | 4 +-- 2 files changed, 9 insertions(+), 21 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 1e4ac1a6d2..79599e71c1 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2715,27 +2715,15 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) f(this, s2) /** Fails this stream with a [[TimeoutException]] if it does not complete within given `timeout`. */ - def withTimeout[F2[x] >: F[x]]( + def timeout[F2[x] >: F[x]]( timeout: FiniteDuration )(implicit F2: Concurrent[F2], timer: Timer[F2]): Stream[F2, O] = - Stream.eval(Deferred.tryable[F2, Unit]).flatMap { complete => - Stream.eval(Deferred[F2, Either[Throwable, Unit]]).flatMap { interrupt => - val newStream = this.onFinalize(complete.complete(())).interruptWhen(interrupt) - val sleepAndInterrupt = - timer - .sleep(timeout) - .flatMap { _ => - complete.tryGet.flatMap { - case Some(_) => - F2.unit - case None => - interrupt.complete(Left(new TimeoutException(s"Timeout after $timeout"))) - } - } - - Stream.eval(sleepAndInterrupt.start).flatMap(_ => newStream) - } - } + this.interruptWhen( + timer + .sleep(timeout) + .as(Left(new TimeoutException(s"Timed out after $timeout"))) + .widen[Either[Throwable, Unit]] + ) /** * Translates effect type from `F` to `G` using the supplied `FunctionK`. diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index 3ab610592d..3a69a7cb2b 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -3790,11 +3790,11 @@ class StreamSpec extends Fs2Spec { "withTimeout" - { "timeout never-ending stream" in { - Stream.never[IO].withTimeout(100.millis).compile.drain.assertThrows[TimeoutException] + Stream.never[IO].timeout(100.millis).compile.drain.assertThrows[TimeoutException] } "not trigger timeout on successfully completed stream" in { - Stream.sleep(10.millis).withTimeout(1.second).compile.drain.assertNoException + Stream.sleep(10.millis).timeout(1.second).compile.drain.assertNoException } } } From c23715c368690c67441bf9eb7d36e9c39ff25acb Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Tue, 4 Feb 2020 00:51:23 +0300 Subject: [PATCH 3/4] Add tests, use context bounds --- core/shared/src/main/scala/fs2/Stream.scala | 6 +++--- core/shared/src/test/scala/fs2/StreamSpec.scala | 12 ++++++++++++ 2 files changed, 15 insertions(+), 3 deletions(-) diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 79599e71c1..840b084a5e 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -2715,11 +2715,11 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) f(this, s2) /** Fails this stream with a [[TimeoutException]] if it does not complete within given `timeout`. */ - def timeout[F2[x] >: F[x]]( + def timeout[F2[x] >: F[x]: Concurrent: Timer]( timeout: FiniteDuration - )(implicit F2: Concurrent[F2], timer: Timer[F2]): Stream[F2, O] = + ): Stream[F2, O] = this.interruptWhen( - timer + Timer[F2] .sleep(timeout) .as(Left(new TimeoutException(s"Timed out after $timeout"))) .widen[Either[Throwable, Unit]] diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index 3a69a7cb2b..bd51f3107f 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -3796,5 +3796,17 @@ class StreamSpec extends Fs2Spec { "not trigger timeout on successfully completed stream" in { Stream.sleep(10.millis).timeout(1.second).compile.drain.assertNoException } + + "compose timeouts d1 and d2 when d1 < d2" in { + val d1 = 20.millis + val d2 = 30.millis + (Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(30.millis)).timeout(d2).compile.drain.assertThrows[TimeoutException] + } + + "compose timeouts d1 and d2 when d1 > d2" in { + val d1 = 40.millis + val d2 = 30.millis + (Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(25.millis)).timeout(d2).compile.drain.assertThrows[TimeoutException] + } } } From 3c1a76e80925f3244d105e784e93e42c6a4346ac Mon Sep 17 00:00:00 2001 From: Z1kkurat Date: Tue, 4 Feb 2020 01:21:58 +0300 Subject: [PATCH 4/4] Format test --- core/shared/src/test/scala/fs2/StreamSpec.scala | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index bd51f3107f..4aeb62668e 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -3800,13 +3800,21 @@ class StreamSpec extends Fs2Spec { "compose timeouts d1 and d2 when d1 < d2" in { val d1 = 20.millis val d2 = 30.millis - (Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(30.millis)).timeout(d2).compile.drain.assertThrows[TimeoutException] + (Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(30.millis)) + .timeout(d2) + .compile + .drain + .assertThrows[TimeoutException] } "compose timeouts d1 and d2 when d1 > d2" in { val d1 = 40.millis val d2 = 30.millis - (Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(25.millis)).timeout(d2).compile.drain.assertThrows[TimeoutException] + (Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(25.millis)) + .timeout(d2) + .compile + .drain + .assertThrows[TimeoutException] } } }