Skip to content

Commit

Permalink
Support partition directory starting with underscore and dot
Browse files Browse the repository at this point in the history
This PR enables "CONVERT TO DELTA" to support partition column name starting with underscore and dot for completeness.

Unit test is added to reproduce the gap and fixed by code chang in this PR

GitOrigin-RevId: 168a84d946ddb2d3575070fb3152b0e492946b0b
  • Loading branch information
mingdai-db authored and allisonport-db committed Sep 6, 2022
1 parent ce466a7 commit 3e8d2d1
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,10 @@ class ManualListingFileManifest(

protected def doList(): Dataset[SerializableFileStatus] = {
val conf = spark.sparkContext.broadcast(serializableConf)
DeltaFileOperations.recursiveListDirs(spark, Seq(basePath), conf).where("!isDir")
DeltaFileOperations
.recursiveListDirs(
spark, Seq(basePath), conf, hiddenDirNameFilter = ConvertToDeltaCommand.hiddenDirNameFilter)
.where("!isDir")
}

private lazy val list: Dataset[SerializableFileStatus] = {
Expand Down Expand Up @@ -814,4 +817,9 @@ object ConvertToDeltaCommand {

AddFile(pathStrForAddFile, partition, file.length, file.modificationTime, dataChange = true)
}

def hiddenDirNameFilter(fileName: String): Boolean = {
// Allow partition column name starting with underscore and dot
DeltaFileOperations.defaultHiddenFileFilter(fileName) && !fileName.contains("=")
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ object VacuumCommand extends VacuumCommandImpl with Serializable {
spark,
Seq(basePath),
hadoopConf,
hiddenDirNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
hiddenFileNameFilter = DeltaTableUtils.isHiddenDirectory(partitionColumns, _),
fileListingParallelism = Option(parallelism)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,17 +138,19 @@ object DeltaFileOperations extends DeltaLogging {
hadoopConf: Configuration,
subDirs: Iterator[String],
recurse: Boolean,
hiddenDirNameFilter: String => Boolean,
hiddenFileNameFilter: String => Boolean,
listAsDirectories: Boolean = true): Iterator[SerializableFileStatus] = {

def list(dir: String, tries: Int): Iterator[SerializableFileStatus] = {
logInfo(s"Listing $dir")
try {

val path = if (listAsDirectories) new Path(dir, "\u0000") else new Path(dir + "\u0000")
logStore.listFrom(path, hadoopConf)
.filterNot(f => hiddenFileNameFilter(f.getPath.getName))
.map(SerializableFileStatus.fromStatus)
.filterNot{ f =>
val name = f.getPath.getName
if (f.isDirectory) hiddenDirNameFilter(name) else hiddenFileNameFilter(name)
}.map(SerializableFileStatus.fromStatus)
} catch {
case NonFatal(e) if isThrottlingError(e) && tries > 0 =>
randomBackoff("listing", e)
Expand All @@ -164,7 +166,8 @@ object DeltaFileOperations extends DeltaLogging {
}

if (recurse) {
recurseDirectories(logStore, hadoopConf, filesAndDirs, hiddenFileNameFilter)
recurseDirectories(
logStore, hadoopConf, filesAndDirs, hiddenDirNameFilter, hiddenFileNameFilter)
} else {
filesAndDirs
}
Expand All @@ -175,11 +178,18 @@ object DeltaFileOperations extends DeltaLogging {
logStore: LogStore,
hadoopConf: Configuration,
filesAndDirs: Iterator[SerializableFileStatus],
hiddenDirNameFilter: String => Boolean,
hiddenFileNameFilter: String => Boolean): Iterator[SerializableFileStatus] = {
filesAndDirs.flatMap {
case dir: SerializableFileStatus if dir.isDir =>
Iterator.single(dir) ++ listUsingLogStore(
logStore, hadoopConf, Iterator.single(dir.path), recurse = true, hiddenFileNameFilter)
Iterator.single(dir) ++
listUsingLogStore(
logStore,
hadoopConf,
Iterator.single(dir.path),
recurse = true,
hiddenDirNameFilter,
hiddenFileNameFilter)
case file =>
Iterator.single(file)
}
Expand All @@ -200,6 +210,9 @@ object DeltaFileOperations extends DeltaLogging {
* @param spark The SparkSession
* @param subDirs Absolute path of the subdirectories to list
* @param hadoopConf The Hadoop Configuration to get a FileSystem instance
* @param hiddenDirNameFilter A function that returns true when the directory should be considered
* hidden and excluded from results. Defaults to checking for prefixes
* of "." or "_".
* @param hiddenFileNameFilter A function that returns true when the file should be considered
* hidden and excluded from results. Defaults to checking for prefixes
* of "." or "_".
Expand All @@ -212,6 +225,7 @@ object DeltaFileOperations extends DeltaLogging {
spark: SparkSession,
subDirs: Seq[String],
hadoopConf: Broadcast[SerializableConfiguration],
hiddenDirNameFilter: String => Boolean = defaultHiddenFileFilter,
hiddenFileNameFilter: String => Boolean = defaultHiddenFileFilter,
fileListingParallelism: Option[Int] = None,
listAsDirectories: Boolean = true): Dataset[SerializableFileStatus] = {
Expand All @@ -225,7 +239,7 @@ object DeltaFileOperations extends DeltaLogging {
hadoopConf.value.value,
dirs,
recurse = false,
hiddenFileNameFilter, listAsDirectories)
hiddenDirNameFilter, hiddenFileNameFilter, listAsDirectories)
}.repartition(listParallelism) // Initial list of subDirs may be small

val allDirsAndFiles = dirsAndFiles.mapPartitions { firstLevelDirsAndFiles =>
Expand All @@ -234,6 +248,7 @@ object DeltaFileOperations extends DeltaLogging {
logStore,
hadoopConf.value.value,
firstLevelDirsAndFiles,
hiddenDirNameFilter,
hiddenFileNameFilter)
}
spark.createDataset(allDirsAndFiles)
Expand All @@ -250,6 +265,9 @@ object DeltaFileOperations extends DeltaLogging {
* @param listFilename Absolute path to a filename from which new files are listed (exclusive)
* @param topDir Absolute path to the original starting directory
* @param hadoopConf The Hadoop Configuration to get a FileSystem instance
* @param hiddenDirNameFilter A function that returns true when the directory should be considered
* hidden and excluded from results. Defaults to checking for prefixes
* of "." or "_".
* @param hiddenFileNameFilter A function that returns true when the file should be considered
* hidden and excluded from results. Defaults to checking for prefixes
* of "." or "_".
Expand All @@ -259,6 +277,7 @@ object DeltaFileOperations extends DeltaLogging {
listFilename: String,
topDir: String,
hadoopConf: Broadcast[SerializableConfiguration],
hiddenDirNameFilter: String => Boolean = defaultHiddenFileFilter,
hiddenFileNameFilter: String => Boolean = defaultHiddenFileFilter,
fileListingParallelism: Option[Int] = None): Dataset[SerializableFileStatus] = {

Expand All @@ -267,7 +286,7 @@ object DeltaFileOperations extends DeltaLogging {
// If there are no new files, listing from parent directories are expected to be constant time.
val subDirs = getAllTopComponents(new Path(listFilename), new Path(topDir))

recursiveListDirs(spark, subDirs, hadoopConf, hiddenFileNameFilter,
recursiveListDirs(spark, subDirs, hadoopConf, hiddenDirNameFilter, hiddenFileNameFilter,
fileListingParallelism, listAsDirectories = false)
}

Expand All @@ -279,9 +298,11 @@ object DeltaFileOperations extends DeltaLogging {
hadoopConf: Configuration,
dirs: Seq[String],
recursive: Boolean = true,
dirFilter: String => Boolean = defaultHiddenFileFilter,
fileFilter: String => Boolean = defaultHiddenFileFilter): Iterator[SerializableFileStatus] = {
val logStore = LogStore(SparkEnv.get.conf, hadoopConf)
listUsingLogStore(logStore, hadoopConf, dirs.toIterator, recurse = recursive, fileFilter)
listUsingLogStore(
logStore, hadoopConf, dirs.toIterator, recurse = recursive, dirFilter, fileFilter)
}

/**
Expand All @@ -294,11 +315,12 @@ object DeltaFileOperations extends DeltaLogging {
listFilename: String,
topDir: String,
recursive: Boolean = true,
dirFilter: String => Boolean = defaultHiddenFileFilter,
fileFilter: String => Boolean = defaultHiddenFileFilter): Iterator[SerializableFileStatus] = {
val logStore = LogStore(SparkEnv.get.conf, hadoopConf)
val listDirs = getAllTopComponents(new Path(listFilename), new Path(topDir))
listUsingLogStore(logStore, hadoopConf, listDirs.toIterator, recurse = recursive, fileFilter,
listAsDirectories = false)
listUsingLogStore(logStore, hadoopConf, listDirs.toIterator, recurse = recursive,
dirFilter, fileFilter, listAsDirectories = false)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons
}
assert(realCause.getMessage.contains("Failed to merge"))
assert(exception.isInstanceOf[AnalysisException] ||
realCause.getCause.getMessage.contains("/part="),
realCause.getMessage.contains("/part="),
"Error message should contain the file name")
}
}
Expand Down Expand Up @@ -682,6 +682,21 @@ trait ConvertToDeltaSuiteBase extends ConvertToDeltaSuiteBaseCommons
simpleDF.filter("id % 2 == 1").select("id"))
}
}

test("partition column name starting with underscore and dot") {
withTempDir { dir =>
val df = spark.range(100)
.withColumn("_key1", col("id") % 2)
.withColumn(".key2", col("id") % 7 cast "String")

val tempDir = dir.getCanonicalPath
writeFiles(tempDir, df, partCols = Seq("_key1", ".key2"))

convertToDelta(s"parquet.`$tempDir`", Some("_key1 long, `.key2` string"))

checkAnswer(sql(s"SELECT * FROM delta.`$tempDir`"), df)
}
}
}

/**
Expand Down

0 comments on commit 3e8d2d1

Please # to comment.