Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Fixing bug introducted in rollback for MOR table type with inserts into log files #417

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -216,11 +217,14 @@ public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> comm
final Map<FileStatus, Boolean> filesToDeletedStatus = new HashMap<>();
Map<FileStatus, Long> filesToNumBlocksRollback = new HashMap<>();

// In case all data was inserts and the commit failed, there is no partition stats
if (commitMetadata.getPartitionToWriteStats().size() == 0) {
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
}
// In case all data was inserts and the commit failed, delete the file belonging to that commit
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);

final Set<String> deletedFiles = filesToDeletedStatus.entrySet().stream()
.map(entry -> {
Path filePath = entry.getKey().getPath();
return FSUtils.getFileIdFromFilePath(filePath);
}).collect(Collectors.toSet());
// append rollback blocks for updates
if (commitMetadata.getPartitionToWriteStats().containsKey(partitionPath)) {
// This needs to be done since GlobalIndex at the moment does not store the latest commit time
Expand All @@ -231,16 +235,9 @@ public List<HoodieRollbackStat> rollback(JavaSparkContext jsc, List<String> comm
.filter(wStat -> {
if (wStat != null
&& wStat.getPrevCommit() != HoodieWriteStat.NULL_COMMIT
&& wStat.getPrevCommit() != null) {
&& wStat.getPrevCommit() != null && !deletedFiles.contains(wStat.getFileId())) {
return true;
}
// we do not know fileIds for inserts (first inserts are either log files or parquet files),
// delete all files for the corresponding failed commit, if present (same as COW)
try {
super.deleteCleanedFiles(filesToDeletedStatus, partitionPath, filter);
} catch (IOException io) {
throw new UncheckedIOException(io);
}
return false;
})
.forEach(wStat -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.uber.hoodie.index.bloom.HoodieBloomIndex;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -389,10 +391,10 @@ public void testCOWToMORConvertedDatasetRollback() throws Exception {
@Test
public void testRollbackWithDeltaAndCompactionCommit() throws Exception {

HoodieWriteConfig cfg = getConfig(true);
HoodieWriteConfig cfg = getConfig(false);
HoodieWriteClient client = new HoodieWriteClient(jsc, cfg);

// Test delta commit rollback (with all log files)
// Test delta commit rollback
/**
* Write 1 (only inserts)
*/
Expand All @@ -403,7 +405,9 @@ public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
List<HoodieRecord> records = dataGen.generateInserts(newCommitTime, 200);
JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 1);

List<WriteStatus> statuses = client.upsert(writeRecords, newCommitTime).collect();
JavaRDD<WriteStatus> writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
List<WriteStatus> statuses = writeStatusJavaRDD.collect();
assertNoWriteErrors(statuses);

HoodieTableMetaClient metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
Expand All @@ -428,56 +432,99 @@ public void testRollbackWithDeltaAndCompactionCommit() throws Exception {
dataFilesToRead.findAny().isPresent());

/**
* Write 2 (updates)
* Write 2 (inserts + updates - testing failed delta commit)
*/
newCommitTime = "002";
client.startCommitWithTime(newCommitTime);
final String commitTime1 = "002";
// WriteClient with custom config (disable small file handling)
client = new HoodieWriteClient(jsc, HoodieWriteConfig.newBuilder().withPath(basePath)
.withSchema(TRIP_EXAMPLE_SCHEMA)
.withParallelism(2, 2)
.withAutoCommit(false).withAssumeDatePartitioning(true).withCompactionConfig(HoodieCompactionConfig.newBuilder()
.compactionSmallFileSize(1 * 1024).withInlineCompaction(false)
.withMaxNumDeltaCommitsBeforeCompaction(1).build())
.withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1 * 1024).build())
.forTable("test-trip-table").build());
client.startCommitWithTime(commitTime1);

List<HoodieRecord> copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(commitTime1, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime1, 200));

records = dataGen.generateUpdates(newCommitTime, records);
List<String> dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);

statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
statuses = client.upsert(jsc.parallelize(copyOfRecords, 1), commitTime1).collect();
// Verify there are no errors
assertNoWriteErrors(statuses);
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
deltaCommit = metaClient.getActiveTimeline().getDeltaCommitTimeline().lastInstant();
assertTrue(deltaCommit.isPresent());
assertEquals("Latest Delta commit should be 002", "002", deltaCommit.get().getTimestamp());

commit = metaClient.getActiveTimeline().getCommitTimeline().firstInstant();
assertFalse(commit.isPresent());
// Test failed delta commit rollback
client.rollback(commitTime1);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
// After rollback, there should be no parquet file with the failed commit time
Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName()
.contains(commitTime1)).collect(Collectors.toList()).size(), 0);
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);

List<String> dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
List<GenericRecord> recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);

/**
* Write 3 (inserts + updates - testing successful delta commit)
*/
final String commitTime2 = "002";
client = new HoodieWriteClient(jsc, cfg);
client.startCommitWithTime(commitTime2);

copyOfRecords = new ArrayList<>(records);
copyOfRecords = dataGen.generateUpdates(commitTime2, copyOfRecords);
copyOfRecords.addAll(dataGen.generateInserts(commitTime2, 200));

dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);
assertEquals(recordsRead.size(), 200);

// Test delta commit rollback
client.rollback(newCommitTime);
writeStatusJavaRDD = client.upsert(writeRecords, commitTime2);
client.commit(commitTime2, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);

// Test successful delta commit rollback
client.rollback(commitTime2);
allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
// After rollback, there should be no parquet file with the failed commit time
Assert.assertEquals(Arrays.asList(allFiles).stream().filter(file -> file.getPath().getName()
.contains(commitTime2)).collect(Collectors.toList()).size(), 0);

metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc);
roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles);
dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList());
recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath);

// check that the number of records read is still correct after rollback operation
assertEquals(recordsRead.size(), 200);

//Test compaction commit rollback
// Test compaction commit rollback
/**
* Write 2 (updates)
* Write 4 (updates)
*/
newCommitTime = "003";
client.startCommitWithTime(newCommitTime);

records = dataGen.generateUpdates(newCommitTime, 400);
records = dataGen.generateUpdates(newCommitTime, records);

statuses = client.upsert(jsc.parallelize(records, 1), newCommitTime).collect();
writeStatusJavaRDD = client.upsert(writeRecords, newCommitTime);
client.commit(newCommitTime, writeStatusJavaRDD);
statuses = writeStatusJavaRDD.collect();
// Verify there are no errors
assertNoWriteErrors(statuses);

metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());

String compactionCommit = client.startCompaction();
client.compact(compactionCommit);
JavaRDD<WriteStatus> writeStatus = client.compact(compactionCommit);
client.commitCompaction(compactionCommit, writeStatus);

allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath());
metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,16 @@ public static String getFileIdFromLogPath(Path path) {
return matcher.group(1);
}

/**
* Check if the file is a parquet file of a log file. Then get the fileId appropriately.
*/
public static String getFileIdFromFilePath(Path filePath) {
if (FSUtils.isLogFile(filePath)) {
return FSUtils.getFileIdFromLogPath(filePath);
}
return FSUtils.getFileId(filePath.getName());
}

/**
* Get the first part of the file name in the log file. That will be the fileId. Log file do not
* have commitTime in the file name.
Expand Down