diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala index 75ec069946d21..0499430223f6d 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieDatasetBulkInsertHelper.scala @@ -17,6 +17,7 @@ package org.apache.hudi +import org.apache.hudi.HoodieSparkUtils.injectSQLConf import org.apache.hudi.client.WriteStatus import org.apache.hudi.client.model.HoodieInternalRow import org.apache.hudi.common.config.TypedProperties @@ -40,6 +41,7 @@ import org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.{Alias, Literal} import org.apache.spark.sql.catalyst.plans.logical.Project +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row} import org.apache.spark.unsafe.types.UTF8String @@ -81,7 +83,7 @@ object HoodieDatasetBulkInsertHelper "Key-generator class name is required") val prependedRdd: RDD[InternalRow] = - df.queryExecution.toRdd.mapPartitions { iter => + injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter => val typedProps = new TypedProperties(config.getProps) if (autoGenerateRecordKeys) { typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId())) @@ -107,7 +109,7 @@ object HoodieDatasetBulkInsertHelper // TODO use mutable row, avoid re-allocating new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false) } - } + }, SQLConf.get) val dedupedRdd = if (config.shouldCombineBeforeInsert) { dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config)) @@ -144,7 +146,7 @@ object HoodieDatasetBulkInsertHelper arePartitionRecordsSorted: Boolean, shouldPreserveHoodieMetadata: Boolean): HoodieData[WriteStatus] = { val schema = dataset.schema - val writeStatuses = dataset.queryExecution.toRdd.mapPartitions(iter => { + val writeStatuses = injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => { val taskContextSupplier: TaskContextSupplier = table.getTaskContextSupplier val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get val taskId = taskContextSupplier.getStageIdSupplier.get.toLong @@ -189,7 +191,7 @@ object HoodieDatasetBulkInsertHelper } writer.getWriteStatuses.asScala.iterator - }).collect() + }), SQLConf.get).collect() table.getContext.parallelize(writeStatuses.toList.asJava) } diff --git a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala index f360bd9278447..06b23b4800845 100644 --- a/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala +++ b/hudi-client/hudi-spark-client/src/main/scala/org/apache/hudi/HoodieSparkUtils.scala @@ -125,7 +125,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi }, SQLConf.get) } - private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] = + def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] = new SQLConfInjectingRDD(rdd, conf) def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean, diff --git a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java index cd3755d26c81f..f04bd73de58c0 100644 --- a/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java +++ b/hudi-common/src/test/java/org/apache/hudi/common/testutils/HoodieTestDataGenerator.java @@ -90,6 +90,13 @@ */ public class HoodieTestDataGenerator implements AutoCloseable { + /** + * You may get a different result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet INT96 files can be ambiguous, + * as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+s Proleptic Gregorian calendar. + * See more details in SPARK-31404. + */ + private boolean makeDatesAmbiguous = false; + // based on examination of sample file, the schema produces the following per record size public static final int BYTES_PER_RECORD = (int) (1.2 * 1024); // with default bloom filter with 60,000 entries and 0.000000001 FPRate @@ -207,6 +214,11 @@ public HoodieTestDataGenerator() { this(DEFAULT_PARTITION_PATHS); } + public HoodieTestDataGenerator(boolean makeDatesAmbiguous) { + this(); + this.makeDatesAmbiguous = makeDatesAmbiguous; + } + @Deprecated public HoodieTestDataGenerator(String[] partitionPaths, Map keyPartitionMap) { // NOTE: This used as a workaround to make sure that new instantiations of the generator @@ -391,7 +403,8 @@ private void generateExtraSchemaValues(GenericRecord rec) { rec.put("nation", ByteBuffer.wrap(bytes)); long randomMillis = genRandomTimeMillis(rand); Instant instant = Instant.ofEpochMilli(randomMillis); - rec.put("current_date", (int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay()); + rec.put("current_date", makeDatesAmbiguous ? -1000000 : + (int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay()); rec.put("current_ts", randomMillis); BigDecimal bigDecimal = new BigDecimal(String.format(Locale.ENGLISH, "%5f", rand.nextFloat())); diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java index 11c4656ccc2f1..90fef76c117d3 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java @@ -310,9 +310,14 @@ protected static void prepareParquetDFSFiles(int numRecords, String baseParquetP } protected static HoodieTestDataGenerator prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema, - String schemaStr, Schema schema) throws IOException { + String schemaStr, Schema schema) throws IOException { + return prepareParquetDFSFiles(numRecords, baseParquetPath, fileName, useCustomSchema, schemaStr, schema, false); + } + + protected static HoodieTestDataGenerator prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema, + String schemaStr, Schema schema, boolean makeDatesAmbiguous) throws IOException { String path = baseParquetPath + "/" + fileName; - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(makeDatesAmbiguous); if (useCustomSchema) { Helpers.saveParquetToDFS(Helpers.toGenericRecords( dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr), diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java index 5bc267bd2b78d..6c865dd7f87ac 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java @@ -1406,20 +1406,34 @@ private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List @Test public void testBulkInsertRowWriterContinuousModeWithAsyncClustering() throws Exception { testBulkInsertRowWriterContinuousMode(false, null, false, - getTableServicesConfigs(2000, "false", "", "", "true", "3")); + getTableServicesConfigs(2000, "false", "", "", "true", "3"), false); } @Test public void testBulkInsertRowWriterContinuousModeWithInlineClustering() throws Exception { testBulkInsertRowWriterContinuousMode(false, null, false, - getTableServicesConfigs(2000, "false", "true", "3", "false", "")); + getTableServicesConfigs(2000, "false", "true", "3", "false", ""), false); } - private void testBulkInsertRowWriterContinuousMode(Boolean useSchemaProvider, List transformerClassNames, boolean testEmptyBatch, List customConfigs) throws Exception { + @Test + public void testBulkInsertRowWriterContinuousModeWithInlineClusteringAmbiguousDates() throws Exception { + sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY"); + sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInWrite", "LEGACY"); + sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY"); + sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY"); + sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInRead", "LEGACY"); + sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInRead", "LEGACY"); + testBulkInsertRowWriterContinuousMode(false, null, false, + getTableServicesConfigs(2000, "false", "true", "3", + "false", ""), true); + } + + private void testBulkInsertRowWriterContinuousMode(Boolean useSchemaProvider, List transformerClassNames, + boolean testEmptyBatch, List customConfigs, boolean makeDatesAmbiguous) throws Exception { PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum; int parquetRecordsCount = 100; boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty(); - prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null); + prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null, makeDatesAmbiguous); prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET, PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : ""); @@ -1429,7 +1443,7 @@ private void testBulkInsertRowWriterContinuousMode(Boolean useSchemaProvider, Li int counter = 2; while (counter < 100) { // lets keep going. if the test times out, we will cancel the future within finally. So, safe to generate 100 batches. LOG.info("Generating data for batch " + counter); - prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(counter) + ".parquet", false, null, null); + prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(counter) + ".parquet", false, null, null, makeDatesAmbiguous); counter++; Thread.sleep(2000); }