From 38f146b3e9e319cb6817eb0cccf367f3ac2dc370 Mon Sep 17 00:00:00 2001 From: lzlfred Date: Tue, 15 Nov 2022 21:55:25 -0800 Subject: [PATCH] make Delta checkpoint and state reconstruction visible in Spark UI Adding "withNewExecutionId" to state cache and checkpoint logic so those jobs are visible in Spark/notebook UI. GitOrigin-RevId: 9b606b57e1158b02e923a8b586f00fdcd52c15df --- .../apache/spark/sql/delta/Checkpoints.scala | 10 +++++++-- .../spark/sql/delta/util/StateCache.scala | 22 +++++++++---------- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala index 8ea47279722..13bcf58df59 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/Checkpoints.scala @@ -47,6 +47,7 @@ import org.apache.hadoop.mapreduce.{Job, TaskType} import org.apache.spark.sql.{Column, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute import org.apache.spark.sql.catalyst.expressions.{Cast, ElementAt, Literal} +import org.apache.spark.sql.execution.SQLExecution import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat import org.apache.spark.sql.functions.{coalesce, col, struct, when} import org.apache.spark.sql.internal.SQLConf @@ -556,8 +557,9 @@ object Checkpoints extends DeltaLogging { new SerializableConfiguration(job.getConfiguration)) } - val finalCheckpointFiles = chk - .queryExecution // This is a hack to get spark to write directly to a file. + // This is a hack to get spark to write directly to a file. + val qe = chk.queryExecution + def executeFinalCheckpointFiles(): Array[SerializableFileStatus] = qe .executedPlan .execute() .mapPartitionsWithIndex { case (index, iter) => @@ -626,6 +628,10 @@ object Checkpoints extends DeltaLogging { Iterator(SerializableFileStatus.fromStatus(finalPathFileStatus)) }.collect() + val finalCheckpointFiles = SQLExecution.withNewExecutionId(qe, Some("Delta checkpoint")) { + executeFinalCheckpointFiles() + } + val checkpointSizeInBytes = finalCheckpointFiles.map(_.length).sum if (numOfFiles.value != snapshot.numOfFiles) { throw DeltaErrors.checkpointMismatchWithSnapshot diff --git a/core/src/main/scala/org/apache/spark/sql/delta/util/StateCache.scala b/core/src/main/scala/org/apache/spark/sql/delta/util/StateCache.scala index 2c0ab3666f7..391cf56710d 100644 --- a/core/src/main/scala/org/apache/spark/sql/delta/util/StateCache.scala +++ b/core/src/main/scala/org/apache/spark/sql/delta/util/StateCache.scala @@ -24,7 +24,7 @@ import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.rdd.RDD import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} -import org.apache.spark.sql.execution.LogicalRDD +import org.apache.spark.sql.execution.{LogicalRDD, SQLExecution} import org.apache.spark.storage.StorageLevel /** @@ -54,19 +54,19 @@ trait StateCache extends DeltaLogging { // single-session scenarios to avoid the overhead of `Dataset` creation which can take 100ms. private val cachedDs = cached.synchronized { if (isCached) { - val rdd = recordFrameProfile("Delta", "CachedDS.toRdd") { - ds.queryExecution.toRdd.map(_.copy()) + val qe = ds.queryExecution + val rdd = SQLExecution.withNewExecutionId(qe, Some(s"Cache $name")) { + val rdd = recordFrameProfile("Delta", "CachedDS.toRdd") { + // toRdd should always trigger execution + qe.toRdd.map(_.copy()) + } + rdd.setName(name) + rdd.persist(storageLevel) } - rdd.setName(name) - rdd.persist(storageLevel) cached += rdd val dsCache = new DatasetRefCache(() => { - Dataset.ofRows( - spark, - LogicalRDD( - ds.queryExecution.analyzed.output, - rdd)( - spark)) + val logicalRdd = LogicalRDD(qe.analyzed.output, rdd)(spark) + Dataset.ofRows(spark, logicalRdd) }) Some(dsCache) } else {