Skip to content

Commit

Permalink
[HUDI-7479] SQL confs don't propagate to spark row writer properly (a…
Browse files Browse the repository at this point in the history
…pache#579)

Co-authored-by: Jonathan Vexler <=>
  • Loading branch information
jonvex authored Mar 5, 2024
1 parent 84ebd5f commit 203be9d
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hudi

import org.apache.hudi.HoodieSparkUtils.injectSQLConf
import org.apache.hudi.client.WriteStatus
import org.apache.hudi.client.model.HoodieInternalRow
import org.apache.hudi.common.config.TypedProperties
Expand All @@ -40,6 +41,7 @@ import org.apache.spark.sql.HoodieUnsafeUtils.getNumPartitions
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.{Alias, Literal}
import org.apache.spark.sql.catalyst.plans.logical.Project
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.apache.spark.sql.{DataFrame, Dataset, HoodieUnsafeUtils, Row}
import org.apache.spark.unsafe.types.UTF8String
Expand Down Expand Up @@ -81,7 +83,7 @@ object HoodieDatasetBulkInsertHelper
"Key-generator class name is required")

val prependedRdd: RDD[InternalRow] =
df.queryExecution.toRdd.mapPartitions { iter =>
injectSQLConf(df.queryExecution.toRdd.mapPartitions { iter =>
val typedProps = new TypedProperties(config.getProps)
if (autoGenerateRecordKeys) {
typedProps.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, String.valueOf(TaskContext.getPartitionId()))
Expand All @@ -107,7 +109,7 @@ object HoodieDatasetBulkInsertHelper
// TODO use mutable row, avoid re-allocating
new HoodieInternalRow(commitTimestamp, commitSeqNo, recordKey, partitionPath, filename, row, false)
}
}
}, SQLConf.get)

val dedupedRdd = if (config.shouldCombineBeforeInsert) {
dedupeRows(prependedRdd, updatedSchema, config.getPreCombineField, SparkHoodieIndexFactory.isGlobalIndex(config))
Expand Down Expand Up @@ -144,7 +146,7 @@ object HoodieDatasetBulkInsertHelper
arePartitionRecordsSorted: Boolean,
shouldPreserveHoodieMetadata: Boolean): HoodieData[WriteStatus] = {
val schema = dataset.schema
val writeStatuses = dataset.queryExecution.toRdd.mapPartitions(iter => {
val writeStatuses = injectSQLConf(dataset.queryExecution.toRdd.mapPartitions(iter => {
val taskContextSupplier: TaskContextSupplier = table.getTaskContextSupplier
val taskPartitionId = taskContextSupplier.getPartitionIdSupplier.get
val taskId = taskContextSupplier.getStageIdSupplier.get.toLong
Expand Down Expand Up @@ -189,7 +191,7 @@ object HoodieDatasetBulkInsertHelper
}

writer.getWriteStatuses.asScala.iterator
}).collect()
}), SQLConf.get).collect()
table.getContext.parallelize(writeStatuses.toList.asJava)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ object HoodieSparkUtils extends SparkAdapterSupport with SparkVersionsSupport wi
}, SQLConf.get)
}

private def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
def injectSQLConf[T: ClassTag](rdd: RDD[T], conf: SQLConf): RDD[T] =
new SQLConfInjectingRDD(rdd, conf)

def safeCreateRDD(df: DataFrame, structName: String, recordNamespace: String, reconcileToLatestSchema: Boolean,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,13 @@
*/
public class HoodieTestDataGenerator implements AutoCloseable {

/**
* You may get a different result due to the upgrading of Spark 3.0: reading dates before 1582-10-15 or timestamps before 1900-01-01T00:00:00Z from Parquet INT96 files can be ambiguous,
* as the files may be written by Spark 2.x or legacy versions of Hive, which uses a legacy hybrid calendar that is different from Spark 3.0+s Proleptic Gregorian calendar.
* See more details in SPARK-31404.
*/
private boolean makeDatesAmbiguous = false;

// based on examination of sample file, the schema produces the following per record size
public static final int BYTES_PER_RECORD = (int) (1.2 * 1024);
// with default bloom filter with 60,000 entries and 0.000000001 FPRate
Expand Down Expand Up @@ -207,6 +214,11 @@ public HoodieTestDataGenerator() {
this(DEFAULT_PARTITION_PATHS);
}

public HoodieTestDataGenerator(boolean makeDatesAmbiguous) {
this();
this.makeDatesAmbiguous = makeDatesAmbiguous;
}

@Deprecated
public HoodieTestDataGenerator(String[] partitionPaths, Map<Integer, KeyPartition> keyPartitionMap) {
// NOTE: This used as a workaround to make sure that new instantiations of the generator
Expand Down Expand Up @@ -391,7 +403,8 @@ private void generateExtraSchemaValues(GenericRecord rec) {
rec.put("nation", ByteBuffer.wrap(bytes));
long randomMillis = genRandomTimeMillis(rand);
Instant instant = Instant.ofEpochMilli(randomMillis);
rec.put("current_date", (int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay());
rec.put("current_date", makeDatesAmbiguous ? -1000000 :
(int) LocalDateTime.ofInstant(instant, ZoneOffset.UTC).toLocalDate().toEpochDay());
rec.put("current_ts", randomMillis);

BigDecimal bigDecimal = new BigDecimal(String.format(Locale.ENGLISH, "%5f", rand.nextFloat()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -310,9 +310,14 @@ protected static void prepareParquetDFSFiles(int numRecords, String baseParquetP
}

protected static HoodieTestDataGenerator prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema,
String schemaStr, Schema schema) throws IOException {
String schemaStr, Schema schema) throws IOException {
return prepareParquetDFSFiles(numRecords, baseParquetPath, fileName, useCustomSchema, schemaStr, schema, false);
}

protected static HoodieTestDataGenerator prepareParquetDFSFiles(int numRecords, String baseParquetPath, String fileName, boolean useCustomSchema,
String schemaStr, Schema schema, boolean makeDatesAmbiguous) throws IOException {
String path = baseParquetPath + "/" + fileName;
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(makeDatesAmbiguous);
if (useCustomSchema) {
Helpers.saveParquetToDFS(Helpers.toGenericRecords(
dataGenerator.generateInsertsAsPerSchema("000", numRecords, schemaStr),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1406,20 +1406,34 @@ private void testBulkInsertRowWriterMultiBatches(Boolean useSchemaProvider, List
@Test
public void testBulkInsertRowWriterContinuousModeWithAsyncClustering() throws Exception {
testBulkInsertRowWriterContinuousMode(false, null, false,
getTableServicesConfigs(2000, "false", "", "", "true", "3"));
getTableServicesConfigs(2000, "false", "", "", "true", "3"), false);
}

@Test
public void testBulkInsertRowWriterContinuousModeWithInlineClustering() throws Exception {
testBulkInsertRowWriterContinuousMode(false, null, false,
getTableServicesConfigs(2000, "false", "true", "3", "false", ""));
getTableServicesConfigs(2000, "false", "true", "3", "false", ""), false);
}

private void testBulkInsertRowWriterContinuousMode(Boolean useSchemaProvider, List<String> transformerClassNames, boolean testEmptyBatch, List<String> customConfigs) throws Exception {
@Test
public void testBulkInsertRowWriterContinuousModeWithInlineClusteringAmbiguousDates() throws Exception {
sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInWrite", "LEGACY");
sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInWrite", "LEGACY");
sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInWrite", "LEGACY");
sparkSession.sqlContext().setConf("spark.sql.parquet.datetimeRebaseModeInRead", "LEGACY");
sparkSession.sqlContext().setConf("spark.sql.avro.datetimeRebaseModeInRead", "LEGACY");
sparkSession.sqlContext().setConf("spark.sql.parquet.int96RebaseModeInRead", "LEGACY");
testBulkInsertRowWriterContinuousMode(false, null, false,
getTableServicesConfigs(2000, "false", "true", "3",
"false", ""), true);
}

private void testBulkInsertRowWriterContinuousMode(Boolean useSchemaProvider, List<String> transformerClassNames,
boolean testEmptyBatch, List<String> customConfigs, boolean makeDatesAmbiguous) throws Exception {
PARQUET_SOURCE_ROOT = basePath + "/parquetFilesDfs" + testNum;
int parquetRecordsCount = 100;
boolean hasTransformer = transformerClassNames != null && !transformerClassNames.isEmpty();
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null);
prepareParquetDFSFiles(parquetRecordsCount, PARQUET_SOURCE_ROOT, FIRST_PARQUET_FILE_NAME, false, null, null, makeDatesAmbiguous);
prepareParquetDFSSource(useSchemaProvider, hasTransformer, "source.avsc", "target.avsc", PROPS_FILENAME_TEST_PARQUET,
PARQUET_SOURCE_ROOT, false, "partition_path", testEmptyBatch ? "1" : "");

Expand All @@ -1429,7 +1443,7 @@ private void testBulkInsertRowWriterContinuousMode(Boolean useSchemaProvider, Li
int counter = 2;
while (counter < 100) { // lets keep going. if the test times out, we will cancel the future within finally. So, safe to generate 100 batches.
LOG.info("Generating data for batch " + counter);
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(counter) + ".parquet", false, null, null);
prepareParquetDFSFiles(100, PARQUET_SOURCE_ROOT, Integer.toString(counter) + ".parquet", false, null, null, makeDatesAmbiguous);
counter++;
Thread.sleep(2000);
}
Expand Down

0 comments on commit 203be9d

Please # to comment.