Skip to content

Commit

Permalink
[ENG-3705] Cherry pick GlobalSimpleIndex duplicate fix from OSS (apac…
Browse files Browse the repository at this point in the history
…he#435)

Signed-off-by: Lokesh Jain <ljain@apache.org>
Co-authored-by: Shiyan Xu <2701446+xushiyan@users.noreply.github.com>
  • Loading branch information
lokeshj1703 and xushiyan authored Nov 9, 2023
1 parent a044797 commit 3454e55
Show file tree
Hide file tree
Showing 33 changed files with 2,153 additions and 433 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ private static HoodieRecord<? extends HoodieRecordPayload> transform(
String preCombineField, Option<Pair<String, String>> simpleKeyGenFieldsOpt) {
return simpleKeyGenFieldsOpt.isPresent()
? SpillableMapUtils.convertToHoodieRecordPayload(record,
payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField(), Option.empty())
payloadClass, preCombineField, simpleKeyGenFieldsOpt.get(), scanner.isWithOperationField(), Option.empty(), Option.empty())
: SpillableMapUtils.convertToHoodieRecordPayload(record,
payloadClass, preCombineField, scanner.isWithOperationField(), scanner.getPartitionName());
payloadClass, preCombineField, scanner.isWithOperationField(), scanner.getPartitionName(), Option.empty());
}

private HoodieFileSliceReader(Iterator<HoodieRecord<T>> recordsItr) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,12 @@ public class HoodieIndexConfig extends HoodieConfig {
.defaultValue("true")
.withDocumentation("Similar to " + BLOOM_INDEX_UPDATE_PARTITION_PATH_ENABLE + ", but for simple index.");

public static final ConfigProperty<String> GLOBAL_INDEX_RECONCILE_PARALLELISM = ConfigProperty
.key("hoodie.global.index.reconcile.parallelism")
.defaultValue("60")
.withDocumentation("Only applies if index type is GLOBAL_BLOOM or GLOBAL_SIMPLE. "
+ "This controls the parallelism for deduplication during indexing where more than 1 record could be tagged due to partition update.");

/**
* ***** Bucket Index Configs *****
* Bucket Index is targeted to locate the record fast by hash in big data scenarios.
Expand Down Expand Up @@ -625,6 +631,11 @@ public Builder withGlobalSimpleIndexUpdatePartitionPath(boolean updatePartitionP
return this;
}

public Builder withGlobalIndexReconcileParallelism(int parallelism) {
hoodieIndexConfig.setValue(GLOBAL_INDEX_RECONCILE_PARALLELISM, String.valueOf(parallelism));
return this;
}

public Builder withEngineType(EngineType engineType) {
this.engineType = engineType;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,10 @@ public boolean getGlobalSimpleIndexUpdatePartitionPath() {
return getBoolean(HoodieIndexConfig.SIMPLE_INDEX_UPDATE_PARTITION_PATH_ENABLE);
}

public int getGlobalIndexReconcileParallelism() {
return getInt(HoodieIndexConfig.GLOBAL_INDEX_RECONCILE_PARALLELISM);
}

public int getBucketIndexNumBuckets() {
return getIntOrDefault(HoodieIndexConfig.BUCKET_INDEX_NUM_BUCKETS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,31 +18,53 @@

package org.apache.hudi.index;

import org.apache.hudi.avro.HoodieAvroUtils;
import org.apache.hudi.common.data.HoodieData;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.EmptyHoodieRecordPayload;
import org.apache.hudi.common.model.FileSlice;
import org.apache.hudi.common.model.HoodieAvroRecord;
import org.apache.hudi.common.model.HoodieBaseFile;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodiePayloadProps;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.MetadataValues;
import org.apache.hudi.common.model.RewriteAvroPayload;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.io.HoodieMergedReadHandle;
import org.apache.hudi.io.storage.HoodieFileReader;
import org.apache.hudi.io.storage.HoodieFileReaderFactory;
import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
import org.apache.hudi.table.HoodieTable;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.TreeSet;

Expand Down Expand Up @@ -124,8 +146,8 @@ public static List<Pair<String, HoodieBaseFile>> getLatestBaseFilesForAllPartiti
* @param location {@link HoodieRecordLocation} for the passed in {@link HoodieRecord}
* @return the tagged {@link HoodieRecord}
*/
public static HoodieRecord getTaggedRecord(HoodieRecord inputRecord, Option<HoodieRecordLocation> location) {
HoodieRecord<?> record = inputRecord;
public static <R> HoodieRecord<R> getTaggedRecord(HoodieRecord<R> inputRecord, Option<HoodieRecordLocation> location) {
HoodieRecord<R> record = inputRecord;
if (location.isPresent()) {
// When you have a record in multiple files in the same partition, then <row key, record> collection
// will have 2 entries with the same exact in memory copy of the HoodieRecord and the 2
Expand Down Expand Up @@ -175,4 +197,217 @@ public static boolean checkIfValidCommit(HoodieTimeline commitTimeline, String c
// 2) is less than the first commit ts in the timeline
return !commitTimeline.empty() && commitTimeline.containsOrBeforeTimelineStarts(commitTs);
}

/**
* Read existing records based on the given partition path and {@link HoodieRecordLocation} info.
* <p>
* This will perform merged read for MOR table, in case a FileGroup contains log files.
*
* @return {@link HoodieRecord}s that have the current location being set.
*/
private static <R> HoodieData<HoodieRecord<R>> getExistingRecords(
HoodieData<Pair<String, HoodieRecordLocation>> partitionLocations, HoodieWriteConfig config, HoodieTable hoodieTable) {
final Option<String> instantTime = hoodieTable
.getMetaClient()
.getCommitsTimeline()
.filterCompletedInstants()
.lastInstant()
.map(HoodieInstant::getTimestamp);
return partitionLocations.flatMap(p -> {
String partitionPath = p.getLeft();
String fileId = p.getRight().getFileId();
return new HoodieMergedReadHandle(config, instantTime, hoodieTable, Pair.of(partitionPath, fileId))
.getMergedRecords().iterator();
});
}

/**
* Merge the incoming record with the matching existing record loaded via {@link HoodieMergedReadHandle}. The existing record is the latest version in the table.
*/
private static <R> Option<HoodieRecord<R>> mergeIncomingWithExistingRecord(HoodieRecord<R> incoming, HoodieRecord<R> existing, Schema writeSchema, HoodieWriteConfig config) throws IOException {
Schema existingSchema = HoodieAvroUtils.addMetadataFields(new Schema.Parser().parse(config.getSchema()), config.allowOperationMetadataField());
Schema writeSchemaWithMetaFields = HoodieAvroUtils.addMetadataFields(writeSchema, config.allowOperationMetadataField());
// prepend the hoodie meta fields as the incoming record does not have them
HoodieRecord incomingPrepended = prependMetaFields((HoodieAvroRecord) incoming, writeSchema, writeSchemaWithMetaFields,
new MetadataValues().setRecordKey(incoming.getRecordKey()).setPartitionPath(incoming.getPartitionPath()), config.getProps());
// after prepend the meta fields, convert the record back to the original payload
HoodieRecord incomingWithMetaFields = wrapIntoHoodieRecordPayloadWithParams((HoodieRecordPayload) incomingPrepended.getData(), writeSchema,
config.getProps(), Option.empty(), config.allowOperationMetadataField(), Option.empty(), false, Option.empty());

Option<IndexedRecord> mergedPayload = ((HoodieRecordPayload) incomingWithMetaFields.getData()).combineAndGetUpdateValue(
(IndexedRecord) (((HoodieRecordPayload) existing.getData()).getInsertValue(existingSchema, config.getProps()).get()), writeSchemaWithMetaFields, config.getPayloadConfig().getProps());
Option<HoodieAvroRecord> mergedRecord = mergedPayload.map(payload -> {
try {
return new HoodieAvroRecord(existing.getKey(), createPayload(getPayloadClass(config.getProps()), (GenericRecord) payload, getOrderingVal((GenericRecord) payload, config.getProps())));
} catch (IOException e) {
throw new RuntimeException(e);
}
});

if (mergedRecord.isPresent()) {
// the merged record needs to be converted back to the original payload
HoodieRecord<R> merged = wrapIntoHoodieRecordPayloadWithParams(mergedRecord.get().getData(),
writeSchemaWithMetaFields, config.getProps(), Option.empty(),
config.allowOperationMetadataField(), Option.empty(), false, Option.of(writeSchema));
return Option.of(merged);
} else {
return Option.empty();
}
}

/**
* Merge tagged incoming records with existing records in case of partition path updated.
*/
public static <R> HoodieData<HoodieRecord<R>> mergeForPartitionUpdates(
HoodieData<Pair<HoodieRecord<R>, Option<Pair<String, HoodieRecordLocation>>>> taggedHoodieRecords, HoodieWriteConfig config, HoodieTable hoodieTable) {
// completely new records
HoodieData<HoodieRecord<R>> newRecords = taggedHoodieRecords.filter(p -> !p.getRight().isPresent()).map(Pair::getLeft);
// the records tagged to existing base files
HoodieData<HoodieRecord<R>> updatingRecords = taggedHoodieRecords.filter(p -> p.getRight().isPresent()).map(Pair::getLeft)
.distinctWithKey(HoodieRecord::getRecordKey, config.getGlobalIndexReconcileParallelism());
// the tagging partitions and locations
HoodieData<Pair<String, HoodieRecordLocation>> partitionLocations = taggedHoodieRecords
.filter(p -> p.getRight().isPresent())
.map(p -> p.getRight().get())
.distinct(config.getGlobalIndexReconcileParallelism());
// merged existing records with current locations being set
HoodieData<HoodieRecord<R>> existingRecords = getExistingRecords(partitionLocations, config, hoodieTable);

HoodieData<HoodieRecord<R>> taggedUpdatingRecords = updatingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r))
.leftOuterJoin(existingRecords.mapToPair(r -> Pair.of(r.getRecordKey(), r)))
.values().flatMap(entry -> {
HoodieRecord<R> incoming = entry.getLeft();
Option<HoodieRecord<R>> existingOpt = entry.getRight();
if (!existingOpt.isPresent()) {
// existing record not found (e.g., due to delete log not merged to base file): tag as a new record
return Collections.singletonList(getTaggedRecord(incoming, Option.empty())).iterator();
}
HoodieRecord<R> existing = existingOpt.get();
Schema writeSchema = new Schema.Parser().parse(config.getWriteSchema());
if (isDelete((HoodieRecord<HoodieRecordPayload>) incoming, writeSchema, config.getProps())) {
// incoming is a delete: force tag the incoming to the old partition
return Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()), Option.of(existing.getCurrentLocation()))).iterator();
}

Option<HoodieRecord<R>> mergedOpt = mergeIncomingWithExistingRecord(incoming, existing, writeSchema, config);
if (!mergedOpt.isPresent()) {
// merge resulted in delete: force tag the incoming to the old partition
return Collections.singletonList(getTaggedRecord(incoming.newInstance(existing.getKey()), Option.of(existing.getCurrentLocation()))).iterator();
}
HoodieRecord<R> merged = mergedOpt.get();
if (Objects.equals(merged.getPartitionPath(), existing.getPartitionPath())) {
// merged record has the same partition: route the merged result to the current location as an update
return Collections.singletonList(getTaggedRecord(merged, Option.of(existing.getCurrentLocation()))).iterator();
} else {
// merged record has a different partition: issue a delete to the old partition and insert the merged record to the new partition
HoodieRecord<R> deleteRecord = createDeleteRecord(existing.getKey());
deleteRecord.setCurrentLocation(existing.getCurrentLocation());
deleteRecord.seal();
return Arrays.asList(deleteRecord, getTaggedRecord(merged, Option.empty())).iterator();
}
});
return taggedUpdatingRecords.union(newRecords);
}

private static Comparable getOrderingVal(GenericRecord record, Properties properties) {
String orderField = properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
if (orderField == null) {
return true;
}
boolean consistentLogicalTimestampEnabled = Boolean.parseBoolean(properties.getProperty(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()));
return (Comparable) HoodieAvroUtils.getNestedFieldVal((GenericRecord) record,
orderField,
true, consistentLogicalTimestampEnabled);
}

/**
* Create a payload class via reflection, passing in an ordering/precombine value.
*/
private static HoodieRecordPayload createPayload(String payloadClass, GenericRecord record, Comparable orderingVal)
throws IOException {
try {
return (HoodieRecordPayload) ReflectionUtils.loadClass(payloadClass,
new Class<?>[] {GenericRecord.class, Comparable.class}, record, orderingVal);
} catch (Throwable e) {
throw new IOException("Could not create payload for class: " + payloadClass, e);
}
}

/**
* Get ordering field.
*/
private static String getOrderingField(Properties properties) {
String orderField = null;
if (properties.containsKey(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY)) {
orderField = properties.getProperty(HoodiePayloadProps.PAYLOAD_ORDERING_FIELD_PROP_KEY);
} else if (properties.containsKey("hoodie.datasource.write.precombine.field")) {
orderField = properties.getProperty("hoodie.datasource.write.precombine.field");
} else if (properties.containsKey(HoodieTableConfig.PRECOMBINE_FIELD.key())) {
orderField = properties.getProperty(HoodieTableConfig.PRECOMBINE_FIELD.key());
}
return orderField;
}

/**
* Get payload class.
*/
private static String getPayloadClass(Properties properties) {
String payloadClass = null;
if (properties.containsKey(HoodieTableConfig.PAYLOAD_CLASS_NAME.key())) {
payloadClass = properties.getProperty(HoodieTableConfig.PAYLOAD_CLASS_NAME.key());
} else if (properties.containsKey("hoodie.datasource.write.payload.class")) {
payloadClass = properties.getProperty("hoodie.datasource.write.payload.class");
}
return payloadClass;
}

private static HoodieRecord wrapIntoHoodieRecordPayloadWithParams(
HoodieRecordPayload data,
Schema recordSchema, Properties props,
Option<Pair<String, String>> simpleKeyGenFieldsOpt,
Boolean withOperation,
Option<String> partitionNameOp,
Boolean populateMetaFields,
Option<Schema> schemaWithoutMetaFields) throws IOException {
IndexedRecord indexedRecord = (IndexedRecord) data.getInsertValue(recordSchema, props).get();
String payloadClass = getPayloadClass(props);
String preCombineField = getOrderingField(props);
return HoodieAvroUtils.createHoodieRecordFromAvro(indexedRecord, payloadClass, preCombineField, simpleKeyGenFieldsOpt, withOperation, partitionNameOp, populateMetaFields, schemaWithoutMetaFields);
}

private static HoodieRecord prependMetaFields(HoodieAvroRecord record, Schema recordSchema, Schema targetSchema, MetadataValues metadataValues, Properties props) {
try {
Option<IndexedRecord> avroRecordOpt = record.getData().getInsertValue(recordSchema, props);
GenericRecord newAvroRecord = HoodieAvroUtils.rewriteRecordWithNewSchema(avroRecordOpt.get(), targetSchema, Collections.emptyMap());
updateMetadataValuesInternal(newAvroRecord, metadataValues);
return new HoodieAvroRecord<RewriteAvroPayload>(record.getKey(), new RewriteAvroPayload(newAvroRecord), record.getOperation(), record.getCurrentLocation(),
(HoodieRecordLocation) record.getNewLocation().orElse(null));
} catch (IOException e) {
throw new HoodieIOException("Failed to deserialize record!", e);
}
}

static void updateMetadataValuesInternal(GenericRecord avroRecord, MetadataValues metadataValues) {
if (metadataValues.isEmpty()) {
return; // no-op
}

String[] values = metadataValues.getValues();
for (int pos = 0; pos < values.length; ++pos) {
String value = values[pos];
if (value != null) {
avroRecord.put(HoodieRecord.HoodieMetadataField.values()[pos].getFieldName(), value);
}
}
}

private static boolean isDelete(HoodieRecord<HoodieRecordPayload> record, Schema recordSchema, Properties props) throws IOException {
return !record.getData().getInsertValue(recordSchema, props).isPresent();
}

private static <T> HoodieRecord<T> createDeleteRecord(HoodieKey key) {
return new HoodieAvroRecord(key, new EmptyHoodieRecordPayload());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ record -> new ImmutablePair<>(record.getPartitionPath(), record.getRecordKey()))
}

// Step 3: Tag the incoming records, as inserts or updates, by joining with existing record keys
HoodieData<HoodieRecord<R>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairs, records);
HoodieData<HoodieRecord<R>> taggedRecords = tagLocationBacktoRecords(keyFilenamePairs, records, hoodieTable);

if (config.getBloomIndexUseCaching()) {
records.unpersist();
Expand Down Expand Up @@ -300,7 +300,8 @@ HoodieData<Pair<String, HoodieKey>> explodeRecordsWithFileComparisons(
*/
protected <R> HoodieData<HoodieRecord<R>> tagLocationBacktoRecords(
HoodiePairData<HoodieKey, HoodieRecordLocation> keyFilenamePair,
HoodieData<HoodieRecord<R>> records) {
HoodieData<HoodieRecord<R>> records,
HoodieTable hoodieTable) {
HoodiePairData<HoodieKey, HoodieRecord<R>> keyRecordPairs =
records.mapToPair(record -> new ImmutablePair<>(record.getKey(), record));
// Here as the records might have more data than keyFilenamePairs (some row keys' fileId is null),
Expand Down
Loading

0 comments on commit 3454e55

Please # to comment.