[SPARK-35998][SQL] Make from_csv/to_csv to handle year-month intervals properly

### What changes were proposed in this pull request?

This PR fixes an issue that `from_csv/to_csv` doesn't handle year-month intervals properly.
`from_csv` throws exception if year-month interval types are given.
```
spark-sql> select from_csv("interval '1-2' year to month", "a interval year to month");
21/07/03 04:32:24 ERROR SparkSQLDriver: Failed in [select from_csv("interval '1-2' year to month", "a interval year to month")]
java.lang.Exception: Unsupported type: interval year to month
	at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedTypeError(QueryExecutionErrors.scala:775)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.makeConverter(UnivocityParser.scala:224)
	at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$valueConverters$1(UnivocityParser.scala:134)
```

Also, `to_csv` doesn't handle year-month interval types properly though any exception is thrown.
The result of `to_csv` for year-month interval types is not ANSI interval compliant form.

```
spark-sql> select to_csv(named_struct("a", interval '1-2' year to month));
14
```
The result above should be `INTERVAL '1-2' YEAR TO MONTH`.

### Why are the changes needed?

Bug fix.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New tests.

Closes #33210 from sarutak/csv-yminterval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit f4237aff7e)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
Kousuke Saruta 2021-07-05 13:10:50 +03:00 committed by Max Gekk
parent 52616009da
commit 544b7e16ac
3 changed files with 41 additions and 2 deletions

View file

@ -22,7 +22,7 @@ import java.io.Writer
import com.univocity.parsers.csv.CsvWriter
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{DateFormatter, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.{DateFormatter, IntervalStringStyles, IntervalUtils, TimestampFormatter}
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.types._
@ -61,6 +61,11 @@ class UnivocityGenerator(
case TimestampType =>
(row: InternalRow, ordinal: Int) => timestampFormatter.format(row.getLong(ordinal))
case YearMonthIntervalType(start, end) =>
(row: InternalRow, ordinal: Int) =>
IntervalUtils.toYearMonthIntervalString(
row.getInt(ordinal), IntervalStringStyles.ANSI_STYLE, start, end)
case udt: UserDefinedType[_] => makeConverter(udt.sqlType)
case dt: DataType =>

View file

@ -26,7 +26,7 @@ import com.univocity.parsers.csv.CsvParser
import org.apache.spark.SparkUpgradeException
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.{InternalRow, NoopFilters, OrderedFilters}
import org.apache.spark.sql.catalyst.expressions.{ExprUtils, GenericInternalRow}
import org.apache.spark.sql.catalyst.expressions.{Cast, EmptyRow, ExprUtils, GenericInternalRow, Literal}
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.catalyst.util.LegacyDateFormats.FAST_DATE_FORMAT
import org.apache.spark.sql.errors.QueryExecutionErrors
@ -217,6 +217,11 @@ class UnivocityParser(
IntervalUtils.safeStringToInterval(UTF8String.fromString(datum))
}
case ym: YearMonthIntervalType => (d: String) =>
nullSafeDatum(d, name, nullable, options) { datum =>
Cast(Literal(datum), ym).eval(EmptyRow)
}
case udt: UserDefinedType[_] =>
makeConverter(name, udt.sqlType, nullable)

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql
import java.text.SimpleDateFormat
import java.time.Period
import java.util.Locale
import scala.collection.JavaConverters._
@ -27,6 +28,7 @@ import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.YearMonthIntervalType.{MONTH, YEAR}
class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
import testImplicits._
@ -279,4 +281,31 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession {
}
}
}
test("SPARK-35998: Make from_csv/to_csv to handle year-month intervals properly") {
val ymDF = Seq(Period.of(1, 2, 0)).toDF
Seq(
(YearMonthIntervalType(), "INTERVAL '1-2' YEAR TO MONTH", Period.of(1, 2, 0)),
(YearMonthIntervalType(YEAR), "INTERVAL '1' YEAR", Period.of(1, 0, 0)),
(YearMonthIntervalType(MONTH), "INTERVAL '14' MONTH", Period.of(1, 2, 0))
).foreach { case (toCsvDtype, toCsvExpected, fromCsvExpected) =>
val toCsvDF = ymDF.select(to_csv(struct($"value" cast toCsvDtype)) as "csv")
checkAnswer(toCsvDF, Row(toCsvExpected))
DataTypeTestUtils.yearMonthIntervalTypes.foreach { fromCsvDtype =>
val fromCsvDF = toCsvDF
.select(
from_csv(
$"csv",
StructType(StructField("a", fromCsvDtype) :: Nil),
Map.empty[String, String]) as "value")
.selectExpr("value.a")
if (toCsvDtype == fromCsvDtype) {
checkAnswer(fromCsvDF, Row(fromCsvExpected))
} else {
checkAnswer(fromCsvDF, Row(null))
}
}
}
}
}