Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Add legacyColumnMode configuration #394

Merged
merged 8 commits into from
Feb 4, 2025
Merged

Add legacyColumnMode configuration #394

merged 8 commits into from
Feb 4, 2025

Conversation

oguzhanunlu
Copy link
Contributor

@oguzhanunlu oguzhanunlu commented Jan 28, 2025

This commit introduces a new feature flag legacyColumnMode , changing loading behavior such that all events are loaded to legacy columns regardless of legacyColumns configuration. Set it to true to enable legacy behavior.

ref: PDP-1489

@oguzhanunlu oguzhanunlu self-assigned this Jan 28, 2025
map1 <- v2Transform
map2 <- legacyTransform
} yield event -> (map1 ++ map2)
if (legacyColumnMode) LegacyColumns.transformEvent(badProcessor, event, legacyEntities).map(event -> _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This line doesn't look right. I might be wrong... but are you sure you're loading the atomic fields, like event_id and collector_tstamp etc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll add a test for that

This commit introduces a new feature flag `legacyColumnMode` , changing loading behavior such that all events are loaded to legacy columns regardless of `legacyColumns` configuration. Set it to true to enable legacy behavior.
Comment on lines 202 to 203
def alter1_legacy = alter1_base(legacyColumns = true, timeout = true, legacyColumnMode = false)
def alter1_full_legacy = alter1_base(legacyColumns = true, timeout = true, legacyColumnMode = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the difference between alter1_legacy and alter1_full_legacy? It looks like you are testing exactly the same thing twice.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, it doesn't have a meaning as it is. I'll remove it.

Comment on lines 278 to 279
def alter2_legacy = alter2_base(legacyColumns = true, timeout = true, legacyColumnMode = false)
def alter2_full_legacy = alter2_base(legacyColumns = true, timeout = true, legacyColumnMode = true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above. alter2_legacy and alter2_full_legacy test exactly the same thing twice.

Comment on lines 42 to 43
case class WroteNRowsToBigQuery(n: Int) extends Action
case class WroteRowsToBigQuery(rows: Iterable[Map[String, AnyRef]]) extends Action
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see why you did this. But it doesn't feel right to have two different Action classes representing the same action. And you introduced lots of boolean flags recordRows = false throughout the specs.

Here's an alternative idea. Don't change the Action classes. Leave it like it was before, like:

case class WroteRowsToBigQuery(rowCount: Int) extends Action

And instead, change the class MockEnvironment so it captures the actions AND the event content

case class State(actions: Vector[Action], writtenToBQ: Iterable[Map[String, AnyRef]])
case class MockEnvironment(state: Ref[IO, State],  environment: Environment[IO])

Then any of the specs can (if they want to) check the event content.

You might even find other existing specs that benefit from doing a check of the event content.

@oguzhanunlu oguzhanunlu requested a review from istreeter February 3, 2025 14:17

import scala.concurrent.duration.{DurationInt, FiniteDuration}

case class MockEnvironment(state: Ref[IO, Vector[MockEnvironment.Action]], environment: Environment[IO])
case class State(actions: Vector[Action], writtenToBQ: Iterable[Map[String, AnyRef]])
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My personal style preference, is to define the State class within the MockEnvironment object. Just because I don't like having top-level classes defined inside a file, where the filename does not match the class name.

This is nitpicking though, sorry!

Comment on lines 257 to 258
(failures must beEmpty) and
(fields must contain(expected))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In all of the other "ue" tests, we have an assertion like (fields must haveSize(1)). Should we have the same here?

Same comment too for the c6 test you added in this file.

} yield {
val rows = state.writtenToBQ
(rows.size shouldEqual inputs.head.events.size) and
(rows.head.get("event_id") should beEqualTo(Option(eventID.toString))) and
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

specs2 has some nice syntax for working with options:

rows.head.get("event_id") should beSome(eventID.toString)

It also has nice syntax for working with maps:

rows.head should havePair("event_id" -> eventID.toString)

TestControl.executeEmbed(io)
}

def e14_base(legacyColumnMode: Boolean) = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My criticism of this test is.... it is too different from all the other tests in this file. And I don't see why it needs to be different. In all other tests, the thing we test is: What is the result of calling Processing.stream(environment) with different inputs.

Processing.stream() is the public method, and we require it has the correct end-to-end behaviour. Whereas the method Processing.resolveV2NonAtomicFields is private, and therefore an implementation detail of Processing.stream.

The two behaviours you are trying to test here are:

  • If legacyColumnMode is enabled, then do we load those legacy columns into BigQuery?
  • If legacyColumnMode is not enabled, then do we load v2-style columns into BigQuery?

I am fairly sure you can test those two behaviours, while being consistent with the pattern of other tests in this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the feedback! I wanted to test the change in isolation fearing that testing the whole pipe could miss it in future if not today. I guess it was unnecessary. I can surely stay consistent with other tests.

@oguzhanunlu oguzhanunlu requested a review from istreeter February 4, 2025 13:23
_ <- Processing.stream(control.environment).compile.drain
state <- control.state.get
} yield state.actions should beEqualTo(
Vector(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a nice improvement compared to when I reviewed it yesterday.

I think one tiny space for improvement... in this spec, currently this is the important line which covers the new feature that you added:

Action.AlterTableAddedColumns(Vector(expectedColumnName)),

But that's it. In terms of testing the feature you added, currently you are just testing that it tried to alter the table with the expected column.

There is an opportunity to test more: You could also check state.writtenToBQ, and that will tell you whether it transformed the data in the expected way.

(Separately, I would like to amend some of the older tests to also check state.writtenToBQ, but that is beyond the scope of this PR).

@@ -621,19 +632,117 @@ class ProcessingSpec extends Specification with CatsEffect {
def e12 = e12Base(legacyColumns = false)
def e12Legacy = e12Base(legacyColumns = true)

def e13 = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The title of this test, taken from above, is

Use legacy columns for all fields when legacyColumnMode is enabled $e13

Do you think that is a good description of what is actually tested in the test implementation? Looks like you are just testing that it sets various atomic fields. It's a nice check... but the loader should pass this test whether or not legacyColumnMode is enabled.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, I'll get rid of e13 as e14 and e15 test the feature

@@ -174,6 +175,13 @@ object Processing {
)
}

private[processing] def resolveV2NonAtomicFields[F[_]: Async: RegistryLookup](
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this can be full private, not private[processing].

@istreeter istreeter self-requested a review February 4, 2025 13:45
@istreeter
Copy link
Contributor

I hit "approve" by accident.

Comment on lines +695 to +697
)) and
(state.writtenToBQ.head should haveKey(expectedColumnName))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent. Really nice improvement to the machinery of this test suite.

@oguzhanunlu oguzhanunlu merged commit 09d7255 into v2 Feb 4, 2025
2 checks passed
@oguzhanunlu oguzhanunlu deleted the full-legacy branch February 4, 2025 15:15
istreeter added a commit that referenced this pull request Feb 6, 2025
A few of the tests in `ProcessingSpec` had recently been disabled by
replacing the real test with `TestControl.executeEmbed(io.timeout(10.seconds))`.
This PR fixes and re-enables the test, so we have better coverage of
when legacy modes are enabled.

Also, I make more use of the new ability to test `writtenToBQ` which was
added in #394. This means we have more test coverage that the expected
fields get loaded.
istreeter added a commit that referenced this pull request Feb 7, 2025
A few of the tests in `ProcessingSpec` had recently been disabled by
replacing the real test with `TestControl.executeEmbed(io.timeout(10.seconds))`.
This PR fixes and re-enables the test, so we have better coverage of
when legacy modes are enabled.

Also, I make more use of the new ability to test `writtenToBQ` which was
added in #394. This means we have more test coverage that the expected
fields get loaded.
benjben pushed a commit that referenced this pull request Feb 14, 2025
This commit introduces a new feature flag `legacyColumnMode` , changing loading behavior such that all events are loaded to legacy columns regardless of `legacyColumns` configuration. Set it to true to enable legacy behavior.
benjben pushed a commit that referenced this pull request Feb 14, 2025
A few of the tests in `ProcessingSpec` had recently been disabled by
replacing the real test with `TestControl.executeEmbed(io.timeout(10.seconds))`.
This PR fixes and re-enables the test, so we have better coverage of
when legacy modes are enabled.

Also, I make more use of the new ability to test `writtenToBQ` which was
added in #394. This means we have more test coverage that the expected
fields get loaded.
benjben pushed a commit that referenced this pull request Feb 14, 2025
This commit introduces a new feature flag `legacyColumnMode` , changing loading behavior such that all events are loaded to legacy columns regardless of `legacyColumns` configuration. Set it to true to enable legacy behavior.
benjben pushed a commit that referenced this pull request Feb 14, 2025
A few of the tests in `ProcessingSpec` had recently been disabled by
replacing the real test with `TestControl.executeEmbed(io.timeout(10.seconds))`.
This PR fixes and re-enables the test, so we have better coverage of
when legacy modes are enabled.

Also, I make more use of the new ability to test `writtenToBQ` which was
added in #394. This means we have more test coverage that the expected
fields get loaded.
benjben pushed a commit that referenced this pull request Feb 17, 2025
This commit introduces a new feature flag `legacyColumnMode` , changing loading behavior such that all events are loaded to legacy columns regardless of `legacyColumns` configuration. Set it to true to enable legacy behavior.
benjben pushed a commit that referenced this pull request Feb 17, 2025
A few of the tests in `ProcessingSpec` had recently been disabled by
replacing the real test with `TestControl.executeEmbed(io.timeout(10.seconds))`.
This PR fixes and re-enables the test, so we have better coverage of
when legacy modes are enabled.

Also, I make more use of the new ability to test `writtenToBQ` which was
added in #394. This means we have more test coverage that the expected
fields get loaded.
benjben pushed a commit that referenced this pull request Feb 17, 2025
- Update license to SLULA 1.1
- Cluster by event_name when creating new table (#402)
- Add parallelism to parseBytes and transform (#400)
- Decrease default batching.maxBytes to 10 MB (#398)
- Fix and improve ProcessingSpec for legacy column mode (#396)
- Add legacyColumnMode configuration (#394)
- Add e2e_latency_millis metric (#391)
- Fix startup on missing existing table (#384)
- Add option to exit on missing Iglu schemas (#382)
- Refactor health monitoring (#381)
- Feature flag to support the legacy column style -- bug fixes (#379 #380)
- Require alter table when schema is evolved for contexts
- Allow for delay in Writer discovering new columns
- Stay healthy if BigQuery table exceeds column limit (#372)
- Recover from server-side schema mismatch exceptions
- Improve exception handling immediately after altering the table
- Manage Writer resource to be consistent with Snowflake Loader
benjben pushed a commit that referenced this pull request Feb 17, 2025
- Update license to SLULA 1.1
- Cluster by event_name when creating new table (#402)
- Add parallelism to parseBytes and transform (#400)
- Decrease default batching.maxBytes to 10 MB (#398)
- Fix and improve ProcessingSpec for legacy column mode (#396)
- Add legacyColumnMode configuration (#394)
- Add e2e_latency_millis metric (#391)
- Fix startup on missing existing table (#384)
- Add option to exit on missing Iglu schemas (#382)
- Refactor health monitoring (#381)
- Feature flag to support the legacy column style -- bug fixes (#379 #380)
- Require alter table when schema is evolved for contexts
- Allow for delay in Writer discovering new columns
- Stay healthy if BigQuery table exceeds column limit (#372)
- Recover from server-side schema mismatch exceptions
- Improve exception handling immediately after altering the table
- Manage Writer resource to be consistent with Snowflake Loader
# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants