Skip to content

Commit

Permalink
Improve DV path canonicalization
Browse files Browse the repository at this point in the history
## Description

This PR improves the FILE_PATH canonicalization logic by avoiding calling expensive `Path.toUri.toString` calls for each row in a table. Canonicalized paths are now cached and the UDF just needs to look it up.

Future improvement is possible for handling huge logs: build `canonicalizedPathMap` in a distributed way.

Related PR target the 2.4 branch: #1829.

Existing tests.

Closes #1836

Signed-off-by: Paddy Xu <xupaddy@gmail.com>
GitOrigin-RevId: c4810852f9136c36ec21f3519620ca26ed12bb04
  • Loading branch information
xupefei authored and vkorukanti committed Jun 20, 2023
1 parent 6583b2e commit fc39f78
Showing 1 changed file with 21 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path

import org.apache.spark.paths.SparkPath
import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession}
import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceMetadataAttribute}
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
Expand Down Expand Up @@ -341,11 +342,16 @@ object DeletionVectorBitmapGenerator {
: Seq[DeletionVectorResult] = {
// TODO: fix this to work regardless of whether Spark encodes or doesn't encode
// _metadata.file_path. See https://github.com/delta-io/delta/issues/1725
val uriEncode = DeltaUDF.stringFromString(path => {
new Path(path).toUri.toString
})
// Build two maps, using Path or String as keys. The one with String keys is used in UDF.
val canonicalizedPathMap = buildCanonicalizedPathMap(txn.deltaLog, candidateFiles)
val canonicalizedPathStringMap =
canonicalizedPathMap.map { case (k, v) => k.toString -> v }
val broadcastCanonicalizedPathStringMap =
sparkSession.sparkContext.broadcast(canonicalizedPathStringMap)

val lookupPathUdf = DeltaUDF.stringFromString(broadcastCanonicalizedPathStringMap.value(_))
val matchedRowsDf = targetDf
.withColumn(FILE_NAME_COL, uriEncode(col(s"${METADATA_NAME}.${FILE_PATH}")))
.withColumn(FILE_NAME_COL, lookupPathUdf(col(s"${METADATA_NAME}.${FILE_PATH}")))
// Filter after getting input file name as the filter might introduce a join and we
// cannot get input file name on join's output.
.filter(new Column(condition))
Expand All @@ -358,7 +364,7 @@ object DeletionVectorBitmapGenerator {
val filePathToDV = candidateFiles.map { add =>
val serializedDV = Option(add.deletionVector).map(dvd => JsonUtils.toJson(dvd))
// Paths in the metadata column are canonicalized. Thus we must canonicalize the DV path.
FileToDvDescriptor(absolutePath(basePath, add.path).toUri.toString, serializedDV)
FileToDvDescriptor(canonicalizedPathMap(absolutePath(basePath, add.path)), serializedDV)
}
val filePathToDVDf = sparkSession.createDataset(filePathToDV)

Expand All @@ -379,6 +385,16 @@ object DeletionVectorBitmapGenerator {

DeletionVectorBitmapGenerator.buildDeletionVectors(sparkSession, df, txn.deltaLog, txn)
}

private def buildCanonicalizedPathMap(
log: DeltaLog,
addFiles: Seq[AddFile]): Map[Path, String] = {
val basePath = log.dataPath.toString
addFiles.map { add =>
val absPath = absolutePath(basePath, add.path)
absPath -> SparkPath.fromPath(absPath).urlEncoded
}.toMap
}
}

/**
Expand Down

0 comments on commit fc39f78

Please # to comment.