-
Notifications
You must be signed in to change notification settings - Fork 2.5k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
DeduplicateRecords based on recordKey if global index is used #345
Conversation
959e120
to
1c1184f
Compare
.mapToPair(record -> { | ||
HoodieKey hoodieKey = record.getKey(); | ||
// If index used is global, then records are expected to differ in their partitionPath | ||
boolean isGlobal = getIndex() != null && getIndex().isGlobal(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why would the index be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Corrected this. The check had to be outside of this code block.
@@ -151,7 +152,7 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, | |||
config.getUpsertShuffleParallelism()); | |||
|
|||
// perform index loop up to get existing location of records | |||
JavaRDD<HoodieRecord<T>> taggedRecords = index.tagLocation(dedupedRecords, table); | |||
JavaRDD<HoodieRecord<T>> taggedRecords = getIndex().tagLocation(dedupedRecords, table); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets remove index
=> getIndex()
refactors. its still a private variable that can be accessed within the same class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, made a package private constructor for testing purpose to inject a mock index.
2db725c
to
76aa3a1
Compare
|
@@ -122,6 +123,20 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, | |||
} | |||
} | |||
|
|||
@VisibleForTesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The existing HoodieWriteClient constructor may use the new constructor to save duplicate code.
public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
boolean rollbackInFlight) { boolean rollbackInFlight) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.. @kaushikd49 can we fix that as well..
Defining a new package private constructor is much cleaner.. Nicely done!
@@ -122,6 +123,20 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, | |||
} | |||
} | |||
|
|||
@VisibleForTesting |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point.. @kaushikd49 can we fix that as well..
Defining a new package private constructor is much cleaner.. Nicely done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please squash your commits into 1 as well..
.mapToPair(record -> { | ||
HoodieKey hoodieKey = record.getKey(); | ||
// If index used is global, then records are expected to differ in their partitionPath | ||
Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: replace isIndexingGlobal
with index.isGlobal()
cb1fe33
to
b0f8b71
Compare
Thanks, Squashed commits @vinothchandar |
Fixes issue #344