[SPARK-35999][SQL] Make from_csv/to_csv to handle day-time intervals properly

### What changes were proposed in this pull request?

This PR fixes an issue that `from_csv/to_csv` doesn't handle day-time intervals properly.
`from_csv` throws exception if day-time interval types are given.
```
spark-sql> select from_csv("interval '1 2:3:4' day to second", "a interval day to second");
21/07/03 04:39:13 ERROR SparkSQLDriver: Failed in [select from_csv("interval '1 2:3:4' day to second", "a interval day to second")]
java.lang.Exception: Unsupported type: interval day to second
 at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:775)
 at org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:224)
 at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134)
```

Also, `to_csv` doesn't handle day-time interval types properly though any exception is thrown.
The result of `to_csv` for day-time interval types is not ANSI interval compliant form.

```
spark-sql> select to_csv(named_struct("a", interval '1 2:3:4' day to second));
93784000000
```
The result above should be `INTERVAL '1 02:03:04' DAY TO SECOND`.

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33226 from sarutak/csv-dtinterval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Kousuke Saruta 2021-07-06 17:37:38 +08:00 committed by Wenchen Fan
parent c8ff613c3c
commit def8bc5c96
3 changed files with 56 additions and 1 deletions

View file

@ -66,6 +66,11 @@ class UnivocityGenerator(
IntervalUtils.toYearMonthIntervalString(
row.getInt(ordinal), IntervalStringStyles.ANSI_STYLE, start, end)
case DayTimeIntervalType(start, end) =>
(row: InternalRow, ordinal: Int) =>
IntervalUtils.toDayTimeIntervalString(
row.getLong(ordinal), IntervalStringStyles.ANSI_STYLE, start, end)
case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
case dt: DataType =>

View file

@ -222,6 +222,11 @@ class UnivocityParser(
Cast(Literal(datum), ym).eval(EmptyRow)
}
case dt: DayTimeIntervalType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
Cast(Literal(datum), dt).eval(EmptyRow)
}
case udt: UserDefinedType[_] =>
makeConverter(name, udt.sqlType, nullable)

View file

@ -18,7 +18,7 @@
package org.apache.spark.sql
import java.text.SimpleDateFormat
import java.time.Period
import java.time.{Duration, Period}
import java.util.Locale
import scala.collection.JavaConverters._
@ -28,6 +28,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType.{DAY, HOUR, MINUTE, SECOND}
import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR}
class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
@ -308,4 +309,48 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
}
}
}
test("SPARK-35999: Make from_csv/to_csv to handle day-time intervals properly") {
val dtDF = Seq(Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)).toDF
Seq(
(DayTimeIntervalType(), "INTERVAL '1 02:03:04' DAY TO SECOND",
Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4)),
(DayTimeIntervalType(DAY, MINUTE), "INTERVAL '1 02:03' DAY TO MINUTE",
Duration.ofDays(1).plusHours(2).plusMinutes(3)),
(DayTimeIntervalType(DAY, HOUR), "INTERVAL '1 02' DAY TO HOUR",
Duration.ofDays(1).plusHours(2)),
(DayTimeIntervalType(DAY), "INTERVAL '1' DAY",
Duration.ofDays(1)),
(DayTimeIntervalType(HOUR, SECOND), "INTERVAL '26:03:04' HOUR TO SECOND",
Duration.ofHours(26).plusMinutes(3).plusSeconds(4)),
(DayTimeIntervalType(HOUR, MINUTE), "INTERVAL '26:03' HOUR TO MINUTE",
Duration.ofHours(26).plusMinutes(3)),
(DayTimeIntervalType(HOUR), "INTERVAL '26' HOUR",
Duration.ofHours(26)),
(DayTimeIntervalType(MINUTE, SECOND), "INTERVAL '1563:04' MINUTE TO SECOND",
Duration.ofMinutes(1563).plusSeconds(4)),
(DayTimeIntervalType(MINUTE), "INTERVAL '1563' MINUTE",
Duration.ofMinutes(1563)),
(DayTimeIntervalType(SECOND), "INTERVAL '93784' SECOND",
Duration.ofSeconds(93784))
).foreach { case (toCsvDtype, toCsvExpected, fromCsvExpected) =>
val toCsvDF = dtDF.select(to_csv(struct($"value" cast toCsvDtype)) as "csv")
checkAnswer(toCsvDF, Row(toCsvExpected))
DataTypeTestUtils.dayTimeIntervalTypes.foreach { fromCsvDtype =>
val fromCsvDF = toCsvDF
.select(
from_csv(
$"csv",
StructType(StructField("a", fromCsvDtype) :: Nil),
Map.empty[String, String]) as "value")
.selectExpr("value.a")
if (toCsvDtype == fromCsvDtype) {
checkAnswer(fromCsvDF, Row(fromCsvExpected))
} else {
checkAnswer(fromCsvDF, Row(null))
}
}
}
}
}