[SPARK-35983][SQL] Allow from_json/to_json for map types where value types are day-time intervals

### What changes were proposed in this pull request?

This PR fixes two issues. One is that `to_json` doesn't support `map` types where value types are `day-time` interval types like:
```
spark-sql> select to_json(map('a', interval '1 2:3:4' day to second));
21/07/06 14:53:58 ERROR SparkSQLDriver: Failed in [select to_json(map('a', interval '1 2:3:4' day to second))]
java.lang.RuntimeException: Failed to convert value 93784000000 (class of class java.lang.Long) with the type of DayTimeIntervalType(0,3) to JSON.
```
The other issue is that even if the issue of `to_json` is resolved, `from_json` doesn't support to convert `day-time` interval string to JSON. So the result of following query will be `null`.
```
spark-sql> select from_json(to_json(map('a', interval '1 2:3:4' day to second)), 'a interval day to second');
{"a":null}
```

### Why are the changes needed?

There should be no reason why day-time intervals cannot used as map value types.
`CalendarIntervalTypes` can do it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33225 from sarutak/json-dtinterval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit c8ff613c3c)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
Kousuke Saruta 2021-07-06 11:06:56 +03:00 committed by Max Gekk
parent c2ef235419
commit 634b2e265c
3 changed files with 59 additions and 1 deletions

View file

@ -156,6 +156,15 @@ private[sql] class JacksonGenerator(
end) end)
gen.writeString(ymString) gen.writeString(ymString)
case DayTimeIntervalType(start, end) =>
(row: SpecializedGetters, ordinal: Int) =>
val dtString = IntervalUtils.toDayTimeIntervalString(
row.getLong(ordinal),
IntervalStringStyles.ANSI_STYLE,
start,
end)
gen.writeString(dtString)
case BinaryType => case BinaryType =>
(row: SpecializedGetters, ordinal: Int) => (row: SpecializedGetters, ordinal: Int) =>
gen.writeBinary(row.getBinary(ordinal)) gen.writeBinary(row.getBinary(ordinal))

View file

@ -302,6 +302,13 @@ class JacksonParser(
Integer.valueOf(expr.eval(EmptyRow).asInstanceOf[Int]) Integer.valueOf(expr.eval(EmptyRow).asInstanceOf[Int])
} }
case dt: DayTimeIntervalType => (parser: JsonParser) =>
parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING =>
val expr = Cast(Literal(parser.getText), dt)
java.lang.Long.valueOf(expr.eval(EmptyRow).asInstanceOf[Long])
}
case st: StructType => case st: StructType =>
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
(parser: JsonParser) => parseJsonToken[InternalRow](parser, dataType) { (parser: JsonParser) => parseJsonToken[InternalRow](parser, dataType) {

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql package org.apache.spark.sql
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.time.Period import java.time.{Duration, Period}
import java.util.Locale import java.util.Locale
import collection.JavaConverters._ import collection.JavaConverters._
@ -28,6 +28,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._ import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType.{DAY, HOUR, MINUTE, SECOND}
import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR} import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR}
class JsonFunctionsSuite extends QueryTest with SharedSparkSession { class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
@ -847,4 +848,45 @@ class JsonFunctionsSuite extends QueryTest with SharedSparkSession {
} }
} }
} }
test("SPARK-35983: from_json/to_json for map types where value types are day-time intervals") {
val dtDF = Seq(Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)).toDF
Seq(
(DayTimeIntervalType(), """{"key":"INTERVAL '1 02:03:04' DAY TO SECOND"}""",
Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)),
(DayTimeIntervalType(DAY, MINUTE), """{"key":"INTERVAL '1 02:03' DAY TO MINUTE"}""",
Duration.ofDays(1).plusHours(2).plusMinutes(3)),
(DayTimeIntervalType(DAY, HOUR), """{"key":"INTERVAL '1 02' DAY TO HOUR"}""",
Duration.ofDays(1).plusHours(2)),
(DayTimeIntervalType(DAY), """{"key":"INTERVAL '1' DAY"}""",
Duration.ofDays(1)),
(DayTimeIntervalType(HOUR, SECOND), """{"key":"INTERVAL '26:03:04' HOUR TO SECOND"}""",
Duration.ofHours(26).plusMinutes(3).plusSeconds(4)),
(DayTimeIntervalType(HOUR, MINUTE), """{"key":"INTERVAL '26:03' HOUR TO MINUTE"}""",
Duration.ofHours(26).plusMinutes(3)),
(DayTimeIntervalType(HOUR), """{"key":"INTERVAL '26' HOUR"}""",
Duration.ofHours(26)),
(DayTimeIntervalType(MINUTE, SECOND), """{"key":"INTERVAL '1563:04' MINUTE TO SECOND"}""",
Duration.ofMinutes(1563).plusSeconds(4)),
(DayTimeIntervalType(MINUTE), """{"key":"INTERVAL '1563' MINUTE"}""",
Duration.ofMinutes(1563)),
(DayTimeIntervalType(SECOND), """{"key":"INTERVAL '93784' SECOND"}""",
Duration.ofSeconds(93784))
).foreach { case (toJsonDtype, toJsonExpected, fromJsonExpected) =>
val toJsonDF = dtDF.select(to_json(map(lit("key"), $"value" cast toJsonDtype)) as "json")
checkAnswer(toJsonDF, Row(toJsonExpected))
DataTypeTestUtils.dayTimeIntervalTypes.foreach { fromJsonDtype =>
val fromJsonDF = toJsonDF
.select(
from_json($"json", StructType(StructField("key", fromJsonDtype) :: Nil)) as "value")
.selectExpr("value['key']")
if (toJsonDtype == fromJsonDtype) {
checkAnswer(fromJsonDF, Row(fromJsonExpected))
} else {
checkAnswer(fromJsonDF, Row(null))
}
}
}
}
} }