Skip to content

Commit

Permalink
[Spark] Fix resource leaking when a deletion vector file is not found
Browse files Browse the repository at this point in the history
The `DeltaParquetFileFormat` adds additional glue for filtering the deleting rows (according to the deletion vector) from the iterator returned by the `ParquetFileFormat`. In case when the DV file is not found, the iterator returned from `ParquetFileFormat` should be close.

An integration test simulating the DV file deletion and verifying no resource leak.

Closes #2113

Signed-off-by: Venki Korukanti <venki.korukanti@databricks.com>
GitOrigin-RevId: d378b495630da31ff2af062dd9124874fdb69e12
  • Loading branch information
vkorukanti committed Oct 2, 2023
1 parent 9e3d4c2 commit 2d92266
Show file tree
Hide file tree
Showing 4 changed files with 62 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -164,14 +164,26 @@ case class DeltaParquetFileFormat(
val useOffHeapBuffers = sparkSession.sessionState.conf.offHeapColumnVectorEnabled
(partitionedFile: PartitionedFile) => {
val rowIteratorFromParquet = parquetDataReader(partitionedFile)
val iterToReturn =
iteratorWithAdditionalMetadataColumns(
partitionedFile,
rowIteratorFromParquet,
isRowDeletedColumn,
useOffHeapBuffers = useOffHeapBuffers,
rowIndexColumn = rowIndexColumn)
iterToReturn.asInstanceOf[Iterator[InternalRow]]
try {
val iterToReturn =
iteratorWithAdditionalMetadataColumns(
partitionedFile,
rowIteratorFromParquet,
isRowDeletedColumn,
useOffHeapBuffers = useOffHeapBuffers,
rowIndexColumn = rowIndexColumn)
iterToReturn.asInstanceOf[Iterator[InternalRow]]
} catch {
case NonFatal(e) =>
// Close the iterator if it is a closeable resource. The `ParquetFileFormat` opens
// the file and returns `RecordReaderIterator` (which implements `AutoCloseable` and
// `Iterator`) instance as a `Iterator`.
rowIteratorFromParquet match {
case resource: AutoCloseable => closeQuietly(resource)
case _ => // do nothing
}
throw e
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.spark.sql.delta.deletionvectors.{RoaringBitmapArray, RoaringBi
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.storage.dv.DeletionVectorStore
import org.apache.spark.sql.delta.util.PathWithFileSystem
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.sql.{DataFrame, QueryTest, RuntimeConfig, SparkSession}
Expand Down Expand Up @@ -317,6 +318,13 @@ trait DeletionVectorsTestUtils extends QueryTest with SharedSparkSession {
)
}

/** Delete the DV file in the given [[AddFile]]. Assumes the [[AddFile]] has a valid DV. */
protected def deleteDVFile(tablePath: String, addFile: AddFile): Unit = {
assert(addFile.deletionVector != null)
val dvPath = addFile.deletionVector.absolutePath(new Path(tablePath))
FileUtils.delete(new File(dvPath.toString))
}

/**
* Creates a [[DeletionVectorDescriptor]] from an [[RoaringBitmapArray]]
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.JavaConverters._
import scala.collection.concurrent
import scala.reflect.ClassTag
import scala.util.matching.Regex

import org.apache.spark.sql.delta.DeltaTestUtils.Plans
Expand Down Expand Up @@ -225,6 +226,16 @@ trait DeltaTestUtilsBase {
protected def errorContains(errMsg: String, str: String): Unit = {
assert(errMsg.toLowerCase(Locale.ROOT).contains(str.toLowerCase(Locale.ROOT)))
}

/** Utility method to check exception `e` is of type `E` or a cause of it is of type `E` */
def findIfResponsible[E <: Throwable: ClassTag](e: Throwable): Option[E] = e match {
case culprit: E => Some(culprit)
case _ =>
val children = Option(e.getCause).iterator ++ e.getSuppressed.iterator
children
.map(findIfResponsible[E](_))
.collectFirst { case Some(culprit) => culprit }
}
}

trait DeltaCheckpointTestUtils
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package org.apache.spark.sql.delta.deletionvectors

import java.io.File
import java.io.{File, FileNotFoundException}

import org.apache.spark.sql.delta.{DeletionVectorsTableFeature, DeletionVectorsTestUtils, DeltaChecksumException, DeltaConfigs, DeltaLog, DeltaMetricsUtils, DeltaTestUtilsForTempViews}
import org.apache.spark.sql.delta.DeltaTestUtils.{createTestAddFile, BOOLEAN_DOMAIN}
Expand All @@ -32,6 +32,7 @@ import io.delta.tables.DeltaTable
import org.apache.commons.io.FileUtils
import org.apache.hadoop.fs.Path

import org.apache.spark.SparkException
import org.apache.spark.sql.{DataFrame, QueryTest, Row}
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, Subquery}
import org.apache.spark.sql.functions.col
Expand Down Expand Up @@ -663,6 +664,27 @@ class DeletionVectorsSuite extends QueryTest
}
}

test("Check no resource leak when DV files are missing (table corrupted)") {
withTempDir { tempDir =>
val source = new File(table2Path)
val target = new File(tempDir, "resourceLeakTest")
val targetPath = target.getAbsolutePath

// Copy the source DV table to a temporary directory
FileUtils.copyDirectory(source, target)

val filesWithDvs = getFilesWithDeletionVectors(DeltaLog.forTable(spark, target))
assert(filesWithDvs.size > 0)
deleteDVFile(targetPath, filesWithDvs(0))

val se = intercept[SparkException] {
spark.sql(s"SELECT * FROM delta.`$targetPath`").collect()
}
assert(findIfResponsible[FileNotFoundException](se).nonEmpty,
s"Expected a file not found exception as the cause, but got: [${se}]")
}
}

private sealed case class DeleteUsingDVWithResults(
scale: String,
sqlRule: String,
Expand Down

0 comments on commit 2d92266

Please # to comment.