From c2d343750c384985b1ee6b623758a01772e4d624 Mon Sep 17 00:00:00 2001 From: richardc-db <87336575+richardc-db@users.noreply.github.com> Date: Wed, 21 Aug 2024 13:50:19 -0700 Subject: [PATCH] [SHARING][VARIANT] Add variant table feature to sharing client (#3549) ## Description adds variant table feature as a supported table feature. Because delta-spark already can read variants, support comes mostly for free cross tests with Spark 3.5 and 4.0 in the "sharing" package because we require spark 4.0 to create variants. ## How was this patch tested? added UTs for the streaming and non-streaming case tested ``` build/sbt -DsparkVersion=master sharing/'testOnly io.delta.sharing.spark.DeltaSharingDataSourceDeltaSuite -- -z "basic variant test"' ``` with both `-DsparkVersion=master` and `-DsparkVersoin=latest` --- build.sbt | 12 ++-- .../sharing/spark/DeltaSharingUtils.scala | 7 ++- .../spark/DeltaFormatSharingSourceSuite.scala | 55 ++++++++++++++++++- .../DeltaSharingDataSourceDeltaSuite.scala | 44 ++++++++++++++- .../TestClientForDeltaFormatSharing.scala | 6 +- 5 files changed, 111 insertions(+), 13 deletions(-) diff --git a/build.sbt b/build.sbt index a54837543d9..a9d3548de58 100644 --- a/build.sbt +++ b/build.sbt @@ -59,6 +59,7 @@ spark / sparkVersion := getSparkVersion() connectCommon / sparkVersion := getSparkVersion() connectClient / sparkVersion := getSparkVersion() connectServer / sparkVersion := getSparkVersion() +sharing / sparkVersion := getSparkVersion() // Dependent library versions val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION @@ -539,9 +540,10 @@ lazy val sharing = (project in file("sharing")) commonSettings, scalaStyleSettings, releaseSettings, + crossSparkSettings(), Test / javaOptions ++= Seq("-ea"), libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "provided", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided", "io.delta" %% "delta-sharing-client" % "1.2.0", @@ -550,10 +552,10 @@ lazy val sharing = (project in file("sharing")) "org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test", "junit" % "junit" % "4.13.2" % "test", "com.novocode" % "junit-interface" % "0.11" % "test", - "org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests", - "org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests", + "org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests", + "org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests", ) ).configureUnidoc() diff --git a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala index 4e9e3360d72..e77290855b6 100644 --- a/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala +++ b/sharing/src/main/scala/io/delta/sharing/spark/DeltaSharingUtils.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.{ SnapshotDescriptor, TimestampNTZTableFeature } +import org.apache.spark.sql.delta.VariantTypeTableFeature import org.apache.spark.sql.delta.actions.{Metadata, Protocol} import com.google.common.hash.Hashing import io.delta.sharing.client.{DeltaSharingClient, DeltaSharingRestClient} @@ -49,13 +50,15 @@ object DeltaSharingUtils extends Logging { Seq( DeletionVectorsTableFeature.name, ColumnMappingTableFeature.name, - TimestampNTZTableFeature.name + TimestampNTZTableFeature.name, + VariantTypeTableFeature.name ) val SUPPORTED_READER_FEATURES: Seq[String] = Seq( DeletionVectorsTableFeature.name, ColumnMappingTableFeature.name, - TimestampNTZTableFeature.name + TimestampNTZTableFeature.name, + VariantTypeTableFeature.name ) // The prefix will be used for block ids of all blocks that store the delta log in BlockManager. diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala index 1c8fa12f872..732dc140088 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaFormatSharingSourceSuite.scala @@ -18,8 +18,7 @@ package io.delta.sharing.spark import java.time.LocalDateTime -import org.apache.spark.sql.delta.DeltaIllegalStateException -import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.{DeltaExcludedBySparkVersionTestMixinShims, DeltaIllegalStateException, DeltaLog} import org.apache.spark.sql.delta.DeltaOptions.{ IGNORE_CHANGES_OPTION, IGNORE_DELETES_OPTION, @@ -50,7 +49,8 @@ class DeltaFormatSharingSourceSuite extends StreamTest with DeltaSQLCommandTest with DeltaSharingTestSparkUtils - with DeltaSharingDataSourceDeltaTestUtils { + with DeltaSharingDataSourceDeltaTestUtils + with DeltaExcludedBySparkVersionTestMixinShims { import testImplicits._ @@ -1196,4 +1196,53 @@ class DeltaFormatSharingSourceSuite } } } + + testSparkMasterOnly("streaming variant query works") { + withTempDirs { (inputDir, outputDir, checkpointDir) => + val deltaTableName = "variant_table" + withTable(deltaTableName) { + sql(s"create table $deltaTableName (v VARIANT) using delta") + + val sharedTableName = "shared_variant_table" + prepareMockedClientMetadata(deltaTableName, sharedTableName) + + val profileFile = prepareProfileFile(inputDir) + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val tablePath = profileFile.getCanonicalPath + s"#share1.default.$sharedTableName" + + sql(s"""insert into table $deltaTableName + select parse_json(format_string('{"key": %s}', id)) + from range(0, 10) + """) + + prepareMockedClientAndFileSystemResult( + deltaTableName, + sharedTableName, + versionAsOf = Some(1L) + ) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + val q = spark.readStream + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .writeStream + .format("delta") + .option("checkpointLocation", checkpointDir.toString) + .start(outputDir.toString) + + try { + q.processAllAvailable() + } finally { + q.stop() + } + + checkAnswer( + spark.read.format("delta").load(outputDir.getCanonicalPath), + spark.sql(s"select * from $deltaTableName") + ) + } + } + } + } } diff --git a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala index 2dd93c22aec..e55d1caa8b0 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/DeltaSharingDataSourceDeltaSuite.scala @@ -16,6 +16,7 @@ package io.delta.sharing.spark +import org.apache.spark.sql.delta.DeltaExcludedBySparkVersionTestMixinShims import org.apache.spark.sql.delta.sources.DeltaSQLConf import org.apache.spark.sql.delta.test.DeltaSQLCommandTest @@ -37,7 +38,8 @@ trait DeltaSharingDataSourceDeltaSuiteBase extends QueryTest with DeltaSQLCommandTest with DeltaSharingTestSparkUtils - with DeltaSharingDataSourceDeltaTestUtils { + with DeltaSharingDataSourceDeltaTestUtils + with DeltaExcludedBySparkVersionTestMixinShims { override def beforeEach(): Unit = { spark.sessionState.conf.setConfString( @@ -1466,6 +1468,46 @@ trait DeltaSharingDataSourceDeltaSuiteBase } } } + + testSparkMasterOnly("basic variant test") { + withTempDir { tempDir => + val deltaTableName = "variant_table" + withTable(deltaTableName) { + spark.range(0, 10) + .selectExpr("parse_json(cast(id as string)) v") + .write + .format("delta") + .mode("overwrite") + .saveAsTable(deltaTableName) + + val sharedTableName = "shared_table_variant" + prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName) + prepareMockedClientGetTableVersion(deltaTableName, sharedTableName) + + val expectedSchemaString = "StructType(StructField(v,VariantType,true))" + val expected = spark.read.format("delta").table(deltaTableName) + + def test(tablePath: String): Unit = { + assert( + expectedSchemaString == spark.read + .format("deltaSharing") + .option("responseFormat", "delta") + .load(tablePath) + .schema + .toString + ) + val df = + spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath) + checkAnswer(df, expected) + } + + withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) { + val profileFile = prepareProfileFile(tempDir) + test(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName") + } + } + } + } } class DeltaSharingDataSourceDeltaSuite extends DeltaSharingDataSourceDeltaSuiteBase {} diff --git a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala index af89751a226..a3d3d52a85d 100644 --- a/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala +++ b/sharing/src/test/scala/io/delta/sharing/spark/TestClientForDeltaFormatSharing.scala @@ -64,9 +64,11 @@ private[spark] class TestClientForDeltaFormatSharing( ( readerFeatures.contains("deletionVectors") && readerFeatures.contains("columnMapping") && - readerFeatures.contains("timestampNtz") + readerFeatures.contains("timestampNtz") && + readerFeatures.contains("variantType-preview") ), - "deletionVectors, columnMapping, timestampNtz should be supported in all types of queries." + "deletionVectors, columnMapping, timestampNtz, variantType-preview should be supported in " + + "all types of queries." ) import TestClientForDeltaFormatSharing._