Skip to content

Commit

Permalink
[HUDI-6108] Alter table should not include metadata fields in the dat…
Browse files Browse the repository at this point in the history
…a schema (#8510)
  • Loading branch information
danny0405 authored Apr 21, 2023
1 parent 6082e9c commit 79618f6
Show file tree
Hide file tree
Showing 2 changed files with 23 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.spark.sql.hudi.command

import java.nio.charset.StandardCharsets

import org.apache.avro.Schema
import org.apache.hudi.avro.HoodieAvroUtils
import org.apache.hudi.common.model.{HoodieCommitMetadata, WriteOperationType}
import org.apache.hudi.common.table.timeline.HoodieInstant.State
import org.apache.hudi.common.table.timeline.{HoodieActiveTimeline, HoodieInstant}
Expand Down Expand Up @@ -101,10 +101,11 @@ object AlterHoodieTableAddColumnsCommand {
def commitWithSchema(schema: Schema, hoodieCatalogTable: HoodieCatalogTable,
sparkSession: SparkSession): Unit = {

val writeSchema = HoodieAvroUtils.removeMetadataFields(schema);
val jsc = new JavaSparkContext(sparkSession.sparkContext)
val client = DataSourceUtils.createHoodieClient(
jsc,
schema.toString,
writeSchema.toString,
hoodieCatalogTable.tableLocation,
hoodieCatalogTable.tableName,
HoodieWriterUtils.parametersWithWriteDefaults(hoodieCatalogTable.catalogProperties).asJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.spark.sql.hudi

import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.model.HoodieRecord
import org.apache.hudi.common.table.{HoodieTableMetaClient, TableSchemaResolver}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.junit.jupiter.api.Assertions.assertFalse

import scala.collection.JavaConverters._

class TestAlterTable extends HoodieSparkSqlTestBase {

Expand Down Expand Up @@ -50,6 +54,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
assertResult("primary id") (
catalogTable.schema(catalogTable.schema.fieldIndex("id")).getComment().get
)
validateTableSchema(tablePath)
spark.sql(s"alter table $tableName change column name name string comment 'name column'")
spark.sessionState.catalog.refreshTable(new TableIdentifier(tableName))
catalogTable = spark.sessionState.catalog.getTableMetadata(new TableIdentifier(tableName))
Expand All @@ -59,6 +64,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
assertResult("name column") (
catalogTable.schema(catalogTable.schema.fieldIndex("name")).getComment().get
)
validateTableSchema(tablePath)

// alter table name.
val newTableName = s"${tableName}_1"
Expand All @@ -74,6 +80,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
.setConf(hadoopConf).build()
assertResult(newTableName) (metaClient.getTableConfig.getTableName)
validateTableSchema(tablePath)

// insert some data
spark.sql(s"insert into $newTableName values(1, 'a1', 10, 1000)")
Expand All @@ -87,6 +94,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
checkAnswer(s"select id, name, price, ts, ext0 from $newTableName")(
Seq(1, "a1", 10.0, 1000, null)
)
validateTableSchema(tablePath)

// change column's data type
checkExceptionContain(s"alter table $newTableName change column id id bigint") (
Expand Down Expand Up @@ -168,6 +176,16 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
}
}

def validateTableSchema(tablePath: String): Unit = {
val hadoopConf = spark.sessionState.newHadoopConf()
val metaClient = HoodieTableMetaClient.builder().setBasePath(tablePath)
.setConf(hadoopConf).build()

val schema = new TableSchemaResolver(metaClient).getTableAvroSchema(false)
assertFalse(schema.getFields.asScala.exists(f => HoodieRecord.HOODIE_META_COLUMNS.contains(f.name())),
"Metadata fields should be excluded from the table schema")
}

test("Test Alter Rename Table") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down Expand Up @@ -240,6 +258,7 @@ class TestAlterTable extends HoodieSparkSqlTestBase {
}
}
}

test("Test Alter Table With OCC") {
withTempDir { tmp =>
Seq("cow", "mor").foreach { tableType =>
Expand Down

0 comments on commit 79618f6

Please # to comment.