Skip to content

Commit

Permalink
[Spark] Handle concurrent CREATE TABLE IF NOT EXISTS ... LIKE ... tab…
Browse files Browse the repository at this point in the history
…le commands (#3306)

#### Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)

## Description

When 2 or more CREATE TABLE IF NOT EXISTS table commands are run
concurrently, they both think the table doesn't exist yet and the second
command fails with TABLE_ALREADY_EXISTS error.

With this PR, we aim to make sure the second command end up in a no-op
instead of a failure.

## How was this patch tested?

UTs

## Does this PR introduce _any_ user-facing changes?

No
  • Loading branch information
sumeet-db authored Jun 26, 2024
1 parent 4482ee3 commit 0dc0722
Show file tree
Hide file tree
Showing 2 changed files with 44 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -603,7 +603,7 @@ case class CreateDeltaTableCommand(
case TableCreationModes.Create =>
spark.sessionState.catalog.createTable(
cleaned,
ignoreIfExists = existingTableOpt.isDefined,
ignoreIfExists = existingTableOpt.isDefined || mode == SaveMode.Ignore,
validateLocation = false)
case TableCreationModes.Replace | TableCreationModes.CreateOrReplace
if existingTableOpt.isDefined =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,21 @@
package org.apache.spark.sql.delta

import java.io.File
import java.net.URI
import java.util.UUID

import org.apache.spark.sql.delta.catalog.DeltaCatalog
import org.apache.spark.sql.delta.commands.{CreateDeltaTableCommand, TableCreationModes}
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest
import org.scalatest.exceptions.TestFailedException

import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, CatalogTable, CatalogTableType}
import org.apache.spark.sql.functions.lit
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types.StructType

class DeltaCreateTableLikeSuite extends QueryTest
with SharedSparkSession
Expand Down Expand Up @@ -277,6 +284,42 @@ class DeltaCreateTableLikeSuite extends QueryTest
}
}

test("concurrent create Managed Catalog table commands should not fail") {
withTempDir { dir =>
withTable("t") {
def getCatalogTable: CatalogTable = {
val storage = CatalogStorageFormat.empty.copy(
locationUri = Some(new URI(s"$dir/${UUID.randomUUID().toString}")))
val catalogTableTarget = CatalogTable(
identifier = TableIdentifier("t"),
tableType = CatalogTableType.MANAGED,
storage = storage,
provider = Some("delta"),
schema = new StructType().add("id", "long"))
new DeltaCatalog()
.verifyTableAndSolidify(
tableDesc = catalogTableTarget,
query = None,
maybeClusterBySpec = None)
}
CreateDeltaTableCommand(
getCatalogTable,
existingTableOpt = None,
mode = SaveMode.Ignore,
query = None,
operation = TableCreationModes.Create).run(spark)
assert(spark.sessionState.catalog.tableExists(TableIdentifier("t")))
CreateDeltaTableCommand(
getCatalogTable,
existingTableOpt = None, // Set to None to simulate concurrent table creation commands.
mode = SaveMode.Ignore,
query = None,
operation = TableCreationModes.Create).run(spark)
assert(spark.sessionState.catalog.tableExists(TableIdentifier("t")))
}
}
}

test("CREATE TABLE LIKE where sourceTable is a json table") {
val srcTbl = "srcTbl"
val targetTbl = "targetTbl"
Expand Down

0 comments on commit 0dc0722

Please # to comment.