From b29df23253508efd4c74892d7e22aff7b47987d4 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Sat, 10 Aug 2024 11:16:02 +0100 Subject: [PATCH 1/7] Refactor application health monitoring Application health is needed for two distinct reasons: 1. Sending alerts to a monitoring webhook for setup errors. Relatedly, sending heartbeat messages to the webhook when healthy. 2. The HTTP health probe. Needed so that the orchestration environment (kubernetes or whatever) can kill the pod when unhealthy Several Snowplow apps each implement their own logic of when to toggle the health probe and when to send an alert. This PR consolidates that logic into one place. After this PR, the application code just needs to call methods on the AppHealth class. This lib then manages webhook events and health probe, based on the current status of the AppHealth. --- .../src/main/resources/reference.conf | 5 + .../snowplow/runtime/AppHealth.scala | 204 ++++++++++++++ .../snowplow/runtime/HealthProbe.scala | 62 +++-- .../snowplow/runtime/Webhook.scala | 132 ++++++--- .../snowplow/runtime/AppHealthSpec.scala | 120 +++++++++ .../snowplow/runtime/HealthProbeSpec.scala | 79 ++++++ .../snowplow/runtime/WebhookConfigSpec.scala | 134 ++++++++++ .../snowplow/runtime/WebhookSpec.scala | 252 +++++++++++++----- 8 files changed, 882 insertions(+), 106 deletions(-) create mode 100644 modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala create mode 100644 modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala create mode 100644 modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala create mode 100644 modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookConfigSpec.scala diff --git a/modules/runtime-common/src/main/resources/reference.conf b/modules/runtime-common/src/main/resources/reference.conf index 675be4f9..e218bcc0 100644 --- a/modules/runtime-common/src/main/resources/reference.conf +++ b/modules/runtime-common/src/main/resources/reference.conf @@ -5,6 +5,11 @@ snowplow.defaults: { period: "60 seconds" } + webhook: { + tags: {} + heartbeat: "60 minutes" + } + telemetry: { disable: false interval: "15 minutes" diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala new file mode 100644 index 00000000..06fb1e2b --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala @@ -0,0 +1,204 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package com.snowplowanalytics.snowplow.runtime + +import cats.Eq +import cats.{Applicative, Monad} +import cats.implicits._ +import cats.effect.{Async, Ref} +import fs2.concurrent.SignallingRef + +/** + * Class to collect, store and provide the health statuses of services required by the App + * + * @tparam SetupAlert + * Sealed trait of the alerts this app is allowed to send to the webhook for setup errors + * @tparam RuntimeService + * Sealed trait of the services that this app requires to be healthy + * + * ==App Boilerplate== + * + * To use this class in a Snowplow app, first enumerate the runtime services required by this app: + * + * {{{ + * trait MyAppRuntimeService + * case object BadRowsOutputStream extends MyAppRuntimeService + * case object EventsInputStream extends MyAppRuntimeService + * }}} + * + * Next, define the alerts this app is allowed to send to the webhook for setup errors + * + * {{{ + * trait MyAppAlert + * case object BadPassword extends MyAppAlert + * case class MissingPermissions(moreDetails: String) extends MyAppAlert + * }}} + * + * Implement a `cats.Show` for the runtime services and for the alerts + * + * {{{ + * val runtimeShow = Show[MyAppRuntimeService] { + * case BadRowsOutputStream => "Bad rows output stream" + * case EventsInputStream => "Events input stream" + * } + * + * val alertShow = Show[MyAppAlert] { + * case BadPassword => "Bad password" + * case MissingPermissions(moreDetails) => s"Missing permissions $moreDetails" + * } + * }}} + * + * Implement a `cats.Eq` for the alerts, so we can alert on anything uniquely new + * {{{ + * val alertEq = Eq.fromUniversalEquals[Alert] // Do it better than this + * }}} + * + * ==Environment initialization== + * + * Initialize the AppHealth as part of App Environment initialization: + * + * {{{ + * appHealth <- AppHealth.init[F, MyAppAlert, MyAppRuntimeService] + * }}} + * + * Initialize a health probe, so the app reports itself as unhealthy whenever a required service + * becomes unhealthy + * + * {{{ + * _ <- HealthProbe.resource(port, appHealthy) + * }}} + * + * Initialize the webhook, so the app reports new setup errors or whenever the setup configuration + * becomes healthy + * + * {{{ + * _ <- Webhook.resource(config, appInfo, httpClient, appHealth) + * }}} + * + * And finally, register any runtime service that provides its own health reporter. Not all services + * fall into this category, but the source stream does fall into this category. + * + * {{{ + * _ <- appHealth.addRuntimeHealthReporter(EventsInputStream, sourceAndAckIsHealthy) + * }}} + * + * ==Application processing== + * + * Once the application enters its processing phase, you can set the health status for any runtime + * service or setup configuration + * + * {{{ + * // After catching an exception in the bad events stream + * _ <- Logger[F].error(e)("Problem with bad rows output stream") + * _ <- appHealth.becomeUnhealthyForRuntimeService(BadRowsOutputStream) + * + * // After bad rows stream becomes healthy again + * _ <- Logger[F].debug("Bad rows output stream is ok") + * _ <- appHealth.becomeHealthyForRuntimeService(BadRowsOutputStream) + * + * // After catching an exception with the external setup configuration + * // Note this will send an alert webhook + * _ <- Logger[F].error(e)("Problem with the provided password") + * _ <- appHealth.becomeUnhealthyForSetup(BadPassword) + * + * // After successful connection to the externally configured services + * // Note this will send the first hearbeat webhook + * _ <- Logger[F].error(e)("Everything ok with the provided setup configuration") + * _ <- appHealth.becomeHealthyForSetup + * }}} + * + * The application processing code does not need to explicitly send any monitoring alert or adjust + * the health probe return code. + */ +class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private ( + private[runtime] val setupHealth: SignallingRef[F, AppHealth.SetupStatus[SetupAlert]], + runtimeHealth: Ref[F, Map[RuntimeService, F[AppHealth.RuntimeServiceStatus]]] +) extends AppHealth.Interface[F, SetupAlert, RuntimeService] { + import AppHealth._ + + def becomeHealthyForSetup: F[Unit] = + setupHealth.set(SetupStatus.Healthy) + + def becomeUnhealthyForSetup(alert: SetupAlert): F[Unit] = + setupHealth.set(SetupStatus.Unhealthy(alert)) + + def becomeHealthyForRuntimeService(service: RuntimeService): F[Unit] = + runtimeHealth.update(_ - service) + + def becomeUnhealthyForRuntimeService(service: RuntimeService): F[Unit] = + runtimeHealth.update(_ + (service -> Applicative[F].pure(RuntimeServiceStatus.Unhealthy))) + + def addRuntimeHealthReporter(service: RuntimeService, reporter: F[RuntimeServiceStatus]): F[Unit] = + runtimeHealth.update(_ + (service -> reporter)) + + private[runtime] def unhealthyRuntimeServices: F[List[RuntimeService]] = + for { + asMap <- runtimeHealth.get + pairs <- asMap.toList.traverse { case (service, statusF) => + statusF.map((_, service)) + } + } yield pairs.collect { case (RuntimeServiceStatus.Unhealthy, service) => service } +} + +object AppHealth { + + /** + * Subset of AppHealth interface required during app's event-processing phase + * + * This interface should be part of the App's Environment. Processing Specs can provide a custom + * implementation. + */ + trait Interface[F[_], -SetupAlert, -RuntimeService] { + def becomeHealthyForSetup: F[Unit] + + def becomeUnhealthyForSetup(alert: SetupAlert): F[Unit] + + def becomeHealthyForRuntimeService(service: RuntimeService): F[Unit] + + def becomeUnhealthyForRuntimeService(service: RuntimeService): F[Unit] + } + + private[runtime] sealed trait SetupStatus[+Alert] + private[runtime] object SetupStatus { + case object AwaitingHealth extends SetupStatus[Nothing] + case object Healthy extends SetupStatus[Nothing] + case class Unhealthy[Alert](alert: Alert) extends SetupStatus[Alert] + + implicit def eq[Alert: Eq]: Eq[SetupStatus[Alert]] = Eq.instance { + case (Healthy, Healthy) => true + case (Healthy, _) => false + case (AwaitingHealth, AwaitingHealth) => true + case (AwaitingHealth, _) => false + case (Unhealthy(a1), Unhealthy(a2)) => Eq[Alert].eqv(a1, a2) + case (Unhealthy(_), _) => false + } + } + + sealed trait RuntimeServiceStatus + object RuntimeServiceStatus { + case object Healthy extends RuntimeServiceStatus + case object Unhealthy extends RuntimeServiceStatus + } + + /** + * Initialize the AppHealth + * + * @tparam SetupAlert + * Sealed trait of the alerts this app is allowed to send to the webhook for setup errors + * @tparam RuntimeService + * Sealed trait of the services that this app requires to be healthy + */ + def init[F[_]: Async, SetupAlert, RuntimeService]: F[AppHealth[F, SetupAlert, RuntimeService]] = + for { + setupHealth <- SignallingRef[F, SetupStatus[SetupAlert]](SetupStatus.AwaitingHealth) + runtimeHealth <- Ref[F].of(Map.empty[RuntimeService, F[AppHealth.RuntimeServiceStatus]]) + } yield new AppHealth(setupHealth, runtimeHealth) +} diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala index 418cca9b..c2f63f45 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala @@ -7,9 +7,10 @@ */ package com.snowplowanalytics.snowplow.runtime -import cats.effect.{Async, Resource, Sync} -import cats.data.Kleisli +import cats.Show import cats.implicits._ +import cats.data.Kleisli +import cats.effect.{Async, Resource, Sync} import com.comcast.ip4s.{Ipv4Address, Port} import io.circe.Decoder import org.http4s.ember.server.EmberServerBuilder @@ -22,18 +23,17 @@ object HealthProbe { private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - sealed trait Status - case object Healthy extends Status - case class Unhealthy(reason: String) extends Status - - def resource[F[_]: Async](port: Port, isHealthy: F[Status]): Resource[F, Unit] = { + def resource[F[_]: Async, RuntimeService: Show]( + port: Port, + appHealth: AppHealth[F, ?, RuntimeService] + ): Resource[F, Unit] = { implicit val network: Network[F] = Network.forAsync[F] EmberServerBuilder .default[F] .withHost(Ipv4Address.fromBytes(0, 0, 0, 0)) .withPort(port) .withMaxConnections(1) - .withHttpApp(httpApp(isHealthy)) + .withHttpApp(httpApp(appHealth)) .build .evalTap { _ => Logger[F].info(s"Health service listening on port $port") @@ -47,17 +47,49 @@ object HealthProbe { } } - private def httpApp[F[_]: Sync](isHealthy: F[Status]): HttpApp[F] = + private[runtime] def httpApp[F[_]: Sync, RuntimeService: Show]( + appHealth: AppHealth[F, ?, RuntimeService] + ): HttpApp[F] = Kleisli { _ => - isHealthy.flatMap { - case Healthy => + val problemsF = for { + runtimeUnhealthies <- appHealth.unhealthyRuntimeServices + setupHealth <- appHealth.setupHealth.get + } yield { + val allUnhealthy = runtimeUnhealthies.map(_.show) ++ (setupHealth match { + case AppHealth.SetupStatus.Unhealthy(_) => Some("External setup configuration") + case _ => None + }) + + val allAwaiting = setupHealth match { + case AppHealth.SetupStatus.AwaitingHealth => Some("External setup configuration") + case _ => None + } + + val unhealthyMsg = if (allUnhealthy.nonEmpty) { + val joined = allUnhealthy.mkString("Services are unhealthy [", ", ", "]") + Some(joined) + } else None + + val awaitingMsg = if (allAwaiting.nonEmpty) { + val joined = allUnhealthy.mkString("Services are awaiting a healthy status [", ", ", "]") + Some(joined) + } else None + + if (unhealthyMsg.isEmpty && awaitingMsg.isEmpty) + None + else + Some((unhealthyMsg ++ awaitingMsg).mkString(" AND ")) + } + + problemsF.flatMap { + case Some(errorMsg) => + Logger[F].warn(s"Health probe returning 503: $errorMsg").as { + Response(status = Status.ServiceUnavailable) + } + case None => Logger[F].debug("Health probe returning 200").as { Response(status = Status.Ok) } - case Unhealthy(reason) => - Logger[F].warn(s"Health probe returning 503: $reason").as { - Response(status = Status.ServiceUnavailable) - } } } diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala index 2b6aeb23..b9af657d 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala @@ -10,12 +10,15 @@ package com.snowplowanalytics.snowplow.runtime -import cats.effect.{Async, Sync} +import cats.effect.{Async, Resource, Sync} +import cats.effect.implicits._ import cats.implicits._ -import cats.Show +import cats.{Eq, Show} +import fs2.{Pipe, Pull, Stream} import io.circe.{Decoder, Json} import io.circe.generic.semiauto._ import io.circe.syntax._ +import io.circe.config.syntax._ import org.http4s.circe.jsonEncoder import org.http4s.client.Client import org.http4s.{Method, Request} @@ -23,16 +26,18 @@ import org.http4s.{ParseFailure, Uri} import org.typelevel.log4cats.Logger import org.typelevel.log4cats.slf4j.Slf4jLogger +import scala.concurrent.duration.FiniteDuration + import com.snowplowanalytics.iglu.core.circe.implicits._ import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData} -trait Webhook[F[_], Alert] { - def alert(message: Alert): F[Unit] -} - object Webhook { - final case class Config(endpoint: Uri, tags: Map[String, String]) + final case class Config( + endpoint: Option[Uri], + tags: Map[String, String], + heartbeat: FiniteDuration + ) object Config { implicit def webhookConfigDecoder: Decoder[Config] = { @@ -42,32 +47,61 @@ object Webhook { } } - private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] + def resource[F[_]: Async, SetupAlert: Show: Eq]( + config: Config, + appInfo: AppInfo, + httpClient: Client[F], + appHealth: AppHealth[F, SetupAlert, ?] + ): Resource[F, Unit] = + stream(config, appInfo, httpClient, appHealth).compile.drain.background.void - def create[F[_]: Async, Alert: Show]( - config: Option[Config], + def stream[F[_]: Async, SetupAlert: Show: Eq]( + config: Config, appInfo: AppInfo, - httpClient: Client[F] - ): Webhook[F, Alert] = new Webhook[F, Alert] { - - override def alert(message: Alert): F[Unit] = - config match { - case Some(webhookConfig) => - val request = buildHttpRequest[F, Alert](webhookConfig, appInfo, message) - Logger[F].info(show"Sending alert to ${webhookConfig.endpoint} with details of the setup error...") *> - executeHttpRequest[F](webhookConfig, httpClient, request) - case None => - Logger[F].debug(s"Webhook monitoring is not configured, skipping alert: $message") + httpClient: Client[F], + appHealth: AppHealth[F, SetupAlert, ?] + ): Stream[F, Nothing] = + appHealth.setupHealth.discrete.changes + .through(repeatPeriodically(config.heartbeat)) + .flatMap { + case AppHealth.SetupStatus.AwaitingHealth => + Stream.empty + case AppHealth.SetupStatus.Healthy => + buildHeartbeatHttpRequest[F](config, appInfo) + case AppHealth.SetupStatus.Unhealthy(alert) => + buildAlertHttpRequest[F, SetupAlert](config, appInfo, alert) } - } + .evalMap(executeHttpRequest(config, httpClient, _)) + .drain + + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] - private def buildHttpRequest[F[_], Alert: Show]( + private def buildAlertHttpRequest[F[_]: Sync, SetupAlert: Show]( webhookConfig: Config, appInfo: AppInfo, - alert: Alert - ): Request[F] = - Request[F](Method.POST, webhookConfig.endpoint) - .withEntity(toSelfDescribingJson(alert, appInfo, webhookConfig.tags)) + reason: SetupAlert + ): Stream[F, Request[F]] = + webhookConfig.endpoint match { + case Some(endpoint) => + val request = Request[F](Method.POST, endpoint) + .withEntity(toAlertSdj(reason, appInfo, webhookConfig.tags)) + Stream.emit(request) + case None => + Stream.eval(Logger[F].info(show"Skipping setup alert because webhook is not configured: $reason")).drain + } + + private def buildHeartbeatHttpRequest[F[_]: Sync]( + webhookConfig: Config, + appInfo: AppInfo + ): Stream[F, Request[F]] = + webhookConfig.endpoint match { + case Some(endpoint) => + val request = Request[F](Method.POST, endpoint) + .withEntity(toHeartbeatSdj(appInfo, webhookConfig.tags)) + Stream.emit(request) + case None => + Stream.eval(Logger[F].info(s"Skipping heartbeat because webhook is not configured")).drain + } private def executeHttpRequest[F[_]: Async]( webhookConfig: Config, @@ -91,8 +125,8 @@ object Webhook { /** Restrict the length of an alert message to be compliant with alert iglu schema */ private val MaxAlertPayloadLength = 4096 - private def toSelfDescribingJson[Alert: Show]( - alert: Alert, + private def toAlertSdj[SetupAlert: Show]( + reason: SetupAlert, appInfo: AppInfo, tags: Map[String, String] ): Json = @@ -101,9 +135,47 @@ object Webhook { data = Json.obj( "appName" -> appInfo.name.asJson, "appVersion" -> appInfo.version.asJson, - "message" -> alert.show.take(MaxAlertPayloadLength).asJson, + "message" -> reason.show.take(MaxAlertPayloadLength).asJson, + "tags" -> tags.asJson + ) + ).normalize + + private def toHeartbeatSdj( + appInfo: AppInfo, + tags: Map[String, String] + ): Json = + SelfDescribingData( + schema = SchemaKey("com.snowplowanalytics.monitoring.loader", "heartbeat", "jsonschema", SchemaVer.Full(1, 0, 0)), + data = Json.obj( + "appName" -> appInfo.name.asJson, + "appVersion" -> appInfo.version.asJson, "tags" -> tags.asJson ) ).normalize + def repeatPeriodically[F[_]: Async, A](period: FiniteDuration): Pipe[F, A, A] = { + + def go(timedPull: Pull.Timed[F, A], toRepeat: Option[A]): Pull[F, A, Unit] = + timedPull.uncons.flatMap { + case None => + // Upstream finished + Pull.done + case Some((Left(_), next)) => + // Timer timed-out. + Pull.outputOption1[F, A](toRepeat) *> timedPull.timeout(period) *> go(next, toRepeat) + case Some((Right(chunk), next)) => + chunk.last match { + case Some(last) => + Pull.output[F, A](chunk) *> timedPull.timeout(period) *> go(next, Some(last)) + case None => + go(next, toRepeat) + } + } + + in => + in.pull.timed { timedPull => + go(timedPull, None) + }.stream + } + } diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala new file mode 100644 index 00000000..bd3a321d --- /dev/null +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.effect.testing.specs2.CatsEffect +import cats.effect.{IO, Ref} +import org.specs2.Specification + +class AppHealthSpec extends Specification with CatsEffect { + import AppHealthSpec._ + + def is = s2""" + The AppHealth should: + For Runtime health: + Start healthy $runtime1 + Report one unhealthy service if one service is unhealthy $runtime2 + Report two unhealthy services if two services are unhealthy $runtime3 + Become unhealthy after one service recovers $runtime4 + Report one unhealthy service if two services were unhealthy and one recovers $runtime5 + Report healthy status for an external reporter $runtime6 + For Setup health: + Start with status of awaiting health $setup1 + Report unhealthy after told of a setup problem $setup2 + Report healthy after told of a healthy setup $setup3 + Recover from an unhealthy status when told $setup4 + Return to an unhealthy status when told $setup5 + """ + + def runtime1 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + statuses <- appHealth.unhealthyRuntimeServices + } yield statuses should beEmpty + + def runtime2 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) + statuses <- appHealth.unhealthyRuntimeServices + } yield statuses should beEqualTo(List(TestService1)) + + def runtime3 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) + _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) + statuses <- appHealth.unhealthyRuntimeServices + } yield statuses should containTheSameElementsAs(List(TestService1, TestService2)) + + def runtime4 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) + _ <- appHealth.becomeHealthyForRuntimeService(TestService1) + statuses <- appHealth.unhealthyRuntimeServices + } yield statuses should beEmpty + + def runtime5 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) + _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) + _ <- appHealth.becomeHealthyForRuntimeService(TestService1) + statuses <- appHealth.unhealthyRuntimeServices + } yield statuses should beEqualTo(List(TestService2)) + + def runtime6 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + reporter <- Ref[IO].of[AppHealth.RuntimeServiceStatus](AppHealth.RuntimeServiceStatus.Healthy) + _ <- appHealth.addRuntimeHealthReporter(TestService1, reporter.get) + result1 <- appHealth.unhealthyRuntimeServices + _ <- reporter.set(AppHealth.RuntimeServiceStatus.Unhealthy) + result2 <- appHealth.unhealthyRuntimeServices + _ <- reporter.set(AppHealth.RuntimeServiceStatus.Healthy) + result3 <- appHealth.unhealthyRuntimeServices + } yield (result1 should beEmpty) and (result2 should beEqualTo(List(TestService1))) and (result3 should beEmpty) + + def setup1 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.AwaitingHealth) + + def setup2 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Unhealthy(TestAlert1)) + + def setup3 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + _ <- appHealth.becomeHealthyForSetup + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) + + def setup4 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) + _ <- appHealth.becomeHealthyForSetup + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) + + def setup5 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) + setupHealth <- appHealth.setupHealth.get + } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Unhealthy(TestAlert1)) + +} + +object AppHealthSpec { + + sealed trait TestAlert + case object TestAlert1 extends TestAlert + case object TestAlert2 extends TestAlert + + sealed trait TestService + case object TestService1 extends TestService + case object TestService2 extends TestService +} diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala new file mode 100644 index 00000000..072bfaf9 --- /dev/null +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.effect.testing.specs2.CatsEffect +import cats.effect.IO +import cats.Show +import org.http4s.{Request, Status} +import org.specs2.Specification + +class HealthProbeSpec extends Specification with CatsEffect { + import HealthProbeSpec._ + + def is = s2""" + The HealthProbe should: + Initially return 503 $probe1 + Return 200 after setup configuration is healthy $probe2 + Return 503 after setup configuration becomes unhealthy $probe3 + Return 503 if a runtime service is unhealthy $probe4 + Return 200 after a runtime service recovers $probe5 + """ + + def probe1 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + httpApp = HealthProbe.httpApp(appHealth) + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.ServiceUnavailable) + + def probe2 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + httpApp = HealthProbe.httpApp(appHealth) + _ <- appHealth.becomeHealthyForSetup + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.Ok) + + def probe3 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + httpApp = HealthProbe.httpApp(appHealth) + _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.ServiceUnavailable) + + def probe4 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + httpApp = HealthProbe.httpApp(appHealth) + _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.ServiceUnavailable) + + def probe5 = for { + appHealth <- AppHealth.init[IO, TestAlert, TestService] + httpApp = HealthProbe.httpApp(appHealth) + _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) + _ <- appHealth.becomeHealthyForRuntimeService(TestService1) + response <- httpApp.run(Request[IO]()) + } yield response.status must beEqualTo(Status.Ok) + +} + +object HealthProbeSpec { + + sealed trait TestAlert + case object TestAlert1 extends TestAlert + case object TestAlert2 extends TestAlert + + sealed trait TestService + case object TestService1 extends TestService + case object TestService2 extends TestService + + implicit def showTestAlert: Show[TestService] = Show[TestService](_.toString) +} diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookConfigSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookConfigSpec.scala new file mode 100644 index 00000000..48b5bd8f --- /dev/null +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookConfigSpec.scala @@ -0,0 +1,134 @@ +/* + * Copyright (c) 2023-present Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Snowplow Community License Version 1.0, + * and you may not use this file except in compliance with the Snowplow Community License Version 1.0. + * You may obtain a copy of the Snowplow Community License Version 1.0 at https://docs.snowplow.io/community-license-1.0 + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.implicits._ +import com.typesafe.config.ConfigFactory +import io.circe.config.syntax.CirceConfigOps +import io.circe.DecodingFailure +import io.circe.Decoder +import io.circe.literal._ +import io.circe.generic.semiauto._ +import org.http4s.Uri +import org.specs2.Specification + +import scala.concurrent.duration.DurationLong + +class WebhookConfigSpec extends Specification { + import WebhookConfigSpec._ + + def is = s2""" + The webhook config decoder should: + Decode a valid JSON config when endpoint is set $e1 + Decode a valid JSON config when endpoint is missing $e2 + Not decode JSON if other required field is missing $e3 + The webhook defaults should: + Provide default values from reference.conf $e4 + Not provide default value for endpoint $e5 + + + """ + + def e1 = { + val json = json""" + { + "endpoint": "http://example.com/xyz?abc=123", + "tags": { + "abc": "xyz" + }, + "heartbeat": "42 seconds" + } + """ + + json.as[Webhook.Config] must beRight { (c: Webhook.Config) => + List( + c.endpoint must beSome(Uri.unsafeFromString("http://example.com/xyz?abc=123")), + c.tags must beEqualTo(Map("abc" -> "xyz")), + c.heartbeat must beEqualTo(42.seconds) + ).reduce(_ and _) + } + } + + def e2 = { + val json = json""" + { + "tags": { + "abc": "xyz" + }, + "heartbeat": "42 seconds" + } + """ + + json.as[Webhook.Config] must beRight { (c: Webhook.Config) => + List( + c.endpoint must beNone + ).reduce(_ and _) + } + } + + def e3 = { + + // missing heartbeat + val json = json""" + { + "endpoint": "http://example.com/xyz?abc=123", + "tags": { + "abc": "xyz" + } + } + """ + + json.as[Webhook.Config] must beLeft.like { case e: DecodingFailure => + e.show must beEqualTo("DecodingFailure at .heartbeat: Missing required field") + } + } + + def e4 = { + val input = s""" + |{ + | "xyz": $${snowplow.defaults.webhook} + | "xyz": { + | "endpoint": "http://example.com/xyz?abc=123" + | } + |} + |""".stripMargin + + val result = ConfigFactory.load(ConfigFactory.parseString(input)) + + val expected = Webhook.Config( + endpoint = Some(Uri.unsafeFromString("http://example.com/xyz?abc=123")), + tags = Map.empty, + heartbeat = 60.minutes + ) + + result.as[ConfigWrapper] must beRight { (w: ConfigWrapper) => + w.xyz must beEqualTo(expected) + } + } + + def e5 = { + val input = s""" + |{ + | "xyz": $${snowplow.defaults.webhook} + |} + |""".stripMargin + + val result = ConfigFactory.load(ConfigFactory.parseString(input)) + + result.as[ConfigWrapper] must beRight.like { case w: ConfigWrapper => + w.xyz.endpoint must beNone + } + } + +} + +object WebhookConfigSpec { + case class ConfigWrapper(xyz: Webhook.Config) + + implicit def wrapperDecoder: Decoder[ConfigWrapper] = deriveDecoder[ConfigWrapper] +} diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala index 8e6e831b..05ed55a0 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala @@ -7,20 +7,18 @@ */ package com.snowplowanalytics.snowplow.runtime -import cats.{Applicative, Id, Show} -import cats.implicits._ +import cats.{Applicative, Eq, Id, Show} import cats.effect.testing.specs2.CatsEffect import cats.effect.{Clock, IO, Ref, Resource} +import cats.effect.testkit.TestControl import org.http4s.{Headers, Method, Response} import org.http4s.client.Client import io.circe.Json -import io.circe.literal.JsonStringContext import io.circe.parser.{parse => circeParse} -import io.circe.DecodingFailure import org.http4s.Uri import org.specs2.Specification -import scala.concurrent.duration.FiniteDuration +import scala.concurrent.duration.{DurationInt, FiniteDuration} import java.util.concurrent.TimeUnit import com.snowplowanalytics.iglu.core.SelfDescribingData @@ -33,78 +31,184 @@ class WebhookSpec extends Specification with CatsEffect { import WebhookSpec._ def is = s2""" - The webhook config decoder should: - Decode a valid JSON config $decode1 - Not decode JSON if a required field is missing $decode2 The webhook should: - Not send any payloads if config is empty $send1 - Send a valid payload if given valid config $send2 - Ignore any exception raised by sending webhook $send3 + Not send any payloads if app health never leaves awaiting status $send1 + Send a single hearbeat after app becomes healthy for setup $send2 + Send a second heartbeat after configured period of time $send3 + Send a single alert after app becomes unhealthy for setup $send4 + Send multiple alerts if app becomes unhealthy for setup with different alert messages $send5 + Send alternating hearbeat and alert if app health flip flops $send6 + Not send any payloads if endpoint is not set in the configuration $send7 + Ignore any exception raised by sending webhook $send8 """ - def decode1 = { - val json = json""" - { - "endpoint": "http://example.com/xyz?abc=123", - "tags": { - "abc": "xyz" - } + def send1 = { + val io = resources().use { case (getReportedRequests, appHealth) => + val _ = appHealth + for { + _ <- IO.sleep(60.minutes) + reportedRequests <- getReportedRequests + } yield reportedRequests should beEmpty } - """ + TestControl.executeEmbed(io) + } - json.as[Option[Webhook.Config]] must beRight.like { case Some(c: Webhook.Config) => - List( - c.endpoint must beEqualTo(Uri.unsafeFromString("http://example.com/xyz?abc=123")), - c.tags must beEqualTo(Map("abc" -> "xyz")) + def send2 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(1), + reportedRequests must contain { req: ReportedRequest => + List( + req.method must beEqualTo(Method.POST), + req.uri must beEqualTo(testEndpoint), + req.headers.toString must contain("Content-Type: application/json") + ).reduce(_ and _) + } ).reduce(_ and _) } + TestControl.executeEmbed(io) } - def decode2 = { - val json = json""" - { - "tags": { - "abc": "xyz" - } + def send3 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(45.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(2), + reportedRequests must contain { req: ReportedRequest => + List( + req.method must beEqualTo(Method.POST), + req.uri must beEqualTo(testEndpoint), + req.headers.toString must contain("Content-Type: application/json") + ).reduce(_ and _) + }.forall + ).reduce(_ and _) } - """ + TestControl.executeEmbed(io) + } - json.as[Option[Webhook.Config]] must beLeft.like { case e: DecodingFailure => - e.show must beEqualTo("DecodingFailure at .endpoint: Missing required field") + def send4 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(1), + reportedRequests must contain { req: ReportedRequest => + List( + mustHaveValidAlertBody(req.body), + req.method must beEqualTo(Method.POST), + req.uri must beEqualTo(testEndpoint), + req.headers.toString must contain("Content-Type: application/json") + ).reduce(_ and _) + } + ).reduce(_ and _) } + TestControl.executeEmbed(io) } - def send1 = for { - ref <- Ref[IO].of(List.empty[ReportedRequest]) - httpClient = reportingHttpClient(ref) - webhook = Webhook.create[IO, TestAlert](None, testAppInfo, httpClient) - _ <- webhook.alert(TestAlert("this is a test")) - results <- ref.get - } yield results must beEmpty + def send5 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom 1")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom 2")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom 3")) + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(3), + reportedRequests must contain { req: ReportedRequest => + List( + mustHaveValidAlertBody(req.body), + req.method must beEqualTo(Method.POST), + req.uri must beEqualTo(testEndpoint), + req.headers.toString must contain("Content-Type: application/json") + ).reduce(_ and _) + }.forall + ).reduce(_ and _) + } + TestControl.executeEmbed(io) + } - def send2 = for { - ref <- Ref[IO].of(List.empty[ReportedRequest]) - httpClient = reportingHttpClient(ref) - webhook = Webhook.create[IO, TestAlert](Some(testConfig), testAppInfo, httpClient) - _ <- webhook.alert(TestAlert("this is a test")) - results <- ref.get - } yield List( - results must haveSize(1), - results must contain { req: ReportedRequest => - List( - mustHaveValidAlertBody(req.body), - req.method must beEqualTo(Method.POST), - req.uri must beEqualTo(testConfig.endpoint), - req.headers.toString must contain("Content-Type: application/json") + def send6 = { + val io = resources().use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield List( + reportedRequests should haveSize(4), + reportedRequests must contain { req: ReportedRequest => + List( + mustHaveValidAlertBody(req.body), + req.method must beEqualTo(Method.POST), + req.uri must beEqualTo(testEndpoint), + req.headers.toString must contain("Content-Type: application/json") + ).reduce(_ and _) + }.exactly(2.times) + // TODO: Test for the heartbeat events here once schema exists in Iglu Central ).reduce(_ and _) } - ).reduce(_ and _) + TestControl.executeEmbed(io) + } - def send3 = { - val webhook = Webhook.create[IO, TestAlert](Some(testConfig), testAppInfo, errorRaisingHttpClient) - for { - _ <- webhook.alert(TestAlert("this is a test")) - } yield ok + def send7 = { + val config = testConfig.copy(endpoint = None) + val io = resources(config).use { case (getReportedRequests, appHealth) => + for { + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(5.minutes) + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(5.minutes) + reportedRequests <- getReportedRequests + } yield reportedRequests should beEmpty + } + TestControl.executeEmbed(io) + } + + def send8 = { + val resources = for { + appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService]) + _ <- Webhook.resource(testConfig, testAppInfo, errorRaisingHttpClient, appHealth) + } yield appHealth + + val io = resources.use { appHealth => + for { + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(30.minutes) + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(30.minutes) + _ <- appHealth.becomeHealthyForSetup + _ <- IO.sleep(30.minutes) + } yield ok + } + + TestControl.executeEmbed(io) } private def mustHaveValidAlertBody(body: String) = @@ -126,6 +230,13 @@ object WebhookSpec { implicit def testAlertShow: Show[TestAlert] = Show(_.msg) + implicit def testAlertEq: Eq[TestAlert] = + Eq.by(_.msg) + + sealed trait TestService + case object TestService1 extends TestService + case object TestService2 extends TestService + val testAppInfo: AppInfo = new AppInfo { def name: String = "testName" def version: String = "testVersion" @@ -133,9 +244,12 @@ object WebhookSpec { def cloud: String = "testCloud" } + def testEndpoint = Uri.unsafeFromString("http://example.com/xyz?abc=123") + def testConfig: Webhook.Config = Webhook.Config( - endpoint = Uri.unsafeFromString("http://example.com/xyz?abc=123"), - tags = Map("myTag" -> "myValue") + endpoint = Some(testEndpoint), + tags = Map("myTag" -> "myValue"), + heartbeat = 42.minutes ) // Used in tests to report the request that was sent to the webhook @@ -157,6 +271,22 @@ object WebhookSpec { } } + /** + * Resources for running a Spec + * + * @return + * a IO that records the requests sent to the webhook, and the AppHealth on which the spec can + * set healthy/unhealthy services + */ + def resources( + config: Webhook.Config = testConfig + ): Resource[IO, (IO[List[ReportedRequest]], AppHealth.Interface[IO, TestAlert, TestService])] = for { + ref <- Resource.eval(Ref[IO].of(List.empty[ReportedRequest])) + httpClient = reportingHttpClient(ref) + appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService]) + _ <- Webhook.resource(config, testAppInfo, httpClient, appHealth) + } yield (ref.get, appHealth) + // A http4s Client that raises exceptions def errorRaisingHttpClient: Client[IO] = Client[IO] { _ => From d8b7bb07196b7c8e614c3d6c31b1251c7d2e4b6b Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 14 Aug 2024 13:20:19 +0100 Subject: [PATCH 2/7] [amendment] tests to validate the heartbeat schema --- .../snowplow/runtime/AppHealth.scala | 2 +- .../snowplow/runtime/WebhookSpec.scala | 76 +++++++++---------- project/BuildSettings.scala | 5 +- 3 files changed, 39 insertions(+), 44 deletions(-) diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala index 06fb1e2b..8ae466f4 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala @@ -52,7 +52,7 @@ import fs2.concurrent.SignallingRef * * val alertShow = Show[MyAppAlert] { * case BadPassword => "Bad password" - * case MissingPermissions(moreDetails) => s"Missing permissions $moreDetails" + * case MissingPermissions(moreDetails) => "Missing permissions " + moreDetails * } * }}} * diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala index 05ed55a0..d439815b 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala @@ -17,6 +17,7 @@ import io.circe.Json import io.circe.parser.{parse => circeParse} import org.http4s.Uri import org.specs2.Specification +import org.specs2.matcher.Matcher import scala.concurrent.duration.{DurationInt, FiniteDuration} import java.util.concurrent.TimeUnit @@ -33,7 +34,7 @@ class WebhookSpec extends Specification with CatsEffect { def is = s2""" The webhook should: Not send any payloads if app health never leaves awaiting status $send1 - Send a single hearbeat after app becomes healthy for setup $send2 + Send a single heartbeat after app becomes healthy for setup $send2 Send a second heartbeat after configured period of time $send3 Send a single alert after app becomes unhealthy for setup $send4 Send multiple alerts if app becomes unhealthy for setup with different alert messages $send5 @@ -65,13 +66,7 @@ class WebhookSpec extends Specification with CatsEffect { reportedRequests <- getReportedRequests } yield List( reportedRequests should haveSize(1), - reportedRequests must contain { req: ReportedRequest => - List( - req.method must beEqualTo(Method.POST), - req.uri must beEqualTo(testEndpoint), - req.headers.toString must contain("Content-Type: application/json") - ).reduce(_ and _) - } + reportedRequests must contain(beValidHeartbeatRequest) ).reduce(_ and _) } TestControl.executeEmbed(io) @@ -85,13 +80,7 @@ class WebhookSpec extends Specification with CatsEffect { reportedRequests <- getReportedRequests } yield List( reportedRequests should haveSize(2), - reportedRequests must contain { req: ReportedRequest => - List( - req.method must beEqualTo(Method.POST), - req.uri must beEqualTo(testEndpoint), - req.headers.toString must contain("Content-Type: application/json") - ).reduce(_ and _) - }.forall + reportedRequests must contain(beValidHeartbeatRequest).forall ).reduce(_ and _) } TestControl.executeEmbed(io) @@ -109,14 +98,7 @@ class WebhookSpec extends Specification with CatsEffect { reportedRequests <- getReportedRequests } yield List( reportedRequests should haveSize(1), - reportedRequests must contain { req: ReportedRequest => - List( - mustHaveValidAlertBody(req.body), - req.method must beEqualTo(Method.POST), - req.uri must beEqualTo(testEndpoint), - req.headers.toString must contain("Content-Type: application/json") - ).reduce(_ and _) - } + reportedRequests must contain(beValidAlertRequest) ).reduce(_ and _) } TestControl.executeEmbed(io) @@ -134,14 +116,7 @@ class WebhookSpec extends Specification with CatsEffect { reportedRequests <- getReportedRequests } yield List( reportedRequests should haveSize(3), - reportedRequests must contain { req: ReportedRequest => - List( - mustHaveValidAlertBody(req.body), - req.method must beEqualTo(Method.POST), - req.uri must beEqualTo(testEndpoint), - req.headers.toString must contain("Content-Type: application/json") - ).reduce(_ and _) - }.forall + reportedRequests must contain(beValidAlertRequest).forall ).reduce(_ and _) } TestControl.executeEmbed(io) @@ -161,15 +136,8 @@ class WebhookSpec extends Specification with CatsEffect { reportedRequests <- getReportedRequests } yield List( reportedRequests should haveSize(4), - reportedRequests must contain { req: ReportedRequest => - List( - mustHaveValidAlertBody(req.body), - req.method must beEqualTo(Method.POST), - req.uri must beEqualTo(testEndpoint), - req.headers.toString must contain("Content-Type: application/json") - ).reduce(_ and _) - }.exactly(2.times) - // TODO: Test for the heartbeat events here once schema exists in Iglu Central + reportedRequests must contain(beValidAlertRequest).exactly(2.times), + reportedRequests must contain(beValidHeartbeatRequest).exactly(2.times) ).reduce(_ and _) } TestControl.executeEmbed(io) @@ -211,11 +179,35 @@ class WebhookSpec extends Specification with CatsEffect { TestControl.executeEmbed(io) } - private def mustHaveValidAlertBody(body: String) = + private def beValidAlertRequest: Matcher[ReportedRequest] = { (req: ReportedRequest) => + List( + mustHaveValidAlertBody(req.body), + req.method must beEqualTo(Method.POST), + req.uri must beEqualTo(testEndpoint), + req.headers.toString must contain("Content-Type: application/json") + ).reduce(_ and _) + } + + private def beValidHeartbeatRequest: Matcher[ReportedRequest] = { (req: ReportedRequest) => + List( + mustHaveValidHeartbeatBody(req.body), + req.method must beEqualTo(Method.POST), + req.uri must beEqualTo(testEndpoint), + req.headers.toString must contain("Content-Type: application/json") + ).reduce(_ and _) + } + + private def mustHaveValidAlertBody = + mustHaveValidSdjBody("iglu:com.snowplowanalytics.monitoring.loader/alert/jsonschema/1-0-0") _ + + private def mustHaveValidHeartbeatBody = + mustHaveValidSdjBody("iglu:com.snowplowanalytics.monitoring.loader/heartbeat/jsonschema/1-0-0") _ + + private def mustHaveValidSdjBody(igluUri: String)(body: String) = circeParse(body) must beRight.like { case json: Json => json.as[SelfDescribingData[Json]] must beRight.like { case sdj: SelfDescribingData[Json] => List( - sdj.schema.toSchemaUri must beEqualTo("iglu:com.snowplowanalytics.monitoring.loader/alert/jsonschema/1-0-0"), + sdj.schema.toSchemaUri must beEqualTo(igluUri), igluCirceClient.check(sdj).value must beRight ).reduce(_ and _) } diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala index eebc8d59..00dc2d4f 100644 --- a/project/BuildSettings.scala +++ b/project/BuildSettings.scala @@ -112,11 +112,14 @@ object BuildSettings { ) val igluTestSettings = Seq( + // TODO: Remove this dev repository after heartbeat schema is published + Test / igluRepository := "http://iglucentral-dev.com.s3-website-us-east-1.amazonaws.com/add-loader-heartbeat", Test / igluUris := Seq( // Iglu Central schemas used in tests will get pre-fetched by sbt "iglu:com.snowplowanalytics.iglu/anything-a/jsonschema/1-0-0", "iglu:com.snowplowanalytics.snowplow.media/ad_break_end_event/jsonschema/1-0-0", - "iglu:com.snowplowanalytics.monitoring.loader/alert/jsonschema/1-0-0" + "iglu:com.snowplowanalytics.monitoring.loader/alert/jsonschema/1-0-0", + "iglu:com.snowplowanalytics.monitoring.loader/heartbeat/jsonschema/1-0-0" ) ) } From f261713d14f7bce3ae2b632450cb3d970967cb00 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 14 Aug 2024 14:21:43 +0100 Subject: [PATCH 3/7] [amendment] Allow health reporters to set their own description --- .../snowplow/runtime/AppHealth.scala | 48 +++++++++---------- .../snowplow/runtime/HealthProbe.scala | 4 +- .../snowplow/runtime/AppHealthSpec.scala | 42 +++++++++------- 3 files changed, 52 insertions(+), 42 deletions(-) diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala index 8ae466f4..64840550 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala @@ -10,8 +10,8 @@ package com.snowplowanalytics.snowplow.runtime -import cats.Eq -import cats.{Applicative, Monad} +import cats.{Eq, Show} +import cats.Monad import cats.implicits._ import cats.effect.{Async, Ref} import fs2.concurrent.SignallingRef @@ -31,7 +31,7 @@ import fs2.concurrent.SignallingRef * {{{ * trait MyAppRuntimeService * case object BadRowsOutputStream extends MyAppRuntimeService - * case object EventsInputStream extends MyAppRuntimeService + * case object IgluClient extends MyAppRuntimeService * }}} * * Next, define the alerts this app is allowed to send to the webhook for setup errors @@ -47,7 +47,7 @@ import fs2.concurrent.SignallingRef * {{{ * val runtimeShow = Show[MyAppRuntimeService] { * case BadRowsOutputStream => "Bad rows output stream" - * case EventsInputStream => "Events input stream" + * case IgluClient => "Iglu client" * } * * val alertShow = Show[MyAppAlert] { @@ -87,7 +87,7 @@ import fs2.concurrent.SignallingRef * fall into this category, but the source stream does fall into this category. * * {{{ - * _ <- appHealth.addRuntimeHealthReporter(EventsInputStream, sourceAndAckIsHealthy) + * _ <- appHealth.addRuntimeHealthReporter(sourceAndAckIsHealthy) * }}} * * ==Application processing== @@ -120,7 +120,8 @@ import fs2.concurrent.SignallingRef */ class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private ( private[runtime] val setupHealth: SignallingRef[F, AppHealth.SetupStatus[SetupAlert]], - runtimeHealth: Ref[F, Map[RuntimeService, F[AppHealth.RuntimeServiceStatus]]] + unhealthyRuntimeServices: Ref[F, Set[RuntimeService]], + runtimeServiceReporters: Ref[F, List[F[Option[String]]]] ) extends AppHealth.Interface[F, SetupAlert, RuntimeService] { import AppHealth._ @@ -131,21 +132,25 @@ class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private ( setupHealth.set(SetupStatus.Unhealthy(alert)) def becomeHealthyForRuntimeService(service: RuntimeService): F[Unit] = - runtimeHealth.update(_ - service) + unhealthyRuntimeServices.update(_ - service) def becomeUnhealthyForRuntimeService(service: RuntimeService): F[Unit] = - runtimeHealth.update(_ + (service -> Applicative[F].pure(RuntimeServiceStatus.Unhealthy))) + unhealthyRuntimeServices.update(_ + service) - def addRuntimeHealthReporter(service: RuntimeService, reporter: F[RuntimeServiceStatus]): F[Unit] = - runtimeHealth.update(_ + (service -> reporter)) + /** + * Add a reporter which is counted as an unhealthy runtime service if it returns a `Some`. + * + * The returned string must be a short description of why the service is unhealthy. + */ + def addRuntimeHealthReporter(reporter: F[Option[String]]): F[Unit] = + runtimeServiceReporters.update(reporter :: _) - private[runtime] def unhealthyRuntimeServices: F[List[RuntimeService]] = + private[runtime] def unhealthyRuntimeServiceMessages(implicit show: Show[RuntimeService]): F[List[String]] = for { - asMap <- runtimeHealth.get - pairs <- asMap.toList.traverse { case (service, statusF) => - statusF.map((_, service)) - } - } yield pairs.collect { case (RuntimeServiceStatus.Unhealthy, service) => service } + services <- unhealthyRuntimeServices.get + reporters <- runtimeServiceReporters.get + extras <- reporters.sequence + } yield services.toList.map(_.show) ::: extras.flatten } object AppHealth { @@ -182,12 +187,6 @@ object AppHealth { } } - sealed trait RuntimeServiceStatus - object RuntimeServiceStatus { - case object Healthy extends RuntimeServiceStatus - case object Unhealthy extends RuntimeServiceStatus - } - /** * Initialize the AppHealth * @@ -199,6 +198,7 @@ object AppHealth { def init[F[_]: Async, SetupAlert, RuntimeService]: F[AppHealth[F, SetupAlert, RuntimeService]] = for { setupHealth <- SignallingRef[F, SetupStatus[SetupAlert]](SetupStatus.AwaitingHealth) - runtimeHealth <- Ref[F].of(Map.empty[RuntimeService, F[AppHealth.RuntimeServiceStatus]]) - } yield new AppHealth(setupHealth, runtimeHealth) + unhealthyRuntimeServices <- Ref[F].of(Set.empty[RuntimeService]) + reporters <- Ref[F].of(List.empty[F[Option[String]]]) + } yield new AppHealth(setupHealth, unhealthyRuntimeServices, reporters) } diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala index c2f63f45..1563a782 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala @@ -52,10 +52,10 @@ object HealthProbe { ): HttpApp[F] = Kleisli { _ => val problemsF = for { - runtimeUnhealthies <- appHealth.unhealthyRuntimeServices + runtimeUnhealthies <- appHealth.unhealthyRuntimeServiceMessages setupHealth <- appHealth.setupHealth.get } yield { - val allUnhealthy = runtimeUnhealthies.map(_.show) ++ (setupHealth match { + val allUnhealthy = runtimeUnhealthies ++ (setupHealth match { case AppHealth.SetupStatus.Unhealthy(_) => Some("External setup configuration") case _ => None }) diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala index bd3a321d..a53661cf 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala @@ -7,6 +7,7 @@ */ package com.snowplowanalytics.snowplow.runtime +import cats.Show import cats.effect.testing.specs2.CatsEffect import cats.effect.{IO, Ref} import org.specs2.Specification @@ -33,27 +34,27 @@ class AppHealthSpec extends Specification with CatsEffect { def runtime1 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService] - statuses <- appHealth.unhealthyRuntimeServices + statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEmpty def runtime2 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService] _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) - statuses <- appHealth.unhealthyRuntimeServices - } yield statuses should beEqualTo(List(TestService1)) + statuses <- appHealth.unhealthyRuntimeServiceMessages + } yield statuses should beEqualTo(List("test service 1")) def runtime3 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService] _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) - statuses <- appHealth.unhealthyRuntimeServices - } yield statuses should containTheSameElementsAs(List(TestService1, TestService2)) + statuses <- appHealth.unhealthyRuntimeServiceMessages + } yield statuses should containTheSameElementsAs(List("test service 1", "test service 2")) def runtime4 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService] _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) _ <- appHealth.becomeHealthyForRuntimeService(TestService1) - statuses <- appHealth.unhealthyRuntimeServices + statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEmpty def runtime5 = for { @@ -61,19 +62,23 @@ class AppHealthSpec extends Specification with CatsEffect { _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) _ <- appHealth.becomeHealthyForRuntimeService(TestService1) - statuses <- appHealth.unhealthyRuntimeServices - } yield statuses should beEqualTo(List(TestService2)) + statuses <- appHealth.unhealthyRuntimeServiceMessages + } yield statuses should beEqualTo(List("test service 2")) def runtime6 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService] - reporter <- Ref[IO].of[AppHealth.RuntimeServiceStatus](AppHealth.RuntimeServiceStatus.Healthy) - _ <- appHealth.addRuntimeHealthReporter(TestService1, reporter.get) - result1 <- appHealth.unhealthyRuntimeServices - _ <- reporter.set(AppHealth.RuntimeServiceStatus.Unhealthy) - result2 <- appHealth.unhealthyRuntimeServices - _ <- reporter.set(AppHealth.RuntimeServiceStatus.Healthy) - result3 <- appHealth.unhealthyRuntimeServices - } yield (result1 should beEmpty) and (result2 should beEqualTo(List(TestService1))) and (result3 should beEmpty) + reporter <- Ref[IO].of(Option.empty[String]) + _ <- appHealth.addRuntimeHealthReporter(reporter.get) + result1 <- appHealth.unhealthyRuntimeServiceMessages + _ <- reporter.set(Some("test reporter unhealthy 1")) + result2 <- appHealth.unhealthyRuntimeServiceMessages + _ <- reporter.set(None) + result3 <- appHealth.unhealthyRuntimeServiceMessages + } yield List( + result1 should beEmpty, + result2 should beEqualTo(List("test reporter unhealthy 1")), + result3 should beEmpty + ).reduce(_ and _) def setup1 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService] @@ -117,4 +122,9 @@ object AppHealthSpec { sealed trait TestService case object TestService1 extends TestService case object TestService2 extends TestService + + implicit def showTestService: Show[TestService] = Show { + case TestService1 => "test service 1" + case TestService2 => "test service 2" + } } From 0e727742a064fb93750e9df3305bb697aeb9cc8d Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 14 Aug 2024 16:20:46 +0100 Subject: [PATCH 4/7] [amendment] move reporters into init --- .../snowplow/runtime/AppHealth.scala | 25 ++++++++----------- .../snowplow/runtime/HealthProbe.scala | 2 +- .../snowplow/runtime/AppHealthSpec.scala | 23 ++++++++--------- .../snowplow/runtime/HealthProbeSpec.scala | 10 ++++---- .../snowplow/runtime/WebhookSpec.scala | 4 +-- .../sources/SourceAndAck.scala | 10 +++++++- 6 files changed, 39 insertions(+), 35 deletions(-) diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala index 64840550..02f05219 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala @@ -121,7 +121,7 @@ import fs2.concurrent.SignallingRef class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private ( private[runtime] val setupHealth: SignallingRef[F, AppHealth.SetupStatus[SetupAlert]], unhealthyRuntimeServices: Ref[F, Set[RuntimeService]], - runtimeServiceReporters: Ref[F, List[F[Option[String]]]] + runtimeServiceReporters: List[F[Option[String]]] ) extends AppHealth.Interface[F, SetupAlert, RuntimeService] { import AppHealth._ @@ -137,19 +137,10 @@ class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private ( def becomeUnhealthyForRuntimeService(service: RuntimeService): F[Unit] = unhealthyRuntimeServices.update(_ + service) - /** - * Add a reporter which is counted as an unhealthy runtime service if it returns a `Some`. - * - * The returned string must be a short description of why the service is unhealthy. - */ - def addRuntimeHealthReporter(reporter: F[Option[String]]): F[Unit] = - runtimeServiceReporters.update(reporter :: _) - private[runtime] def unhealthyRuntimeServiceMessages(implicit show: Show[RuntimeService]): F[List[String]] = for { services <- unhealthyRuntimeServices.get - reporters <- runtimeServiceReporters.get - extras <- reporters.sequence + extras <- runtimeServiceReporters.sequence } yield services.toList.map(_.show) ::: extras.flatten } @@ -194,11 +185,17 @@ object AppHealth { * Sealed trait of the alerts this app is allowed to send to the webhook for setup errors * @tparam RuntimeService * Sealed trait of the services that this app requires to be healthy + * + * @param runtimeReporters + * Reporters for any additional service, not covered by `RuntimeService`. Reporters provide a + * String if a service is unhealthy. The String must be a short description of why the service + * is unhealthy. */ - def init[F[_]: Async, SetupAlert, RuntimeService]: F[AppHealth[F, SetupAlert, RuntimeService]] = + def init[F[_]: Async, SetupAlert, RuntimeService]( + runtimeReporters: List[F[Option[String]]] + ): F[AppHealth[F, SetupAlert, RuntimeService]] = for { setupHealth <- SignallingRef[F, SetupStatus[SetupAlert]](SetupStatus.AwaitingHealth) unhealthyRuntimeServices <- Ref[F].of(Set.empty[RuntimeService]) - reporters <- Ref[F].of(List.empty[F[Option[String]]]) - } yield new AppHealth(setupHealth, unhealthyRuntimeServices, reporters) + } yield new AppHealth(setupHealth, unhealthyRuntimeServices, runtimeReporters) } diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala index 1563a782..f3e019e5 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/HealthProbe.scala @@ -71,7 +71,7 @@ object HealthProbe { } else None val awaitingMsg = if (allAwaiting.nonEmpty) { - val joined = allUnhealthy.mkString("Services are awaiting a healthy status [", ", ", "]") + val joined = allAwaiting.mkString("Services are awaiting a healthy status [", ", ", "]") Some(joined) } else None diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala index a53661cf..a2c4dc64 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala @@ -33,32 +33,32 @@ class AppHealthSpec extends Specification with CatsEffect { """ def runtime1 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEmpty def runtime2 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEqualTo(List("test service 1")) def runtime3 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should containTheSameElementsAs(List("test service 1", "test service 2")) def runtime4 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) _ <- appHealth.becomeHealthyForRuntimeService(TestService1) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEmpty def runtime5 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) _ <- appHealth.becomeHealthyForRuntimeService(TestService1) @@ -66,9 +66,8 @@ class AppHealthSpec extends Specification with CatsEffect { } yield statuses should beEqualTo(List("test service 2")) def runtime6 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] reporter <- Ref[IO].of(Option.empty[String]) - _ <- appHealth.addRuntimeHealthReporter(reporter.get) + appHealth <- AppHealth.init[IO, TestAlert, TestService](List(reporter.get)) result1 <- appHealth.unhealthyRuntimeServiceMessages _ <- reporter.set(Some("test reporter unhealthy 1")) result2 <- appHealth.unhealthyRuntimeServiceMessages @@ -81,31 +80,31 @@ class AppHealthSpec extends Specification with CatsEffect { ).reduce(_ and _) def setup1 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.AwaitingHealth) def setup2 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Unhealthy(TestAlert1)) def setup3 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeHealthyForSetup setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) def setup4 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) _ <- appHealth.becomeHealthyForSetup setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) def setup5 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) _ <- appHealth.becomeHealthyForSetup _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) setupHealth <- appHealth.setupHealth.get diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala index 072bfaf9..54812543 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala @@ -26,20 +26,20 @@ class HealthProbeSpec extends Specification with CatsEffect { """ def probe1 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) response <- httpApp.run(Request[IO]()) } yield response.status must beEqualTo(Status.ServiceUnavailable) def probe2 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) _ <- appHealth.becomeHealthyForSetup response <- httpApp.run(Request[IO]()) } yield response.status must beEqualTo(Status.Ok) def probe3 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) _ <- appHealth.becomeHealthyForSetup _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) @@ -47,7 +47,7 @@ class HealthProbeSpec extends Specification with CatsEffect { } yield response.status must beEqualTo(Status.ServiceUnavailable) def probe4 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) _ <- appHealth.becomeHealthyForSetup _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) @@ -55,7 +55,7 @@ class HealthProbeSpec extends Specification with CatsEffect { } yield response.status must beEqualTo(Status.ServiceUnavailable) def probe5 = for { - appHealth <- AppHealth.init[IO, TestAlert, TestService] + appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) _ <- appHealth.becomeHealthyForSetup _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala index d439815b..c63bfa2f 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala @@ -161,7 +161,7 @@ class WebhookSpec extends Specification with CatsEffect { def send8 = { val resources = for { - appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService]) + appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService](Nil)) _ <- Webhook.resource(testConfig, testAppInfo, errorRaisingHttpClient, appHealth) } yield appHealth @@ -275,7 +275,7 @@ object WebhookSpec { ): Resource[IO, (IO[List[ReportedRequest]], AppHealth.Interface[IO, TestAlert, TestService])] = for { ref <- Resource.eval(Ref[IO].of(List.empty[ReportedRequest])) httpClient = reportingHttpClient(ref) - appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService]) + appHealth <- Resource.eval(AppHealth.init[IO, TestAlert, TestService](Nil)) _ <- Webhook.resource(config, testAppInfo, httpClient, appHealth) } yield (ref.get, appHealth) diff --git a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala index fbed0447..384daf76 100644 --- a/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala +++ b/modules/streams-core/src/main/scala/com.snowplowanalytics.snowplow/sources/SourceAndAck.scala @@ -52,7 +52,14 @@ trait SourceAndAck[F[_]] { object SourceAndAck { - sealed trait HealthStatus + sealed trait HealthStatus { self => + final def showIfUnhealthy: Option[String] = + self match { + case Healthy => None + case unhealthy: Unhealthy => Some(unhealthy.show) + } + } + case object Healthy extends HealthStatus sealed trait Unhealthy extends HealthStatus @@ -86,4 +93,5 @@ object SourceAndAck { case LaggingEventProcessor(latency) => show"Processing latency is $latency" case InactiveSource(duration) => show"Source of events has been inactive for $duration" } + } From 6bd798f534aa97900d017dab9c5987a92dc9ac6f Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Wed, 14 Aug 2024 23:02:36 +0100 Subject: [PATCH 5/7] [amendment] remove requirement to implement Eq --- .../snowplowanalytics/snowplow/runtime/AppHealth.scala | 9 ++------- .../com/snowplowanalytics/snowplow/runtime/Webhook.scala | 6 +++--- .../snowplowanalytics/snowplow/runtime/WebhookSpec.scala | 5 +---- 3 files changed, 6 insertions(+), 14 deletions(-) diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala index 02f05219..4c9738e7 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala @@ -56,11 +56,6 @@ import fs2.concurrent.SignallingRef * } * }}} * - * Implement a `cats.Eq` for the alerts, so we can alert on anything uniquely new - * {{{ - * val alertEq = Eq.fromUniversalEquals[Alert] // Do it better than this - * }}} - * * ==Environment initialization== * * Initialize the AppHealth as part of App Environment initialization: @@ -168,12 +163,12 @@ object AppHealth { case object Healthy extends SetupStatus[Nothing] case class Unhealthy[Alert](alert: Alert) extends SetupStatus[Alert] - implicit def eq[Alert: Eq]: Eq[SetupStatus[Alert]] = Eq.instance { + implicit def eq[Alert: Show]: Eq[SetupStatus[Alert]] = Eq.instance { case (Healthy, Healthy) => true case (Healthy, _) => false case (AwaitingHealth, AwaitingHealth) => true case (AwaitingHealth, _) => false - case (Unhealthy(a1), Unhealthy(a2)) => Eq[Alert].eqv(a1, a2) + case (Unhealthy(a1), Unhealthy(a2)) => a1.show === a2.show case (Unhealthy(_), _) => false } } diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala index b9af657d..bd8d9d0d 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Webhook.scala @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.runtime import cats.effect.{Async, Resource, Sync} import cats.effect.implicits._ import cats.implicits._ -import cats.{Eq, Show} +import cats.Show import fs2.{Pipe, Pull, Stream} import io.circe.{Decoder, Json} import io.circe.generic.semiauto._ @@ -47,7 +47,7 @@ object Webhook { } } - def resource[F[_]: Async, SetupAlert: Show: Eq]( + def resource[F[_]: Async, SetupAlert: Show]( config: Config, appInfo: AppInfo, httpClient: Client[F], @@ -55,7 +55,7 @@ object Webhook { ): Resource[F, Unit] = stream(config, appInfo, httpClient, appHealth).compile.drain.background.void - def stream[F[_]: Async, SetupAlert: Show: Eq]( + def stream[F[_]: Async, SetupAlert: Show]( config: Config, appInfo: AppInfo, httpClient: Client[F], diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala index c63bfa2f..ff0edeba 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala @@ -7,7 +7,7 @@ */ package com.snowplowanalytics.snowplow.runtime -import cats.{Applicative, Eq, Id, Show} +import cats.{Applicative, Id, Show} import cats.effect.testing.specs2.CatsEffect import cats.effect.{Clock, IO, Ref, Resource} import cats.effect.testkit.TestControl @@ -222,9 +222,6 @@ object WebhookSpec { implicit def testAlertShow: Show[TestAlert] = Show(_.msg) - implicit def testAlertEq: Eq[TestAlert] = - Eq.by(_.msg) - sealed trait TestService case object TestService1 extends TestService case object TestService2 extends TestService From c089cf0ad230c26cdefb6c1a9ec3277c487d1070 Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 15 Aug 2024 13:07:44 +0100 Subject: [PATCH 6/7] [amendment] change AppHealth method names from become* to be* --- .../snowplow/runtime/AppHealth.scala | 24 +++++------ .../snowplow/runtime/AppHealthSpec.scala | 28 ++++++------- .../snowplow/runtime/HealthProbeSpec.scala | 16 ++++---- .../snowplow/runtime/WebhookSpec.scala | 40 +++++++++---------- 4 files changed, 54 insertions(+), 54 deletions(-) diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala index 4c9738e7..90f03ed4 100644 --- a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/AppHealth.scala @@ -93,21 +93,21 @@ import fs2.concurrent.SignallingRef * {{{ * // After catching an exception in the bad events stream * _ <- Logger[F].error(e)("Problem with bad rows output stream") - * _ <- appHealth.becomeUnhealthyForRuntimeService(BadRowsOutputStream) + * _ <- appHealth.beUnhealthyForRuntimeService(BadRowsOutputStream) * * // After bad rows stream becomes healthy again * _ <- Logger[F].debug("Bad rows output stream is ok") - * _ <- appHealth.becomeHealthyForRuntimeService(BadRowsOutputStream) + * _ <- appHealth.beHealthyForRuntimeService(BadRowsOutputStream) * * // After catching an exception with the external setup configuration * // Note this will send an alert webhook * _ <- Logger[F].error(e)("Problem with the provided password") - * _ <- appHealth.becomeUnhealthyForSetup(BadPassword) + * _ <- appHealth.beUnhealthyForSetup(BadPassword) * * // After successful connection to the externally configured services * // Note this will send the first hearbeat webhook * _ <- Logger[F].error(e)("Everything ok with the provided setup configuration") - * _ <- appHealth.becomeHealthyForSetup + * _ <- appHealth.beHealthyForSetup * }}} * * The application processing code does not need to explicitly send any monitoring alert or adjust @@ -120,16 +120,16 @@ class AppHealth[F[_]: Monad, SetupAlert, RuntimeService] private ( ) extends AppHealth.Interface[F, SetupAlert, RuntimeService] { import AppHealth._ - def becomeHealthyForSetup: F[Unit] = + def beHealthyForSetup: F[Unit] = setupHealth.set(SetupStatus.Healthy) - def becomeUnhealthyForSetup(alert: SetupAlert): F[Unit] = + def beUnhealthyForSetup(alert: SetupAlert): F[Unit] = setupHealth.set(SetupStatus.Unhealthy(alert)) - def becomeHealthyForRuntimeService(service: RuntimeService): F[Unit] = + def beHealthyForRuntimeService(service: RuntimeService): F[Unit] = unhealthyRuntimeServices.update(_ - service) - def becomeUnhealthyForRuntimeService(service: RuntimeService): F[Unit] = + def beUnhealthyForRuntimeService(service: RuntimeService): F[Unit] = unhealthyRuntimeServices.update(_ + service) private[runtime] def unhealthyRuntimeServiceMessages(implicit show: Show[RuntimeService]): F[List[String]] = @@ -148,13 +148,13 @@ object AppHealth { * implementation. */ trait Interface[F[_], -SetupAlert, -RuntimeService] { - def becomeHealthyForSetup: F[Unit] + def beHealthyForSetup: F[Unit] - def becomeUnhealthyForSetup(alert: SetupAlert): F[Unit] + def beUnhealthyForSetup(alert: SetupAlert): F[Unit] - def becomeHealthyForRuntimeService(service: RuntimeService): F[Unit] + def beHealthyForRuntimeService(service: RuntimeService): F[Unit] - def becomeUnhealthyForRuntimeService(service: RuntimeService): F[Unit] + def beUnhealthyForRuntimeService(service: RuntimeService): F[Unit] } private[runtime] sealed trait SetupStatus[+Alert] diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala index a2c4dc64..a614d84b 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/AppHealthSpec.scala @@ -39,29 +39,29 @@ class AppHealthSpec extends Specification with CatsEffect { def runtime2 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) - _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEqualTo(List("test service 1")) def runtime3 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) - _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) - _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beUnhealthyForRuntimeService(TestService2) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should containTheSameElementsAs(List("test service 1", "test service 2")) def runtime4 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) - _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) - _ <- appHealth.becomeHealthyForRuntimeService(TestService1) + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beHealthyForRuntimeService(TestService1) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEmpty def runtime5 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) - _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) - _ <- appHealth.becomeUnhealthyForRuntimeService(TestService2) - _ <- appHealth.becomeHealthyForRuntimeService(TestService1) + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beUnhealthyForRuntimeService(TestService2) + _ <- appHealth.beHealthyForRuntimeService(TestService1) statuses <- appHealth.unhealthyRuntimeServiceMessages } yield statuses should beEqualTo(List("test service 2")) @@ -86,27 +86,27 @@ class AppHealthSpec extends Specification with CatsEffect { def setup2 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) - _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) + _ <- appHealth.beUnhealthyForSetup(TestAlert1) setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Unhealthy(TestAlert1)) def setup3 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) def setup4 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) - _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beUnhealthyForSetup(TestAlert1) + _ <- appHealth.beHealthyForSetup setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Healthy) def setup5 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) - _ <- appHealth.becomeHealthyForSetup - _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) + _ <- appHealth.beHealthyForSetup + _ <- appHealth.beUnhealthyForSetup(TestAlert1) setupHealth <- appHealth.setupHealth.get } yield setupHealth should beEqualTo(AppHealth.SetupStatus.Unhealthy(TestAlert1)) diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala index 54812543..108ed0ca 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/HealthProbeSpec.scala @@ -34,32 +34,32 @@ class HealthProbeSpec extends Specification with CatsEffect { def probe2 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup response <- httpApp.run(Request[IO]()) } yield response.status must beEqualTo(Status.Ok) def probe3 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) - _ <- appHealth.becomeHealthyForSetup - _ <- appHealth.becomeUnhealthyForSetup(TestAlert1) + _ <- appHealth.beHealthyForSetup + _ <- appHealth.beUnhealthyForSetup(TestAlert1) response <- httpApp.run(Request[IO]()) } yield response.status must beEqualTo(Status.ServiceUnavailable) def probe4 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) - _ <- appHealth.becomeHealthyForSetup - _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beHealthyForSetup + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) response <- httpApp.run(Request[IO]()) } yield response.status must beEqualTo(Status.ServiceUnavailable) def probe5 = for { appHealth <- AppHealth.init[IO, TestAlert, TestService](Nil) httpApp = HealthProbe.httpApp(appHealth) - _ <- appHealth.becomeHealthyForSetup - _ <- appHealth.becomeUnhealthyForRuntimeService(TestService1) - _ <- appHealth.becomeHealthyForRuntimeService(TestService1) + _ <- appHealth.beHealthyForSetup + _ <- appHealth.beUnhealthyForRuntimeService(TestService1) + _ <- appHealth.beHealthyForRuntimeService(TestService1) response <- httpApp.run(Request[IO]()) } yield response.status must beEqualTo(Status.Ok) diff --git a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala index ff0edeba..4e7344f9 100644 --- a/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala +++ b/modules/runtime-common/src/test/scala/com/snowplowanalytics/snowplow/runtime/WebhookSpec.scala @@ -57,11 +57,11 @@ class WebhookSpec extends Specification with CatsEffect { def send2 = { val io = resources().use { case (getReportedRequests, appHealth) => for { - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(5.minutes) reportedRequests <- getReportedRequests } yield List( @@ -75,7 +75,7 @@ class WebhookSpec extends Specification with CatsEffect { def send3 = { val io = resources().use { case (getReportedRequests, appHealth) => for { - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(45.minutes) reportedRequests <- getReportedRequests } yield List( @@ -89,11 +89,11 @@ class WebhookSpec extends Specification with CatsEffect { def send4 = { val io = resources().use { case (getReportedRequests, appHealth) => for { - _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) _ <- IO.sleep(5.minutes) reportedRequests <- getReportedRequests } yield List( @@ -107,11 +107,11 @@ class WebhookSpec extends Specification with CatsEffect { def send5 = { val io = resources().use { case (getReportedRequests, appHealth) => for { - _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom 1")) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom 1")) _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom 2")) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom 2")) _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom 3")) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom 3")) _ <- IO.sleep(5.minutes) reportedRequests <- getReportedRequests } yield List( @@ -125,13 +125,13 @@ class WebhookSpec extends Specification with CatsEffect { def send6 = { val io = resources().use { case (getReportedRequests, appHealth) => for { - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeUnhealthyForSetup(TestAlert("boom!")) + _ <- appHealth.beUnhealthyForSetup(TestAlert("boom!")) _ <- IO.sleep(5.minutes) reportedRequests <- getReportedRequests } yield List( @@ -147,11 +147,11 @@ class WebhookSpec extends Specification with CatsEffect { val config = testConfig.copy(endpoint = None) val io = resources(config).use { case (getReportedRequests, appHealth) => for { - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(5.minutes) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(5.minutes) reportedRequests <- getReportedRequests } yield reportedRequests should beEmpty @@ -167,11 +167,11 @@ class WebhookSpec extends Specification with CatsEffect { val io = resources.use { appHealth => for { - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(30.minutes) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(30.minutes) - _ <- appHealth.becomeHealthyForSetup + _ <- appHealth.beHealthyForSetup _ <- IO.sleep(30.minutes) } yield ok } From a139a53a872843748a5294e46a4f9605819c015e Mon Sep 17 00:00:00 2001 From: Ian Streeter Date: Thu, 15 Aug 2024 16:30:17 +0100 Subject: [PATCH 7/7] [amendment] Move service retrying into common-streams --- .../snowplow/runtime/Retrying.scala | 119 ++++++++++++++++++ .../runtime/SetupExceptionMessages.scala | 38 ++++++ project/Dependencies.scala | 1 + 3 files changed, 158 insertions(+) create mode 100644 modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Retrying.scala create mode 100644 modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/SetupExceptionMessages.scala diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Retrying.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Retrying.scala new file mode 100644 index 00000000..3a872939 --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/Retrying.scala @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ + +package com.snowplowanalytics.snowplow.runtime + +import cats.{Applicative, Show} +import cats.effect.Sync +import cats.implicits._ +import retry._ +import io.circe.Decoder +import io.circe.generic.semiauto._ +import io.circe.config.syntax._ +import retry.implicits.retrySyntaxError +import org.typelevel.log4cats.Logger +import org.typelevel.log4cats.slf4j.Slf4jLogger + +import scala.concurrent.duration.FiniteDuration + +object Retrying { + + private implicit def logger[F[_]: Sync]: Logger[F] = Slf4jLogger.getLogger[F] + + // Publicly accessible to help Snowplow apps that manage their own retrying + implicit def showRetryDetails: Show[RetryDetails] = Show { + case RetryDetails.GivingUp(totalRetries, totalDelay) => + s"Giving up on retrying, total retries: $totalRetries, total delay: ${totalDelay.toSeconds} seconds" + case RetryDetails.WillDelayAndRetry(nextDelay, retriesSoFar, cumulativeDelay) => + s"Will retry in ${nextDelay.toMillis} milliseconds, retries so far: $retriesSoFar, total delay so far: ${cumulativeDelay.toMillis} milliseconds" + } + + object Config { + + case class ForSetup(delay: FiniteDuration) + object ForSetup { + implicit def decoder: Decoder[ForSetup] = + deriveDecoder + } + + case class ForTransient(delay: FiniteDuration, attempts: Int) + object ForTransient { + implicit def decoder: Decoder[ForTransient] = + deriveDecoder + } + } + + def withRetries[F[_]: Sync: Sleep, Alert, RuntimeService, A]( + appHealth: AppHealth.Interface[F, Alert, RuntimeService], + configForTransient: Config.ForTransient, + configForSetup: Config.ForSetup, + service: RuntimeService, + toAlert: SetupExceptionMessages => Alert, + setupErrorCheck: PartialFunction[Throwable, String] + )( + action: F[A] + ): F[A] = + action + .retryingOnSomeErrors( + isWorthRetrying = checkingNestedExceptions(setupErrorCheck, _).nonEmpty.pure[F], + policy = policyForSetupErrors[F](configForSetup), + onError = logErrorAndSendAlert(appHealth, setupErrorCheck, toAlert, _, _) + ) + .retryingOnAllErrors( + policy = policyForTransientErrors[F](configForTransient), + onError = logErrorAndReportUnhealthy(appHealth, service, _, _) + ) + .productL(appHealth.beHealthyForRuntimeService(service)) + + private def policyForSetupErrors[F[_]: Applicative](config: Config.ForSetup): RetryPolicy[F] = + RetryPolicies.exponentialBackoff[F](config.delay) + + private def policyForTransientErrors[F[_]: Applicative](config: Config.ForTransient): RetryPolicy[F] = + RetryPolicies.fullJitter[F](config.delay).join(RetryPolicies.limitRetries(config.attempts - 1)) + + private def logErrorAndSendAlert[F[_]: Sync, Alert, RuntimeService]( + appHealth: AppHealth.Interface[F, Alert, RuntimeService], + setupErrorCheck: PartialFunction[Throwable, String], + toAlert: SetupExceptionMessages => Alert, + error: Throwable, + details: RetryDetails + ): F[Unit] = + logError(error, details) *> appHealth.beUnhealthyForSetup( + toAlert(SetupExceptionMessages(checkingNestedExceptions(setupErrorCheck, error))) + ) + + private def logError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] = + Logger[F].error(error)(show"Executing command failed. $details") + + private def logErrorAndReportUnhealthy[F[_]: Sync, RuntimeService]( + appHealth: AppHealth.Interface[F, ?, RuntimeService], + service: RuntimeService, + error: Throwable, + details: RetryDetails + ): F[Unit] = + logError(error, details) *> appHealth.beUnhealthyForRuntimeService(service) + + // Returns a list of reasons of why this was a destination setup error. + // Or empty list if this was not caused by a destination setup error + private def checkingNestedExceptions( + setupErrorCheck: PartialFunction[Throwable, String], + t: Throwable + ): List[String] = + unnestThrowableCauses(t).map(setupErrorCheck.lift).flatten + + private def unnestThrowableCauses(t: Throwable): List[Throwable] = { + def go(t: Throwable, acc: List[Throwable]): List[Throwable] = + Option(t.getCause) match { + case Some(cause) => go(cause, cause :: acc) + case None => acc.reverse + } + go(t, List(t)) + } +} diff --git a/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/SetupExceptionMessages.scala b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/SetupExceptionMessages.scala new file mode 100644 index 00000000..74d69daa --- /dev/null +++ b/modules/runtime-common/src/main/scala/com/snowplowanalytics/snowplow/runtime/SetupExceptionMessages.scala @@ -0,0 +1,38 @@ +/* + * Copyright (c) 2014-present Snowplow Analytics Ltd. All rights reserved. + * + * This software is made available by Snowplow Analytics, Ltd., + * under the terms of the Snowplow Limited Use License Agreement, Version 1.0 + * located at https://docs.snowplow.io/limited-use-license-1.0 + * BY INSTALLING, DOWNLOADING, ACCESSING, USING OR DISTRIBUTING ANY PORTION + * OF THE SOFTWARE, YOU AGREE TO THE TERMS OF SUCH LICENSE AGREEMENT. + */ +package com.snowplowanalytics.snowplow.runtime + +import cats.Show + +/** + * Represents all messages from an Exception associated with a setup error + * + * Messages are expected to be included in the webhook payload. This library assumes each message + * has been cleaned and sanitized by the Snowplow application. + */ +case class SetupExceptionMessages(value: List[String]) + +object SetupExceptionMessages { + + implicit def showSetupExceptionMessages: Show[SetupExceptionMessages] = { + def removeDuplicateMessages(in: List[String]): List[String] = + in match { + case h :: t :: rest => + if (h.contains(t)) removeDuplicateMessages(h :: rest) + else if (t.contains(h)) removeDuplicateMessages(t :: rest) + else h :: removeDuplicateMessages(t :: rest) + case fewer => fewer + } + + Show.show { exceptionMessages => + removeDuplicateMessages(exceptionMessages.value).mkString(": ") + } + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c6d4c11b..d7c1cb26 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -166,6 +166,7 @@ object Dependencies { val runtimeCommonDependencies = Seq( cats, catsEffectKernel, + catsRetry, circeConfig, circeGeneric, emberServer,