Skip to content

Commit

Permalink
OSS cherry picks 2023-04-17 to 2023-04-27 (apache#268)
Browse files Browse the repository at this point in the history
This commit cherry picks the commits from OSS Hudi (range 64bf871..77039ae).

apache#8380
apache#8352
apache#8422
apache#8494
apache#8496
apache#8510
apache#8495
apache#8384
apache#8484
apache#6799
apache#8553
apache#8376

Co-authored-by: voonhous <voonhousu@gmail.com>
Co-authored-by: huangxiaoping <1754789345@qq.com>
Co-authored-by: Sivabalan Narayanan <n.siva.b@gmail.com>
Co-authored-by: Prashant Wason <pwason@uber.com>
Co-authored-by: Danny Chan <yuzhao.cyz@gmail.com>
Co-authored-by: 冯健 <fengjian428@gmail.com>
Co-authored-by: fengjian <fengjian@dipeak.com>
Co-authored-by: kongwei <kongwei@pku.edu.cn>
Co-authored-by: wei.kong <wei.kong@shopee.com>
  • Loading branch information
10 people authored Apr 28, 2023
1 parent c5491c7 commit 936591a
Show file tree
Hide file tree
Showing 32 changed files with 867 additions and 169 deletions.
15 changes: 15 additions & 0 deletions doap_HUDI.rdf
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,21 @@
<created>2022-10-18</created>
<revision>0.12.1</revision>
</Version>
<Version>
<name>Apache Hudi 0.12.2</name>
<created>2022-12-28</created>
<revision>0.12.2</revision>
</Version>
<Version>
<name>Apache Hudi 0.13.0</name>
<created>2023-02-25</created>
<revision>0.13.0</revision>
</Version>
<Version>
<name>Apache Hudi 0.12.3</name>
<created>2023-04-23</created>
<revision>0.12.3</revision>
</Version>
</release>
<repository>
<GitRepository>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -317,7 +317,7 @@ public Builder hfileBlockSize(int blockSize) {
return this;
}

public Builder logFileDataBlockMaxSize(int dataBlockSize) {
public Builder logFileDataBlockMaxSize(long dataBlockSize) {
storageConfig.setValue(LOGFILE_DATA_BLOCK_MAX_SIZE, String.valueOf(dataBlockSize));
return this;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ReflectionUtils;
import org.apache.hudi.common.util.StringUtils;
import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.queue.ExecutorType;
import org.apache.hudi.config.metrics.HoodieMetricsCloudWatchConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
Expand Down Expand Up @@ -93,6 +92,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
import static org.apache.hudi.common.util.queue.ExecutorType.BOUNDED_IN_MEMORY;
import static org.apache.hudi.config.HoodieCleanConfig.CLEANER_POLICY;

Expand Down Expand Up @@ -1794,8 +1794,8 @@ public int getParquetPageSize() {
return getInt(HoodieStorageConfig.PARQUET_PAGE_SIZE);
}

public int getLogFileDataBlockMaxSize() {
return getInt(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE);
public long getLogFileDataBlockMaxSize() {
return getLong(HoodieStorageConfig.LOGFILE_DATA_BLOCK_MAX_SIZE);
}

public double getParquetCompressionRatio() {
Expand Down Expand Up @@ -2812,37 +2812,36 @@ private void validate() {
Objects.requireNonNull(writeConfig.getString(BASE_PATH));
if (writeConfig.getString(WRITE_CONCURRENCY_MODE)
.equalsIgnoreCase(WriteConcurrencyMode.OPTIMISTIC_CONCURRENCY_CONTROL.value())) {
ValidationUtils.checkArgument(!writeConfig.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY)
checkArgument(!writeConfig.getString(HoodieCleanConfig.FAILED_WRITES_CLEANER_POLICY)
.equals(HoodieFailedWritesCleaningPolicy.EAGER.name()), "To enable optimistic concurrency control, set hoodie.cleaner.policy.failed.writes=LAZY");
}


HoodieCleaningPolicy cleaningPolicy = HoodieCleaningPolicy.valueOf(writeConfig.getString(CLEANER_POLICY));
if (writeConfig.getCleanerPolicy() == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
if (cleaningPolicy == HoodieCleaningPolicy.KEEP_LATEST_COMMITS) {
// Ensure minInstantsToKeep > cleanerCommitsRetained, otherwise we will archive some
// commit instant on timeline, that still has not been cleaned. Could miss some data via incr pull
int minInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MIN_COMMITS_TO_KEEP));
int maxInstantsToKeep = Integer.parseInt(writeConfig.getStringOrDefault(HoodieArchivalConfig.MAX_COMMITS_TO_KEEP));
int cleanerCommitsRetained =
Integer.parseInt(writeConfig.getStringOrDefault(HoodieCleanConfig.CLEANER_COMMITS_RETAINED));
ValidationUtils.checkArgument(maxInstantsToKeep > minInstantsToKeep,
checkArgument(maxInstantsToKeep > minInstantsToKeep,
String.format(
"Increase %s=%d to be greater than %s=%d.",
HoodieArchivalConfig.MAX_COMMITS_TO_KEEP.key(), maxInstantsToKeep,
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep));
ValidationUtils.checkArgument(minInstantsToKeep > cleanerCommitsRetained,
checkArgument(minInstantsToKeep > cleanerCommitsRetained,
String.format(
"Increase %s=%d to be greater than %s=%d. Otherwise, there is risk of incremental pull "
+ "missing data from few instants.",
HoodieArchivalConfig.MIN_COMMITS_TO_KEEP.key(), minInstantsToKeep,
HoodieCleanConfig.CLEANER_COMMITS_RETAINED.key(), cleanerCommitsRetained));

boolean inlineCompact = writeConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
boolean inlineCompactSchedule = writeConfig.getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT);
ValidationUtils.checkArgument(!(inlineCompact && inlineCompactSchedule), String.format("Either of inline compaction (%s) or "
+ "schedule inline compaction (%s) can be enabled. Both can't be set to true at the same time. %s, %s", HoodieCompactionConfig.INLINE_COMPACT.key(),
HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), inlineCompact, inlineCompactSchedule));
}

boolean inlineCompact = writeConfig.getBoolean(HoodieCompactionConfig.INLINE_COMPACT);
boolean inlineCompactSchedule = writeConfig.getBoolean(HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT);
checkArgument(!(inlineCompact && inlineCompactSchedule), String.format("Either of inline compaction (%s) or "
+ "schedule inline compaction (%s) can be enabled. Both can't be set to true at the same time. %s, %s", HoodieCompactionConfig.INLINE_COMPACT.key(),
HoodieCompactionConfig.SCHEDULE_INLINE_COMPACT.key(), inlineCompact, inlineCompactSchedule));
}

public HoodieWriteConfig build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieInstant;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.HoodieTimer;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.ValidationUtils;
Expand Down Expand Up @@ -168,4 +169,10 @@ public static List<String> filterKeysFromFile(Path filePath, List<String> candid
}
return foundRecordKeys;
}

public static boolean checkIfValidCommit(HoodieTimeline commitTimeline, String commitTs) {
// Check if the last commit ts for this row is 1) present in the timeline or
// 2) is less than the first commit ts in the timeline
return !commitTimeline.empty() && commitTimeline.containsOrBeforeTimelineStarts(commitTs);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,11 @@
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.table.timeline.HoodieTimeline;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;

import java.util.ArrayList;
Expand Down Expand Up @@ -60,11 +62,13 @@ public <R> HoodieData<HoodieRecord<R>> tagLocation(
HoodieTable hoodieTable) {
return records.mapPartitions(hoodieRecordIterator -> {
List<HoodieRecord<R>> taggedRecords = new ArrayList<>();
HoodieTimeline commitsTimeline = hoodieTable.getMetaClient().getCommitsTimeline().filterCompletedInstants();
while (hoodieRecordIterator.hasNext()) {
HoodieRecord<R> record = hoodieRecordIterator.next();
if (recordLocationMap.containsKey(record.getKey())) {
HoodieRecordLocation location = recordLocationMap.get(record.getKey());
if ((location != null) && HoodieIndexUtils.checkIfValidCommit(commitsTimeline, location.getInstantTime())) {
record.unseal();
record.setCurrentLocation(recordLocationMap.get(record.getKey()));
record.setCurrentLocation(location);
record.seal();
}
taggedRecords.add(record);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,9 @@ public class HoodieAppendHandle<T extends HoodieRecordPayload, I, K, O> extends
// Total number of bytes written during this append phase (an estimation)
protected long estimatedNumberOfBytesWritten;
// Number of records that must be written to meet the max block size for a log block
private int numberOfRecords = 0;
private long numberOfRecords = 0;
// Max block size to limit to for a log block
private final int maxBlockSize = config.getLogFileDataBlockMaxSize();
private final long maxBlockSize = config.getLogFileDataBlockMaxSize();
// Header metadata for a log block
protected final Map<HeaderMetadataType, String> header = new HashMap<>();
private SizeEstimator<HoodieRecord> sizeEstimator;
Expand Down Expand Up @@ -566,7 +566,7 @@ private void flushToDiskIfRequired(HoodieRecord record, boolean appendDeleteBloc
}

// Append if max number of records reached to achieve block size
if (numberOfRecords >= (int) (maxBlockSize / averageRecordSize)) {
if (numberOfRecords >= (long) (maxBlockSize / averageRecordSize)) {
// Recompute averageRecordSize before writing a new block and update existing value with
// avg of new and old
LOG.info("Flush log block to disk, the current avgRecordSize => " + averageRecordSize);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public class HoodieCDCLogger implements Closeable {
private final CDCTransformer transformer;

// Max block size to limit to for a log block
private final int maxBlockSize;
private final long maxBlockSize;

// Average cdc record size. This size is updated at the end of every log block flushed to disk
private long averageCDCRecordSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import org.apache.hudi.exception.HoodieMetadataException;
import org.apache.hudi.hadoop.CachingPath;
import org.apache.hudi.hadoop.SerializablePath;

import org.apache.hudi.table.action.compact.strategy.UnBoundedCompactionStrategy;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
Expand Down Expand Up @@ -284,6 +284,10 @@ private HoodieWriteConfig createMetadataWriteConfig(HoodieWriteConfig writeConfi
// to always use the metadata of the new record.
.withPreserveCommitMetadata(false)
.withLogRecordReaderScanV2(String.valueOf(writeConfig.useScanV2ForLogRecordReader()))
// Compaction on metadata table is used as a barrier for archiving on main dataset and for validating the
// deltacommits having corresponding completed commits. Therefore, we need to compact all fileslices of all
// partitions together requiring UnBoundedCompactionStrategy.
.withCompactionStrategy(new UnBoundedCompactionStrategy())
.build())
.withParallelism(parallelism, parallelism)
.withDeleteParallelism(parallelism)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.hudi.avro.model.HoodieCleanMetadata;
import org.apache.hudi.avro.model.HoodieSavepointMetadata;
import org.apache.hudi.common.engine.HoodieEngineContext;
import org.apache.hudi.common.fs.FSUtils;
import org.apache.hudi.common.model.CleanFileInfo;
import org.apache.hudi.common.model.CompactionOperation;
import org.apache.hudi.common.model.FileSlice;
Expand All @@ -46,7 +47,6 @@
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieSavepointException;
import org.apache.hudi.metadata.FileSystemBackedTableMetadata;
import org.apache.hudi.table.HoodieTable;

import org.apache.log4j.LogManager;
Expand Down Expand Up @@ -215,15 +215,7 @@ private List<String> getPartitionPathsForIncrementalCleaning(HoodieCleanMetadata
*/
private List<String> getPartitionPathsForFullCleaning() {
// Go to brute force mode of scanning all partitions
try {
// Because the partition of BaseTableMetadata has been deleted,
// all partition information can only be obtained from FileSystemBackedTableMetadata.
FileSystemBackedTableMetadata fsBackedTableMetadata = new FileSystemBackedTableMetadata(context,
context.getHadoopConf(), config.getBasePath(), config.shouldAssumeDatePartitioning());
return fsBackedTableMetadata.getAllPartitionPaths();
} catch (IOException e) {
return Collections.emptyList();
}
return FSUtils.getAllPartitionPaths(context, config.getMetadataConfig(), config.getBasePath());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ protected HoodieFileWriter<GenericRecord> createWriter(
protected HoodieFileReader<GenericRecord> createReader(
Configuration conf) throws Exception {
CacheConfig cacheConfig = new CacheConfig(conf);
return new HoodieHFileReader<>(conf, getFilePath(), cacheConfig, getFilePath().getFileSystem(conf));
return new HoodieHFileReader<>(conf, getFilePath(), cacheConfig, getFilePath().getFileSystem(conf), Option.empty());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hudi.exception.HoodieDependentSystemUnavailableException;
import org.apache.hudi.exception.HoodieIndexException;
import org.apache.hudi.index.HoodieIndex;
import org.apache.hudi.index.HoodieIndexUtils;
import org.apache.hudi.table.HoodieTable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
Expand Down Expand Up @@ -226,14 +227,6 @@ protected String getHBaseKey(String key) {
return key;
}

private boolean checkIfValidCommit(HoodieTableMetaClient metaClient, String commitTs) {
HoodieTimeline commitTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
// Check if the last commit ts for this row is 1) present in the timeline or
// 2) is less than the first commit ts in the timeline
return !commitTimeline.empty()
&& commitTimeline.containsOrBeforeTimelineStarts(commitTs);
}

/**
* Function that tags each HoodieRecord with an existing location, if known.
*/
Expand All @@ -257,6 +250,7 @@ private <R> Function2<Integer, Iterator<HoodieRecord<R>>, Iterator<HoodieRecord<
List<Get> statements = new ArrayList<>();
List<HoodieRecord> currentBatchOfRecords = new LinkedList<>();
// Do the tagging.
HoodieTimeline completedCommitsTimeline = metaClient.getCommitsTimeline().filterCompletedInstants();
while (hoodieRecordIterator.hasNext()) {
HoodieRecord rec = hoodieRecordIterator.next();
statements.add(generateStatement(rec.getRecordKey()));
Expand All @@ -280,7 +274,7 @@ private <R> Function2<Integer, Iterator<HoodieRecord<R>>, Iterator<HoodieRecord<
String commitTs = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN));
String fileId = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN));
String partitionPath = Bytes.toString(result.getValue(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN));
if (!checkIfValidCommit(metaClient, commitTs)) {
if (!HoodieIndexUtils.checkIfValidCommit(completedCommitsTimeline, commitTs)) {
// if commit is invalid, treat this as a new taggedRecord
taggedRecords.add(currentRecord);
continue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.io.IOException;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
Expand Down Expand Up @@ -1115,15 +1116,8 @@ private static Object rewritePrimaryTypeWithDiffSchemaType(Object oldValue, Sche
|| oldSchema.getType() == Schema.Type.LONG
|| oldSchema.getType() == Schema.Type.FLOAT) {
LogicalTypes.Decimal decimal = (LogicalTypes.Decimal) newSchema.getLogicalType();
BigDecimal bigDecimal = null;
if (oldSchema.getType() == Schema.Type.STRING) {
bigDecimal = new java.math.BigDecimal(oldValue.toString())
.setScale(decimal.getScale());
} else {
// Due to Java, there will be precision problems in direct conversion, we should use string instead of use double
bigDecimal = new java.math.BigDecimal(oldValue.toString())
.setScale(decimal.getScale());
}
// due to Java, there will be precision problems in direct conversion, we should use string instead of use double
BigDecimal bigDecimal = new java.math.BigDecimal(oldValue.toString()).setScale(decimal.getScale(), RoundingMode.HALF_UP);
return DECIMAL_CONVERSION.toFixed(bigDecimal, newSchema, newSchema.getLogicalType());
}
}
Expand Down Expand Up @@ -1175,7 +1169,7 @@ private static Schema getActualSchemaFromUnion(Schema schema, Object data) {
} else if (schema.getTypes().size() == 1) {
actualSchema = schema.getTypes().get(0);
} else {
// deal complex union. this should not happened in hoodie,
// deal complex union. this should not happen in hoodie,
// since flink/spark do not write this type.
int i = GenericData.get().resolveUnion(schema, data);
actualSchema = schema.getTypes().get(i);
Expand All @@ -1186,10 +1180,10 @@ private static Schema getActualSchemaFromUnion(Schema schema, Object data) {
/**
* Given avro records, rewrites them with new schema.
*
* @param oldRecords oldRecords to be rewrite
* @param oldRecords oldRecords to be rewritten
* @param newSchema newSchema used to rewrite oldRecord
* @param renameCols a map store all rename cols, (k, v)-> (colNameFromNewSchema, colNameFromOldSchema)
* @return a iterator of rewrote GeneriRcords
* @return a iterator of rewritten GenericRecords
*/
public static Iterator<GenericRecord> rewriteRecordWithNewSchema(Iterator<GenericRecord> oldRecords, Schema newSchema, Map<String, String> renameCols, boolean validate) {
if (oldRecords == null || newSchema == null) {
Expand Down
Loading

0 comments on commit 936591a

Please # to comment.