diff --git a/core/shared/src/main/scala/fs2/Stream.scala b/core/shared/src/main/scala/fs2/Stream.scala index 0131a46178..840b084a5e 100644 --- a/core/shared/src/main/scala/fs2/Stream.scala +++ b/core/shared/src/main/scala/fs2/Stream.scala @@ -13,6 +13,7 @@ import fs2.internal.{Resource => _, _} import java.io.PrintStream import scala.annotation.tailrec +import scala.concurrent.TimeoutException import scala.concurrent.duration._ /** @@ -2713,6 +2714,17 @@ final class Stream[+F[_], +O] private[fs2] (private val free: FreeC[F, O, Unit]) )(f: (Stream[F, O], Stream[F2, O2]) => Stream[F2, O3]): Stream[F2, O3] = f(this, s2) + /** Fails this stream with a [[TimeoutException]] if it does not complete within given `timeout`. */ + def timeout[F2[x] >: F[x]: Concurrent: Timer]( + timeout: FiniteDuration + ): Stream[F2, O] = + this.interruptWhen( + Timer[F2] + .sleep(timeout) + .as(Left(new TimeoutException(s"Timed out after $timeout"))) + .widen[Either[Throwable, Unit]] + ) + /** * Translates effect type from `F` to `G` using the supplied `FunctionK`. * diff --git a/core/shared/src/test/scala/fs2/StreamSpec.scala b/core/shared/src/test/scala/fs2/StreamSpec.scala index f63598009e..4aeb62668e 100644 --- a/core/shared/src/test/scala/fs2/StreamSpec.scala +++ b/core/shared/src/test/scala/fs2/StreamSpec.scala @@ -6,6 +6,7 @@ import cats.effect._ import cats.effect.concurrent.{Deferred, Ref, Semaphore} import cats.implicits._ import scala.concurrent.duration._ +import scala.concurrent.TimeoutException import org.scalactic.anyvals._ import org.scalatest.{Assertion, Succeeded} import fs2.concurrent.{Queue, SignallingRef} @@ -3786,4 +3787,34 @@ class StreamSpec extends Fs2Spec { } } } + + "withTimeout" - { + "timeout never-ending stream" in { + Stream.never[IO].timeout(100.millis).compile.drain.assertThrows[TimeoutException] + } + + "not trigger timeout on successfully completed stream" in { + Stream.sleep(10.millis).timeout(1.second).compile.drain.assertNoException + } + + "compose timeouts d1 and d2 when d1 < d2" in { + val d1 = 20.millis + val d2 = 30.millis + (Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(30.millis)) + .timeout(d2) + .compile + .drain + .assertThrows[TimeoutException] + } + + "compose timeouts d1 and d2 when d1 > d2" in { + val d1 = 40.millis + val d2 = 30.millis + (Stream.sleep(10.millis).timeout(d1) ++ Stream.sleep(25.millis)) + .timeout(d2) + .compile + .drain + .assertThrows[TimeoutException] + } + } }