From 4726e5afa3124104bd795128b2271c2ab08ee325 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Fri, 8 May 2020 17:08:27 +0200 Subject: [PATCH 1/6] Inspect chunked responses during termination --- .../scaladsl/GracefulTerminationSpec.scala | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala index 2c2d132a120..5bb8ea88eec 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala @@ -7,6 +7,7 @@ package akka.http.scaladsl import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{ ArrayBlockingQueue, TimeUnit } +import akka.Done import akka.actor.ActorSystem import akka.http.impl.util._ import akka.http.scaladsl.model.HttpEntity._ @@ -15,6 +16,8 @@ import akka.http.scaladsl.model.headers.Connection import akka.http.scaladsl.settings.ClientConnectionSettings import akka.http.scaladsl.settings.{ ConnectionPoolSettings, ServerSettings } import akka.stream.scaladsl._ +import akka.stream.testkit.TestSubscriber.{ OnError, OnNext } +import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ Server => _, _ } import akka.testkit._ import akka.util.ByteString @@ -41,6 +44,20 @@ class GracefulTerminationSpec implicit override val patience = PatienceConfig(5.seconds.dilated(system), 200.millis) + "Unbinding" should { + "not allow new connections" in new TestSetup { + Await.result(serverBinding.unbind(), 1.second) should ===(Done) + + // immediately trying a new connection should cause `Connection refused` since we unbind immediately: + val r = makeRequest(ensureNewConnection = true) + val ex = intercept[StreamTcpException] { + Await.result(r, 2.seconds) + } + ex.getMessage should include("Connection refused") + serverBinding.terminate(hardDeadline = 2.seconds) + } + } + "Graceful termination" should { "stop accepting new connections" in new TestSetup { @@ -60,6 +77,47 @@ class GracefulTerminationSpec ex.getMessage should include("Connection refused") } + "fail chunked response streams" in new TestSetup { + val r1 = makeRequest() + + // reply with an infinite entity stream + val chunks = Source + .fromIterator(() => Iterator.from(1).map(v => ChunkStreamPart(s"reply$v,"))) + .throttle(1, 300.millis) + reply(_ => HttpResponse(entity = HttpEntity.Chunked(ContentTypes.`text/plain(UTF-8)`, chunks))) + + // start reading the response + val response = r1.futureValue.entity.dataBytes + .via(Framing.delimiter(ByteString(","), 20)) + .runWith(TestSink.probe[ByteString]) + response.requestNext().utf8String should ===("reply1") + + val termination = serverBinding.terminate(hardDeadline = 50.millis) + response.request(20) + // local testing shows the stream fails long after the 50 ms deadline + response.expectNext().utf8String should ===("reply2") + response.expectNext().utf8String should ===("reply3") + response.expectNext().utf8String should ===("reply4") + response.expectNext().utf8String should ===("reply5") + val e1 = response.expectEvent() + if (e1.isInstanceOf[OnNext[_]]) { + val e2 = response.expectEvent() + if (e2.isInstanceOf[OnNext[_]]) { + val e3 = response.expectEvent() + if (e3.isInstanceOf[OnNext[_]]) { + fail("the chunked entity stream is expected to fail") + } else if (!e3.isInstanceOf[OnError]) { + fail(s"the chunked entity stream is expected to fail, got $e3") + } + } else if (!e2.isInstanceOf[OnError]) { + fail(s"the chunked entity stream is expected to fail, got $e2") + } + } else if (!e1.isInstanceOf[OnError]) { + fail(s"the chunked entity stream is expected to fail, got $e1") + } + termination.futureValue shouldBe Http.HttpServerTerminated + } + "provide whenTerminated future that completes once server has completed termination (no connections)" in new TestSetup { val time: FiniteDuration = 2.seconds val deadline: Deadline = time.fromNow From dd1728266b69735d3ba32acae0ad6e14fb82f61e Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Mon, 11 May 2020 16:12:55 +0200 Subject: [PATCH 2/6] A separate actor system for the client request --- .../scaladsl/GracefulTerminationSpec.scala | 50 +++++++++++-------- 1 file changed, 28 insertions(+), 22 deletions(-) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala index 5bb8ea88eec..1a83414010c 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala @@ -78,7 +78,9 @@ class GracefulTerminationSpec } "fail chunked response streams" in new TestSetup { - val r1 = makeRequest() + val clientSystem = ActorSystem("client") + val r1 = + Http()(clientSystem).singleRequest(nextRequest, connectionContext = clientConnectionContext, settings = basePoolSettings) // reply with an infinite entity stream val chunks = Source @@ -92,30 +94,34 @@ class GracefulTerminationSpec .runWith(TestSink.probe[ByteString]) response.requestNext().utf8String should ===("reply1") - val termination = serverBinding.terminate(hardDeadline = 50.millis) - response.request(20) - // local testing shows the stream fails long after the 50 ms deadline - response.expectNext().utf8String should ===("reply2") - response.expectNext().utf8String should ===("reply3") - response.expectNext().utf8String should ===("reply4") - response.expectNext().utf8String should ===("reply5") - val e1 = response.expectEvent() - if (e1.isInstanceOf[OnNext[_]]) { - val e2 = response.expectEvent() - if (e2.isInstanceOf[OnNext[_]]) { - val e3 = response.expectEvent() - if (e3.isInstanceOf[OnNext[_]]) { - fail("the chunked entity stream is expected to fail") - } else if (!e3.isInstanceOf[OnError]) { - fail(s"the chunked entity stream is expected to fail, got $e3") + try { + val termination = serverBinding.terminate(hardDeadline = 50.millis) + response.request(20) + // local testing shows the stream fails long after the 50 ms deadline + response.expectNext().utf8String should ===("reply2") + response.expectNext().utf8String should ===("reply3") + response.expectNext().utf8String should ===("reply4") + response.expectNext().utf8String should ===("reply5") + val e1 = response.expectEvent() + if (e1.isInstanceOf[OnNext[_]]) { + val e2 = response.expectEvent() + if (e2.isInstanceOf[OnNext[_]]) { + val e3 = response.expectEvent() + if (e3.isInstanceOf[OnNext[_]]) { + fail("the chunked entity stream is expected to fail") + } else if (!e3.isInstanceOf[OnError]) { + fail(s"the chunked entity stream is expected to fail, got $e3") + } + } else if (!e2.isInstanceOf[OnError]) { + fail(s"the chunked entity stream is expected to fail, got $e2") } - } else if (!e2.isInstanceOf[OnError]) { - fail(s"the chunked entity stream is expected to fail, got $e2") + } else if (!e1.isInstanceOf[OnError]) { + fail(s"the chunked entity stream is expected to fail, got $e1") } - } else if (!e1.isInstanceOf[OnError]) { - fail(s"the chunked entity stream is expected to fail, got $e1") + termination.futureValue shouldBe Http.HttpServerTerminated + } finally { + TestKit.shutdownActorSystem(clientSystem) } - termination.futureValue shouldBe Http.HttpServerTerminated } "provide whenTerminated future that completes once server has completed termination (no connections)" in new TestSetup { From ccf01b17e81494089474dbbe930085523a6631e6 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Fri, 15 May 2020 17:03:42 +0200 Subject: [PATCH 3/6] Use eventually --- .../scaladsl/GracefulTerminationSpec.scala | 18 +++--------------- 1 file changed, 3 insertions(+), 15 deletions(-) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala index 1a83414010c..615d1581ffb 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala @@ -102,21 +102,9 @@ class GracefulTerminationSpec response.expectNext().utf8String should ===("reply3") response.expectNext().utf8String should ===("reply4") response.expectNext().utf8String should ===("reply5") - val e1 = response.expectEvent() - if (e1.isInstanceOf[OnNext[_]]) { - val e2 = response.expectEvent() - if (e2.isInstanceOf[OnNext[_]]) { - val e3 = response.expectEvent() - if (e3.isInstanceOf[OnNext[_]]) { - fail("the chunked entity stream is expected to fail") - } else if (!e3.isInstanceOf[OnError]) { - fail(s"the chunked entity stream is expected to fail, got $e3") - } - } else if (!e2.isInstanceOf[OnError]) { - fail(s"the chunked entity stream is expected to fail, got $e2") - } - } else if (!e1.isInstanceOf[OnError]) { - fail(s"the chunked entity stream is expected to fail, got $e1") + + eventually { + response.expectEvent() shouldBe a[OnError] } termination.futureValue shouldBe Http.HttpServerTerminated } finally { From 6c41cad9d590e797f975efd6a34304345d1be776 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Fri, 15 May 2020 17:09:10 +0200 Subject: [PATCH 4/6] Add ignored test for CloseDelimited --- .../scaladsl/GracefulTerminationSpec.scala | 37 ++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala index 615d1581ffb..e3192be6f15 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala @@ -16,7 +16,7 @@ import akka.http.scaladsl.model.headers.Connection import akka.http.scaladsl.settings.ClientConnectionSettings import akka.http.scaladsl.settings.{ ConnectionPoolSettings, ServerSettings } import akka.stream.scaladsl._ -import akka.stream.testkit.TestSubscriber.{ OnError, OnNext } +import akka.stream.testkit.TestSubscriber.OnError import akka.stream.testkit.scaladsl.TestSink import akka.stream.{ Server => _, _ } import akka.testkit._ @@ -112,6 +112,41 @@ class GracefulTerminationSpec } } + "fail close delimited response streams" ignore new TestSetup { + val clientSystem = ActorSystem("client") + val r1 = + Http()(clientSystem).singleRequest(nextRequest, connectionContext = clientConnectionContext, settings = basePoolSettings) + + // reply with an infinite entity stream + val chunks = Source + .fromIterator(() => Iterator.from(1).map(v => ByteString(s"reply$v,"))) + .throttle(1, 300.millis) + reply(_ => HttpResponse(entity = HttpEntity.CloseDelimited(ContentTypes.`text/plain(UTF-8)`, chunks))) + + // start reading the response + val response = r1.futureValue.entity.dataBytes + .via(Framing.delimiter(ByteString(","), 20)) + .runWith(TestSink.probe[ByteString]) + response.requestNext().utf8String should ===("reply1") + + try { + val termination = serverBinding.terminate(hardDeadline = 50.millis) + response.request(20) + // local testing shows the stream fails long after the 50 ms deadline + response.expectNext().utf8String should ===("reply2") + response.expectNext().utf8String should ===("reply3") + response.expectNext().utf8String should ===("reply4") + response.expectNext().utf8String should ===("reply5") + + eventually { + response.expectEvent() shouldBe a[OnError] + } + termination.futureValue shouldBe Http.HttpServerTerminated + } finally { + TestKit.shutdownActorSystem(clientSystem) + } + } + "provide whenTerminated future that completes once server has completed termination (no connections)" in new TestSetup { val time: FiniteDuration = 2.seconds val deadline: Deadline = time.fromNow From b2d1fbb58808c87c69058fb3fc399293c0feb229 Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Wed, 3 Jun 2020 14:08:17 +0200 Subject: [PATCH 5/6] Don't expect mulitple replies before termination --- .../scala/akka/http/scaladsl/GracefulTerminationSpec.scala | 3 --- 1 file changed, 3 deletions(-) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala index e3192be6f15..d4840714701 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala @@ -134,9 +134,6 @@ class GracefulTerminationSpec response.request(20) // local testing shows the stream fails long after the 50 ms deadline response.expectNext().utf8String should ===("reply2") - response.expectNext().utf8String should ===("reply3") - response.expectNext().utf8String should ===("reply4") - response.expectNext().utf8String should ===("reply5") eventually { response.expectEvent() shouldBe a[OnError] From ced41d4eed8832ebc43e92f98d5c789f2e95105e Mon Sep 17 00:00:00 2001 From: Enno Runne <458526+ennru@users.noreply.github.com> Date: Wed, 3 Jun 2020 14:18:39 +0200 Subject: [PATCH 6/6] Don't expect mulitple replies before termination --- .../scala/akka/http/scaladsl/GracefulTerminationSpec.scala | 5 ----- 1 file changed, 5 deletions(-) diff --git a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala index d4840714701..ef331339278 100644 --- a/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala +++ b/akka-http-core/src/test/scala/akka/http/scaladsl/GracefulTerminationSpec.scala @@ -99,10 +99,6 @@ class GracefulTerminationSpec response.request(20) // local testing shows the stream fails long after the 50 ms deadline response.expectNext().utf8String should ===("reply2") - response.expectNext().utf8String should ===("reply3") - response.expectNext().utf8String should ===("reply4") - response.expectNext().utf8String should ===("reply5") - eventually { response.expectEvent() shouldBe a[OnError] } @@ -134,7 +130,6 @@ class GracefulTerminationSpec response.request(20) // local testing shows the stream fails long after the 50 ms deadline response.expectNext().utf8String should ===("reply2") - eventually { response.expectEvent() shouldBe a[OnError] }