Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

how can i deal this problem when partition's value changed with the same row_key? #1021

Closed
simonqin opened this issue Nov 18, 2019 · 15 comments

Comments

@simonqin
Copy link

// Create the write client to write some records in
HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
.withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2)
.forTable(tableName)
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.GLOBAL_BLOOM).build())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder().archiveCommitsWith(11, 12).build()).build();

First,i insert one record into table:
String partitionPath = "2016/03/15";
HoodieKey key = new HoodieKey("1", partitionPath);
HoodieRecord record = new HoodieRecord(key,HoodieTestDataGenerator.generateRandomValue(key, commitTime));
second, upsert one record into table:
String partitionPath = "2016/04/15";
HoodieKey key = new HoodieKey("1", partitionPath);
HoodieRecord record = new HoodieRecord(key,
HoodieTestDataGenerator.generateRandomValue(key, commitTime));

error log:
14738 [Executor task launch worker-0] INFO com.uber.hoodie.common.table.timeline.HoodieActiveTimeline - Loaded instants java.util.stream.ReferencePipeline$Head@d02b1c7
14738 [Executor task launch worker-0] INFO com.uber.hoodie.common.table.view.AbstractTableFileSystemView - Building file system view for partition (2016/04/15)
14738 [Executor task launch worker-0] INFO com.uber.hoodie.common.table.view.AbstractTableFileSystemView - #files found in partition (2016/04/15) =0, Time taken =0
14738 [Executor task launch worker-0] INFO com.uber.hoodie.common.table.view.AbstractTableFileSystemView - addFilesToView: NumFiles=0, FileGroupsCreationTime=0, StoreTimeTaken=0
14738 [Executor task launch worker-0] INFO com.uber.hoodie.common.table.view.HoodieTableFileSystemView - Adding file-groups for partition :2016/04/15, #FileGroups=0
14738 [Executor task launch worker-0] INFO com.uber.hoodie.common.table.view.AbstractTableFileSystemView - Time to load partition (2016/04/15) =0
14754 [Executor task launch worker-0] ERROR com.uber.hoodie.table.HoodieCopyOnWriteTable - Error upserting bucketType UPDATE for partition :0
java.util.NoSuchElementException: No value present
at com.uber.hoodie.common.util.Option.get(Option.java:112)
at com.uber.hoodie.io.HoodieMergeHandle.(HoodieMergeHandle.java:71)
at com.uber.hoodie.table.HoodieCopyOnWriteTable.getUpdateHandle(HoodieCopyOnWriteTable.java:226)
at com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpdate(HoodieCopyOnWriteTable.java:180)
at com.uber.hoodie.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:263)
at com.uber.hoodie.HoodieWriteClient.lambda$upsertRecordsInternal$7ef77fd$1(HoodieWriteClient.java:442)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:973)
at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)

@bhasudha
Copy link
Contributor

bhasudha commented Nov 18, 2019

"String partitionPath = "2016/04/15";" seems to be different from the insert partitionPath (2016/03/15) which is what is causing an issue here. HoodieKey acts as a primary key for a record in a specific partition path. Notion of update assumes updates sent to the key in the same partition path. That assumption is conflicted here. That why you see these errors. Fixing the partitionPath string should help.

@simonqin
Copy link
Author

@bhasudha thanks for your reply. I set the index to GLOBAL_BLOOM. I understand that this index means that when row_key is the same, after the value of the partition key is changed, hudi will delete the row_key of the original partition and insert new data in the new partition. Is that right?

@bhasudha
Copy link
Contributor

You are right. I noticed that you are using com.uber.hoodie version. Can you give it a shot with the latest master? I believe a similar bug was fixed. Meanwhile I ll try to trace that bug and update here.

@simonqin
Copy link
Author

simonqin commented Nov 20, 2019

@bhasudha I tested three versions, such as 0.4.7, 0.5.0, master,all of them have errors.
here is my test code.This is modified according to the run method in HoodieClientExample.java of hudi-client.:

public void runTest() throws Exception {

    SparkConf sparkConf = new SparkConf().setAppName("hoodie-client-example");
    sparkConf.setMaster("local[1]");
    sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
    sparkConf.set("spark.kryoserializer.buffer.max", "512m");
    JavaSparkContext jsc = new JavaSparkContext(sparkConf);

    // Generator of some records to be loaded in.
    HoodieTestDataGenerator dataGen = new HoodieTestDataGenerator();

    // initialize the table, if not done already
    Path path = new Path(tablePath);
    FileSystem fs = FSUtils.getFs(tablePath, jsc.hadoopConfiguration());
    if (!fs.exists(path)) {
      HoodieTableMetaClient.initTableType(jsc.hadoopConfiguration(), tablePath, HoodieTableType.valueOf(tableType),
              tableName, HoodieAvroPayload.class.getName());
    }

    // Create the write client to write some records in
    HoodieWriteConfig cfg = HoodieWriteConfig.newBuilder().withPath(tablePath)
            .withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA).withParallelism(2, 2).forTable(tableName)
            .withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.GLOBAL_BLOOM).build())
            .withCompactionConfig(HoodieCompactionConfig.newBuilder().archiveCommitsWith(11, 12).build()).build();
    HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);

    /**
     * Write 1 (only inserts)
     */
    String newCommitTime = client.startCommit();
    logger.info("Starting commit " + newCommitTime);
//    List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 100);
    List<HoodieRecord> records = generateInserts(newCommitTime);
    JavaRDD<HoodieRecord> writeRecords = jsc.<HoodieRecord>parallelize(records, 1);
    client.upsert(writeRecords, newCommitTime);

    /**
     * Write 2 (updates)
     */
    newCommitTime = client.startCommit();
    logger.info("Starting commit " + newCommitTime);
//    records.addAll(dataGen.generateUpdates(newCommitTime, 100));
    records.clear();
    records.addAll(generateUpdates(newCommitTime));
    writeRecords = jsc.<HoodieRecord>parallelize(records, 1);
    client.upsert(writeRecords, newCommitTime);

    /**
     * Schedule a compaction and also perform compaction on a MOR dataset
     */
    if (HoodieTableType.valueOf(tableType) == HoodieTableType.MERGE_ON_READ) {
      Option<String> instant = client.scheduleCompaction(Option.empty());
      JavaRDD<WriteStatus> writeStatues = client.compact(instant.get());
      client.commitCompaction(instant.get(), writeStatues, Option.empty());
    }
  }

  public List<HoodieRecord> generateInserts(String commitTime) throws IOException {
    List<HoodieRecord> inserts = new ArrayList<>();

    String partitionPath = "2016/03/15";
    HoodieKey key = new HoodieKey("1", partitionPath);
    HoodieRecord record = new HoodieRecord(key, HoodieTestDataGenerator.generateRandomValue(key, commitTime));
    inserts.add(record);

    return inserts;
  }

  public List<HoodieRecord> generateUpdates(String commitTime) throws IOException {
    List<HoodieRecord> updates = new ArrayList<>();

    String partitionPath = "2016/04/15";
    HoodieKey key = new HoodieKey("1", partitionPath);
    HoodieRecord record = new HoodieRecord(key, HoodieTestDataGenerator.generateRandomValue(key, commitTime));
    updates.add(record);

    return updates;
  }

error log:
16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Building file system view for partition (2016/04/15)
16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - #files found in partition (2016/04/15) =0, Time taken =0
16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - addFilesToView: NumFiles=0, FileGroupsCreationTime=0, StoreTimeTaken=0
16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.HoodieTableFileSystemView  - Adding file-groups for partition :2016/04/15, #FileGroups=0
16214 [Executor task launch worker-0] INFO  org.apache.hudi.common.table.view.AbstractTableFileSystemView  - Time to load partition (2016/04/15) =0
16214 [Executor task launch worker-0] ERROR org.apache.hudi.table.HoodieCopyOnWriteTable  - Error upserting bucketType UPDATE for partition :0
java.util.NoSuchElementException: No value present in Option
	at org.apache.hudi.common.util.Option.get(Option.java:88)
	at org.apache.hudi.io.HoodieMergeHandle.<init>(HoodieMergeHandle.java:74)
	at org.apache.hudi.table.HoodieCopyOnWriteTable.getUpdateHandle(HoodieCopyOnWriteTable.java:220)
	at org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpdate(HoodieCopyOnWriteTable.java:177)
	at org.apache.hudi.table.HoodieCopyOnWriteTable.handleUpsertPartition(HoodieCopyOnWriteTable.java:257)
	at org.apache.hudi.HoodieWriteClient.lambda$upsertRecordsInternal$507693af$1(HoodieWriteClient.java:428)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
	at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
	at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$26.apply(RDD.scala:843)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:336)
	at org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:334)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:973)
	at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:948)
	at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:888)
	at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:948)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:694)
	at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:334)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:285)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
	at org.apache.spark.scheduler.Task.run(Task.scala:99)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

@vinothchandar
Copy link
Member

This seems similar to https://issues.apache.org/jira/browse/HUDI-116 we fixed already..
@bhasudha #687 Should help you step through this as well.. @nsivabalan Also do you want to take a shot at this, since you are working closely with the indexing part atm?

@nsivabalan
Copy link
Contributor

Sure. I can take a look.

@nsivabalan
Copy link
Contributor

Here is the issue. Along the way, records are partitioned according to the new partition Path. Where as, indexLookup returns fileId pertaining to old partition path. So, while trying to do the actual update, the fileId is not found in the partition Path(new partition path).

this is not an issue w/ regular bloom, because, these records are considered to be inserts with regular bloom since index look up does not return any location only. (With global bloom, index loop up returns a fileId from new partition path)

I will let @bhasudha / @vinothchandar / @bvaradar to respond on whats the fix here.

@vinothchandar
Copy link
Member

@nsivabalan Could you debug why the second write it not getting tagged to the first partition? Thats the crux of it.

@nsivabalan
Copy link
Contributor

@vinothchandar : I found the root cause.

within HoodieGlobalIndex#explodeRecordRDDWithFileComparisons

JavaRDD<Tuple2<String, HoodieKey>> explodeRecordRDDWithFileComparisons(
      final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
      JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
.
.
.
 return  partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
         String recordKey = partitionRecordKeyPair._2();
         String partitionPath = partitionRecordKeyPair._1();

         return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
          .map(file -> new Tuple2<>(file, new HoodieKey(recordKey, indexToPartitionMap.get(file))))
          .collect(Collectors.toList());
    }).flatMap(List::iterator);

In this, indexFileFilter.getMatchingFiles(partitionPath, recordKey) returns fileId from Partition1, where as incoming record is tagged with Partition2.

So, this is what I am thinking as the fix. as of now, IndexFileFilter.getMatchingFiles(String partitionPath, String recordKey) is returning Sets. Instead, IndexFileFilter.getMatchingFiles(String partitionPath, String recordKey) should return Set<Pair<PartitionPath, fileId>> and we should attach that as below.

return partitionRecordKeyPairRDD.map(partitionRecordKeyPair -> {
      String recordKey = partitionRecordKeyPair._2();
      String partitionPath = partitionRecordKeyPair._1();

      return indexFileFilter.getMatchingFiles(partitionPath, recordKey).stream()
          .map( (origPartitionPath, matchingFile) -> new Tuple2<>(matchingFile, new HoodieKey(recordKey, origPartitionPath)))
          .collect(Collectors.toList());
    }).flatMap(List::iterator);

But how do we intimate the user that these records are updated with Partition1 and not Partition2 as per incoming records in upsert call?

@vinothchandar
Copy link
Member

vinothchandar commented Dec 3, 2019

@nsivabalan Good catch! lets file a JIRA and tag this against the next release.. Feel free to grab the ticket yourself, to make the fix. if you want. :) We can close this issue here, since we traiged it. thanks @simonqin for reporting this

how do we intimate the user that these records are updated with Partition1 and not Partition2 as per incoming records in upsert call?

I think thats anyway the expected behavior that using Global Index, the partitionpath you pass in is ignored.

@nsivabalan
Copy link
Contributor

Sure. I will work on the fix. Just to clarify, whats the expected behavior in this case, where in original record was inserted with partition1, where as updates was sent with partition2. Partition2 should be ignored and updates should be sent to Partition1 right?

@nsivabalan
Copy link
Contributor

I am trying to figure out something. Let me know if its doable.

Is it possible to change the partition path alone for a given HoodieRecord? or to be precise, to create another HoodieRecord based on a another(base) HoodieRecord but w/ diff partition.
Specifically I am stuck at getting the type of HoodieRecord?
I see we could do something like this.
new HoodieRecord (new HoodieKey(baseRecord.getHoodieKey,getRecordKey(), newPartitionPath), baseRecord.getData());
but not sure how to get the type from the base HoodieRecord?

If this is not doable, then not sure how we can fix this. Bcoz, the passed in HoodieRecord's HoodieKey has partition1 and we need to change that to Partition2.

@vinothchandar
Copy link
Member

Partition2 should be ignored and updates should be sent to Partition1 right?
yes. and lets please move this to a JIRA asap.

I guess, you can create a new object with a differnt partition path using the constructor?

new HoodieRecord (new HoodieKey(baseRecord.getRecordKey(), newPartitionPath), baseRecord.getData()); should do. (there is a method on HoodieRecord itself to get the record key. Why do you need the type of the data? Just passing in a HoodieRecordPayload should be sufficient?

@nsivabalan
Copy link
Contributor

@vinothchandar
Copy link
Member

Added a link to this issue and tagged with the right fix version.. Closing this

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants