diff --git a/standalone/src/main/scala/io/delta/standalone/internal/OptimisticTransactionImpl.scala b/standalone/src/main/scala/io/delta/standalone/internal/OptimisticTransactionImpl.scala index 3346981b4e8..4bb40f785f9 100644 --- a/standalone/src/main/scala/io/delta/standalone/internal/OptimisticTransactionImpl.scala +++ b/standalone/src/main/scala/io/delta/standalone/internal/OptimisticTransactionImpl.scala @@ -33,9 +33,11 @@ import io.delta.standalone.types.StructType import io.delta.standalone.internal.actions.{Action, AddFile, CommitInfo, FileAction, Metadata, Protocol, RemoveFile} import io.delta.standalone.internal.exception.DeltaErrors import io.delta.standalone.internal.logging.Logging +import io.delta.standalone.internal.sources.StandaloneHadoopConf import io.delta.standalone.internal.util.{ConversionUtils, FileNames, SchemaMergingUtils, SchemaUtils} import io.delta.standalone.internal.util.DeltaFileOperations + private[internal] class OptimisticTransactionImpl( deltaLog: DeltaLogImpl, snapshot: SnapshotImpl) extends OptimisticTransaction with Logging { @@ -243,6 +245,13 @@ private[internal] class OptimisticTransactionImpl( val customCommitInfo = actions.exists(_.isInstanceOf[CommitInfo]) assert(!customCommitInfo, "Cannot commit a custom CommitInfo in a transaction.") + // This will ignore errors (disabled by default) when trying to relativize a path + // This is specifically for files living in a filesystem different from the base table path + // so one can enable shallow clones across file systems + val relativizeIgnoreError = deltaLog + .hadoopConf + .getBoolean(StandaloneHadoopConf.RELATIVE_PATH_IGNORE, false) + // Convert AddFile paths to relative paths if they're in the table path var finalActions = actions.map { case addFile: AddFile => @@ -250,7 +259,8 @@ private[internal] class OptimisticTransactionImpl( DeltaFileOperations.tryRelativizePath( deltaLog.fs, deltaLog.getPath, - new Path(addFile.path) + new Path(addFile.path), + relativizeIgnoreError ).toString) case a: Action => a } diff --git a/standalone/src/main/scala/io/delta/standalone/internal/SnapshotImpl.scala b/standalone/src/main/scala/io/delta/standalone/internal/SnapshotImpl.scala index ff41b7eebc4..f5377b00719 100644 --- a/standalone/src/main/scala/io/delta/standalone/internal/SnapshotImpl.scala +++ b/standalone/src/main/scala/io/delta/standalone/internal/SnapshotImpl.scala @@ -353,7 +353,10 @@ private[internal] object SnapshotImpl { val fs = FileSystem.get(hadoopConf) fs.makeQualified(hadoopPath).toUri.toString } else { - // return untouched if it is a relative path or is already fully qualified + // return untouched if + // - path is a relative path + // - or path is already fully qualified + // - or path points to external file systems (authority is not null) hadoopPath.toUri.toString } } diff --git a/standalone/src/main/scala/io/delta/standalone/internal/sources/StandaloneHadoopConf.scala b/standalone/src/main/scala/io/delta/standalone/internal/sources/StandaloneHadoopConf.scala index e258ed69593..9855cfeedfa 100644 --- a/standalone/src/main/scala/io/delta/standalone/internal/sources/StandaloneHadoopConf.scala +++ b/standalone/src/main/scala/io/delta/standalone/internal/sources/StandaloneHadoopConf.scala @@ -21,6 +21,15 @@ package io.delta.standalone.internal.sources */ private[internal] object StandaloneHadoopConf { + /** + * If enabled, this ignores errors when trying to relativize an absolute path of an + * [[io.delta.standalone.actions.AddFile]] across file systems. + * This allows user to define shallow clone delta tables where data resides in + * external file systems such as s3://, wasbs:// or adls:// + * By default, this feature is disabled. Set to `true` to enable. + */ + val RELATIVE_PATH_IGNORE = "io.delta.vacuum.relativize.ignoreError" + /** Time zone as which time-based parquet values will be encoded and decoded. */ val PARQUET_DATA_TIME_ZONE_ID = "io.delta.standalone.PARQUET_DATA_TIME_ZONE_ID" diff --git a/standalone/src/main/scala/io/delta/standalone/internal/util/DeltaFileOperations.scala b/standalone/src/main/scala/io/delta/standalone/internal/util/DeltaFileOperations.scala index 439ab2f7ecb..bde3b04da6d 100644 --- a/standalone/src/main/scala/io/delta/standalone/internal/util/DeltaFileOperations.scala +++ b/standalone/src/main/scala/io/delta/standalone/internal/util/DeltaFileOperations.scala @@ -60,14 +60,12 @@ private[internal] object DeltaFileOperations extends Logging { throw new IllegalStateException( s"""Failed to relativize the path ($child). This can happen when absolute paths make |it into the transaction log, which start with the scheme - |s3://, wasbs:// or adls://. This is a bug that has existed before DBR 5.0. - |To fix this issue, please upgrade your writer jobs to DBR 5.0 and please run: - |%scala com.databricks.delta.Delta.fixAbsolutePathsInLog("$child"). + |s3://, wasbs:// or adls://. | |If this table was created with a shallow clone across file systems |(different buckets/containers) and this table is NOT USED IN PRODUCTION, you can - |set the SQL configuration spark.databricks.delta.vacuum.relativize.ignoreError - |to true. Using this SQL configuration could lead to accidental data loss, + |set the hadoop configuration io.delta.vacuum.relativize.ignoreError + |to true. Using this configuration could lead to accidental data loss, |therefore we do not recommend the use of this flag unless |this is a shallow clone for testing purposes. """.stripMargin) diff --git a/standalone/src/test/scala/io/delta/standalone/internal/DeltaScanSuite.scala b/standalone/src/test/scala/io/delta/standalone/internal/DeltaScanSuite.scala index 8f565891877..57b77415189 100644 --- a/standalone/src/test/scala/io/delta/standalone/internal/DeltaScanSuite.scala +++ b/standalone/src/test/scala/io/delta/standalone/internal/DeltaScanSuite.scala @@ -27,6 +27,7 @@ import io.delta.standalone.expressions.{And, EqualTo, LessThan, Literal} import io.delta.standalone.types.{IntegerType, StructField, StructType} import io.delta.standalone.internal.actions.{Action, AddFile, Metadata} +import io.delta.standalone.internal.sources.StandaloneHadoopConf import io.delta.standalone.internal.util.{ConversionUtils, FileNames} import io.delta.standalone.internal.util.TestUtils._ @@ -54,14 +55,25 @@ class DeltaScanSuite extends FunSuite { AddFile(i.toString, partitionValues, 1L, 1L, dataChange = true) } + private val externalFileSystems = Seq("s3://", "wasbs://", "adls://") + + private val externalFiles = (1 to 10).map { i => + val partitionValues = Map("col1" -> (i % 3).toString, "col2" -> (i % 2).toString) + val schema = externalFileSystems(i % 3) + AddFile(s"${schema}path/to/$i.parquet", partitionValues, 1L, 1L, dataChange = true) + } + private val filesDataChangeFalse = files.map(_.copy(dataChange = false)) private val metadataConjunct = new EqualTo(schema.column("col1"), Literal.of(0)) private val dataConjunct = new EqualTo(schema.column("col3"), Literal.of(5)) - def withLog(actions: Seq[Action])(test: DeltaLog => Unit): Unit = { + def withLog( + actions: Seq[Action], + configuration: Configuration = new Configuration() + )(test: DeltaLog => Unit): Unit = { withTempDir { dir => - val log = DeltaLog.forTable(new Configuration(), dir.getCanonicalPath) + val log = DeltaLog.forTable(configuration, dir.getCanonicalPath) log.startTransaction().commit(metadata :: Nil, op, "engineInfo") log.startTransaction().commit(actions, op, "engineInfo") @@ -104,6 +116,19 @@ class DeltaScanSuite extends FunSuite { } } + test("filtered scan with files stored in external file systems") { + val configuration = new Configuration() + configuration.setBoolean(StandaloneHadoopConf.RELATIVE_PATH_IGNORE, true) + withLog(externalFiles, configuration) { log => + val filter = dataConjunct + val scan = log.update().scan(filter) + val scannedFiles = scan.getFiles.asScala.map(_.getPath).toSet + val expectedFiles = externalFiles.map(_.path).toSet + assert(scannedFiles == expectedFiles, + "paths should not have been made qualified") + } + } + /** * This tests the following DeltaScan MemoryOptimized functionalities: * - skipping AddFiles that don't match the given filter diff --git a/standalone/src/test/scala/io/delta/standalone/internal/OptimisticTransactionSuite.scala b/standalone/src/test/scala/io/delta/standalone/internal/OptimisticTransactionSuite.scala index b9b57efad00..9e485ca3b5c 100644 --- a/standalone/src/test/scala/io/delta/standalone/internal/OptimisticTransactionSuite.scala +++ b/standalone/src/test/scala/io/delta/standalone/internal/OptimisticTransactionSuite.scala @@ -27,9 +27,11 @@ import io.delta.standalone.DeltaLog import io.delta.standalone.actions.{Action => ActionJ, AddFile => AddFileJ, CommitInfo, Metadata => MetadataJ, Protocol, SetTransaction => SetTransactionJ} import io.delta.standalone.types.{IntegerType, StringType, StructField, StructType} -import io.delta.standalone.internal.actions.{AddFile, Metadata} +import io.delta.standalone.internal.actions.AddFile +import io.delta.standalone.internal.sources.StandaloneHadoopConf import io.delta.standalone.internal.util.TestUtils._ + class OptimisticTransactionSuite extends OptimisticTransactionSuiteBase { /////////////////////////////////////////////////////////////////////////// @@ -316,4 +318,36 @@ class OptimisticTransactionSuite extends OptimisticTransactionSuiteBase { assert(committedAddFile.getPath === "file:/absolute/path/to/file/test.parquet") } } + + test("Can't create table with external files") { + val extFile = AddFile("s3://snip/snip.parquet", Map(), 0, 0, true) + val conf = new Configuration() + withTempDir { dir => + val log = DeltaLog.forTable(conf, dir.getCanonicalPath) + val txn = log.startTransaction() + val e = intercept[IllegalStateException] { + txn.updateMetadata(metadata_colXY) + txn.commit(List(extFile), op, engineInfo) + } + assert(e.getMessage.contains("Failed to relativize the path")) + } + } + + test("Create table with external files override") { + val extFile = AddFile("s3://snip/snip.parquet", Map(), 0, 0, true) + val conf = new Configuration() + conf.setBoolean(StandaloneHadoopConf.RELATIVE_PATH_IGNORE, true) + withTempDir { dir => + val log = DeltaLog.forTable(conf, dir.getCanonicalPath) + val txn = log.startTransaction() + txn.updateMetadata(metadata_colXY) + txn.commit(List(extFile), op, engineInfo) + val committedAddFile = log.update().getAllFiles.asScala.head + val committedPath = new Path(committedAddFile.getPath) + // Path is preserved + assert(committedPath.isAbsolute && !committedPath.isAbsoluteAndSchemeAuthorityNull) + assert(committedPath.toString == "s3://snip/snip.parquet") + } + } + }