diff --git a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala index 039ee4bfa6b..d1ca23ba494 100644 --- a/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala +++ b/hudi/src/main/scala/org/apache/spark/sql/delta/hudi/HudiSchemaUtils.scala @@ -45,12 +45,16 @@ object HudiSchemaUtils extends DeltaLogging { finalizeSchema( Schema.createRecord(currentPath, null, null, false, avroFields), isNullable) - // TODO: Add List and Map support: https://github.com/delta-io/delta/issues/2738 + case ArrayType(elementType, containsNull) => - throw new UnsupportedOperationException("UniForm Hudi doesn't support Array columns") + finalizeSchema( + Schema.createArray(transform(elementType, containsNull, currentPath)), + isNullable) case MapType(keyType, valueType, valueContainsNull) => - throw new UnsupportedOperationException("UniForm Hudi doesn't support Map columns") + finalizeSchema( + Schema.createMap(transform(valueType, valueContainsNull, currentPath)), + isNullable) case atomicType: AtomicType => convertAtomic(atomicType, isNullable) diff --git a/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala index fccbeb62172..b96915b7021 100644 --- a/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala +++ b/hudi/src/test/scala/org/apache/spark/sql/delta/hudi/ConvertToHudiSuite.scala @@ -150,18 +150,55 @@ class ConvertToHudiSuite extends QueryTest with Eventually { } } - for (invalidFieldDef <- Seq("col3 ARRAY", "col3 MAP")) { - test(s"Table Throws Exception for Unsupported Type ($invalidFieldDef)") { - intercept[DeltaUnsupportedOperationException] { - _sparkSession.sql( - s"""CREATE TABLE `$testTableName` (col1 INT, col2 STRING, $invalidFieldDef) USING DELTA - |LOCATION '$testTablePath' - |TBLPROPERTIES ( - | 'delta.universalFormat.enabledFormats' = 'hudi', - | 'delta.enableDeletionVectors' = false - |)""".stripMargin) - } - } + test(s"Conversion behavior for lists") { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 ARRAY) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + _sparkSession.sql(s"INSERT INTO `$testTableName` VALUES (array(1, 2, 3))") + verifyFilesAndSchemaMatch() + } + + test(s"Conversion behavior for lists of structs") { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` + |(col1 ARRAY>) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + _sparkSession.sql(s"INSERT INTO `$testTableName` " + + s"VALUES (array(named_struct('field1', 1, 'field2', 'hello'), " + + s"named_struct('field1', 2, 'field2', 'world')))") + verifyFilesAndSchemaMatch() + } + + test(s"Conversion behavior for lists of lists") { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` + |(col1 ARRAY>) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + _sparkSession.sql(s"INSERT INTO `$testTableName` " + + s"VALUES (array(array(1, 2, 3), array(4, 5, 6)))") + verifyFilesAndSchemaMatch() + } + + test(s"Conversion behavior for maps") { + _sparkSession.sql( + s"""CREATE TABLE `$testTableName` (col1 MAP) USING DELTA + |LOCATION '$testTablePath' + |TBLPROPERTIES ( + | 'delta.universalFormat.enabledFormats' = 'hudi' + |)""".stripMargin) + _sparkSession.sql( + s"INSERT INTO `$testTableName` VALUES (map('a', 1, 'b', 2, 'c', 3))" + ) + verifyFilesAndSchemaMatch() } test("validate Hudi timeline archival and cleaning") { diff --git a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala index f126c4ae361..acad136e499 100644 --- a/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala +++ b/spark/src/main/scala/org/apache/spark/sql/delta/UniversalFormat.scala @@ -25,7 +25,7 @@ import org.apache.spark.sql.delta.schema.SchemaUtils import org.apache.spark.internal.MDC import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.catalog.CatalogTable -import org.apache.spark.sql.types.{ArrayType, MapType, NullType} +import org.apache.spark.sql.types.NullType /** * Utils to validate the Universal Format (UniForm) Delta feature (NOT a table feature). @@ -33,8 +33,9 @@ import org.apache.spark.sql.types.{ArrayType, MapType, NullType} * The UniForm Delta feature governs and implements the actual conversion of Delta metadata into * other formats. * - * Currently, UniForm only supports Iceberg. When `delta.universalFormat.enabledFormats` contains - * "iceberg", we say that Universal Format (Iceberg) is enabled. + * UniForm supports both Iceberg and Hudi. When `delta.universalFormat.enabledFormats` contains + * "iceberg", we say that Universal Format (Iceberg) is enabled. When it contains "hudi", we say + * that Universal Format (Hudi) is enabled. * * [[enforceInvariantsAndDependencies]] ensures that all of UniForm's requirements for the * specified format are met (e.g. for 'iceberg' that IcebergCompatV1 or V2 is enabled). @@ -101,9 +102,8 @@ object UniversalFormat extends DeltaLogging { if (DeltaConfigs.ENABLE_DELETION_VECTORS_CREATION.fromMetaData(newestMetadata)) { throw DeltaErrors.uniFormHudiDeleteVectorCompat() } - // TODO: remove once map/list support is added https://github.com/delta-io/delta/issues/2738 SchemaUtils.findAnyTypeRecursively(newestMetadata.schema) { f => - f.isInstanceOf[MapType] || f.isInstanceOf[ArrayType] || f.isInstanceOf[NullType] + f.isInstanceOf[NullType] } match { case Some(unsupportedType) => throw DeltaErrors.uniFormHudiSchemaCompat(unsupportedType)