From 7c155806ed6e1a2488d4ec2fa8da620318fea8dd Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 30 Sep 2021 20:22:06 +0300 Subject: [PATCH] [SPARK-36830][SQL] Support reading and writing ANSI intervals from/to JSON datasources ### What changes were proposed in this pull request? This PR aims to support reading and writing ANSI intervals from/to JSON datasources. Aith this change, a interval data is written as a literal form like `{"col":"INTERVAL '1-2' YEAR TO MONTH"}`. For the reading part, we need to specify the schema explicitly like: ``` val readDF = spark.read.schema("col INTERVAL YEAR TO MONTH").json(...) ``` ### Why are the changes needed? For better usability. There should be no reason to prohibit from reading/writing ANSI intervals from/to JSON datasources. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. It covers both V1 and V2 sources. Closes #34155 from sarutak/ansi-interval-json-source. Authored-by: Kousuke Saruta Signed-off-by: Max Gekk --- .../execution/datasources/DataSource.scala | 2 +- .../datasources/json/JsonFileFormat.scala | 2 -- .../datasources/v2/json/JsonTable.scala | 2 -- .../org/apache/spark/sql/SQLQuerySuite.scala | 19 ----------------- .../CommonFileDataSourceSuite.scala | 2 +- .../datasources/json/JsonSuite.scala | 21 ++++++++++++++++++- 6 files changed, 22 insertions(+), 26 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index be9a9121ebab1..32913c6299f5d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -581,7 +581,7 @@ case class DataSource( // TODO: Remove the Set below once all the built-in datasources support ANSI interval types private val writeAllowedSources: Set[Class[_]] = - Set(classOf[ParquetFileFormat], classOf[CSVFileFormat]) + Set(classOf[ParquetFileFormat], classOf[CSVFileFormat], classOf[JsonFileFormat]) private def disallowWritingIntervals( dataTypes: Seq[DataType], diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala index 8357a411ad69e..9c6c77a8b9622 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/json/JsonFileFormat.scala @@ -134,8 +134,6 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister { override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat] override def supportDataType(dataType: DataType): Boolean = dataType match { - case _: AnsiIntervalType => false - case _: AtomicType => true case st: StructType => st.forall { f => supportDataType(f.dataType) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala index 244dd0f6a73a8..52168007aaa18 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/json/JsonTable.scala @@ -55,8 +55,6 @@ case class JsonTable( } override def supportsDataType(dataType: DataType): Boolean = dataType match { - case _: AnsiIntervalType => false - case _: AtomicType => true case st: StructType => st.forall { f => supportsDataType(f.dataType) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala index 22e3e332ff381..7b2c0bb6d05a2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala @@ -1445,32 +1445,13 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark val ymDF = sql("select interval 3 years -3 month") checkAnswer(ymDF, Row(Period.of(2, 9, 0))) - withTempPath(f => { - val e = intercept[AnalysisException] { - ymDF.write.json(f.getCanonicalPath) - } - e.message.contains("Cannot save interval data type into external storage") - }) val dtDF = sql("select interval 5 days 8 hours 12 minutes 50 seconds") checkAnswer(dtDF, Row(Duration.ofDays(5).plusHours(8).plusMinutes(12).plusSeconds(50))) - withTempPath(f => { - val e = intercept[AnalysisException] { - dtDF.write.json(f.getCanonicalPath) - } - e.message.contains("Cannot save interval data type into external storage") - }) withSQLConf(SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") { val df = sql("select interval 3 years -3 month 7 week 123 microseconds") checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7 * 7, 123))) - withTempPath(f => { - // Currently we don't yet support saving out values of interval data type. - val e = intercept[AnalysisException] { - df.write.json(f.getCanonicalPath) - } - e.message.contains("Cannot save interval data type into external storage") - }) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala index 61a4ccd593bea..28d0967e597b7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/CommonFileDataSourceSuite.scala @@ -36,7 +36,7 @@ trait CommonFileDataSourceSuite extends SQLHelper { self: AnyFunSuite => protected def inputDataset: Dataset[_] = spark.createDataset(Seq("abc"))(Encoders.STRING) test(s"SPARK-36349: disallow saving of ANSI intervals to $dataSourceFormat") { - if (!Set("csv", "parquet").contains(dataSourceFormat.toLowerCase(Locale.ROOT))) { + if (!Set("parquet", "csv", "json").contains(dataSourceFormat.toLowerCase(Locale.ROOT))) { Seq("INTERVAL '1' DAY", "INTERVAL '1' YEAR").foreach { i => withTempPath { dir => val errMsg = intercept[AnalysisException] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index f7f1d0b847cc1..e4d1104e898e4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -21,7 +21,7 @@ import java.io._ import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException} import java.nio.file.Files import java.sql.{Date, Timestamp} -import java.time.{Instant, LocalDate, LocalDateTime, ZoneId} +import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId} import java.util.Locale import com.fasterxml.jackson.core.JsonFactory @@ -2991,6 +2991,25 @@ abstract class JsonSuite } } } + + test("SPARK-36830: Support reading and writing ANSI intervals") { + Seq( + YearMonthIntervalType() -> ((i: Int) => Period.of(i, i, 0)), + DayTimeIntervalType() -> ((i: Int) => Duration.ofDays(i).plusSeconds(i)) + ).foreach { case (it, f) => + val data = (1 to 10).map(i => Row(i, f(i))) + val schema = StructType(Array(StructField("d", IntegerType, false), + StructField("i", it, false))) + withTempPath { file => + val df = spark.createDataFrame(sparkContext.parallelize(data), schema) + df.write.json(file.getCanonicalPath) + val df2 = spark.read.json(file.getCanonicalPath) + checkAnswer(df2, df.select($"d".cast(LongType), $"i".cast(StringType)).collect().toSeq) + val df3 = spark.read.schema(schema).json(file.getCanonicalPath) + checkAnswer(df3, df.collect().toSeq) + } + } + } } class JsonV1Suite extends JsonSuite {