From b0f8b71f9319c07da132c2e9c39c0092261989ba Mon Sep 17 00:00:00 2001 From: Kaushik Devarajaiah Date: Mon, 12 Mar 2018 19:06:52 -0700 Subject: [PATCH] DeduplicateRecords based on recordKey if global index is used --- .../com/uber/hoodie/HoodieWriteClient.java | 19 ++++++++-- .../TestHoodieClientOnCopyOnWriteStorage.java | 35 ++++++++++++++++++- 2 files changed, 50 insertions(+), 4 deletions(-) diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 42778e7dd6771..bf81a1faa4640 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -17,6 +17,7 @@ package com.uber.hoodie; import com.codahale.metrics.Timer; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import com.uber.hoodie.avro.model.HoodieCleanMetadata; @@ -111,10 +112,16 @@ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig) */ public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, boolean rollbackInFlight) { + this(jsc, clientConfig, rollbackInFlight, HoodieIndex.createIndex(clientConfig, jsc)); + } + + @VisibleForTesting + HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig, + boolean rollbackInFlight, HoodieIndex index) { this.fs = FSUtils.getFs(clientConfig.getBasePath(), jsc.hadoopConfiguration()); this.jsc = jsc; this.config = clientConfig; - this.index = HoodieIndex.createIndex(config, jsc); + this.index = index; this.metrics = new HoodieMetrics(config, config.getTableName()); if (rollbackInFlight) { @@ -1051,10 +1058,16 @@ public static SparkConf registerClasses(SparkConf conf) { /** * Deduplicate Hoodie records, using the given deduplication funciton. */ - private JavaRDD> deduplicateRecords(JavaRDD> records, + JavaRDD> deduplicateRecords(JavaRDD> records, int parallelism) { + boolean isIndexingGlobal = index.isGlobal(); return records - .mapToPair(record -> new Tuple2<>(record.getKey(), record)) + .mapToPair(record -> { + HoodieKey hoodieKey = record.getKey(); + // If index used is global, then records are expected to differ in their partitionPath + Object key = isIndexingGlobal ? hoodieKey.getRecordKey() : hoodieKey; + return new Tuple2<>(key, record); + }) .reduceByKey((rec1, rec2) -> { @SuppressWarnings("unchecked") T reducedData = (T) rec1.getData().preCombine(rec2.getData()); diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java index 60a6a1db2a313..c5544760d8625 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java @@ -20,6 +20,8 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import com.google.common.collect.Iterables; import com.uber.hoodie.common.HoodieCleanStat; @@ -48,13 +50,13 @@ import com.uber.hoodie.config.HoodieWriteConfig; import com.uber.hoodie.exception.HoodieRollbackException; import com.uber.hoodie.index.HoodieIndex; -import com.uber.hoodie.table.HoodieCopyOnWriteTable; import com.uber.hoodie.table.HoodieTable; import java.io.File; import java.io.FileInputStream; import java.io.IOException; import java.io.Serializable; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -65,6 +67,7 @@ import java.util.Optional; import java.util.Set; import java.util.TreeSet; +import java.util.UUID; import java.util.stream.Collectors; import org.apache.avro.generic.GenericRecord; import org.apache.commons.io.IOUtils; @@ -218,6 +221,36 @@ public void testUpserts() throws Exception { testUpsertsInternal(getConfig()); } + @Test + public void testDeduplication() throws Exception { + String newCommitTime = "001"; + + String recordKey = UUID.randomUUID().toString(); + HoodieKey keyOne = new HoodieKey(recordKey, "2018-01-01"); + HoodieRecord recordOne = new HoodieRecord(keyOne, + HoodieTestDataGenerator.generateRandomValue(keyOne, newCommitTime)); + + HoodieKey keyTwo = new HoodieKey(recordKey, "2018-02-01"); + HoodieRecord recordTwo = new HoodieRecord(keyTwo, + HoodieTestDataGenerator.generateRandomValue(keyTwo, newCommitTime)); + + JavaRDD records = jsc.parallelize(Arrays.asList(recordOne, recordTwo), 1); + + // dedup should be done based on recordKey only + HoodieWriteClient clientWithDummyGlobalIndex = getWriteClientWithDummyIndex(true); + assertEquals(1, clientWithDummyGlobalIndex.deduplicateRecords(records, 1).collect().size()); + + // dedup should be done based on both recordKey and partitionPath + HoodieWriteClient clientWithDummyNonGlobalIndex = getWriteClientWithDummyIndex(false); + assertEquals(2, clientWithDummyNonGlobalIndex.deduplicateRecords(records, 1).collect().size()); + } + + private HoodieWriteClient getWriteClientWithDummyIndex(final boolean isGlobal) throws Exception { + HoodieIndex index = mock(HoodieIndex.class); + when(index.isGlobal()).thenReturn(isGlobal); + return new HoodieWriteClient(jsc, getConfigBuilder().build(), false, index); + } + private void testUpsertsInternal(HoodieWriteConfig hoodieWriteConfig) throws Exception { HoodieWriteClient client = new HoodieWriteClient(jsc, hoodieWriteConfig);