[SPARK-36208][SQL] SparkScriptTransformation should support ANSI interval types

### What changes were proposed in this pull request?

This PR changes `BaseScriptTransformationExec` for `SparkScriptTransformationExec` to support ANSI interval types.

### Why are the changes needed?

`SparkScriptTransformationExec` support `CalendarIntervalType` so it's better to support ANSI interval types as well.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

Closes #33419 from sarutak/script-transformation-interval.

Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
Kousuke Saruta 2021-07-21 15:13:01 +03:00 committed by Max Gekk
parent 94aece4325
commit f56c7b71ff
2 changed files with 29 additions and 0 deletions

View file

@ -223,6 +223,14 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
case CalendarIntervalType => wrapperConvertException(
data => IntervalUtils.stringToInterval(UTF8String.fromString(data)),
converter)
case YearMonthIntervalType(start, end) => wrapperConvertException(
data => IntervalUtils.monthsToPeriod(
IntervalUtils.castStringToYMInterval(UTF8String.fromString(data), start, end)),
converter)
case DayTimeIntervalType(start, end) => wrapperConvertException(
data => IntervalUtils.microsToDuration(
IntervalUtils.castStringToDTInterval(UTF8String.fromString(data), start, end)),
converter)
case _: ArrayType | _: MapType | _: StructType =>
val complexTypeFactory = JsonToStructs(attr.dataType,
ioschema.outputSerdeProps.toMap, Literal(null), Some(conf.sessionLocalTimeZone))

View file

@ -633,6 +633,27 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
}
}
}
test("SPARK-36208: TRANSFORM should support ANSI interval (no serde)") {
assume(TestUtils.testCommandAvailable("python"))
withTempView("v") {
val df = Seq(
(Period.of(1, 2, 0), Duration.ofDays(1).plusHours(2).plusMinutes(3).plusSeconds(4))
).toDF("ym", "dt")
checkAnswer(
df,
(child: SparkPlan) => createScriptTransformationExec(
script = "cat",
output = Seq(
AttributeReference("ym", YearMonthIntervalType())(),
AttributeReference("dt", DayTimeIntervalType())()),
child = child,
ioschema = defaultIOSchema
),
df.select($"ym", $"dt").collect())
}
}
}
case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {