Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add legacyColumnMode configuration #394

Merged
merged 8 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions config/config.azure.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,10 @@
"iglu:com.acme/legacy/jsonschema/2-*-*"
]

# -- Whether the loader should load to legacy columns irrespective of `legacyColumns` configuration.
# -- Change to `true` so events go to the legacy columns.
"legacyColumnMode": false

# -- Whether the loader should crash and exit if it fails to resolve an Iglu Schema.
# -- We recommend `true` because Snowplow enriched events have already passed validation, so a missing schema normally
# -- indicates an error that needs addressing.
Expand Down
4 changes: 4 additions & 0 deletions config/config.kinesis.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,10 @@
"iglu:com.acme/legacy/jsonschema/2-*-*"
]

# -- Whether the loader should load to legacy columns irrespective of `legacyColumns` configuration.
# -- Change to `true` so events go to the legacy columns.
"legacyColumnMode": false

# -- Whether the loader should crash and exit if it fails to resolve an Iglu Schema.
# -- We recommend `true` because Snowplow enriched events have already passed validation, so a missing schema normally
# -- indicates an error that needs addressing.
Expand Down
4 changes: 4 additions & 0 deletions config/config.pubsub.reference.hocon
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,10 @@
"iglu:com.acme/legacy/jsonschema/2-*-*"
]

# -- Whether the loader should load to legacy columns irrespective of `legacyColumns` configuration.
# -- Change to `true` so events go to the legacy columns.
"legacyColumnMode": false

# -- Whether the loader should crash and exit if it fails to resolve an Iglu Schema.
# -- We recommend `true` because Snowplow enriched events have already passed validation, so a missing schema normally
# -- indicates an error that needs addressing.
Expand Down
1 change: 1 addition & 0 deletions modules/core/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@

"skipSchemas": []
"legacyColumns": []
"legacyColumnMode": false
"exitOnMissingIgluSchema": true

"http": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ case class Config[+Source, +Sink](
license: AcceptedLicense,
skipSchemas: List[SchemaCriterion],
legacyColumns: List[SchemaCriterion],
legacyColumnMode: Boolean,
exitOnMissingIgluSchema: Boolean,
http: Config.Http
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ case class Environment[F[_]](
badRowMaxSize: Int,
schemasToSkip: List[SchemaCriterion],
legacyColumns: List[SchemaCriterion],
legacyColumnMode: Boolean,
exitOnMissingIgluSchema: Boolean
)

Expand Down Expand Up @@ -79,6 +80,7 @@ object Environment {
badRowMaxSize = config.main.output.bad.maxRecordSize,
schemasToSkip = config.main.skipSchemas,
legacyColumns = config.main.legacyColumns,
legacyColumnMode = config.main.legacyColumnMode,
exitOnMissingIgluSchema = config.main.exitOnMissingIgluSchema
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,8 @@ object LegacyColumns {
def resolveTypes[F[_]: Sync: RegistryLookup](
resolver: Resolver[F],
entities: Map[TabledEntity, Set[SchemaSubVersion]],
matchCriteria: List[SchemaCriterion]
matchCriteria: List[SchemaCriterion],
legacyColumnMode: Boolean
): F[Result] =
entities.toVector
.flatMap { case (tabledEntity, subVersions) =>
Expand All @@ -114,7 +115,8 @@ object LegacyColumns {
(tabledEntity.entityType, TabledEntity.toSchemaKey(tabledEntity, subVersion))
}
.filter { case (_, schemaKey) =>
matchCriteria.exists(_.matches(schemaKey))
if (legacyColumnMode) true
else matchCriteria.exists(_.matches(schemaKey))
}
}
.traverse { case (entityType, schemaKey) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ import com.snowplowanalytics.snowplow.runtime.processing.BatchUp
import com.snowplowanalytics.snowplow.runtime.Retrying.showRetryDetails
import com.snowplowanalytics.snowplow.loaders.transform.{BadRowsSerializer, NonAtomicFields, SchemaSubVersion, TabledEntity, Transform}
import com.snowplowanalytics.snowplow.bigquery.{Environment, RuntimeService}
import com.snowplowanalytics.snowplow.loaders.transform.NonAtomicFields.Result

import java.time.Instant

Expand Down Expand Up @@ -156,8 +157,8 @@ object Processing {
now <- Sync[F].realTimeInstant
loadTstamp = BigQueryCaster.timestampValue(now)
_ <- Logger[F].debug(s"Processing batch of size ${events.size}")
v2NonAtomicFields <- NonAtomicFields.resolveTypes[F](env.resolver, entities, env.schemasToSkip ::: env.legacyColumns)
legacyFields <- LegacyColumns.resolveTypes[F](env.resolver, entities, env.legacyColumns)
v2NonAtomicFields <- resolveV2NonAtomicFields(env, entities)
legacyFields <- LegacyColumns.resolveTypes[F](env.resolver, entities, env.legacyColumns, env.legacyColumnMode)
_ <- possiblyExitOnMissingIgluSchema(env, v2NonAtomicFields, legacyFields)
(moreBad, rows) <- transformBatch[F](badProcessor, loadTstamp, events, v2NonAtomicFields, legacyFields)
fields = v2NonAtomicFields.fields.flatMap { tte =>
Expand All @@ -174,6 +175,13 @@ object Processing {
)
}

private def resolveV2NonAtomicFields[F[_]: Async: RegistryLookup](
env: Environment[F],
entities: Map[TabledEntity, Set[SchemaSubVersion]]
): F[Result] =
if (env.legacyColumnMode) Sync[F].delay(Result(Vector.empty, Nil))
else NonAtomicFields.resolveTypes[F](env.resolver, entities, env.schemasToSkip ::: env.legacyColumns)

private def transformBatch[F[_]: Sync](
badProcessor: BadRowProcessor,
loadTstamp: java.lang.Long,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,11 @@ import com.snowplowanalytics.snowplow.runtime.processing.Coldswap
import com.snowplowanalytics.snowplow.sources.{EventProcessingConfig, EventProcessor, SourceAndAck, TokenedEvents}
import com.snowplowanalytics.snowplow.sinks.Sink
import com.snowplowanalytics.snowplow.bigquery.processing.{BigQueryRetrying, TableManager, Writer}
import com.snowplowanalytics.snowplow.bigquery.MockEnvironment.State

import scala.concurrent.duration.{DurationInt, FiniteDuration}

case class MockEnvironment(state: Ref[IO, Vector[MockEnvironment.Action]], environment: Environment[IO])
case class MockEnvironment(state: Ref[IO, State], environment: Environment[IO])

object MockEnvironment {

Expand All @@ -49,6 +50,8 @@ object MockEnvironment {
}
import Action._

case class State(actions: Vector[Action], writtenToBQ: Iterable[Map[String, AnyRef]])

/**
* Build a mock environment for testing
*
Expand All @@ -64,10 +67,11 @@ object MockEnvironment {
def build(
inputs: List[TokenedEvents],
mocks: Mocks,
legacyColumns: List[SchemaCriterion]
legacyColumns: List[SchemaCriterion],
legacyColumnMode: Boolean
): Resource[IO, MockEnvironment] =
for {
state <- Resource.eval(Ref[IO].of(Vector.empty[Action]))
state <- Resource.eval(Ref[IO].of(State(Vector.empty[Action], Nil)))
writerResource <- Resource.eval(testWriter(state, mocks.writerResponses, mocks.descriptors))
writerColdswap <- Coldswap.make(writerResource)
} yield {
Expand All @@ -90,6 +94,7 @@ object MockEnvironment {
badRowMaxSize = 1000000,
schemasToSkip = List.empty,
legacyColumns = legacyColumns,
legacyColumnMode = legacyColumnMode,
exitOnMissingIgluSchema = false
)
MockEnvironment(state, env)
Expand Down Expand Up @@ -124,29 +129,29 @@ object MockEnvironment {
def cloud = "OnPrem"
}

private def testTableManager(mockedResponse: Response[FieldList], state: Ref[IO, Vector[Action]]): TableManager.WithHandledErrors[IO] =
private def testTableManager(mockedResponse: Response[FieldList], state: Ref[IO, State]): TableManager.WithHandledErrors[IO] =
new TableManager.WithHandledErrors[IO] {
def addColumns(columns: Vector[Field]): IO[FieldList] =
mockedResponse match {
case Response.Success(fieldList) =>
state.update(_ :+ AlterTableAddedColumns(columns.map(_.name))).as(fieldList)
state.update(s => s.copy(actions = s.actions :+ AlterTableAddedColumns(columns.map(_.name)))).as(fieldList)
case Response.ExceptionThrown(value) =>
IO.raiseError(value)
}

def createTableIfNotExists: IO[Unit] =
state.update(_ :+ CreatedTable)
state.update(s => s.copy(actions = s.actions :+ CreatedTable))
}

private def testSourceAndAck(inputs: List[TokenedEvents], state: Ref[IO, Vector[Action]]): SourceAndAck[IO] =
private def testSourceAndAck(inputs: List[TokenedEvents], state: Ref[IO, State]): SourceAndAck[IO] =
new SourceAndAck[IO] {
def stream(config: EventProcessingConfig[IO], processor: EventProcessor[IO]): Stream[IO, Nothing] =
Stream
.emits(inputs)
.through(processor)
.chunks
.evalMap { chunk =>
state.update(_ :+ Checkpointed(chunk.toList))
state.update(s => s.copy(actions = s.actions :+ Checkpointed(chunk.toList)))
}
.drain

Expand All @@ -157,11 +162,11 @@ object MockEnvironment {
IO.pure(None)
}

private def testBadSink(mockedResponse: Response[Unit], state: Ref[IO, Vector[Action]]): Sink[IO] =
private def testBadSink(mockedResponse: Response[Unit], state: Ref[IO, State]): Sink[IO] =
Sink[IO] { batch =>
mockedResponse match {
case Response.Success(_) =>
state.update(_ :+ SentToBad(batch.size))
state.update(s => s.copy(actions = s.actions :+ SentToBad(batch.size)))
case Response.ExceptionThrown(value) =>
IO.raiseError(value)
}
Expand All @@ -181,15 +186,15 @@ object MockEnvironment {
* responses given, then it will return with a successful response.
*/
private def testWriter(
actionRef: Ref[IO, Vector[Action]],
stateRef: Ref[IO, State],
responses: List[Response[Writer.WriteResult]],
descriptors: List[Descriptors.Descriptor]
): IO[Resource[IO, Writer[IO]]] =
for {
responseRef <- Ref[IO].of(responses)
descriptorRef <- Ref[IO].of(descriptors)
} yield {
val make = actionRef.update(_ :+ OpenedWriter).as {
val make = stateRef.update(s => s.copy(actions = s.actions :+ OpenedWriter)).as {
new Writer[IO] {
def descriptor: IO[Descriptors.Descriptor] =
descriptorRef.modify {
Expand All @@ -205,55 +210,54 @@ object MockEnvironment {
}
writeResult <- response match {
case success: Response.Success[Writer.WriteResult] =>
updateActions(actionRef, rows, success) *> IO(success.value)
updateActions(stateRef, rows, success) *> IO(success.value)
case Response.ExceptionThrown(ex) =>
IO.raiseError(ex)
}
} yield writeResult

private def updateActions(
state: Ref[IO, Vector[Action]],
state: Ref[IO, State],
rows: Iterable[Map[String, AnyRef]],
success: Response.Success[Writer.WriteResult]
): IO[Unit] =
success.value match {
case Writer.WriteResult.Success =>
state.update(_ :+ WroteRowsToBigQuery(rows.size))
state.update(s => s.copy(actions = s.actions :+ WroteRowsToBigQuery(rows.size), writtenToBQ = s.writtenToBQ ++ rows))
case _ =>
IO.unit
}
}
}

Resource.make(make)(_ => actionRef.update(_ :+ ClosedWriter))
Resource.make(make)(_ => stateRef.update(s => s.copy(actions = s.actions :+ ClosedWriter)))
}

private def testMetrics(ref: Ref[IO, Vector[Action]]): Metrics[IO] = new Metrics[IO] {
private def testMetrics(ref: Ref[IO, State]): Metrics[IO] = new Metrics[IO] {
def addBad(count: Long): IO[Unit] =
ref.update(_ :+ AddedBadCountMetric(count))
ref.update(s => s.copy(actions = s.actions :+ AddedBadCountMetric(count)))

def addGood(count: Long): IO[Unit] =
ref.update(_ :+ AddedGoodCountMetric(count))
ref.update(s => s.copy(actions = s.actions :+ AddedGoodCountMetric(count)))

def setLatency(latency: FiniteDuration): IO[Unit] =
ref.update(_ :+ SetLatencyMetric(latency))
ref.update(s => s.copy(actions = s.actions :+ SetLatencyMetric(latency)))

def setE2ELatency(e2eLatency: FiniteDuration): IO[Unit] =
ref.update(_ :+ SetE2ELatencyMetric(e2eLatency))
ref.update(s => s.copy(actions = s.actions :+ SetE2ELatencyMetric(e2eLatency)))

def report: Stream[IO, Nothing] = Stream.never[IO]
}

private def testAppHealth(ref: Ref[IO, Vector[Action]]): AppHealth.Interface[IO, Alert, RuntimeService] =
private def testAppHealth(ref: Ref[IO, State]): AppHealth.Interface[IO, Alert, RuntimeService] =
new AppHealth.Interface[IO, Alert, RuntimeService] {
def beHealthyForSetup: IO[Unit] =
IO.unit
def beUnhealthyForSetup(alert: Alert): IO[Unit] =
IO.unit
def beHealthyForRuntimeService(service: RuntimeService): IO[Unit] =
ref.update(_ :+ BecameHealthy(service))
ref.update(s => s.copy(actions = s.actions :+ BecameHealthy(service)))
def beUnhealthyForRuntimeService(service: RuntimeService): IO[Unit] =
ref.update(_ :+ BecameUnhealthy(service))
ref.update(s => s.copy(actions = s.actions :+ BecameUnhealthy(service)))
}

private def retriesConfig = Config.Retries(
Expand Down
Loading