From d8fa508e53042dfdbfef49a7b9c3b25a82c68677 Mon Sep 17 00:00:00 2001 From: Daniel Tenedorio Date: Wed, 1 Mar 2023 20:33:53 -0800 Subject: [PATCH] [SPARK-42521][SQL] Add NULLs for INSERTs with user-specified lists of fewer columns than the target table ### What changes were proposed in this pull request? Add NULLs for INSERTs with user-specified lists of fewer columns than the target table. This is done by updating the semantics of the `USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES` SQLConf to only apply for INSERTs with explicit user-specific column lists, and changing it to true by default. ### Why are the changes needed? This behavior is consistent with other query engines. ### Does this PR introduce _any_ user-facing change? Yes, per above. ### How was this patch tested? Unit test coverage in `InsertSuite`. Closes #40229 from dtenedor/defaults-insert-nulls. Authored-by: Daniel Tenedorio Signed-off-by: Gengliang Wang (cherry picked from commit d2a527a545c57c7fe1a736016b4e26666d2e571b) Signed-off-by: Gengliang Wang --- docs/sql-migration-guide.md | 1 + .../analysis/ResolveDefaultColumns.scala | 35 +++-- .../apache/spark/sql/internal/SQLConf.scala | 9 +- .../apache/spark/sql/SQLInsertTestSuite.scala | 4 +- .../sql/connector/DataSourceV2SQLSuite.scala | 16 +- .../spark/sql/sources/InsertSuite.scala | 142 ++++++------------ 6 files changed, 85 insertions(+), 122 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index 7eda9c8de92ac..0181ff95cd659 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -24,6 +24,7 @@ license: | ## Upgrading from Spark SQL 3.3 to 3.4 + - Since Spark 3.4, INSERT INTO commands with explicit column lists comprising fewer columns than the target table will automatically add the corresponding default values for the remaining columns (or NULL for any column lacking an explicitly-assigned default value). In Spark 3.3 or earlier, these commands would have failed returning errors reporting that the number of provided columns does not match the number of columns in the target table. Note that disabling `spark.sql.defaultColumn.useNullsForMissingDefaultValues` will restore the previous behavior. - Since Spark 3.4, Number or Number(\*) from Teradata will be treated as Decimal(38,18). In Spark 3.3 or earlier, Number or Number(\*) from Teradata will be treated as Decimal(38, 0), in which case the fractional part will be removed. - Since Spark 3.4, v1 database, table, permanent view and function identifier will include 'spark_catalog' as the catalog name if database is defined, e.g. a table identifier will be: `spark_catalog.default.t`. To restore the legacy behavior, set `spark.sql.legacy.v1IdentifierNoCatalog` to `true`. - Since Spark 3.4, when ANSI SQL mode(configuration `spark.sql.ansi.enabled`) is on, Spark SQL always returns NULL result on getting a map value with a non-existing key. In Spark 3.3 or earlier, there will be an error. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala index a04844c6526db..6afb51ba81e10 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveDefaultColumns.scala @@ -108,7 +108,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, schema) val expanded: LogicalPlan = - addMissingDefaultValuesForInsertFromInlineTable(node, schema) + addMissingDefaultValuesForInsertFromInlineTable(node, schema, i.userSpecifiedCols.size) val replaced: Option[LogicalPlan] = replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded) replaced.map { r: LogicalPlan => @@ -132,7 +132,7 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl val regenerated: InsertIntoStatement = regenerateUserSpecifiedCols(i, schema) val project: Project = i.query.asInstanceOf[Project] val expanded: Project = - addMissingDefaultValuesForInsertFromProject(project, schema) + addMissingDefaultValuesForInsertFromProject(project, schema, i.userSpecifiedCols.size) val replaced: Option[LogicalPlan] = replaceExplicitDefaultValuesForInputOfInsertInto(schema, expanded) replaced.map { r => @@ -273,15 +273,16 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl */ private def addMissingDefaultValuesForInsertFromInlineTable( node: LogicalPlan, - insertTableSchemaWithoutPartitionColumns: StructType): LogicalPlan = { - val numQueryOutputs: Int = node match { - case table: UnresolvedInlineTable => table.rows(0).size - case local: LocalRelation => local.data(0).numFields - } + insertTableSchemaWithoutPartitionColumns: StructType, + numUserSpecifiedColumns: Int): LogicalPlan = { val schema = insertTableSchemaWithoutPartitionColumns val newDefaultExpressions: Seq[Expression] = - getDefaultExpressionsForInsert(numQueryOutputs, schema) - val newNames: Seq[String] = schema.fields.drop(numQueryOutputs).map { _.name } + getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns) + val newNames: Seq[String] = if (numUserSpecifiedColumns > 0) { + schema.fields.drop(numUserSpecifiedColumns).map(_.name) + } else { + schema.fields.map(_.name) + } node match { case _ if newDefaultExpressions.isEmpty => node case table: UnresolvedInlineTable => @@ -306,11 +307,11 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl */ private def addMissingDefaultValuesForInsertFromProject( project: Project, - insertTableSchemaWithoutPartitionColumns: StructType): Project = { - val numQueryOutputs: Int = project.projectList.size + insertTableSchemaWithoutPartitionColumns: StructType, + numUserSpecifiedColumns: Int): Project = { val schema = insertTableSchemaWithoutPartitionColumns val newDefaultExpressions: Seq[Expression] = - getDefaultExpressionsForInsert(numQueryOutputs, schema) + getDefaultExpressionsForInsert(schema, numUserSpecifiedColumns) val newAliases: Seq[NamedExpression] = newDefaultExpressions.zip(schema.fields).map { case (expr, field) => Alias(expr, field.name)() @@ -322,9 +323,13 @@ case class ResolveDefaultColumns(catalog: SessionCatalog) extends Rule[LogicalPl * This is a helper for the addMissingDefaultValuesForInsertFromInlineTable methods above. */ private def getDefaultExpressionsForInsert( - numQueryOutputs: Int, - schema: StructType): Seq[Expression] = { - val remainingFields: Seq[StructField] = schema.fields.drop(numQueryOutputs) + schema: StructType, + numUserSpecifiedColumns: Int): Seq[Expression] = { + val remainingFields: Seq[StructField] = if (numUserSpecifiedColumns > 0) { + schema.fields.drop(numUserSpecifiedColumns) + } else { + Seq.empty + } val numDefaultExpressionsToAdd = getStructFieldsForDefaultExpressions(remainingFields).size Seq.fill(numDefaultExpressionsToAdd)(UnresolvedAttribute(CURRENT_DEFAULT_COLUMN_NAME)) } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 2328817a2d45b..21bf571c49429 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -3115,13 +3115,12 @@ object SQLConf { val USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES = buildConf("spark.sql.defaultColumn.useNullsForMissingDefaultValues") .internal() - .doc("When true, and DEFAULT columns are enabled, allow column definitions lacking " + - "explicit default values to behave as if they had specified DEFAULT NULL instead. " + - "For example, this allows most INSERT INTO statements to specify only a prefix of the " + - "columns in the target table, and the remaining columns will receive NULL values.") + .doc("When true, and DEFAULT columns are enabled, allow INSERT INTO commands with user-" + + "specified lists of fewer columns than the target table to behave as if they had " + + "specified DEFAULT for all remaining columns instead, in order.") .version("3.4.0") .booleanConf - .createWithDefault(false) + .createWithDefault(true) val SKIP_TYPE_VALIDATION_ON_ALTER_PARTITION = buildConf("spark.sql.legacy.skipTypeValidationOnAlterPartition") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala index 24ebfd75bd56a..1997fce0f5cfd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLInsertTestSuite.scala @@ -207,7 +207,7 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { withTable("t1") { createTable("t1", cols, Seq.fill(4)("int")) - val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 (c1) values(1)")) + val e1 = intercept[AnalysisException](sql(s"INSERT INTO t1 values(1)")) assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 1") || e1.getMessage.contains("expected 4 columns but found 1") || e1.getMessage.contains("not enough data columns") || @@ -217,7 +217,7 @@ trait SQLInsertTestSuite extends QueryTest with SQLTestUtils { withTable("t1") { createTable("t1", cols, Seq.fill(4)("int"), cols.takeRight(2)) val e1 = intercept[AnalysisException] { - sql(s"INSERT INTO t1 partition(c3=3, c4=4) (c1) values(1)") + sql(s"INSERT INTO t1 partition(c3=3, c4=4) values(1)") } assert(e1.getMessage.contains("target table has 4 column(s) but the inserted data has 3") || e1.getMessage.contains("not enough data columns") || diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala index 73bfeafdcdf5f..5b7acda6fc3e0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala @@ -1093,8 +1093,8 @@ class DataSourceV2SQLSuiteV1Filter verifyTable(t1, df) // Missing columns assert(intercept[AnalysisException] { - sql(s"INSERT INTO $t1(data) VALUES(4)") - }.getMessage.contains("Cannot find data for output column 'id'")) + sql(s"INSERT INTO $t1 VALUES(4)") + }.getMessage.contains("not enough data columns")) // Duplicate columns checkError( exception = intercept[AnalysisException] { @@ -1121,9 +1121,9 @@ class DataSourceV2SQLSuiteV1Filter verifyTable(t1, Seq((3L, "c")).toDF("id", "data")) // Missing columns assert(intercept[AnalysisException] { - sql(s"INSERT OVERWRITE $t1(data) VALUES(4)") - }.getMessage.contains("Cannot find data for output column 'id'")) - // Duplicate columns + sql(s"INSERT OVERWRITE $t1 VALUES(4)") + }.getMessage.contains("not enough data columns")) + // Duplicate columns checkError( exception = intercept[AnalysisException] { sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)") @@ -1150,9 +1150,9 @@ class DataSourceV2SQLSuiteV1Filter verifyTable(t1, Seq((1L, "c", "e"), (2L, "b", "d")).toDF("id", "data", "data2")) // Missing columns assert(intercept[AnalysisException] { - sql(s"INSERT OVERWRITE $t1(data, id) VALUES('a', 4)") - }.getMessage.contains("Cannot find data for output column 'data2'")) - // Duplicate columns + sql(s"INSERT OVERWRITE $t1 VALUES('a', 4)") + }.getMessage.contains("not enough data columns")) + // Duplicate columns checkError( exception = intercept[AnalysisException] { sql(s"INSERT OVERWRITE $t1(data, data) VALUES(5)") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala index cc1d4ab3fcdf5..c24b77f014c95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala @@ -864,7 +864,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { test("Allow user to insert specified columns into insertable view") { withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { - sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") + sql("INSERT OVERWRITE TABLE jsonTable SELECT a, DEFAULT FROM jt") checkAnswer( sql("SELECT a, b FROM jsonTable"), (1 to 10).map(i => Row(i, null)) @@ -884,7 +884,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { } val message = intercept[AnalysisException] { - sql("INSERT OVERWRITE TABLE jsonTable(a) SELECT a FROM jt") + sql("INSERT OVERWRITE TABLE jsonTable SELECT a FROM jt") }.getMessage assert(message.contains("target table has 2 column(s) but the inserted data has 1 column(s)")) } @@ -896,7 +896,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { withTable("t") { sql("create table t(i boolean, s bigint) using parquet") - sql("insert into t values(true)") + sql("insert into t(i) values(true)") checkAnswer(spark.table("t"), Row(true, null)) } } @@ -915,13 +915,13 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { // The default value parses correctly and the provided value type is different but coercible. withTable("t") { sql("create table t(i boolean, s bigint default 42) using parquet") - sql("insert into t values(false)") + sql("insert into t(i) values(false)") checkAnswer(spark.table("t"), Row(false, 42L)) } // There are two trailing default values referenced implicitly by the INSERT INTO statement. withTable("t") { sql("create table t(i int, s bigint default 42, x bigint default 43) using parquet") - sql("insert into t values(1)") + sql("insert into t(i) values(1)") checkAnswer(sql("select s + x from t where i = 1"), Seq(85L).map(i => Row(i))) } // The table has a partitioning column and a default value is injected. @@ -933,7 +933,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { // The table has a partitioning column and a default value is added per an explicit reference. withTable("t") { sql("create table t(i boolean, s bigint default 42) using parquet partitioned by (i)") - sql("insert into t partition(i='true') values(default)") + sql("insert into t partition(i='true') (s) values(default)") checkAnswer(spark.table("t"), Row(42L, true)) } // The default value parses correctly as a constant but non-literal expression. @@ -946,7 +946,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { // to the INSERT INTO statement. withTable("t") { sql("create table t(i boolean default false, s bigint default 42) using parquet") - sql("insert into t values(false, default), (default, 42)") + sql("insert into t(i, s) values(false, default), (default, 42)") checkAnswer(spark.table("t"), Seq(Row(false, 42L), Row(false, 42L))) } // There is an explicit default value provided in the INSERT INTO statement in the VALUES, @@ -1003,31 +1003,31 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t1", "t2") { sql("create table t1(j int, s bigint default 42, x bigint default 43) using parquet") if (useDataFrames) { - Seq((1)).toDF.write.insertInto("t1") - Seq((2)).toDF.write.insertInto("t1") - Seq((3)).toDF.write.insertInto("t1") - Seq((4, 44)).toDF.write.insertInto("t1") - Seq((5, 44, 45)).toDF.write.insertInto("t1") + Seq((1, 42, 43)).toDF.write.insertInto("t1") + Seq((2, 42, 43)).toDF.write.insertInto("t1") + Seq((3, 42, 43)).toDF.write.insertInto("t1") + Seq((4, 44, 43)).toDF.write.insertInto("t1") + Seq((5, 44, 43)).toDF.write.insertInto("t1") } else { - sql("insert into t1 values(1)") - sql("insert into t1 values(2, default)") - sql("insert into t1 values(3, default, default)") - sql("insert into t1 values(4, 44)") - sql("insert into t1 values(5, 44, 45)") + sql("insert into t1(j) values(1)") + sql("insert into t1(j, s) values(2, default)") + sql("insert into t1(j, s, x) values(3, default, default)") + sql("insert into t1(j, s) values(4, 44)") + sql("insert into t1(j, s, x) values(5, 44, 45)") } sql("create table t2(j int, s bigint default 42, x bigint default 43) using parquet") if (useDataFrames) { - spark.table("t1").where("j = 1").select("j").write.insertInto("t2") - spark.table("t1").where("j = 2").select("j").write.insertInto("t2") - spark.table("t1").where("j = 3").select("j").write.insertInto("t2") - spark.table("t1").where("j = 4").select("j", "s").write.insertInto("t2") - spark.table("t1").where("j = 5").select("j", "s").write.insertInto("t2") + spark.table("t1").where("j = 1").write.insertInto("t2") + spark.table("t1").where("j = 2").write.insertInto("t2") + spark.table("t1").where("j = 3").write.insertInto("t2") + spark.table("t1").where("j = 4").write.insertInto("t2") + spark.table("t1").where("j = 5").write.insertInto("t2") } else { - sql("insert into t2 select j from t1 where j = 1") - sql("insert into t2 select j, default from t1 where j = 2") - sql("insert into t2 select j, default, default from t1 where j = 3") - sql("insert into t2 select j, s from t1 where j = 4") - sql("insert into t2 select j, s, default from t1 where j = 5") + sql("insert into t2(j) select j from t1 where j = 1") + sql("insert into t2(j, s) select j, default from t1 where j = 2") + sql("insert into t2(j, s, x) select j, default, default from t1 where j = 3") + sql("insert into t2(j, s) select j, s from t1 where j = 4") + sql("insert into t2(j, s, x) select j, s, default from t1 where j = 5") } checkAnswer( spark.table("t2"), @@ -1128,7 +1128,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("create table t(i int, s bigint default 42, x bigint) using parquet") assert(intercept[AnalysisException] { sql("insert into t values(1)") - }.getMessage.contains("expected 3 columns but found")) + }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)")) } // The table has a partitioning column with a default value; this is not allowed. withTable("t") { @@ -1185,15 +1185,6 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { Row(4, 43, false), Row(4, 42, false))) } - // When the CASE_SENSITIVE configuration is disabled, then using different cases for the - // required and provided column names is successful. - withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") { - withTable("t") { - sql("create table t(i boolean, s bigint default 42, q int default 43) using parquet") - sql("insert into t (I, Q) select true from (select 1)") - checkAnswer(spark.table("t"), Row(true, 42L, 43)) - } - } // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is enabled, and no // explicit DEFAULT value is available when the INSERT INTO statement provides fewer // values than expected, NULL values are appended in their place. @@ -1230,29 +1221,12 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { val addOneColButExpectedTwo = "target table has 2 column(s) but the inserted data has 1 col" val addTwoColButExpectedThree = "target table has 3 column(s) but the inserted data has 2 col" // The missing columns in these INSERT INTO commands do not have explicit default values. - withTable("t") { - sql("create table t(i boolean, s bigint) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t (i) values (true)") - }.getMessage.contains(addOneColButExpectedTwo)) - } - withTable("t") { - sql("create table t(i boolean default true, s bigint) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t (i) values (default)") - }.getMessage.contains(addOneColButExpectedTwo)) - } - withTable("t") { - sql("create table t(i boolean, s bigint default 42) using parquet") - assert(intercept[AnalysisException] { - sql("insert into t (s) values (default)") - }.getMessage.contains(addOneColButExpectedTwo)) - } withTable("t") { sql("create table t(i boolean, s bigint, q int default 43) using parquet") assert(intercept[AnalysisException] { sql("insert into t (i, q) select true from (select 1)") - }.getMessage.contains(addTwoColButExpectedThree)) + }.getMessage.contains("Cannot write to table due to mismatched user specified column " + + "size(3) and data column size(2)")) } // When the USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES configuration is disabled, and no // explicit DEFAULT value is available when the INSERT INTO statement provides fewer @@ -1327,14 +1301,14 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql(createTableIntCol) sql("alter table t add column s bigint default 42") sql("alter table t add column x bigint default 43") - sql("insert into t values(1)") + sql("insert into t(i) values(1)") checkAnswer(spark.table("t"), Row(1, 42, 43)) } // There are two trailing default values referenced implicitly by the INSERT INTO statement. withTable("t") { sql(createTableIntCol) sql("alter table t add columns s bigint default 42, x bigint default 43") - sql("insert into t values(1)") + sql("insert into t(i) values(1)") checkAnswer(spark.table("t"), Row(1, 42, 43)) } // The table has a partitioning column and a default value is injected. @@ -1348,8 +1322,8 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withTable("t") { sql(createTableBooleanCol) sql("alter table t add column s bigint default 41 + 1") - sql("insert into t values(false, default)") - checkAnswer(spark.table("t"), Row(false, 42)) + sql("insert into t(i) values(default)") + checkAnswer(spark.table("t"), Row(null, 42)) } // Explicit defaults may appear in different positions within the inline table provided as input // to the INSERT INTO statement. @@ -1399,18 +1373,18 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("create table t1(j int) using parquet") sql("alter table t1 add column s bigint default 42") sql("alter table t1 add column x bigint default 43") - sql("insert into t1 values(1)") - sql("insert into t1 values(2, default)") - sql("insert into t1 values(3, default, default)") - sql("insert into t1 values(4, 44)") - sql("insert into t1 values(5, 44, 45)") + sql("insert into t1(j) values(1)") + sql("insert into t1(j, s) values(2, default)") + sql("insert into t1(j, s, x) values(3, default, default)") + sql("insert into t1(j, s) values(4, 44)") + sql("insert into t1(j, s, x) values(5, 44, 45)") sql("create table t2(j int) using parquet") sql("alter table t2 add columns s bigint default 42, x bigint default 43") - sql("insert into t2 select j from t1 where j = 1") - sql("insert into t2 select j, default from t1 where j = 2") - sql("insert into t2 select j, default, default from t1 where j = 3") - sql("insert into t2 select j, s from t1 where j = 4") - sql("insert into t2 select j, s, default from t1 where j = 5") + sql("insert into t2(j) select j from t1 where j = 1") + sql("insert into t2(j, s) select j, default from t1 where j = 2") + sql("insert into t2(j, s, x) select j, default, default from t1 where j = 3") + sql("insert into t2(j, s) select j, s from t1 where j = 4") + sql("insert into t2(j, s, x) select j, s, default from t1 where j = 5") checkAnswer( spark.table("t2"), Row(1, 42L, 43L) :: @@ -1472,7 +1446,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { sql("alter table t add column x bigint") assert(intercept[AnalysisException] { sql("insert into t values(1)") - }.getMessage.contains("expected 3 columns but found")) + }.getMessage.contains("target table has 3 column(s) but the inserted data has 1 column(s)")) } } @@ -1555,11 +1529,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { useDataFrames: Boolean = false) def runTest(dataSource: String, config: Config): Unit = { def insertIntoT(): Unit = { - if (config.useDataFrames) { - Seq(("xyz", 42)).toDF.write.insertInto("t") - } else { - sql("insert into t values('xyz', 42)") - } + sql("insert into t(a, i) values('xyz', 42)") } def withTableT(f: => Unit): Unit = { sql(s"create table t(a string, i int) using $dataSource") @@ -1897,11 +1867,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { array( map(false, 'def', true, 'jkl')))) using ${config.dataSource}""") - if (config.useDataFrames) { - Seq((1)).toDF.write.insertInto("t") - } else { - sql("insert into t select 1, default") - } + sql("insert into t select 1, default") sql("alter table t alter column s drop default") if (config.useDataFrames) { Seq((2, null)).toDF.write.insertInto("t") @@ -1916,11 +1882,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { struct(3, 4)), array( map(false, 'mno', true, 'pqr')))""") - if (config.useDataFrames) { - Seq((3)).toDF.write.insertInto("t") - } else { - sql("insert into t select 3, default") - } + sql("insert into t select 3, default") sql( """ alter table t @@ -1928,11 +1890,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { map> default array( map(true, 'xyz'))""") - if (config.useDataFrames) { - Seq((4)).toDF.write.insertInto("t") - } else { - sql("insert into t select 4, default") - } + sql("insert into t(i, s) select 4, default") checkAnswer(spark.table("t"), Seq( Row(1, @@ -2275,7 +2233,7 @@ class InsertSuite extends DataSourceTest with SharedSparkSession { withSQLConf(SQLConf.USE_NULLS_FOR_MISSING_DEFAULT_COLUMN_VALUES.key -> "true") { withTable("t1") { sql("CREATE TABLE t1(c1 int, c2 string, c3 int) using parquet") - sql("INSERT INTO TABLE t1 select * from jt where a=1") + sql("INSERT INTO TABLE t1(c1, c2) select * from jt where a=1") checkAnswer(spark.table("t1"), Row(1, "str1", null)) sql("INSERT INTO TABLE t1 select *, 2 from jt where a=2") checkAnswer(spark.table("t1"), Seq(Row(1, "str1", null), Row(2, "str2", 2)))