Skip to content

Commit

Permalink
make Delta checkpoint and state reconstruction visible in Spark UI
Browse files Browse the repository at this point in the history
Adding "withNewExecutionId" to state cache and checkpoint logic so those jobs are visible in Spark/notebook UI.

GitOrigin-RevId: 9b606b57e1158b02e923a8b586f00fdcd52c15df
  • Loading branch information
lzlfred authored and allisonport-db committed Nov 28, 2022
1 parent 0c349da commit 38f146b
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 13 deletions.
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskType}
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
import org.apache.spark.sql.functions.{coalesce, col, struct, when}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -556,8 +557,9 @@ object Checkpoints extends DeltaLogging {
new SerializableConfiguration(job.getConfiguration))
}

val finalCheckpointFiles = chk
.queryExecution // This is a hack to get spark to write directly to a file.
// This is a hack to get spark to write directly to a file.
val qe = chk.queryExecution
def executeFinalCheckpointFiles(): Array[SerializableFileStatus] = qe
.executedPlan
.execute()
.mapPartitionsWithIndex { case (index, iter) =>
Expand Down Expand Up @@ -626,6 +628,10 @@ object Checkpoints extends DeltaLogging {
Iterator(SerializableFileStatus.fromStatus(finalPathFileStatus))
}.collect()

val finalCheckpointFiles = SQLExecution.withNewExecutionId(qe, Some("Delta checkpoint")) {
executeFinalCheckpointFiles()
}

val checkpointSizeInBytes = finalCheckpointFiles.map(_.length).sum
if (numOfFiles.value != snapshot.numOfFiles) {
throw DeltaErrors.checkpointMismatchWithSnapshot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf

import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.execution.LogicalRDD
import org.apache.spark.sql.execution.{LogicalRDD, SQLExecution}
import org.apache.spark.storage.StorageLevel

/**
Expand Down Expand Up @@ -54,19 +54,19 @@ trait StateCache extends DeltaLogging {
// single-session scenarios to avoid the overhead of `Dataset` creation which can take 100ms.
private val cachedDs = cached.synchronized {
if (isCached) {
val rdd = recordFrameProfile("Delta", "CachedDS.toRdd") {
ds.queryExecution.toRdd.map(_.copy())
val qe = ds.queryExecution
val rdd = SQLExecution.withNewExecutionId(qe, Some(s"Cache $name")) {
val rdd = recordFrameProfile("Delta", "CachedDS.toRdd") {
// toRdd should always trigger execution
qe.toRdd.map(_.copy())
}
rdd.setName(name)
rdd.persist(storageLevel)
}
rdd.setName(name)
rdd.persist(storageLevel)
cached += rdd
val dsCache = new DatasetRefCache(() => {
Dataset.ofRows(
spark,
LogicalRDD(
ds.queryExecution.analyzed.output,
rdd)(
spark))
val logicalRdd = LogicalRDD(qe.analyzed.output, rdd)(spark)
Dataset.ofRows(spark, logicalRdd)
})
Some(dsCache)
} else {
Expand Down

0 comments on commit 38f146b

Please # to comment.