diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java index 3693b55bf929a..c879c06ccb560 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java @@ -584,11 +584,11 @@ private boolean commit(String commitTime, JavaRDD writeStatuses, public boolean savepoint(String user, String comment) { HoodieTable table = HoodieTable.getHoodieTable( new HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true), config, jsc); - if (table.getCompletedCommitTimeline().empty()) { + if (table.getCompletedCommitsTimeline().empty()) { throw new HoodieSavepointException("Could not savepoint. Commit timeline is empty"); } - String latestCommit = table.getCompletedCommitTimeline().lastInstant().get().getTimestamp(); + String latestCommit = table.getCompletedCommitsTimeline().lastInstant().get().getTimestamp(); logger.info("Savepointing latest commit " + latestCommit); return savepoint(latestCommit, user, comment); } @@ -615,7 +615,7 @@ public boolean savepoint(String commitTime, String user, String comment) { HoodieInstant commitInstant = new HoodieInstant(false, HoodieTimeline.COMMIT_ACTION, commitTime); - if (!table.getCompletedCommitTimeline().containsInstant(commitInstant)) { + if (!table.getCompletedCommitsTimeline().containsInstant(commitInstant)) { throw new HoodieSavepointException( "Could not savepoint non-existing commit " + commitInstant); } @@ -628,7 +628,7 @@ public boolean savepoint(String commitTime, String user, String comment) { table.getActiveTimeline().getInstantDetails(cleanInstant.get()).get()); lastCommitRetained = cleanMetadata.getEarliestCommitToRetain(); } else { - lastCommitRetained = table.getCompletedCommitTimeline().firstInstant().get().getTimestamp(); + lastCommitRetained = table.getCompletedCommitsTimeline().firstInstant().get().getTimestamp(); } // Cannot allow savepoint time on a commit that could have been cleaned @@ -792,7 +792,7 @@ private void rollback(List commits) { table.getActiveTimeline().filterPendingCompactionTimeline().getInstants() .map(HoodieInstant::getTimestamp).collect(Collectors.toSet()); HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline(); - HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); + HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); // Check if any of the commits is a savepoint - do not allow rollback on those commits List savepoints = table.getCompletedSavepointTimeline().getInstants() diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java index f8647e5cbd68a..eb836d0d41425 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCommitArchiveLog.java @@ -150,7 +150,7 @@ private Stream getInstantsToArchive(JavaSparkContext jsc) { //TODO (na) : Add a way to return actions associated with a timeline and then merge/unify // with logic above to avoid Stream.concats - HoodieTimeline commitTimeline = table.getCompletedCommitTimeline(); + HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline(); Optional oldestPendingCompactionInstant = table.getActiveTimeline().filterPendingCompactionTimeline().firstInstant(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java index 07bc368e44332..67aa6b2ef853c 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java @@ -53,7 +53,7 @@ public HoodieIOHandle(HoodieWriteConfig config, String commitTime, HoodieTable getSmallFiles(String partitionPath) { // smallFiles only for partitionPath List smallFileLocations = new ArrayList<>(); - HoodieTimeline commitTimeline = getCompletedCommitTimeline(); + HoodieTimeline commitTimeline = getCompletedCommitsTimeline(); if (!commitTimeline.empty()) { // if we have some commits HoodieInstant latestCommitTime = commitTimeline.lastInstant().get(); diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java index 849dbae45caf1..de1e563e9331e 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieMergeOnReadTable.java @@ -327,7 +327,7 @@ protected List getSmallFiles(String partitionPath) { List smallFileLocations = new ArrayList<>(); // Init here since this class (and member variables) might not have been initialized - HoodieTimeline commitTimeline = getCompletedCommitTimeline(); + HoodieTimeline commitTimeline = getCompletedCommitsTimeline(); // Find out all eligible small file slices if (!commitTimeline.empty()) { diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java index 10ef94ed25f3d..2f51cbc8c77e1 100644 --- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java +++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java @@ -111,14 +111,14 @@ public Configuration getHadoopConf() { * Get the view of the file system for this table */ public TableFileSystemView getFileSystemView() { - return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline()); + return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline()); } /** * Get the read optimized view of the file system for this table */ public TableFileSystemView.ReadOptimizedView getROFileSystemView() { - return new HoodieTableFileSystemView(metaClient, getCompletedCommitTimeline()); + return new HoodieTableFileSystemView(metaClient, getCompletedCommitsTimeline()); } /** @@ -136,11 +136,18 @@ public TableFileSystemView getCompletedFileSystemView() { return new HoodieTableFileSystemView(metaClient, metaClient.getCommitsTimeline()); } + /** + * Get only the completed (no-inflights) commit + deltacommit timeline + */ + public HoodieTimeline getCompletedCommitsTimeline() { + return metaClient.getCommitsTimeline().filterCompletedInstants(); + } + /** * Get only the completed (no-inflights) commit timeline */ public HoodieTimeline getCompletedCommitTimeline() { - return metaClient.getCommitsTimeline().filterCompletedInstants(); + return metaClient.getCommitTimeline().filterCompletedInstants(); } /** diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java index c13c0c43e6364..dca6d9d69660a 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestAsyncCompaction.java @@ -493,7 +493,7 @@ private List createNextDeltaCommit(String instantTime, List getCurrentLatestDataFiles(HoodieTable table, HoodieWriteConfig cfg) throws IOException { FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(table.getMetaClient().getFs(), cfg.getBasePath()); HoodieTableFileSystemView - view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitTimeline(), allFiles); + view = new HoodieTableFileSystemView(table.getMetaClient(), table.getCompletedCommitsTimeline(), allFiles); List dataFilesToRead = view.getLatestDataFiles().collect(Collectors.toList()); return dataFilesToRead; } diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java index 03f895b27e09a..f3a9616dd5d91 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java @@ -126,8 +126,8 @@ private String insertFirstBigBatchForClientCleanerTest( // Should have 100 records in table (check using Index), all in locations marked at commit HoodieTable table = HoodieTable.getHoodieTable(metaClient, getConfig(), jsc); - assertFalse(table.getCompletedCommitTimeline().empty()); - String commitTime = table.getCompletedCommitTimeline().getInstants().findFirst().get().getTimestamp(); + assertFalse(table.getCompletedCommitsTimeline().empty()); + String commitTime = table.getCompletedCommitsTimeline().getInstants().findFirst().get().getTimestamp(); assertFalse(table.getCompletedCleanTimeline().empty()); assertEquals("The clean instant should be the same as the commit instant", commitTime, table.getCompletedCleanTimeline().getInstants().findFirst().get().getTimestamp()); @@ -380,7 +380,7 @@ private void testInsertAndCleanByCommits( HoodieTableMetaClient metadata = new HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath); HoodieTable table1 = HoodieTable.getHoodieTable(metadata, cfg, jsc); - HoodieTimeline activeTimeline = table1.getCompletedCommitTimeline(); + HoodieTimeline activeTimeline = table1.getCompletedCommitsTimeline(); Optional earliestRetainedCommit = activeTimeline.nthFromLastInstant(maxCommits - 1); Set acceptableCommits = activeTimeline.getInstants().collect(Collectors.toSet()); if (earliestRetainedCommit.isPresent()) { diff --git a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java index ed68e5857d741..6f177db4978ed 100644 --- a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java +++ b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java @@ -176,7 +176,7 @@ public void testSimpleInsertAndUpdate() throws Exception { Stream dataFilesToRead = roView.getLatestDataFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", dataFilesToRead.findAny().isPresent()); @@ -210,7 +210,7 @@ public void testSimpleInsertAndUpdate() throws Exception { client.compact(compactionCommitTime); allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); @@ -283,7 +283,7 @@ public void testSimpleInsertUpdateAndDelete() throws Exception { Stream dataFilesToRead = roView.getLatestDataFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", dataFilesToRead.findAny().isPresent()); @@ -320,7 +320,7 @@ public void testSimpleInsertUpdateAndDelete() throws Exception { assertFalse(commit.isPresent()); allFiles = HoodieTestUtils.listAllDataFilesInPath(dfs, cfg.getBasePath()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); assertTrue(dataFilesToRead.findAny().isPresent()); @@ -380,7 +380,7 @@ public void testCOWToMORConvertedDatasetRollback() throws Exception { HoodieTable hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); FileStatus[] allFiles = HoodieTestUtils.listAllDataFilesInPath(metaClient.getFs(), cfg.getBasePath()); HoodieTableFileSystemView roView = new HoodieTableFileSystemView(metaClient, - hoodieTable.getCompletedCommitTimeline(), allFiles); + hoodieTable.getCompletedCommitsTimeline(), allFiles); final String absentCommit = newCommitTime; assertFalse(roView.getLatestDataFiles().filter(file -> { @@ -430,7 +430,7 @@ public void testRollbackWithDeltaAndCompactionCommit() throws Exception { Stream dataFilesToRead = roView.getLatestDataFiles(); assertTrue(!dataFilesToRead.findAny().isPresent()); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit", dataFilesToRead.findAny().isPresent()); @@ -504,7 +504,7 @@ public void testRollbackWithDeltaAndCompactionCommit() throws Exception { metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), cfg.getBasePath()); hoodieTable = HoodieTable.getHoodieTable(metaClient, cfg, jsc); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFiles = roView.getLatestDataFiles().map(hf -> hf.getPath()).collect(Collectors.toList()); recordsRead = HoodieMergeOnReadTestUtils.getRecordsUsingInputFormat(dataFiles, basePath); // check that the number of records read is still correct after rollback operation @@ -599,7 +599,7 @@ public void testUpsertPartitioner() throws Exception { Map parquetFileIdToSize = dataFilesToRead.collect( Collectors.toMap(HoodieDataFile::getFileId, HoodieDataFile::getFileSize)); - roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitTimeline(), allFiles); + roView = new HoodieTableFileSystemView(metaClient, hoodieTable.getCompletedCommitsTimeline(), allFiles); dataFilesToRead = roView.getLatestDataFiles(); List dataFilesList = dataFilesToRead.collect(Collectors.toList()); assertTrue("RealtimeTableView should list the parquet files we wrote in the delta commit",