Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[WIP] Fix for issue #333 #386

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -390,17 +390,14 @@ private JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
// Here as the recordRDD might have more data than rowKeyRDD (some rowKeys' fileId is null),
// so we do left outer join.
return rowKeyRecordPairRDD.leftOuterJoin(rowKeyFilenamePairRDD).values().map(v1 -> {
HoodieRecord<T> record = v1._1();
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2
// entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the
// record is found in. This will result in setting currentLocation 2 times and it will fail the second time.
// So creating a new in memory copy of the hoodie record.
HoodieRecord<T> record = new HoodieRecord<>(v1._1());
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of checking if the currentLocation is set, I think creating the record every time is cleaner.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the implication of doing this ? Will this trigger a huge number of objects being created ?

if (v1._2().isPresent()) {
String filename = v1._2().get();
if (filename != null && !filename.isEmpty()) {
// When you have a record in multiple files in the same partition, then rowKeyRecordPairRDD will have 2
// entries with the same exact in memory copy of the HoodieRecord and the 2 separate filenames that the
// record is found in. This will result in setting currentLocation 2 times and it will fail the second time.
// This check will create a new in memory copy of the hoodie record.
if (record.getCurrentLocation() != null) {
record = new HoodieRecord<T>(record.getKey(), record.getData());
}
record.setCurrentLocation(new HoodieRecordLocation(FSUtils.getCommitTime(filename),
FSUtils.getFileId(filename)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,11 @@ private boolean writeUpdateRecord(HoodieRecord<T> hoodieRecord,
*/
public void write(GenericRecord oldRecord) {
String key = oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
HoodieRecord<T> hoodieRecord = keyToNewRecords.get(key);
boolean copyOldRecord = true;
if (keyToNewRecords.containsKey(key)) {
// If we have duplicate records that we are updating, then the hoodie record will be deflated after
// writing the first record. So make a copy of the record to be merged
HoodieRecord<T> hoodieRecord = new HoodieRecord<>(keyToNewRecords.get(key));
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test did not catch this, as HoodieMergeHandle uses SpillableMap and the get here uses the DiskBasedMap and hence returns a new HoodieRecord everytime. If the get, gets the value from InMemoryMap, then there will be an issue when there are duplicate records.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This piece of code is very specific to this use-case and will make re-factoring of the code tricky, also again what is the garbage collection implication of this ?

try {
Optional<IndexedRecord> combinedAvroRecord = hoodieRecord.getData()
.combineAndGetUpdateValue(oldRecord, schema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ public HoodieRecord(HoodieKey key, T data) {
this.newLocation = null;
}

public HoodieRecord(HoodieRecord<T> record) {
this(record.key, record.data);
this.currentLocation = record.currentLocation;
this.newLocation = record.newLocation;
}

public HoodieKey getKey() {
return key;
}
Expand Down