From 62a0c06a4a80809f6465749c269c8ce282aeea46 Mon Sep 17 00:00:00 2001 From: RexAn Date: Thu, 17 Nov 2022 01:37:16 +0800 Subject: [PATCH] Use as.of.instant for IncrementalRelation (#6921) --- .../main/scala/org/apache/hudi/IncrementalRelation.scala | 7 ++++--- .../apache/hudi/functional/TestCOWDataSourceStorage.scala | 3 +++ 2 files changed, 7 insertions(+), 3 deletions(-) diff --git a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala index e5497d030a755..260be7530bf64 100644 --- a/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala +++ b/hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/IncrementalRelation.scala @@ -175,8 +175,6 @@ class IncrementalRelation(val sqlContext: SQLContext, (regularFileIdToFullPath.values, metaBootstrapFileIdToFullPath.values) } } - // unset the path filter, otherwise if end_instant_time is not the latest instant, path filter set for RO view - // will filter out all the files incorrectly. // pass internalSchema to hadoopConf, so it can be used in executors. val validCommits = metaClient .getCommitsAndCompactionTimeline.filterCompletedInstants.getInstants.toArray().map(_.asInstanceOf[HoodieInstant].getFileName).mkString(",") @@ -187,7 +185,6 @@ class IncrementalRelation(val sqlContext: SQLContext, case HoodieFileFormat.PARQUET => HoodieParquetFileFormat.FILE_FORMAT_ID case HoodieFileFormat.ORC => "orc" } - sqlContext.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") // Fallback to full table scan if any of the following conditions matches: // 1. the start commit is archived @@ -239,12 +236,16 @@ class IncrementalRelation(val sqlContext: SQLContext, .format("hudi_v1") .schema(usedSchema) .option(DataSourceReadOptions.READ_PATHS.key, filteredMetaBootstrapFullPaths.mkString(",")) + // Setting time to the END_INSTANT_TIME, to avoid pathFilter filter out files incorrectly. + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime) .load() } if (regularFileIdToFullPath.nonEmpty) { df = df.union(sqlContext.read.options(sOpts) .schema(usedSchema).format(formatClassName) + // Setting time to the END_INSTANT_TIME, to avoid pathFilter filter out files incorrectly. + .option(DataSourceReadOptions.TIME_TRAVEL_AS_OF_INSTANT.key(), endInstantTime) .load(filteredRegularFullPaths.toList: _*) .filter(String.format("%s >= '%s'", HoodieRecord.COMMIT_TIME_METADATA_FIELD, commitsToReturn.head.getTimestamp)) diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala index 6f13dbc82f4d9..15b6751328cba 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSourceStorage.scala @@ -167,12 +167,15 @@ class TestCOWDataSourceStorage extends SparkClientFunctionalTestHarness { // Read Incremental Query // we have 2 commits, try pulling the first commit (which is not the latest) val firstCommit = HoodieDataSourceHelpers.listCommitsSince(fs, basePath, "000").get(0) + // Setting HoodieROTablePathFilter here to test whether pathFilter can filter out correctly for IncrementalRelation + spark.sparkContext.hadoopConfiguration.set("mapreduce.input.pathFilter.class", "org.apache.hudi.hadoop.HoodieROTablePathFilter") val hoodieIncViewDF1 = spark.read.format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE.key, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME.key, "000") .option(DataSourceReadOptions.END_INSTANTTIME.key, firstCommit) .load(basePath) assertEquals(100, hoodieIncViewDF1.count()) // 100 initial inserts must be pulled + spark.sparkContext.hadoopConfiguration.unset("mapreduce.input.pathFilter.class") var countsPerCommit = hoodieIncViewDF1.groupBy("_hoodie_commit_time").count().collect() assertEquals(1, countsPerCommit.length) assertEquals(firstCommit, countsPerCommit(0).get(0))