Skip to content

Commit

Permalink
Performing commit archiving in batches to avoid keeping a huge chunk …
Browse files Browse the repository at this point in the history
…in memory
  • Loading branch information
n3nash committed Apr 10, 2019
1 parent b07110b commit 9efbc55
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
public static final String CLEANER_COMMITS_RETAINED_PROP = "hoodie.cleaner.commits.retained";
public static final String MAX_COMMITS_TO_KEEP_PROP = "hoodie.keep.max.commits";
public static final String MIN_COMMITS_TO_KEEP_PROP = "hoodie.keep.min.commits";
public static final String COMMITS_ARCHIVAL_BATCH_SIZE_PROP = "hoodie.commits.archival.batch";
// Upsert uses this file size to compact new data onto existing files..
public static final String PARQUET_SMALL_FILE_LIMIT_BYTES = "hoodie.parquet.small.file.limit";
// By default, treat any file <= 100MB as a small file.
Expand Down Expand Up @@ -104,6 +105,7 @@ public class HoodieCompactionConfig extends DefaultHoodieConfig {
private static final String DEFAULT_CLEANER_COMMITS_RETAINED = "10";
private static final String DEFAULT_MAX_COMMITS_TO_KEEP = "30";
private static final String DEFAULT_MIN_COMMITS_TO_KEEP = "20";
private static final String DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE = String.valueOf(10);
public static final String TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP = "hoodie.compaction.daybased.target"
+ ".partitions";
// 500GB of target IO per compaction (both read and write)
Expand Down Expand Up @@ -240,6 +242,11 @@ public Builder withTargetPartitionsPerDayBasedCompaction(int targetPartitionsPer
return this;
}

public Builder withCommitsArchivalBatchSize(int batchSize) {
props.setProperty(COMMITS_ARCHIVAL_BATCH_SIZE_PROP, String.valueOf(batchSize));
return this;
}

public HoodieCompactionConfig build() {
HoodieCompactionConfig config = new HoodieCompactionConfig(props);
setDefaultOnCondition(props, !props.containsKey(AUTO_CLEAN_PROP), AUTO_CLEAN_PROP,
Expand Down Expand Up @@ -281,6 +288,8 @@ public HoodieCompactionConfig build() {
COMPACTION_REVERSE_LOG_READ_ENABLED_PROP, DEFAULT_COMPACTION_REVERSE_LOG_READ_ENABLED);
setDefaultOnCondition(props, !props.containsKey(TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP),
TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP, DEFAULT_TARGET_PARTITIONS_PER_DAYBASED_COMPACTION);
setDefaultOnCondition(props, !props.containsKey(COMMITS_ARCHIVAL_BATCH_SIZE_PROP),
COMMITS_ARCHIVAL_BATCH_SIZE_PROP, DEFAULT_COMMITS_ARCHIVAL_BATCH_SIZE);

HoodieCleaningPolicy.valueOf(props.getProperty(CLEANER_POLICY_PROP));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,11 @@ public int getTargetPartitionsPerDayBasedCompaction() {
.parseInt(props.getProperty(HoodieCompactionConfig.TARGET_PARTITIONS_PER_DAYBASED_COMPACTION_PROP));
}

public int getCommitArchivalBatchSize() {
return Integer
.parseInt(props.getProperty(HoodieCompactionConfig.COMMITS_ARCHIVAL_BATCH_SIZE_PROP));
}

/**
* index properties
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,11 +245,11 @@ public void archive(List<HoodieInstant> instants) throws HoodieCommitException {
List<IndexedRecord> records = new ArrayList<>();
for (HoodieInstant hoodieInstant : instants) {
records.add(convertToAvroRecord(commitTimeline, hoodieInstant));
if (records.size() >= this.config.getCommitArchivalBatchSize()) {
writeToFile(wrapperSchema, records);
}
}
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
this.writer = writer.appendBlock(block);
writeToFile(wrapperSchema, records);
} catch (Exception e) {
throw new HoodieCommitException("Failed to archive commits", e);
}
Expand All @@ -259,6 +259,16 @@ public Path getArchiveFilePath() {
return archiveFilePath;
}

private void writeToFile(Schema wrapperSchema, List<IndexedRecord> records) throws Exception {
if (records.size() > 0) {
Map<HoodieLogBlock.HeaderMetadataType, String> header = Maps.newHashMap();
header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, wrapperSchema.toString());
HoodieAvroDataBlock block = new HoodieAvroDataBlock(records, header);
this.writer = writer.appendBlock(block);
records.clear();
}
}

private IndexedRecord convertToAvroRecord(HoodieTimeline commitTimeline,
HoodieInstant hoodieInstant) throws IOException {
HoodieArchivedMetaEntry archivedMetaWrapper = new HoodieArchivedMetaEntry();
Expand Down

0 comments on commit 9efbc55

Please # to comment.