[SPARK-35730][SQL][TESTS] Check all day-time interval types in UDF

### What changes were proposed in this pull request?
Check all day-time interval types in UDF.

### Why are the changes needed?
New checks should improve test coverage.

### Does this PR introduce _any_ user-facing change?
Yes but `DayTimeIntervalType` has not been released yet.

### How was this patch tested?
Existed UT.

Closes #33047 from AngersZhuuuu/SPARK-35730.

Lead-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Co-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
AngersZhuuuu 2021-06-24 10:42:20 +03:00 committed by Max Gekk
parent 92ddef7cfb
commit 7d0786f535

View file

@ -42,6 +42,7 @@ import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.sql.test.SQLTestData._
import org.apache.spark.sql.types._
import org.apache.spark.sql.types.DayTimeIntervalType._
import org.apache.spark.sql.types.YearMonthIntervalType._
import org.apache.spark.sql.util.QueryExecutionListener
@ -874,27 +875,112 @@ class UDFSuite extends QueryTest with SharedSparkSession {
assert(e.isInstanceOf[java.lang.ArithmeticException])
}
test("SPARK-34663: using java.time.Duration in UDF") {
test("SPARK-34663, SPARK-35730: using java.time.Duration in UDF") {
// Regular case
val input = Seq(java.time.Duration.ofHours(23)).toDF("d")
.select($"d",
$"d".cast(DayTimeIntervalType(DAY)).as("d_dd"),
$"d".cast(DayTimeIntervalType(DAY, HOUR)).as("d_dh"),
$"d".cast(DayTimeIntervalType(DAY, MINUTE)).as("d_dm"),
$"d".cast(DayTimeIntervalType(HOUR)).as("d_hh"),
$"d".cast(DayTimeIntervalType(HOUR, MINUTE)).as("d_hm"),
$"d".cast(DayTimeIntervalType(HOUR, SECOND)).as("d_hs"),
$"d".cast(DayTimeIntervalType(MINUTE)).as("d_mm"),
$"d".cast(DayTimeIntervalType(MINUTE, SECOND)).as("d_ms"),
$"d".cast(DayTimeIntervalType(SECOND)).as("d_ss"))
val plusHour = udf((d: java.time.Duration) => d.plusHours(1))
val result = input.select(plusHour($"d").as("new_d"))
checkAnswer(result, Row(java.time.Duration.ofDays(1)) :: Nil)
// TODO(SPARK-35730): Check all day-time interval types in UDF
assert(result.schema === new StructType().add("new_d", DayTimeIntervalType()))
val result = input.select(
plusHour($"d").as("new_d"),
plusHour($"d_dd").as("new_d_dd"),
plusHour($"d_dh").as("new_d_dh"),
plusHour($"d_dm").as("new_d_dm"),
plusHour($"d_hh").as("new_d_hh"),
plusHour($"d_hm").as("new_d_hm"),
plusHour($"d_hs").as("new_d_hs"),
plusHour($"d_mm").as("new_d_mm"),
plusHour($"d_ms").as("new_d_ms"),
plusHour($"d_ss").as("new_d_ss"))
checkAnswer(result, Row(java.time.Duration.ofDays(1), java.time.Duration.ofHours(1),
java.time.Duration.ofDays(1), java.time.Duration.ofDays(1),
java.time.Duration.ofDays(1), java.time.Duration.ofDays(1),
java.time.Duration.ofDays(1), java.time.Duration.ofDays(1),
java.time.Duration.ofDays(1), java.time.Duration.ofDays(1)) :: Nil)
assert(result.schema === new StructType()
.add("new_d", DayTimeIntervalType())
.add("new_d_dd", DayTimeIntervalType())
.add("new_d_dh", DayTimeIntervalType())
.add("new_d_dm", DayTimeIntervalType())
.add("new_d_hh", DayTimeIntervalType())
.add("new_d_hm", DayTimeIntervalType())
.add("new_d_hs", DayTimeIntervalType())
.add("new_d_mm", DayTimeIntervalType())
.add("new_d_ms", DayTimeIntervalType())
.add("new_d_ss", DayTimeIntervalType()))
// UDF produces `null`
val nullFunc = udf((_: java.time.Duration) => null.asInstanceOf[java.time.Duration])
val nullResult = input.select(nullFunc($"d").as("null_d"))
checkAnswer(nullResult, Row(null) :: Nil)
// TODO(SPARK-35730): Check all day-time interval types in UDF
assert(nullResult.schema === new StructType().add("null_d", DayTimeIntervalType()))
val nullResult = input.select(nullFunc($"d").as("null_d"),
nullFunc($"d_dd").as("null_d_dd"),
nullFunc($"d_dh").as("null_d_dh"),
nullFunc($"d_dm").as("null_d_dm"),
nullFunc($"d_hh").as("null_d_hh"),
nullFunc($"d_hm").as("null_d_hm"),
nullFunc($"d_hs").as("null_d_hs"),
nullFunc($"d_mm").as("null_d_mm"),
nullFunc($"d_ms").as("null_d_ms"),
nullFunc($"d_ss").as("null_d_ss"))
checkAnswer(nullResult, Row(null, null, null, null, null, null, null, null, null, null) :: Nil)
assert(nullResult.schema === new StructType()
.add("null_d", DayTimeIntervalType())
.add("null_d_dd", DayTimeIntervalType())
.add("null_d_dh", DayTimeIntervalType())
.add("null_d_dm", DayTimeIntervalType())
.add("null_d_hh", DayTimeIntervalType())
.add("null_d_hm", DayTimeIntervalType())
.add("null_d_hs", DayTimeIntervalType())
.add("null_d_mm", DayTimeIntervalType())
.add("null_d_ms", DayTimeIntervalType())
.add("null_d_ss", DayTimeIntervalType()))
// Input parameter of UDF is null
val nullInput = Seq(null.asInstanceOf[java.time.Duration]).toDF("null_d")
.select($"null_d",
$"null_d".cast(DayTimeIntervalType(DAY)).as("null_d_dd"),
$"null_d".cast(DayTimeIntervalType(DAY, HOUR)).as("null_d_dh"),
$"null_d".cast(DayTimeIntervalType(DAY, MINUTE)).as("null_d_dm"),
$"null_d".cast(DayTimeIntervalType(HOUR)).as("null_d_hh"),
$"null_d".cast(DayTimeIntervalType(HOUR, MINUTE)).as("null_d_hm"),
$"null_d".cast(DayTimeIntervalType(HOUR, SECOND)).as("null_d_hs"),
$"null_d".cast(DayTimeIntervalType(MINUTE)).as("null_d_mm"),
$"null_d".cast(DayTimeIntervalType(MINUTE, SECOND)).as("null_d_ms"),
$"null_d".cast(DayTimeIntervalType(SECOND)).as("null_d_ss"))
val constDuration = udf((_: java.time.Duration) => java.time.Duration.ofMinutes(10))
val constResult = nullInput.select(constDuration($"null_d").as("10_min"))
checkAnswer(constResult, Row(java.time.Duration.ofMinutes(10)) :: Nil)
// TODO(SPARK-35730): Check all day-time interval types in UDF
assert(constResult.schema === new StructType().add("10_min", DayTimeIntervalType()))
val constResult = nullInput.select(
constDuration($"null_d").as("10_min"),
constDuration($"null_d_dd").as("10_min_dd"),
constDuration($"null_d_dh").as("10_min_dh"),
constDuration($"null_d_dm").as("10_min_dm"),
constDuration($"null_d_hh").as("10_min_hh"),
constDuration($"null_d_hm").as("10_min_hm"),
constDuration($"null_d_hs").as("10_min_hs"),
constDuration($"null_d_mm").as("10_min_mm"),
constDuration($"null_d_ms").as("10_min_ms"),
constDuration($"null_d_ss").as("10_min_ss"))
checkAnswer(constResult, Row(
java.time.Duration.ofMinutes(10), java.time.Duration.ofMinutes(10),
java.time.Duration.ofMinutes(10), java.time.Duration.ofMinutes(10),
java.time.Duration.ofMinutes(10), java.time.Duration.ofMinutes(10),
java.time.Duration.ofMinutes(10), java.time.Duration.ofMinutes(10),
java.time.Duration.ofMinutes(10), java.time.Duration.ofMinutes(10)) :: Nil)
assert(constResult.schema === new StructType()
.add("10_min", DayTimeIntervalType())
.add("10_min_dd", DayTimeIntervalType())
.add("10_min_dh", DayTimeIntervalType())
.add("10_min_dm", DayTimeIntervalType())
.add("10_min_hh", DayTimeIntervalType())
.add("10_min_hm", DayTimeIntervalType())
.add("10_min_hs", DayTimeIntervalType())
.add("10_min_mm", DayTimeIntervalType())
.add("10_min_ms", DayTimeIntervalType())
.add("10_min_ss", DayTimeIntervalType()))
// Error in the conversion of UDF result to the internal representation of day-time interval
val overflowFunc = udf((d: java.time.Duration) => d.plusDays(Long.MaxValue))
val e = intercept[SparkException] {