From 2d922660d6fbb8b65bc5d20b276969eedde74c8d Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Fri, 29 Sep 2023 11:16:17 -0700 Subject: [PATCH] [Spark] Fix resource leaking when a deletion vector file is not found The `DeltaParquetFileFormat` adds additional glue for filtering the deleting rows (according to the deletion vector) from the iterator returned by the `ParquetFileFormat`. In case when the DV file is not found, the iterator returned from `ParquetFileFormat` should be close. An integration test simulating the DV file deletion and verifying no resource leak. Closes delta-io/delta#2113 Signed-off-by: Venki Korukanti GitOrigin-RevId: d378b495630da31ff2af062dd9124874fdb69e12 --- .../sql/delta/DeltaParquetFileFormat.scala | 28 +++++++++++++------ .../sql/delta/DeletionVectorsTestUtils.scala | 8 ++++++ .../spark/sql/delta/DeltaTestUtils.scala | 11 ++++++++ .../DeletionVectorsSuite.scala | 24 +++++++++++++++- 4 files changed, 62 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala index f1c3c61643d..f01aa031ff2 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/DeltaParquetFileFormat.scala @@ -164,14 +164,26 @@ case class DeltaParquetFileFormat( val useOffHeapBuffers = sparkSession.sessionState.conf.offHeapColumnVectorEnabled (partitionedFile: PartitionedFile) => { val rowIteratorFromParquet = parquetDataReader(partitionedFile) - val iterToReturn = - iteratorWithAdditionalMetadataColumns( - partitionedFile, - rowIteratorFromParquet, - isRowDeletedColumn, - useOffHeapBuffers = useOffHeapBuffers, - rowIndexColumn = rowIndexColumn) - iterToReturn.asInstanceOf[Iterator[InternalRow]] + try { + val iterToReturn = + iteratorWithAdditionalMetadataColumns( + partitionedFile, + rowIteratorFromParquet, + isRowDeletedColumn, + useOffHeapBuffers = useOffHeapBuffers, + rowIndexColumn = rowIndexColumn) + iterToReturn.asInstanceOf[Iterator[InternalRow]] + } catch { + case NonFatal(e) => + // Close the iterator if it is a closeable resource. The `ParquetFileFormat` opens + // the file and returns `RecordReaderIterator` (which implements `AutoCloseable` and + // `Iterator`) instance as a `Iterator`. + rowIteratorFromParquet match { + case resource: AutoCloseable => closeQuietly(resource) + case _ => // do nothing + } + throw e + } } } diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala index ba01c377289..ad19165ce45 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeletionVectorsTestUtils.scala @@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBi import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore import org.apache.spark.sql.delta.util.PathWithFileSystem +import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path import org.apache.spark.sql.{DataFrame, QueryTest, RuntimeConfig, SparkSession} @@ -317,6 +318,13 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession { ) } + /** Delete the DV file in the given [[AddFile]]. Assumes the [[AddFile]] has a valid DV. */ + protected def deleteDVFile(tablePath: String, addFile: AddFile): Unit = { + assert(addFile.deletionVector != null) + val dvPath = addFile.deletionVector.absolutePath(new Path(tablePath)) + FileUtils.delete(new File(dvPath.toString)) + } + /** * Creates a [[DeletionVectorDescriptor]] from an [[RoaringBitmapArray]] */ diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala index d7be7f58fed..11b82d722ce 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/DeltaTestUtils.scala @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.JavaConverters._ import scala.collection.concurrent +import scala.reflect.ClassTag import scala.util.matching.Regex import org.apache.spark.sql.delta.DeltaTestUtils.Plans @@ -225,6 +226,16 @@ trait DeltaTestUtilsBase { protected def errorContains(errMsg: String, str: String): Unit = { assert(errMsg.toLowerCase(Locale.ROOT).contains(str.toLowerCase(Locale.ROOT))) } + + /** Utility method to check exception `e` is of type `E` or a cause of it is of type `E` */ + def findIfResponsible[E <: Throwable: ClassTag](e: Throwable): Option[E] = e match { + case culprit: E => Some(culprit) + case _ => + val children = Option(e.getCause).iterator ++ e.getSuppressed.iterator + children + .map(findIfResponsible[E](_)) + .collectFirst { case Some(culprit) => culprit } + } } trait DeltaCheckpointTestUtils diff --git a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala index 918f156bd94..9c9945cb200 100644 --- a/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala +++ b/spark/src/test/scala/org/apache/spark/sql/delta/deletionvectors/DeletionVectorsSuite.scala @@ -16,7 +16,7 @@ package org.apache.spark.sql.delta.deletionvectors -import java.io.File +import java.io.{File, FileNotFoundException} import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeletionVectorsTestUtils, DeltaChecksumException, DeltaConfigs, DeltaLog, DeltaMetricsUtils, DeltaTestUtilsForTempViews} import org.apache.spark.sql.delta.DeltaTestUtils.{createTestAddFile, BOOLEAN_DOMAIN} @@ -32,6 +32,7 @@ import io.delta.tables.DeltaTable import org.apache.commons.io.FileUtils import org.apache.hadoop.fs.Path +import org.apache.spark.SparkException import org.apache.spark.sql.{DataFrame, QueryTest, Row} import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Subquery} import org.apache.spark.sql.functions.col @@ -663,6 +664,27 @@ class DeletionVectorsSuite extends QueryTest } } + test("Check no resource leak when DV files are missing (table corrupted)") { + withTempDir { tempDir => + val source = new File(table2Path) + val target = new File(tempDir, "resourceLeakTest") + val targetPath = target.getAbsolutePath + + // Copy the source DV table to a temporary directory + FileUtils.copyDirectory(source, target) + + val filesWithDvs = getFilesWithDeletionVectors(DeltaLog.forTable(spark, target)) + assert(filesWithDvs.size > 0) + deleteDVFile(targetPath, filesWithDvs(0)) + + val se = intercept[SparkException] { + spark.sql(s"SELECT * FROM delta.`$targetPath`").collect() + } + assert(findIfResponsible[FileNotFoundException](se).nonEmpty, + s"Expected a file not found exception as the cause, but got: [${se}]") + } + } + private sealed case class DeleteUsingDVWithResults( scale: String, sqlRule: String,