Skip to content

Commit

Permalink
Catch errors when last_checkpoint write fails
Browse files Browse the repository at this point in the history
Catch exceptions that are thrown when writing the _last_checkpoint file. These are already handled for writing the checkpoints themselves, so extend the same logging/catching logic

Added unit test

GitOrigin-RevId: 95202c7c27c6c7875ae8e78a4261f7d236afe038
  • Loading branch information
husseinnagr-db authored and scottsand-db committed Sep 15, 2022
1 parent 1e8a1d2 commit e5a7cd0
Showing 1 changed file with 33 additions and 27 deletions.
60 changes: 33 additions & 27 deletions core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -304,55 +304,61 @@ trait Checkpoints extends DeltaLogging {
*/
def checkpoint(): Unit = checkpoint(snapshot)

/**
* Catch non-fatal exceptions related to checkpointing, since the checkpoint is written
* after the commit has completed. From the perspective of the user, the commit has
* completed successfully. However, throw if this is in a testing environment -
* that way any breaking changes can be caught in unit tests.
*/
protected def withCheckpointExceptionHandling(
deltaLog: DeltaLog, opType: String)(thunk: => Unit): Unit = {
try {
thunk
} catch {
case NonFatal(e) =>
recordDeltaEvent(
deltaLog,
opType,
data = Map("exception" -> e.getMessage(), "stackTrace" -> e.getStackTrace())
)
logWarning(s"Error when writing checkpoint-related files", e)
val throwError = Utils.isTesting ||
spark.sessionState.conf.getConf(DeltaSQLConf.DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED)
if (throwError) throw e
}
}

/**
* Creates a checkpoint using snapshotToCheckpoint. By default it uses the current log version.
* Note that this function captures and logs all exceptions, since the checkpoint shouldn't fail
* the overall commit operation.
*/
def checkpoint(snapshotToCheckpoint: Snapshot): Unit = recordDeltaOperation(
this, "delta.checkpoint") {
try {
withCheckpointExceptionHandling(snapshotToCheckpoint.deltaLog, "delta.checkpoint.sync.error") {
if (snapshotToCheckpoint.version < 0) {
throw DeltaErrors.checkpointNonExistTable(dataPath)
}
checkpointAndCleanUpDeltaLog(snapshotToCheckpoint)
} catch {
// Catch all non-fatal exceptions, since the checkpoint is written after the commit
// has completed. From the perspective of the user, the commit completed successfully.
// However, throw if this is in a testing environment - that way any breaking changes
// can be caught in unit tests.
case NonFatal(e) =>
recordDeltaEvent(
snapshotToCheckpoint.deltaLog,
"delta.checkpoint.sync.error",
data = Map(
"exception" -> e.getMessage(),
"stackTrace" -> e.getStackTrace()
)
)
logWarning(s"Error when writing checkpoint synchronously", e)
val throwError = Utils.isTesting ||
spark.sessionState.conf.getConf(
DeltaSQLConf.DELTA_CHECKPOINT_THROW_EXCEPTION_WHEN_FAILED)
if (throwError) {
throw e
}
}
}

protected def checkpointAndCleanUpDeltaLog(
snapshotToCheckpoint: Snapshot): Unit = {
val checkpointMetaData = writeCheckpointFiles(snapshotToCheckpoint)
writeLastCheckpointFile(checkpointMetaData, CheckpointMetaData.checksumEnabled(spark))
writeLastCheckpointFile(
snapshotToCheckpoint.deltaLog, checkpointMetaData, CheckpointMetaData.checksumEnabled(spark))
doLogCleanup(snapshotToCheckpoint)
}

protected def writeLastCheckpointFile(
protected[delta] def writeLastCheckpointFile(
deltaLog: DeltaLog,
checkpointMetaData: CheckpointMetaData,
addChecksum: Boolean): Unit = {
val json = CheckpointMetaData.serializeToJson(checkpointMetaData, addChecksum)
store.write(
LAST_CHECKPOINT, Iterator(json), overwrite = true, newDeltaHadoopConf())
withCheckpointExceptionHandling(deltaLog, "delta.lastCheckpoint.write.error") {
val json = CheckpointMetaData.serializeToJson(checkpointMetaData, addChecksum)
store.write(LAST_CHECKPOINT, Iterator(json), overwrite = true, newDeltaHadoopConf())
}
}

protected def writeCheckpointFiles(
Expand Down

0 comments on commit e5a7cd0

Please # to comment.