Skip to content

Commit

Permalink
[SHARING][VARIANT] Add variant table feature to sharing client (#3549)
Browse files Browse the repository at this point in the history
## Description
adds variant table feature as a supported table feature. Because
delta-spark already can read variants, support comes mostly for free

cross tests with Spark 3.5 and 4.0 in the "sharing" package because we
require spark 4.0 to create variants.

## How was this patch tested?
added UTs for the streaming and non-streaming case

tested

```
build/sbt -DsparkVersion=master sharing/'testOnly io.delta.sharing.spark.DeltaSharingDataSourceDeltaSuite -- -z "basic variant test"'
```

with both `-DsparkVersion=master` and `-DsparkVersoin=latest`
  • Loading branch information
richardc-db authored Aug 21, 2024
1 parent 56d057c commit c2d3437
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 13 deletions.
12 changes: 7 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ spark / sparkVersion := getSparkVersion()
connectCommon / sparkVersion := getSparkVersion()
connectClient / sparkVersion := getSparkVersion()
connectServer / sparkVersion := getSparkVersion()
sharing / sparkVersion := getSparkVersion()

// Dependent library versions
val defaultSparkVersion = LATEST_RELEASED_SPARK_VERSION
Expand Down Expand Up @@ -539,9 +540,10 @@ lazy val sharing = (project in file("sharing"))
commonSettings,
scalaStyleSettings,
releaseSettings,
crossSparkSettings(),
Test / javaOptions ++= Seq("-ea"),
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "provided",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "provided",

"io.delta" %% "delta-sharing-client" % "1.2.0",

Expand All @@ -550,10 +552,10 @@ lazy val sharing = (project in file("sharing"))
"org.scalatestplus" %% "scalacheck-1-15" % "3.2.9.0" % "test",
"junit" % "junit" % "4.13.2" % "test",
"com.novocode" % "junit-interface" % "0.11" % "test",
"org.apache.spark" %% "spark-catalyst" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % defaultSparkVersion % "test" classifier "tests",
"org.apache.spark" %% "spark-catalyst" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-core" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-sql" % sparkVersion.value % "test" classifier "tests",
"org.apache.spark" %% "spark-hive" % sparkVersion.value % "test" classifier "tests",
)
).configureUnidoc()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import org.apache.spark.sql.delta.{
SnapshotDescriptor,
TimestampNTZTableFeature
}
import org.apache.spark.sql.delta.VariantTypeTableFeature
import org.apache.spark.sql.delta.actions.{Metadata, Protocol}
import com.google.common.hash.Hashing
import io.delta.sharing.client.{DeltaSharingClient, DeltaSharingRestClient}
Expand All @@ -49,13 +50,15 @@ object DeltaSharingUtils extends Logging {
Seq(
DeletionVectorsTableFeature.name,
ColumnMappingTableFeature.name,
TimestampNTZTableFeature.name
TimestampNTZTableFeature.name,
VariantTypeTableFeature.name
)
val SUPPORTED_READER_FEATURES: Seq[String] =
Seq(
DeletionVectorsTableFeature.name,
ColumnMappingTableFeature.name,
TimestampNTZTableFeature.name
TimestampNTZTableFeature.name,
VariantTypeTableFeature.name
)

// The prefix will be used for block ids of all blocks that store the delta log in BlockManager.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@ package io.delta.sharing.spark

import java.time.LocalDateTime

import org.apache.spark.sql.delta.DeltaIllegalStateException
import org.apache.spark.sql.delta.DeltaLog
import org.apache.spark.sql.delta.{DeltaExcludedBySparkVersionTestMixinShims, DeltaIllegalStateException, DeltaLog}
import org.apache.spark.sql.delta.DeltaOptions.{
IGNORE_CHANGES_OPTION,
IGNORE_DELETES_OPTION,
Expand Down Expand Up @@ -50,7 +49,8 @@ class DeltaFormatSharingSourceSuite
extends StreamTest
with DeltaSQLCommandTest
with DeltaSharingTestSparkUtils
with DeltaSharingDataSourceDeltaTestUtils {
with DeltaSharingDataSourceDeltaTestUtils
with DeltaExcludedBySparkVersionTestMixinShims {

import testImplicits._

Expand Down Expand Up @@ -1196,4 +1196,53 @@ class DeltaFormatSharingSourceSuite
}
}
}

testSparkMasterOnly("streaming variant query works") {
withTempDirs { (inputDir, outputDir, checkpointDir) =>
val deltaTableName = "variant_table"
withTable(deltaTableName) {
sql(s"create table $deltaTableName (v VARIANT) using delta")

val sharedTableName = "shared_variant_table"
prepareMockedClientMetadata(deltaTableName, sharedTableName)

val profileFile = prepareProfileFile(inputDir)
withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val tablePath = profileFile.getCanonicalPath + s"#share1.default.$sharedTableName"

sql(s"""insert into table $deltaTableName
select parse_json(format_string('{"key": %s}', id))
from range(0, 10)
""")

prepareMockedClientAndFileSystemResult(
deltaTableName,
sharedTableName,
versionAsOf = Some(1L)
)
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)

val q = spark.readStream
.format("deltaSharing")
.option("responseFormat", "delta")
.load(tablePath)
.writeStream
.format("delta")
.option("checkpointLocation", checkpointDir.toString)
.start(outputDir.toString)

try {
q.processAllAvailable()
} finally {
q.stop()
}

checkAnswer(
spark.read.format("delta").load(outputDir.getCanonicalPath),
spark.sql(s"select * from $deltaTableName")
)
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.delta.sharing.spark

import org.apache.spark.sql.delta.DeltaExcludedBySparkVersionTestMixinShims
import org.apache.spark.sql.delta.sources.DeltaSQLConf
import org.apache.spark.sql.delta.test.DeltaSQLCommandTest

Expand All @@ -37,7 +38,8 @@ trait DeltaSharingDataSourceDeltaSuiteBase
extends QueryTest
with DeltaSQLCommandTest
with DeltaSharingTestSparkUtils
with DeltaSharingDataSourceDeltaTestUtils {
with DeltaSharingDataSourceDeltaTestUtils
with DeltaExcludedBySparkVersionTestMixinShims {

override def beforeEach(): Unit = {
spark.sessionState.conf.setConfString(
Expand Down Expand Up @@ -1466,6 +1468,46 @@ trait DeltaSharingDataSourceDeltaSuiteBase
}
}
}

testSparkMasterOnly("basic variant test") {
withTempDir { tempDir =>
val deltaTableName = "variant_table"
withTable(deltaTableName) {
spark.range(0, 10)
.selectExpr("parse_json(cast(id as string)) v")
.write
.format("delta")
.mode("overwrite")
.saveAsTable(deltaTableName)

val sharedTableName = "shared_table_variant"
prepareMockedClientAndFileSystemResult(deltaTableName, sharedTableName)
prepareMockedClientGetTableVersion(deltaTableName, sharedTableName)

val expectedSchemaString = "StructType(StructField(v,VariantType,true))"
val expected = spark.read.format("delta").table(deltaTableName)

def test(tablePath: String): Unit = {
assert(
expectedSchemaString == spark.read
.format("deltaSharing")
.option("responseFormat", "delta")
.load(tablePath)
.schema
.toString
)
val df =
spark.read.format("deltaSharing").option("responseFormat", "delta").load(tablePath)
checkAnswer(df, expected)
}

withSQLConf(getDeltaSharingClassesSQLConf.toSeq: _*) {
val profileFile = prepareProfileFile(tempDir)
test(s"${profileFile.getCanonicalPath}#share1.default.$sharedTableName")
}
}
}
}
}

class DeltaSharingDataSourceDeltaSuite extends DeltaSharingDataSourceDeltaSuiteBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ private[spark] class TestClientForDeltaFormatSharing(
(
readerFeatures.contains("deletionVectors") &&
readerFeatures.contains("columnMapping") &&
readerFeatures.contains("timestampNtz")
readerFeatures.contains("timestampNtz") &&
readerFeatures.contains("variantType-preview")
),
"deletionVectors, columnMapping, timestampNtz should be supported in all types of queries."
"deletionVectors, columnMapping, timestampNtz, variantType-preview should be supported in " +
"all types of queries."
)

import TestClientForDeltaFormatSharing._
Expand Down

0 comments on commit c2d3437

Please # to comment.