diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/FileSystemClient.java b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/FileSystemClient.java index 8814a86602c..769a29ee48f 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/engine/FileSystemClient.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/engine/FileSystemClient.java @@ -76,4 +76,13 @@ CloseableIterator readFiles(CloseableIterator { "needs to be a positive integer.", true); + /** + * The shortest duration we have to keep delta/checkpoint files around before deleting them. We + * can only delete delta files that are before a checkpoint. + */ + public static final TableConfig LOG_RETENTION = + new TableConfig<>( + "delta.logRetentionDuration", + "interval 30 days", + (engineOpt, v) -> IntervalParserUtils.safeParseIntervalAsMillis(v), + value -> true, + "needs to be provided as a calendar interval such as '2 weeks'. Months " + + "and years are not accepted. You may specify '365 days' for a year instead.", + true /* editable */); + + /** Whether to clean up expired checkpoints and delta logs. */ + public static final TableConfig EXPIRED_LOG_CLEANUP_ENABLED = + new TableConfig<>( + "delta.enableExpiredLogCleanup", + "true", + (engineOpt, v) -> Boolean.valueOf(v), + value -> true, + "needs to be a boolean.", + true /* editable */); + /** * This table property is used to track the enablement of the {@code inCommitTimestamps}. * diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java index 1f32099daa7..d71ef4c30be 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/TableImpl.java @@ -108,7 +108,7 @@ public Snapshot getSnapshotAsOfTimestamp(Engine engine, long millisSinceEpochUTC @Override public void checkpoint(Engine engine, long version) throws TableNotFoundException, CheckpointAlreadyExistsException, IOException { - snapshotManager.checkpoint(engine, version); + snapshotManager.checkpoint(engine, clock, version); } @Override diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/lang/ListUtils.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/lang/ListUtils.java index 78c6970db29..06ba6cab887 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/lang/ListUtils.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/lang/ListUtils.java @@ -18,6 +18,7 @@ import io.delta.kernel.internal.util.Tuple2; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -34,4 +35,22 @@ public static Tuple2, List> partition( public static T last(List list) { return list.get(list.size() - 1); } + + /** Remove once supported JDK (build) version is 21 or above */ + public static T getFirst(List list) { + if (list.isEmpty()) { + throw new NoSuchElementException(); + } else { + return list.get(0); + } + } + + /** Remove once supported JDK (build) version is 21 or above */ + public static T getLast(List list) { + if (list.isEmpty()) { + throw new NoSuchElementException(); + } else { + return list.get(list.size() - 1); + } + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/MetadataCleanup.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/MetadataCleanup.java new file mode 100644 index 00000000000..0b405426751 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/MetadataCleanup.java @@ -0,0 +1,200 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.snapshot; + +import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; +import static io.delta.kernel.internal.checkpoints.Checkpointer.getLatestCompleteCheckpointFromList; +import static io.delta.kernel.internal.lang.ListUtils.getFirst; +import static io.delta.kernel.internal.lang.ListUtils.getLast; +import static io.delta.kernel.internal.util.Preconditions.checkArgument; +import static java.util.stream.Collectors.toList; + +import io.delta.kernel.engine.Engine; +import io.delta.kernel.internal.checkpoints.CheckpointInstance; +import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.util.Clock; +import io.delta.kernel.internal.util.FileNames; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetadataCleanup { + private static final Logger logger = LoggerFactory.getLogger(MetadataCleanup.class); + + private MetadataCleanup() {} + + /** + * Delete the Delta log files (delta and checkpoint files) that are expired according to the table + * metadata retention settings. While deleting the log files, it makes sure the time travel + * continues to work for all unexpired table versions. + * + *

Here is algorithm: + * + *

    + *
  • Initial the potential delete file list: `potentialFilesToDelete` as an empty list + *
  • Initialize the last seen checkpoint file list: `lastSeenCheckpointFiles`. There could be + * one or more checkpoint files for a given version. + *
  • List the delta log files starting with prefix "00000000000000000000." (%020d). For each + * file: + *
      + *
    • Step 1: Check if the `lastSeenCheckpointFiles` contains a complete checkpoint, then + *
        + *
      • Step 1.1: delete all files in `potentialFilesToDelete`. Now we know there is + * a checkpoint that contains the compacted Delta log up to the checkpoint + * version and all commit/checkpoint files before this checkpoint version are + * not needed. + *
      • Step 1.2: add `lastCheckpointFiles` to `potentialFileStoDelete` list. This + * checkpoint is potential candidate to delete later if we find another + * checkpoint + *
      + *
    • Step 2: If the timestamp falls within the retention period, stop + *
    • Step 3: If the file is a delta log file, add it to the `potentialFilesToDelete` + * list + *
    • Step 4: If the file is a checkpoint file, add it to the `lastSeenCheckpointFiles` + *
    + *
+ * + * @param engine {@link Engine} instance to delete the expired log files + * @param clock {@link Clock} instance to get the current time. Useful in testing to mock the + * current time. + * @param tablePath Table location + * @param retentionMillis Log file retention period in milliseconds + * @return number of log files deleted + * @throws IOException if an error occurs while deleting the log files + */ + public static long cleanupExpiredLogs( + Engine engine, Clock clock, Path tablePath, long retentionMillis) throws IOException { + checkArgument(retentionMillis >= 0, "Retention period must be non-negative"); + + List potentialLogFilesToDelete = new ArrayList<>(); + long lastSeenCheckpointVersion = -1; // -1 indicates no checkpoint seen yet + List lastSeenCheckpointFiles = new ArrayList<>(); + + long fileCutOffTime = clock.getTimeMillis() - retentionMillis; + logger.info("{}: Starting the deletion of log files older than {}", tablePath, fileCutOffTime); + long numDeleted = 0; + try (CloseableIterator files = listDeltaLogs(engine, tablePath)) { + while (files.hasNext()) { + // Step 1: Check if the `lastSeenCheckpointFiles` contains a complete checkpoint + Optional lastCompleteCheckpoint = + getLatestCompleteCheckpointFromList( + lastSeenCheckpointFiles.stream().map(CheckpointInstance::new).collect(toList()), + CheckpointInstance.MAX_VALUE); + + if (lastCompleteCheckpoint.isPresent()) { + // Step 1.1: delete all files in `potentialFilesToDelete`. Now we know there is a + // checkpoint that contains the compacted Delta log up to the checkpoint version and all + // commit/checkpoint files before this checkpoint version are not needed. add + // `lastCheckpointFiles` to `potentialFileStoDelete` list. This checkpoint is potential + // candidate to delete later if we find another checkpoint + if (!potentialLogFilesToDelete.isEmpty()) { + logger.info( + "{}: Deleting log files (start = {}, end = {}) because a checkpoint at " + + "version {} indicates that these log files are no longer needed.", + tablePath, + getFirst(potentialLogFilesToDelete), + getLast(potentialLogFilesToDelete), + lastSeenCheckpointVersion); + + numDeleted += deleteLogFiles(engine, potentialLogFilesToDelete); + potentialLogFilesToDelete.clear(); + } + + // Step 1.2: add `lastCheckpointFiles` to `potentialFileStoDelete` list. This checkpoint + // is potential candidate to delete later if we find another checkpoint + potentialLogFilesToDelete.addAll(lastSeenCheckpointFiles); + lastSeenCheckpointFiles.clear(); + lastSeenCheckpointVersion = -1; + } + + FileStatus nextFile = files.next(); + + // Step 2: If the timestamp is earlier than the retention period, stop + if (nextFile.getModificationTime() > fileCutOffTime) { + if (!potentialLogFilesToDelete.isEmpty()) { + logger.info( + "{}: Skipping deletion of expired log files {}, because there is no checkpoint " + + "file that indicates that the log files are no longer needed. ", + tablePath, + potentialLogFilesToDelete.size()); + } + break; + } + + if (FileNames.isCommitFile(nextFile.getPath())) { + // Step 3: If the file is a delta log file, add it to the `potentialFilesToDelete` list + // We can't delete these files until we encounter a checkpoint later that indicates + // that the log files are no longer needed. + potentialLogFilesToDelete.add(nextFile.getPath()); + } else if (FileNames.isCheckpointFile(nextFile.getPath())) { + // Step 4: If the file is a checkpoint file, add it to the `lastSeenCheckpointFiles` + long newLastSeenCheckpointVersion = FileNames.checkpointVersion(nextFile.getPath()); + checkArgument( + lastSeenCheckpointVersion == -1 + || newLastSeenCheckpointVersion >= lastSeenCheckpointVersion); + + if (lastSeenCheckpointVersion != -1 + && newLastSeenCheckpointVersion > lastSeenCheckpointVersion) { + // We have found checkpoint file for a new version. This means the files gathered for + // the last checkpoint version are not complete (most likely an incomplete multipart + // checkpoint). We should delete the files gathered so far and start fresh + // last seen checkpoint state + logger.info( + "{}: Incomplete checkpoint files found at version {}, ignoring the checkpoint" + + " files and adding them to potential log file delete list", + tablePath, + lastSeenCheckpointVersion); + potentialLogFilesToDelete.addAll(lastSeenCheckpointFiles); + lastSeenCheckpointFiles.clear(); + } + + lastSeenCheckpointFiles.add(nextFile.getPath()); + lastSeenCheckpointVersion = newLastSeenCheckpointVersion; + } + // Ignore non-delta and non-checkpoint files. + } + } + logger.info("{}: Deleted {} log files older than {}", tablePath, numDeleted, fileCutOffTime); + return numDeleted; + } + + private static CloseableIterator listDeltaLogs(Engine engine, Path tablePath) + throws IOException { + Path logPath = new Path(tablePath, "_delta_log"); + // TODO: Currently we don't update the timestamps of files to be monotonically increasing. + // In future we can do something similar to Delta Spark to make the timestamps monotonically + // increasing. See `BufferingLogDeletionIterator` in Delta Spark. + return engine.getFileSystemClient().listFrom(FileNames.listingPrefix(logPath, 0)); + } + + private static int deleteLogFiles(Engine engine, List logFiles) throws IOException { + int numDeleted = 0; + for (String logFile : logFiles) { + if (wrapEngineExceptionThrowsIO( + () -> engine.getFileSystemClient().delete(logFile), + "Failed to delete the log file as part of the metadata cleanup %s", + logFile)) { + numDeleted++; + } + } + return numDeleted; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 82f00bd5e54..8c760dbaf0c 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -17,10 +17,13 @@ package io.delta.kernel.internal.snapshot; import static io.delta.kernel.internal.DeltaErrors.wrapEngineExceptionThrowsIO; +import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED; +import static io.delta.kernel.internal.TableConfig.LOG_RETENTION; import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable; import static io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBefore; import static io.delta.kernel.internal.fs.Path.getName; import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable; +import static io.delta.kernel.internal.snapshot.MetadataCleanup.cleanupExpiredLogs; import static io.delta.kernel.internal.util.Preconditions.checkArgument; import static java.lang.String.format; @@ -31,11 +34,13 @@ import io.delta.kernel.exceptions.InvalidTableException; import io.delta.kernel.exceptions.TableNotFoundException; import io.delta.kernel.internal.*; +import io.delta.kernel.internal.actions.Metadata; import io.delta.kernel.internal.checkpoints.*; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.ListUtils; import io.delta.kernel.internal.replay.CreateCheckpointIterator; import io.delta.kernel.internal.replay.LogReplay; +import io.delta.kernel.internal.util.Clock; import io.delta.kernel.internal.util.FileNames; import io.delta.kernel.internal.util.Tuple2; import io.delta.kernel.utils.CloseableIterator; @@ -186,7 +191,8 @@ public Snapshot getSnapshotForTimestamp(Engine engine, long millisSinceEpochUTC) return getSnapshotAt(engine, versionToRead); } - public void checkpoint(Engine engine, long version) throws TableNotFoundException, IOException { + public void checkpoint(Engine engine, Clock clock, long version) + throws TableNotFoundException, IOException { logger.info("{}: Starting checkpoint for version: {}", tablePath, version); // Get the snapshot corresponding the version SnapshotImpl snapshot = (SnapshotImpl) getSnapshotAt(engine, version); @@ -231,6 +237,15 @@ public void checkpoint(Engine engine, long version) throws TableNotFoundExceptio logger.info("{}: Last checkpoint metadata file is written for version: {}", tablePath, version); logger.info("{}: Finished checkpoint for version: {}", tablePath, version); + + // Clean up delta log files if enabled. + Metadata metadata = snapshot.getMetadata(); + if (EXPIRED_LOG_CLEANUP_ENABLED.fromMetadata(engine, metadata)) { + cleanupExpiredLogs(engine, clock, tablePath, LOG_RETENTION.fromMetadata(engine, metadata)); + } else { + logger.info( + "{}: Log cleanup is disabled. Skipping the deletion of expired log files", tablePath); + } } //////////////////// diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/MetadataCleanupSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/MetadataCleanupSuite.scala new file mode 100644 index 00000000000..04afb67bd68 --- /dev/null +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/snapshot/MetadataCleanupSuite.scala @@ -0,0 +1,308 @@ +/* + * Copyright (2023) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.snapshot + +import io.delta.kernel.internal.snapshot.MetadataCleanup.cleanupExpiredLogs +import io.delta.kernel.internal.util.ManualClock +import io.delta.kernel.test.{MockFileSystemClientUtils, MockListFromDeleteFileSystemClient} +import io.delta.kernel.utils.FileStatus +import org.scalatest.funsuite.AnyFunSuite + +/** + * Test suite for the metadata cleanup logic in the Delta log directory. It mocks the + * `FileSystemClient` to test the cleanup logic for various combinations of delta files and + * checkpoint files. Utility methods in `MockFileSystemClientUtils` are used to generate the + * log file statuses which usually have modification time as the `version * 10`. + */ +class MetadataCleanupSuite extends AnyFunSuite with MockFileSystemClientUtils { + + import MetadataCleanupSuite._ + + /* ------------------- TESTS ------------------ */ + + // Simple case where the Delta log directory contains only delta files and no checkpoint files + Seq( + ( + "no files should be deleted even some of them are expired", + DeletedFileList(), // expected deleted files - none of them should be deleted + 70, // current time + 30 // retention period + ), + ( + "no files should be deleted as none of them are expired", + DeletedFileList(), // expected deleted files - none of them should be deleted + 200, // current time + 200 // retention period + ), + ( + "no files should be deleted as none of them are expired", + DeletedFileList(), // expected deleted files - none of them should be deleted + 200, // current time + 0 // retention period + ) + ).foreach { + case (testName, expectedDeletedFiles, currentTime, retentionPeriod) => + // _deltalog directory contents - contains only delta files + val logFiles = deltaFileStatuses(Seq(0, 1, 2, 3, 4, 5, 6)) + test(s"metadataCleanup: $testName: $currentTime, $retentionPeriod") { + cleanupAndVerify(logFiles, expectedDeletedFiles.fileList(), currentTime, retentionPeriod) + } + } + + // with various checkpoint types + Seq("classic", "multi-part", "v2", "hybrid").foreach { checkpointType => + // _deltalog directory contains a combination of delta files and checkpoint files + + val logFiles = deltaFileStatuses(Seq(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) ++ + (checkpointType match { + case "classic" => + singularCheckpointFileStatuses(Seq(3, 6, 9, 12)) + case "multi-part" => + multiCheckpointFileStatuses(Seq(3, 6, 9, 12), multiPartCheckpointPartsSize) + case "v2" => + v2CPFileStatuses(Seq[Long](3, 6, 9, 12)) + case "hybrid" => + singularCheckpointFileStatuses(Seq(3)) ++ + multiCheckpointFileStatuses(Seq(6), numParts = multiPartCheckpointPartsSize) ++ + v2CPFileStatuses(Seq[Long](9)) ++ + singularCheckpointFileStatuses(Seq(12)) + }) + + // test cases + Seq( + ( + "delete expired delta files up to the checkpoint version, " + + "not all expired delta files are deleted", + Seq(0L, 1L, 2L), // expDeletedDeltaVersions, + Seq(), // expDeletedCheckpointVersions, + 130, // current time + 80 // retention period + ), + ( + "expired delta files + expired checkpoint should be deleted", + Seq(0L, 1L, 2L, 3L, 4L, 5L), // expDeletedDeltaVersions, + Seq(3L), // expDeletedCheckpointVersions, + 130, // current time + 60 // retention period + ), + ( + "expired delta files + expired checkpoints should be deleted", + Seq(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L), // expDeletedDeltaVersions, + Seq(3L, 6L), // expDeletedCheckpointVersions, + 130, // current time + 40 // retention period + ), + ( + "all delta/checkpoint files should be except the last checkpoint file", + Seq(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L, 11L), // expDeletedDeltaVersions, + Seq(3L, 6L, 9L), // expDeletedCheckpointVersions, + 130, // current time + 0 // retention period + ), + ( + "no delta/checkpoint files should be deleted as none expired", + Seq(), // expDeletedDeltaVersions + Seq(), // expDeletedDeltaVersions + 200, // current time + 200 // retention period + ) + ).foreach { + case (testName, expDeletedDeltaVersions, expDeletedCheckpointVersions, + currentTime, retentionPeriod) => + + val expectedDeletedFiles = DeletedFileList( + deltaVersions = expDeletedDeltaVersions, + classicCheckpointVersions = checkpointType match { + case "classic" => expDeletedCheckpointVersions + case "hybrid" => expDeletedCheckpointVersions.filter(Seq(3, 12).contains(_)) + case _ => Seq.empty + }, + multipartCheckpointVersions = checkpointType match { + case "multi-part" => expDeletedCheckpointVersions + case "hybrid" => expDeletedCheckpointVersions.filter(_ == 6) + case _ => Seq.empty + }, + v2CheckpointVersions = checkpointType match { + case "v2" => expDeletedCheckpointVersions + case "hybrid" => expDeletedCheckpointVersions.filter(_ == 9) + case _ => Seq.empty + } + ) + + test(s"metadataCleanup: $checkpointType: $testName: $currentTime, $retentionPeriod") { + cleanupAndVerify(logFiles, expectedDeletedFiles.fileList(), currentTime, retentionPeriod) + } + } + } + + test("first log entry is a checkpoint") { + val logFiles = multiCheckpointFileStatuses(Seq(25), multiPartCheckpointPartsSize) ++ + singularCheckpointFileStatuses(Seq(29)) ++ + deltaFileStatuses(Seq(25, 26, 27, 28, 29, 30, 31, 32)) + + Seq( + ( + 330, // current time + 50, // retention period + DeletedFileList() // expected deleted files - none of them should be deleted + ), + ( + 330, // current time + 30, // retention period + DeletedFileList( + deltaVersions = Seq(25, 26, 27, 28), + multipartCheckpointVersions = Seq(25) + ) + ), + ( + 330, // current time + 10, // retention period + DeletedFileList( + deltaVersions = Seq(25, 26, 27, 28), + multipartCheckpointVersions = Seq(25) + ) + ) + ).foreach { + case (currentTime, retentionPeriod, expectedDeletedFiles) => + cleanupAndVerify(logFiles, expectedDeletedFiles.fileList(), currentTime, retentionPeriod) + } + } + + /* ------------------- NEGATIVE TESTS ------------------ */ + test("metadataCleanup: invalid retention period") { + val e = intercept[IllegalArgumentException] { + cleanupExpiredLogs( + mockEngine(mockFsClient(Seq.empty)), + new ManualClock(100), + logPath, + -1 /* retentionPeriod */ + ) + } + + assert(e.getMessage.contains("Retention period must be non-negative")) + } + + test("incomplete checkpoints should not be considered") { + val logFiles = deltaFileStatuses(Seq(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)) ++ + multiCheckpointFileStatuses(Seq(3), multiPartCheckpointPartsSize) + // delete the third part of the checkpoint + .filterNot(_.getPath.contains(s"%010d.%010d".format(2, 4))) ++ + multiCheckpointFileStatuses(Seq(6), multiPartCheckpointPartsSize) ++ + v2CPFileStatuses(Seq(9)) + + // test cases + Seq( + ( + Seq[Long](), // expDeletedDeltaVersions, + Seq[Long](), // expDeletedCheckpointVersions, + 130, // current time + 80 // retention period + ), + ( + Seq(0L, 1L, 2L, 3L, 4L, 5L), // expDeletedDeltaVersions, + Seq(3L), // expDeletedCheckpointVersions, + 130, // current time + 60 // retention period + ), + ( + Seq(0L, 1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L), // expDeletedDeltaVersions, + Seq(3L, 6L), // expDeletedCheckpointVersions, + 130, // current time + 20 // retention period + ) + ).foreach { + case (expDeletedDeltaVersions, expDeletedCheckpointVersions, + currentTime, retentionPeriod) => + + val expectedDeletedFiles = (deltaFileStatuses(expDeletedDeltaVersions) ++ + expDeletedCheckpointVersions.flatMap { + case v@3 => multiCheckpointFileStatuses(Seq(v), multiPartCheckpointPartsSize) + .filterNot(_.getPath.contains(s"%010d.%010d".format(2, 4))) + case v@6 => multiCheckpointFileStatuses(Seq(v), multiPartCheckpointPartsSize) + case v@9 => v2CPFileStatuses(Seq(v)) + }).map(_.getPath) + + cleanupAndVerify(logFiles, expectedDeletedFiles, currentTime, retentionPeriod) + } + } + + /* ------------------- HELPER UTILITIES/CONSTANTS ------------------ */ + /** + * Cleanup the metadata log files and verify the expected deleted files. + * + * @param logFiles List of log files in the _delta_log directory + * @param expectedDeletedFiles List of expected deleted file paths + * @param currentTimeMillis Current time in millis + * @param retentionPeriodMillis Retention period in millis + */ + def cleanupAndVerify( + logFiles: Seq[FileStatus], + expectedDeletedFiles: Seq[String], + currentTimeMillis: Long, + retentionPeriodMillis: Long): Unit = { + val fsClient = mockFsClient(logFiles) + val resultDeletedCount = cleanupExpiredLogs( + mockEngine(fsClient), + new ManualClock(currentTimeMillis), + logPath, + retentionPeriodMillis + ) + + assert(resultDeletedCount === expectedDeletedFiles.size) + assert(fsClient.getDeleteCalls.toSet === expectedDeletedFiles.toSet) + } +} + +object MetadataCleanupSuite extends MockFileSystemClientUtils { + /* ------------------- HELPER UTILITIES/CONSTANTS ------------------ */ + private val multiPartCheckpointPartsSize = 4 + + /** Case class containing the list of expected files in the deleted metadata log file list */ + case class DeletedFileList( + deltaVersions: Seq[Long] = Seq.empty, + classicCheckpointVersions: Seq[Long] = Seq.empty, + multipartCheckpointVersions: Seq[Long] = Seq.empty, + v2CheckpointVersions: Seq[Long] = Seq.empty) { + + def fileList(): Seq[String] = { + (deltaFileStatuses(deltaVersions) ++ + singularCheckpointFileStatuses(classicCheckpointVersions) ++ + multiCheckpointFileStatuses(multipartCheckpointVersions, multiPartCheckpointPartsSize) ++ + v2CPFileStatuses(v2CheckpointVersions) + ).sortBy(_.getPath).map(_.getPath) + } + } + + def mockFsClient(logFiles: Seq[FileStatus]): MockListFromDeleteFileSystemClient = { + new MockListFromDeleteFileSystemClient(logFiles) + } + + def v2CPFileStatuses(versions: Seq[Long]): Seq[FileStatus] = { + // Replace the UUID with a standard UUID to make the test deterministic + val standardUUID = "123e4567-e89b-12d3-a456-426614174000" + val uuidPattern = + "[0-9a-fA-F]{8}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{4}-[0-9a-fA-F]{12}".r + + v2CheckpointFileStatuses( + versions.map(v => (v, true, 20)), // to (version, useUUID, numSidecars) + "parquet" + ).map(_._1) + .map(f => FileStatus.of( + uuidPattern.replaceAllIn(f.getPath, standardUUID), + f.getSize, + f.getModificationTime)) + } +} diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala index 671272bdabb..c7eb9caf7f9 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockEngineUtils.scala @@ -159,4 +159,7 @@ trait BaseMockFileSystemClient extends FileSystemClient { override def mkdirs(path: String): Boolean = throw new UnsupportedOperationException("not supported in this test suite") + + override def delete(path: String): Boolean = + throw new UnsupportedOperationException("not supported in this test suite") } diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockFileSystemClientUtils.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockFileSystemClientUtils.scala index c3bbda3b117..0e8ed2c56b9 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockFileSystemClientUtils.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/test/MockFileSystemClientUtils.scala @@ -161,3 +161,28 @@ class MockListFromResolvePathFileSystemClient(listFromProvider: String => Seq[Fi def getListFromCalls: Seq[String] = listFromCalls } + +/** + * A mock [[FileSystemClient]] that answers `listFrom` call from the given list of file statuses + * and tracks the delete calls. + * @param listContents List of file statuses to be returned by `listFrom` call. + */ +class MockListFromDeleteFileSystemClient(listContents: Seq[FileStatus]) + extends BaseMockFileSystemClient { + private val listOfFiles: Seq[String] = listContents.map(_.getPath).toSeq + private var isListFromAlreadyCalled = false + private var deleteCalls: Seq[String] = Seq.empty + + override def listFrom(filePath: String): CloseableIterator[FileStatus] = { + assert(!isListFromAlreadyCalled, "listFrom should be called only once") + isListFromAlreadyCalled = true + toCloseableIterator(listContents.sortBy(_.getPath).asJava.iterator()) + } + + override def delete(path: String): Boolean = { + deleteCalls = deleteCalls :+ path + listOfFiles.contains(path) + } + + def getDeleteCalls: Seq[String] = deleteCalls +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java index e80923554a1..c226e61a5ad 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/DefaultFileSystemClient.java @@ -99,6 +99,13 @@ public boolean mkdirs(String path) throws IOException { return fs.mkdirs(pathObject); } + @Override + public boolean delete(String path) throws IOException { + Path pathObject = new Path(path); + FileSystem fs = pathObject.getFileSystem(hadoopConf); + return fs.delete(pathObject, false); + } + private ByteArrayInputStream getStream(String filePath, int offset, int size) { Path path = new Path(filePath); try {