[SPARK-36830][SQL] Support reading and writing ANSI intervals from/to JSON datasources
### What changes were proposed in this pull request? This PR aims to support reading and writing ANSI intervals from/to JSON datasources. Aith this change, a interval data is written as a literal form like `{"col":"INTERVAL '1-2' YEAR TO MONTH"}`. For the reading part, we need to specify the schema explicitly like: ``` val readDF = spark.read.schema("col INTERVAL YEAR TO MONTH").json(...) ``` ### Why are the changes needed? For better usability. There should be no reason to prohibit from reading/writing ANSI intervals from/to JSON datasources. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. It covers both V1 and V2 sources. Closes #34155 from sarutak/ansi-interval-json-source. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
parent
5a32e41e9c
commit
7c155806ed
|
@ -581,7 +581,7 @@ case class DataSource(
|
|||
|
||||
// TODO: Remove the Set below once all the built-in datasources support ANSI interval types
|
||||
private val writeAllowedSources: Set[Class[_]] =
|
||||
Set(classOf[ParquetFileFormat], classOf[CSVFileFormat])
|
||||
Set(classOf[ParquetFileFormat], classOf[CSVFileFormat], classOf[JsonFileFormat])
|
||||
|
||||
private def disallowWritingIntervals(
|
||||
dataTypes: Seq[DataType],
|
||||
|
|
|
@ -134,8 +134,6 @@ class JsonFileFormat extends TextBasedFileFormat with DataSourceRegister {
|
|||
override def equals(other: Any): Boolean = other.isInstanceOf[JsonFileFormat]
|
||||
|
||||
override def supportDataType(dataType: DataType): Boolean = dataType match {
|
||||
case _: AnsiIntervalType => false
|
||||
|
||||
case _: AtomicType => true
|
||||
|
||||
case st: StructType => st.forall { f => supportDataType(f.dataType) }
|
||||
|
|
|
@ -55,8 +55,6 @@ case class JsonTable(
|
|||
}
|
||||
|
||||
override def supportsDataType(dataType: DataType): Boolean = dataType match {
|
||||
case _: AnsiIntervalType => false
|
||||
|
||||
case _: AtomicType => true
|
||||
|
||||
case st: StructType => st.forall { f => supportsDataType(f.dataType) }
|
||||
|
|
|
@ -1445,32 +1445,13 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession with AdaptiveSpark
|
|||
|
||||
val ymDF = sql("select interval 3 years -3 month")
|
||||
checkAnswer(ymDF, Row(Period.of(2, 9, 0)))
|
||||
withTempPath(f => {
|
||||
val e = intercept[AnalysisException] {
|
||||
ymDF.write.json(f.getCanonicalPath)
|
||||
}
|
||||
e.message.contains("Cannot save interval data type into external storage")
|
||||
})
|
||||
|
||||
val dtDF = sql("select interval 5 days 8 hours 12 minutes 50 seconds")
|
||||
checkAnswer(dtDF, Row(Duration.ofDays(5).plusHours(8).plusMinutes(12).plusSeconds(50)))
|
||||
withTempPath(f => {
|
||||
val e = intercept[AnalysisException] {
|
||||
dtDF.write.json(f.getCanonicalPath)
|
||||
}
|
||||
e.message.contains("Cannot save interval data type into external storage")
|
||||
})
|
||||
|
||||
withSQLConf(SQLConf.LEGACY_INTERVAL_ENABLED.key -> "true") {
|
||||
val df = sql("select interval 3 years -3 month 7 week 123 microseconds")
|
||||
checkAnswer(df, Row(new CalendarInterval(12 * 3 - 3, 7 * 7, 123)))
|
||||
withTempPath(f => {
|
||||
// Currently we don't yet support saving out values of interval data type.
|
||||
val e = intercept[AnalysisException] {
|
||||
df.write.json(f.getCanonicalPath)
|
||||
}
|
||||
e.message.contains("Cannot save interval data type into external storage")
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -36,7 +36,7 @@ trait CommonFileDataSourceSuite extends SQLHelper { self: AnyFunSuite =>
|
|||
protected def inputDataset: Dataset[_] = spark.createDataset(Seq("abc"))(Encoders.STRING)
|
||||
|
||||
test(s"SPARK-36349: disallow saving of ANSI intervals to $dataSourceFormat") {
|
||||
if (!Set("csv", "parquet").contains(dataSourceFormat.toLowerCase(Locale.ROOT))) {
|
||||
if (!Set("parquet", "csv", "json").contains(dataSourceFormat.toLowerCase(Locale.ROOT))) {
|
||||
Seq("INTERVAL '1' DAY", "INTERVAL '1' YEAR").foreach { i =>
|
||||
withTempPath { dir =>
|
||||
val errMsg = intercept[AnalysisException] {
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.io._
|
|||
import java.nio.charset.{Charset, StandardCharsets, UnsupportedCharsetException}
|
||||
import java.nio.file.Files
|
||||
import java.sql.{Date, Timestamp}
|
||||
import java.time.{Instant, LocalDate, LocalDateTime, ZoneId}
|
||||
import java.time.{Duration, Instant, LocalDate, LocalDateTime, Period, ZoneId}
|
||||
import java.util.Locale
|
||||
|
||||
import com.fasterxml.jackson.core.JsonFactory
|
||||
|
@ -2991,6 +2991,25 @@ abstract class JsonSuite
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
test("SPARK-36830: Support reading and writing ANSI intervals") {
|
||||
Seq(
|
||||
YearMonthIntervalType() -> ((i: Int) => Period.of(i, i, 0)),
|
||||
DayTimeIntervalType() -> ((i: Int) => Duration.ofDays(i).plusSeconds(i))
|
||||
).foreach { case (it, f) =>
|
||||
val data = (1 to 10).map(i => Row(i, f(i)))
|
||||
val schema = StructType(Array(StructField("d", IntegerType, false),
|
||||
StructField("i", it, false)))
|
||||
withTempPath { file =>
|
||||
val df = spark.createDataFrame(sparkContext.parallelize(data), schema)
|
||||
df.write.json(file.getCanonicalPath)
|
||||
val df2 = spark.read.json(file.getCanonicalPath)
|
||||
checkAnswer(df2, df.select($"d".cast(LongType), $"i".cast(StringType)).collect().toSeq)
|
||||
val df3 = spark.read.schema(schema).json(file.getCanonicalPath)
|
||||
checkAnswer(df3, df.collect().toSeq)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class JsonV1Suite extends JsonSuite {
|
||||
|
|
Loading…
Reference in a new issue