diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteWithDeletionVectorsHelper.scala b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteWithDeletionVectorsHelper.scala index cb3af0846ad..1ae919a4b0e 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteWithDeletionVectorsHelper.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/commands/DeleteWithDeletionVectorsHelper.scala @@ -35,6 +35,7 @@ import org.apache.spark.sql.delta.util.DeltaFileOperations.absolutePath import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path +import org.apache.spark.paths.SparkPath import org.apache.spark.sql.{Column, DataFrame, Dataset, Encoder, SparkSession} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, Expression, FileSourceMetadataAttribute} import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project} @@ -341,11 +342,16 @@ object DeletionVectorBitmapGenerator { : Seq[DeletionVectorResult] = { // TODO: fix this to work regardless of whether Spark encodes or doesn't encode // _metadata.file_path. See https://github.com/delta-io/delta/issues/1725 - val uriEncode = DeltaUDF.stringFromString(path => { - new Path(path).toUri.toString - }) + // Build two maps, using Path or String as keys. The one with String keys is used in UDF. + val canonicalizedPathMap = buildCanonicalizedPathMap(txn.deltaLog, candidateFiles) + val canonicalizedPathStringMap = + canonicalizedPathMap.map { case (k, v) => k.toString -> v } + val broadcastCanonicalizedPathStringMap = + sparkSession.sparkContext.broadcast(canonicalizedPathStringMap) + + val lookupPathUdf = DeltaUDF.stringFromString(broadcastCanonicalizedPathStringMap.value(_)) val matchedRowsDf = targetDf - .withColumn(FILE_NAME_COL, uriEncode(col(s"${METADATA_NAME}.${FILE_PATH}"))) + .withColumn(FILE_NAME_COL, lookupPathUdf(col(s"${METADATA_NAME}.${FILE_PATH}"))) // Filter after getting input file name as the filter might introduce a join and we // cannot get input file name on join's output. .filter(new Column(condition)) @@ -358,7 +364,7 @@ object DeletionVectorBitmapGenerator { val filePathToDV = candidateFiles.map { add => val serializedDV = Option(add.deletionVector).map(dvd => JsonUtils.toJson(dvd)) // Paths in the metadata column are canonicalized. Thus we must canonicalize the DV path. - FileToDvDescriptor(absolutePath(basePath, add.path).toUri.toString, serializedDV) + FileToDvDescriptor(canonicalizedPathMap(absolutePath(basePath, add.path)), serializedDV) } val filePathToDVDf = sparkSession.createDataset(filePathToDV) @@ -379,6 +385,16 @@ object DeletionVectorBitmapGenerator { DeletionVectorBitmapGenerator.buildDeletionVectors(sparkSession, df, txn.deltaLog, txn) } + + private def buildCanonicalizedPathMap( + log: DeltaLog, + addFiles: Seq[AddFile]): Map[Path, String] = { + val basePath = log.dataPath.toString + addFiles.map { add => + val absPath = absolutePath(basePath, add.path) + absPath -> SparkPath.fromPath(absPath).urlEncoded + }.toMap + } } /**