From 3ff4075d6ccfc834223aeeb231831b7b8d5ec1b8 Mon Sep 17 00:00:00 2001 From: Gengliang Wang Date: Mon, 21 Aug 2023 09:54:49 -0700 Subject: [PATCH] [Spark] Support 3-part naming in table identifier parsing -Spark - [ ] Standalone - [ ] Flink - [ ] Kernel - [ ] Other (fill in here) Support 3-part naming in table identifier parsing. Before the changes, the following command ``` OPTIMIZE catalog_foo.db.tbl ``` will throw error ``` org.apache.spark.sql.delta.DeltaParseException: Illegal table name catalog_foo.db.tbl(line 1, pos 9) == SQL == optimize catalog_foo.db.tbl ---------^^^ at io.delta.sql.parser.DeltaSqlAstBuilder.$anonfun$visitTableIdentifier$1(DeltaSqlParser.scala:430) at org.apache.spark.sql.catalyst.parser.ParserUtils$.withOrigin(ParserUtils.scala:160) at io.delta.sql.parser.DeltaSqlAstBuilder.visitTableIdentifier(DeltaSqlParser.scala:427) at io.delta.sql.parser.DeltaSqlAstBuilder.$anonfun$visitOptimizeTable$5(DeltaSqlParser.scala:348) at scala.Option.map(Option.scala:230) at io.delta.sql.parser.DeltaSqlAstBuilder.$anonfun$visitOptimizeTable$1(DeltaSqlParser.scala:348) ``` After the changes, the command works. A new unit test No Closes delta-io/delta#1985 Signed-off-by: Venki Korukanti GitOrigin-RevId: dd297a9d8e77a6fdfafb834c74a915de1aeae737 --- .../io/delta/sql/parser/DeltaSqlParser.scala | 2 ++ .../sql/parser/DeltaSqlParserSuite.scala | 29 +++++++++++++------ 2 files changed, 22 insertions(+), 9 deletions(-) diff --git a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala index 3563bf3aebd..8afa78e726a 100644 --- a/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala +++ b/spark/src/main/scala/io/delta/sql/parser/DeltaSqlParser.scala @@ -427,6 +427,8 @@ class DeltaSqlAstBuilder extends DeltaSqlBaseBaseVisitor[AnyRef] { ctx.identifier.asScala.toSeq match { case Seq(tbl) => TableIdentifier(tbl.getText) case Seq(db, tbl) => TableIdentifier(tbl.getText, Some(db.getText)) + case Seq(catalog, db, tbl) => + TableIdentifier(tbl.getText, Some(db.getText), Some(catalog.getText)) case _ => throw new DeltaParseException(s"Illegal table name ${ctx.getText}", ctx) } } diff --git a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala index 06e0c9da79e..860065a9ec9 100644 --- a/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala +++ b/spark/src/test/scala/io/delta/sql/parser/DeltaSqlParserSuite.scala @@ -62,6 +62,12 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === UnresolvedTable(Seq("db", "tbl"), "OPTIMIZE", None)) + parsedCmd = parser.parsePlan("OPTIMIZE catalog_foo.db.tbl") + assert(parsedCmd === + OptimizeTableCommand(None, Some(tblId("tbl", "db", "catalog_foo")), Nil)(Nil)) + assert(parsedCmd.asInstanceOf[OptimizeTableCommand].child === + UnresolvedTable(Seq("catalog_foo", "db", "tbl"), "OPTIMIZE", None)) + assert(parser.parsePlan("OPTIMIZE tbl_${system:spark.testing}") === OptimizeTableCommand(None, Some(tblId("tbl_true")), Nil)(Nil)) @@ -230,7 +236,7 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { versionAsOf, Some("sql")) }, - UnresolvedRelation(tblId(target)), + new UnresolvedRelation(target.split('.')), ifNotExists = false, isReplaceCommand = isReplace, isCreateCommand = isCreate, @@ -260,11 +266,8 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { checkCloneStmt(parser, source = "t1", target = "t1", versionAsOf = Some(1L)) // Clone with 3L table (only useful for Iceberg table now) checkCloneStmt(parser, source = "local.iceberg.table", target = "t1", sourceIs3LTable = true) - // Yet target cannot be a 3L table yet - intercept[ParseException] { - checkCloneStmt(parser, source = "local.iceberg.table", target = "catalog.delta.table", - sourceIs3LTable = true) - } + checkCloneStmt(parser, source = "local.iceberg.table", target = "delta.table", + sourceIs3LTable = true) // Custom source format with path checkCloneStmt(parser, source = "/path/to/iceberg", target = "t1", sourceFormat = "iceberg", sourceIsTable = false) @@ -274,8 +277,16 @@ class DeltaSqlParserSuite extends SparkFunSuite with SQLHelper { new UnresolvedAttribute(colName) } - private def tblId(tblName: String, schema: String = null): TableIdentifier = { - if (schema == null) new TableIdentifier(tblName) - else new TableIdentifier(tblName, Some(schema)) + private def tblId( + tblName: String, + schema: String = null, + catalog: String = null): TableIdentifier = { + if (catalog == null) { + if (schema == null) new TableIdentifier(tblName) + else new TableIdentifier(tblName, Some(schema)) + } else { + assert(schema != null) + new TableIdentifier(tblName, Some(schema), Some(catalog)) + } } }