diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala index 2c2d132a120..ef331339278 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala @@ -7,6 +7,7 @@ package akka.http.scaladsl import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ ArrayBlockingQueue, TimeUnit } +import akka.Done import akka.actor.ActorSystem import akka.http.impl.util._ import akka.http.scaladsl.model.HttpEntity._ @@ -15,6 +16,8 @@ import akka.http.scaladsl.model.headers.Connection import akka.http.scaladsl.settings.ClientConnectionSettings import akka.http.scaladsl.settings.{ ConnectionPoolSettings, ServerSettings } import akka.stream.scaladsl._ +import akka.stream.testkit.TestSubscriber.OnError +import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ Server => _, _ } import akka.testkit._ import akka.util.ByteString @@ -41,6 +44,20 @@ class GracefulTerminationSpec implicit override val patience = PatienceConfig(5.seconds.dilated(system), 200.millis) + "Unbinding" should { + "not allow new connections" in new TestSetup { + Await.result(serverBinding.unbind(), 1.second) should ===(Done) + + // immediately trying a new connection should cause `Connection refused` since we unbind immediately: + val r = makeRequest(ensureNewConnection = true) + val ex = intercept[StreamTcpException] { + Await.result(r, 2.seconds) + } + ex.getMessage should include("Connection refused") + serverBinding.terminate(hardDeadline = 2.seconds) + } + } + "Graceful termination" should { "stop accepting new connections" in new TestSetup { @@ -60,6 +77,68 @@ class GracefulTerminationSpec ex.getMessage should include("Connection refused") } + "fail chunked response streams" in new TestSetup { + val clientSystem = ActorSystem("client") + val r1 = + Http()(clientSystem).singleRequest(nextRequest, connectionContext = clientConnectionContext, settings = basePoolSettings) + + // reply with an infinite entity stream + val chunks = Source + .fromIterator(() => Iterator.from(1).map(v => ChunkStreamPart(s"reply$v,"))) + .throttle(1, 300.millis) + reply(_ => HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain(UTF-8)`, chunks))) + + // start reading the response + val response = r1.futureValue.entity.dataBytes + .via(Framing.delimiter(ByteString(","), 20)) + .runWith(TestSink.probe[ByteString]) + response.requestNext().utf8String should ===("reply1") + + try { + val termination = serverBinding.terminate(hardDeadline = 50.millis) + response.request(20) + // local testing shows the stream fails long after the 50 ms deadline + response.expectNext().utf8String should ===("reply2") + eventually { + response.expectEvent() shouldBe a[OnError] + } + termination.futureValue shouldBe Http.HttpServerTerminated + } finally { + TestKit.shutdownActorSystem(clientSystem) + } + } + + "fail close delimited response streams" ignore new TestSetup { + val clientSystem = ActorSystem("client") + val r1 = + Http()(clientSystem).singleRequest(nextRequest, connectionContext = clientConnectionContext, settings = basePoolSettings) + + // reply with an infinite entity stream + val chunks = Source + .fromIterator(() => Iterator.from(1).map(v => ByteString(s"reply$v,"))) + .throttle(1, 300.millis) + reply(_ => HttpResponse(entity = HttpEntity.CloseDelimited(ContentTypes.`text/plain(UTF-8)`, chunks))) + + // start reading the response + val response = r1.futureValue.entity.dataBytes + .via(Framing.delimiter(ByteString(","), 20)) + .runWith(TestSink.probe[ByteString]) + response.requestNext().utf8String should ===("reply1") + + try { + val termination = serverBinding.terminate(hardDeadline = 50.millis) + response.request(20) + // local testing shows the stream fails long after the 50 ms deadline + response.expectNext().utf8String should ===("reply2") + eventually { + response.expectEvent() shouldBe a[OnError] + } + termination.futureValue shouldBe Http.HttpServerTerminated + } finally { + TestKit.shutdownActorSystem(clientSystem) + } + } + "provide whenTerminated future that completes once server has completed termination (no connections)" in new TestSetup { val time: FiniteDuration = 2.seconds val deadline: Deadline = time.fromNow