diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java index 95e0c9b9ebf7e..dfd69c509da36 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieCompactionConfig.java @@ -45,6 +45,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained"; public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits"; public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits"; + public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch"; // Upsert uses this file size to compact new data onto existing files.. public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit"; // By default, treat any file <= 100MB as a small file. @@ -104,6 +105,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig { private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10"; private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30"; private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20"; + private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10); public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target" + ".partitions"; // 500GB of target IO per compaction (both read and write) @@ -240,6 +242,11 @@ public Builder withTargetPartitionsPerDayBasedCompaction(int targetPartitionsPer return this; } + public Builder withCommitsArchivalBatchSize(int batchSize) { + props.setProperty(COMMITS_ARCHIVAL_BATCH_SIZE_PROP, String.valueOf(batchSize)); + return this; + } + public HoodieCompactionConfig build() { HoodieCompactionConfig config = new HoodieCompactionConfig(props); setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP, @@ -281,6 +288,8 @@ public HoodieCompactionConfig build() { COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED); setDefaultOnCondition(props, !props.containsKey(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP), TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION); + setDefaultOnCondition(props, !props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP), + COMMITS_ARCHIVAL_BATCH_SIZE_PROP, DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE); HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP)); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java index 7156623f60dcc..115dd51b3b6e4 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java @@ -249,6 +249,11 @@ public int getTargetPartitionsPerDayBasedCompaction() { .parseInt(props.getProperty(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP)); } + public int getCommitArchivalBatchSize() { + return Integer + .parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP)); + } + /** * index properties **/ diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index eb836d0d41425..ccf303a01c659 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -245,11 +245,11 @@ public void archive(List instants) throws HoodieCommitException { List records = new ArrayList<>(); for (HoodieInstant hoodieInstant : instants) { records.add(convertToAvroRecord(commitTimeline, hoodieInstant)); + if (records.size() >= this.config.getCommitArchivalBatchSize()) { + writeToFile(wrapperSchema, records); + } } - Map header = Maps.newHashMap(); - header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString()); - HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header); - this.writer = writer.appendBlock(block); + writeToFile(wrapperSchema, records); } catch (Exception e) { throw new HoodieCommitException("Failed to archive commits", e); } @@ -259,6 +259,16 @@ public Path getArchiveFilePath() { return archiveFilePath; } + private void writeToFile(Schema wrapperSchema, List records) throws Exception { + if (records.size() > 0) { + Map header = Maps.newHashMap(); + header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString()); + HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header); + this.writer = writer.appendBlock(block); + records.clear(); + } + } + private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline, HoodieInstant hoodieInstant) throws IOException { HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();