Skip to content

Commit

Permalink
Support passing Hadoop configurations via DeltaTable
Browse files Browse the repository at this point in the history
This PR makes DeltaTable support reading Hadoop configuration. It adds a new public API to the DeltaTable in both Scala and Python:
```
    def forPath(
      sparkSession: SparkSession,
      path: String,
      hadoopConf: scala.collection.Map[String, String])
```

Along with the API change, it adds the necessary change to make operations on `DeltaTable` work:
```
    def as()
    def alias()
    def toDF()
    def optimize()
    def upgradeTableProtocol()
    def vacuum(...)
    def history()
    def generate(...)
    def update(...)
    def updateExpr(...)
    def delete(...)
    def merge(...)
    def clone(...)
    def cloneAtVersion(...)
    def restoreToVersion(...)
```
With the change in this PR, the above functions work and are verified in a new unit test. Some commands such as Merge/Vacuum/restoreToVersion etc don't pick up the Hadoop configurations even though they are passed to DeltaTableV2 through new forPath(..., options) API. Note that the unit test is written first by verifying that it fails without the change and passes with the change.

New unit tests.

AFFECTED VERSIONS: Delta 2.2

PROBLEM DESCRIPTION:
Similar to DataFrame, DeltaTable API in both Scala and Python supports custom Hadoop file system options to access underlying storage system. Example:
```
  val myCredential = Map(
  "fs.azure.account.auth.type" -> "OAuth",
  "fs.azure.account.oauth.provider.type" -> "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
  "fs.azure.account.oauth2.client.id" -> "...",
  "fs.azure.account.oauth2.client.secret" -> "...",
  "fs.azure.account.oauth2.client.endpoint" -> "..."
  )
  val deltaTable = DeltaTable.forPath(spark, "/path/to/table", myCredential)
```
Before this PR, there is no way to pass these Hadoop configurations through DeltaTable. DeltaTable will only pick up ones starting with `fs.` or `dfs.` to create Hadoop Configuration object to access the storage the same way as DataFrame options for Delta. We avoid picking up other options because:

- We don't want unrelated options to be passed into Delta underlying constructs such as DeltaLog.

GitOrigin-RevId: 89cfb1a3465d30081a14f74ae6aa80a4c48f9e56
  • Loading branch information
dabao521 authored and zsxwing committed Aug 22, 2022
1 parent 8e7e109 commit ee3917f
Show file tree
Hide file tree
Showing 19 changed files with 572 additions and 52 deletions.
6 changes: 6 additions & 0 deletions core/src/main/resources/error/delta-error-classes.json
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,12 @@
],
"sqlState" : "42000"
},
"DELTA_TABLE_FOR_PATH_UNSUPPORTED_HADOOP_CONF" : {
"message" : [
"Currently DeltaTable.forPath only supports hadoop configuration keys starting with <allowedPrefixes> but got <unsupportedOptions>"
],
"sqlState" : "0A000"
},
"DELTA_TABLE_FOUND_IN_EXECUTOR" : {
"message" : [
"DeltaTable cannot be used in executors"
Expand Down
11 changes: 7 additions & 4 deletions core/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,28 +199,31 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] {
OptimizeTableCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.partitionPredicate).map(extractRawText(_)))(interleaveBy)
Option(ctx.partitionPredicate).map(extractRawText(_)), Map.empty)(interleaveBy)
}

override def visitDescribeDeltaDetail(
ctx: DescribeDeltaDetailContext): LogicalPlan = withOrigin(ctx) {
DescribeDeltaDetailCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier))
Option(ctx.table).map(visitTableIdentifier),
Map.empty)
}

override def visitDescribeDeltaHistory(
ctx: DescribeDeltaHistoryContext): LogicalPlan = withOrigin(ctx) {
DescribeDeltaHistoryCommand(
Option(ctx.path).map(string),
Option(ctx.table).map(visitTableIdentifier),
Option(ctx.limit).map(_.getText.toInt))
Option(ctx.limit).map(_.getText.toInt),
Map.empty)
}

override def visitGenerate(ctx: GenerateContext): LogicalPlan = withOrigin(ctx) {
DeltaGenerateCommand(
modeName = ctx.modeName.getText,
tableId = visitTableIdentifier(ctx.table))
tableId = visitTableIdentifier(ctx.table),
Map.empty)
}

override def visitConvert(ctx: ConvertContext): LogicalPlan = withOrigin(ctx) {
Expand Down
12 changes: 8 additions & 4 deletions core/src/main/scala/io/delta/tables/DeltaOptimizeBuilder.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,13 @@ import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
* @param sparkSession SparkSession to use for execution
* @param tableIdentifier Id of the table on which to
* execute the optimize
* @param options Hadoop file system options for read and write.
* @since 2.0.0
*/
class DeltaOptimizeBuilder private(
sparkSession: SparkSession,
tableIdentifier: String) extends AnalysisHelper {
tableIdentifier: String,
options: Map[String, String]) extends AnalysisHelper {
@volatile private var partitionFilter: Option[String] = None

/**
Expand Down Expand Up @@ -77,7 +79,8 @@ class DeltaOptimizeBuilder private(
.sessionState
.sqlParser
.parseTableIdentifier(tableIdentifier)
val optimize = OptimizeTableCommand(None, Some(tableId), partitionFilter)(zOrderBy = zOrderBy)
val optimize =
OptimizeTableCommand(None, Some(tableId), partitionFilter, options)(zOrderBy = zOrderBy)
toDataset(sparkSession, optimize)
}
}
Expand All @@ -91,7 +94,8 @@ private[delta] object DeltaOptimizeBuilder {
@Unstable
private[delta] def apply(
sparkSession: SparkSession,
tableIdentifier: String): DeltaOptimizeBuilder = {
new DeltaOptimizeBuilder(sparkSession, tableIdentifier)
tableIdentifier: String,
options: Map[String, String]): DeltaOptimizeBuilder = {
new DeltaOptimizeBuilder(sparkSession, tableIdentifier, options)
}
}
70 changes: 66 additions & 4 deletions core/src/main/scala/io/delta/tables/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ class DeltaTable private[tables](
*/
def optimize(): DeltaOptimizeBuilder = {
DeltaOptimizeBuilder(sparkSession,
table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`"))
table.tableIdentifier.getOrElse(s"delta.`${deltaLog.dataPath.toString}`"), table.options)
}

/**
Expand Down Expand Up @@ -650,15 +650,77 @@ object DeltaTable {
* @since 0.3.0
*/
def forPath(sparkSession: SparkSession, path: String): DeltaTable = {
forPath(sparkSession, path, Map.empty[String, String])
}

/**
* Instantiate a [[DeltaTable]] object representing the data at the given path, If the given
* path is invalid (i.e. either no table exists or an existing table is not a Delta table),
* it throws a `not a Delta table` error.
*
* @param hadoopConf: Hadoop configuration starting with "fs." or "dfs." will be picked up
* by `DeltaTable` to access the file system when executing queries.
* Other configurations will not be allowed.
*
* {{{
* val hadoopConf = Map(
* "fs.s3a.access.key" -> "<access-key>",
* "fs.s3a.secret.key" -> "<secret-key>"
* )
* DeltaTable.forPath(spark, "/path/to/table", hadoopConf)
* }}}
* @since 2.1.0
*/
def forPath(
sparkSession: SparkSession,
path: String,
hadoopConf: scala.collection.Map[String, String]): DeltaTable = {
// We only pass hadoopConf so that we won't pass any unsafe options to Delta.
val badOptions = hadoopConf.filterKeys { k =>
!DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}.toMap
if (!badOptions.isEmpty) {
throw DeltaErrors.unsupportedDeltaTableForPathHadoopConf(badOptions)
}
val fileSystemOptions: Map[String, String] = hadoopConf.toMap
val hdpPath = new Path(path)
if (DeltaTableUtils.isDeltaTable(sparkSession, hdpPath)) {
new DeltaTable(sparkSession.read.format("delta").load(path),
DeltaTableV2(sparkSession, hdpPath))
if (DeltaTableUtils.isDeltaTable(sparkSession, hdpPath, fileSystemOptions)) {
new DeltaTable(sparkSession.read.format("delta").options(fileSystemOptions).load(path),
DeltaTableV2(
spark = sparkSession,
path = hdpPath,
options = fileSystemOptions))
} else {
throw DeltaErrors.notADeltaTableException(DeltaTableIdentifier(path = Some(path)))
}
}

/**
* Java friendly API to instantiate a [[DeltaTable]] object representing the data at the given
* path, If the given path is invalid (i.e. either no table exists or an existing table is not a
* Delta table), it throws a `not a Delta table` error.
*
* @param hadoopConf: Hadoop configuration starting with "fs." or "dfs." will be picked up
* by `DeltaTable` to access the file system when executing queries.
* Other configurations will be ignored.
*
* {{{
* val hadoopConf = Map(
* "fs.s3a.access.key" -> "<access-key>",
* "fs.s3a.secret.key", "<secret-key>"
* )
* DeltaTable.forPath(spark, "/path/to/table", hadoopConf)
* }}}
* @since 2.1.0
*/
def forPath(
sparkSession: SparkSession,
path: String,
hadoopConf: java.util.Map[String, String]): DeltaTable = {
val fsOptions = hadoopConf.asScala.toMap
forPath(sparkSession, path, fsOptions)
}

/**
* Instantiate a [[DeltaTable]] object using the given table or view name. If the given
* tableOrViewName is invalid (i.e. either no table exists or an existing table is not a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
protected def executeDetails(
path: String,
tableIdentifier: Option[TableIdentifier]): DataFrame = {
val details = DescribeDeltaDetailCommand(Option(path), tableIdentifier)
val details = DescribeDeltaDetailCommand(Option(path), tableIdentifier, self.deltaLog.options)
toDataset(sparkSession, details)
}

Expand All @@ -66,7 +66,7 @@ trait DeltaTableOperations extends AnalysisHelper { self: DeltaTable =>
.sessionState
.sqlParser
.parseTableIdentifier(tblIdentifier)
val generate = DeltaGenerateCommand(mode, tableId)
val generate = DeltaGenerateCommand(mode, tableId, self.deltaLog.options)
toDataset(sparkSession, generate)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2275,6 +2275,15 @@ trait DeltaErrorsBase
new AnalysisException(
s"SHOW COLUMNS with conflicting databases: '$db' != '${tableID.database.get}'")
}

def unsupportedDeltaTableForPathHadoopConf(unsupportedOptions: Map[String, String]): Throwable = {
new DeltaIllegalArgumentException(
errorClass = "DELTA_TABLE_FOR_PATH_UNSUPPORTED_HADOOP_CONF",
messageParameters = Array(
DeltaTableUtils.validDeltaTableHadoopPrefixes.mkString("[", ",", "]"),
unsupportedOptions.mkString(","))
)
}
}

object DeltaErrors extends DeltaErrorsBase
Expand Down
11 changes: 8 additions & 3 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaLog.scala
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,8 @@ class DeltaLog private(
val actionType = actionTypeOpt.getOrElse(if (isStreaming) "streaming" else "batch")
val fileIndex = new TahoeBatchFileIndex(spark, actionType, addFiles, this, dataPath, snapshot)

val hadoopOptions = snapshot.metadata.format.options ++ options

val relation = HadoopFsRelation(
fileIndex,
partitionSchema =
Expand All @@ -386,7 +388,7 @@ class DeltaLog private(
ColumnWithDefaultExprUtils.removeDefaultExpressions(snapshot.metadata.schema)),
bucketSpec = None,
snapshot.deltaLog.fileFormat(snapshot.metadata),
snapshot.metadata.format.options)(spark)
hadoopOptions)(spark)

Dataset.ofRows(spark, LogicalRelation(relation, isStreaming = isStreaming))
}
Expand Down Expand Up @@ -457,7 +459,8 @@ object DeltaLog extends DeltaLogging {

/**
* The key type of `DeltaLog` cache. It's a pair of the canonicalized table path and the file
* system options (options starting with "fs." prefix) passed into `DataFrameReader/Writer`
* system options (options starting with "fs." or "dfs." prefix) passed into
* `DataFrameReader/Writer`
*/
private type DeltaLogCacheKey = (Path, Map[String, String])

Expand Down Expand Up @@ -571,7 +574,9 @@ object DeltaLog extends DeltaLogging {
DeltaSQLConf.LOAD_FILE_SYSTEM_CONFIGS_FROM_DATAFRAME_OPTIONS)) {
// We pick up only file system options so that we don't pass any parquet or json options to
// the code that reads Delta transaction logs.
options.filterKeys(_.startsWith("fs.")).toMap
options.filterKeys { k =>
DeltaTableUtils.validDeltaTableHadoopPrefixes.exists(k.startsWith)
}.toMap
} else {
Map.empty
}
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/org/apache/spark/sql/delta/DeltaTable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ object NodeWithOnlyDeterministicProjectAndFilter {
object DeltaTableUtils extends PredicateHelper
with DeltaLogging {

// The valid hadoop prefixes passed through `DeltaTable.forPath` or DataFrame APIs.
val validDeltaTableHadoopPrefixes: List[String] = List("fs.", "dfs.")

/** Check whether this table is a Delta table based on information from the Catalog. */
def isDeltaTable(table: CatalogTable): Boolean = DeltaSourceUtils.isDeltaTable(table.provider)

Expand All @@ -104,8 +107,11 @@ object DeltaTableUtils extends PredicateHelper
}

/** Check if the provided path is the root or the children of a Delta table. */
def isDeltaTable(spark: SparkSession, path: Path): Boolean = {
findDeltaTableRoot(spark, path).isDefined
def isDeltaTable(
spark: SparkSession,
path: Path,
options: Map[String, String] = Map.empty): Boolean = {
findDeltaTableRoot(spark, path, options).isDefined
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ trait DeltaCommand extends DeltaLogging {
* @param path Table location. Expects a non-empty [[tableIdentifier]] or [[path]].
* @param tableIdentifier Table identifier. Expects a non-empty [[tableIdentifier]] or [[path]].
* @param operationName Operation that is getting the DeltaLog, used in error messages.
* @param hadoopConf Hadoop file system options used to build DeltaLog.
* @return DeltaLog of the table
* @throws AnalysisException If either no Delta table exists at the given path/identifier or
* there is neither [[path]] nor [[tableIdentifier]] is provided.
Expand All @@ -227,7 +228,8 @@ trait DeltaCommand extends DeltaLogging {
spark: SparkSession,
path: Option[String],
tableIdentifier: Option[TableIdentifier],
operationName: String): DeltaLog = {
operationName: String,
hadoopConf: Map[String, String] = Map.empty): DeltaLog = {
val tablePath =
if (path.nonEmpty) {
new Path(path.get)
Expand All @@ -250,7 +252,7 @@ trait DeltaCommand extends DeltaLogging {
throw DeltaErrors.missingTableIdentifierException(operationName)
}

val deltaLog = DeltaLog.forTable(spark, tablePath)
val deltaLog = DeltaLog.forTable(spark, tablePath, hadoopConf)
if (deltaLog.snapshot.version < 0) {
throw DeltaErrors.notADeltaTableException(
operationName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
import org.apache.spark.sql.execution.command.LeafRunnableCommand

case class DeltaGenerateCommand(modeName: String, tableId: TableIdentifier)
case class DeltaGenerateCommand(
modeName: String,
tableId: TableIdentifier,
options: Map[String, String])
extends LeafRunnableCommand {

import DeltaGenerateCommand._
Expand All @@ -43,7 +46,7 @@ case class DeltaGenerateCommand(modeName: String, tableId: TableIdentifier)
new Path(sparkSession.sessionState.catalog.getTableMetadata(tableId).location)
}

val deltaLog = DeltaLog.forTable(sparkSession, tablePath)
val deltaLog = DeltaLog.forTable(sparkSession, tablePath, options)
if (!deltaLog.tableExists) {
throw DeltaErrors.notADeltaTableException("GENERATE")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,15 @@ object TableDetail {
*/
case class DescribeDeltaDetailCommand(
path: Option[String],
tableIdentifier: Option[TableIdentifier]) extends LeafRunnableCommand with DeltaLogging {
tableIdentifier: Option[TableIdentifier],
hadoopConf: Map[String, String]) extends LeafRunnableCommand with DeltaLogging {

override val output: Seq[Attribute] = TableDetail.schema.toAttributes

override def run(sparkSession: SparkSession): Seq[Row] = {
val (basePath, tableMetadata) = getPathAndTableMetadata(sparkSession, path, tableIdentifier)

val deltaLog = DeltaLog.forTable(sparkSession, basePath)
val deltaLog = DeltaLog.forTable(sparkSession, basePath, hadoopConf)
recordDeltaOperation(deltaLog, "delta.ddl.describeDetails") {
val snapshot = deltaLog.update()
if (snapshot.version == -1) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ import org.apache.spark.sql.execution.command.LeafRunnableCommand
/**
* A logical placeholder for describing a Delta table's history, so that the history can be
* leveraged in subqueries. Replaced with `DescribeDeltaHistoryCommand` during planning.
*
* @param options: Hadoop file system options used for read and write.
*/
case class DescribeDeltaHistory(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
limit: Option[Int],
options: Map[String, String],
output: Seq[Attribute] = ExpressionEncoder[DeltaHistory]().schema.toAttributes)
extends LeafNode with MultiInstanceRelation {
override def computeStats(): Statistics = Statistics(sizeInBytes = conf.defaultSizeInBytes)
Expand All @@ -47,11 +50,14 @@ case class DescribeDeltaHistory(

/**
* A command for describing the history of a Delta table.
*
* @param options: Hadoop file system options used for read and write.
*/
case class DescribeDeltaHistoryCommand(
path: Option[String],
tableIdentifier: Option[TableIdentifier],
limit: Option[Int],
options: Map[String, String],
override val output: Seq[Attribute] = ExpressionEncoder[DeltaHistory]().schema.toAttributes)
extends LeafRunnableCommand with DeltaLogging {

Expand Down Expand Up @@ -83,7 +89,7 @@ case class DescribeDeltaHistoryCommand(
throw DeltaErrors.maxArraySizeExceeded()
}

val deltaLog = DeltaLog.forTable(sparkSession, basePath)
val deltaLog = DeltaLog.forTable(sparkSession, basePath, options)
recordDeltaOperation(deltaLog, "delta.ddl.describeHistory") {
if (!deltaLog.tableExists) {
throw DeltaErrors.notADeltaTableException("DESCRIBE HISTORY")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,13 +108,14 @@ abstract class OptimizeTableCommandBase extends RunnableCommand with DeltaComman
case class OptimizeTableCommand(
path: Option[String],
tableId: Option[TableIdentifier],
partitionPredicate: Option[String])(val zOrderBy: Seq[UnresolvedAttribute])
partitionPredicate: Option[String],
options: Map[String, String])(val zOrderBy: Seq[UnresolvedAttribute])
extends OptimizeTableCommandBase with LeafRunnableCommand {

override val otherCopyArgs: Seq[AnyRef] = zOrderBy :: Nil

override def run(sparkSession: SparkSession): Seq[Row] = {
val deltaLog = getDeltaLog(sparkSession, path, tableId, "OPTIMIZE")
val deltaLog = getDeltaLog(sparkSession, path, tableId, "OPTIMIZE", options)

val txn = deltaLog.startTransaction()
if (txn.readVersion == -1) {
Expand Down
Loading

0 comments on commit ee3917f

Please # to comment.