Skip to content

Commit

Permalink
[HUDI-7523] Add HOODIE_SPARK_DATASOURCE_OPTIONS to be used in HoodieI…
Browse files Browse the repository at this point in the history
…ncrSource (apache#606)
  • Loading branch information
vinishjail97 authored Mar 21, 2024
1 parent 2ace7cb commit 8bf2b68
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,11 @@ public class HoodieIncrSourceConfig extends HoodieConfig {
.withAlternatives(DELTA_STREAMER_CONFIG_PREFIX + "source.hoodieincr.partition.extractor.class")
.markAdvanced()
.withDocumentation("PartitionValueExtractor class to extract partition fields from _hoodie_partition_path");

public static final ConfigProperty<String> HOODIE_SPARK_DATASOURCE_OPTIONS = ConfigProperty
.key(STREAMER_CONFIG_PREFIX + "source.hoodieincr.data.datasource.options")
.noDefaultValue()
.markAdvanced()
.withDocumentation("A comma separate list of options that can be passed to the spark dataframe reader of a hudi table, "
+ "eg: hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,20 +23,25 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.table.timeline.TimelineUtils.HollowCommitHandling;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.QueryInfo;
import org.apache.hudi.utilities.streamer.StreamContext;

import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrameReader;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import java.util.stream.Collectors;

import static org.apache.hudi.DataSourceReadOptions.BEGIN_INSTANTTIME;
import static org.apache.hudi.DataSourceReadOptions.END_INSTANTTIME;
Expand Down Expand Up @@ -171,10 +176,18 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
return Pair.of(Option.empty(), queryInfo.getEndInstant());
}

DataFrameReader reader = sparkSession.read().format("org.apache.hudi");
String datasourceOpts = getStringWithAltKeys(props, HoodieIncrSourceConfig.HOODIE_SPARK_DATASOURCE_OPTIONS, true);
if (!StringUtils.isNullOrEmpty(datasourceOpts)) {
Map<String, String> optionsMap = Arrays.stream(datasourceOpts.split(","))
.map(option -> Pair.of(option.split("=")[0], option.split("=")[1]))
.collect(Collectors.toMap(Pair::getLeft, Pair::getRight));
reader = reader.options(optionsMap);
}
Dataset<Row> source;
// Do Incr pull. Set end instant if available
if (queryInfo.isIncremental()) {
source = sparkSession.read().format("org.apache.hudi")
source = reader
.option(QUERY_TYPE().key(), QUERY_TYPE_INCREMENTAL_OPT_VAL())
.option(BEGIN_INSTANTTIME().key(), queryInfo.getStartInstant())
.option(END_INSTANTTIME().key(), queryInfo.getEndInstant())
Expand All @@ -185,7 +198,7 @@ public Pair<Option<Dataset<Row>>, String> fetchNextBatch(Option<String> lastCkpt
.load(srcPath);
} else {
// if checkpoint is missing from source table, and if strategy is set to READ_UPTO_LATEST_COMMIT, we have to issue snapshot query
Dataset<Row> snapshot = sparkSession.read().format("org.apache.hudi")
Dataset<Row> snapshot = reader
.option(DataSourceReadOptions.QUERY_TYPE().key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL())
.load(srcPath);
if (snapshotLoadQuerySplitter.isPresent()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.testutils.SparkClientFunctionalTestHarness;
import org.apache.hudi.utilities.config.HoodieIncrSourceConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
import org.apache.hudi.utilities.sources.helpers.IncrSourceHelper;
import org.apache.hudi.utilities.sources.helpers.TestSnapshotQuerySplitterImpl;
Expand Down Expand Up @@ -294,7 +295,7 @@ public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableTy
Option.empty(),
100,
dataBatches.get(0).getKey(),
Option.of(TestSnapshotQuerySplitterImpl.class.getName()));
Option.of(TestSnapshotQuerySplitterImpl.class.getName()), new TypedProperties());

// The pending tables services should not block the incremental pulls
// Reads everything up to latest
Expand Down Expand Up @@ -327,12 +328,45 @@ public void testHoodieIncrSourceWithPendingTableServices(HoodieTableType tableTy
}
}

@ParameterizedTest
@EnumSource(HoodieTableType.class)
public void testHoodieIncrSourceWithDataSourceOptions(HoodieTableType tableType) throws IOException {
this.tableType = tableType;
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
HoodieWriteConfig writeConfig = getConfigBuilder(basePath(), metaClient)
.withArchivalConfig(HoodieArchivalConfig.newBuilder().archiveCommitsWith(10, 12).build())
.withCleanConfig(HoodieCleanConfig.newBuilder().retainCommits(9).build())
.withCompactionConfig(
HoodieCompactionConfig.newBuilder()
.withScheduleInlineCompaction(true)
.withMaxNumDeltaCommitsBeforeCompaction(1)
.build())
.withMetadataConfig(HoodieMetadataConfig.newBuilder().enable(true)
.withMetadataIndexColumnStats(true)
.withColumnStatsIndexForColumns("_hoodie_commit_time")
.build())
.build();

TypedProperties extraProps = new TypedProperties();
extraProps.setProperty(HoodieIncrSourceConfig.HOODIE_SPARK_DATASOURCE_OPTIONS.key(), "hoodie.metadata.enable=true,hoodie.enable.data.skipping=true");
try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
Pair<String, List<HoodieRecord>> inserts = writeRecords(writeClient, INSERT, null, "100");
Pair<String, List<HoodieRecord>> inserts2 = writeRecords(writeClient, INSERT, null, "200");
readAndAssert(IncrSourceHelper.MissingCheckpointStrategy.READ_UPTO_LATEST_COMMIT,
Option.empty(),
100,
inserts.getKey(),
Option.of(TestSnapshotQuerySplitterImpl.class.getName()), extraProps);
}
}

private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull, int expectedCount,
String expectedCheckpoint, Option<String> snapshotCheckPointImplClassOpt) {
String expectedCheckpoint, Option<String> snapshotCheckPointImplClassOpt, TypedProperties extraProps) {

Properties properties = new Properties();
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path", basePath());
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy", missingCheckpointStrategy.name());
properties.putAll(extraProps);
snapshotCheckPointImplClassOpt.map(className ->
properties.setProperty(SnapshotLoadQuerySplitter.Config.SNAPSHOT_LOAD_QUERY_SPLITTER_CLASS_NAME, className));
TypedProperties typedProperties = new TypedProperties(properties);
Expand All @@ -351,7 +385,7 @@ private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingChe

private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy missingCheckpointStrategy, Option<String> checkpointToPull,
int expectedCount, String expectedCheckpoint) {
readAndAssert(missingCheckpointStrategy, checkpointToPull, expectedCount, expectedCheckpoint, Option.empty());
readAndAssert(missingCheckpointStrategy, checkpointToPull, expectedCount, expectedCheckpoint, Option.empty(), new TypedProperties());
}

private Pair<String, List<HoodieRecord>> writeRecords(SparkRDDWriteClient writeClient,
Expand Down

0 comments on commit 8bf2b68

Please # to comment.