[SPARK-36675][SQL] Support ScriptTransformation for timestamp_ntz
### What changes were proposed in this pull request? This PR aims to support `ScriptTransformation` for `timestamp_ntz`. In the current master, it doesn't work. ``` spark.sql("SELECT transform(col1) USING 'cat' AS (col1 timestamp_ntz) FROM VALUES timestamp_ntz'2021-09-06 20:19:13' t").show(false) 21/09/06 22:03:55 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: SparkScriptTransformation without serde does not support TimestampNTZType$ as output data type at org.apache.spark.sql.errors.QueryExecutionErrors$.outputDataTypeUnsupportedByNodeWithoutSerdeError(QueryExecutionErrors.scala:1740) at org.apache.spark.sql.execution.BaseScriptTransformationExec.$anonfun$outputFieldWriters$1(BaseScriptTransformationExec.scala:245) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.execution.BaseScriptTransformationExec.org$apache$spark$sql$execution$BaseScriptTransformationExec$$outputFieldWriters(BaseScriptTransformationExec.scala:194) at org.apache.spark.sql.execution.BaseScriptTransformationExec.org$apache$spark$sql$execution$BaseScriptTransformationExec$$outputFieldWriters$(BaseScriptTransformationExec.scala:194) at org.apache.spark.sql.execution.SparkScriptTransformationExec.org$apache$spark$sql$execution$BaseScriptTransformationExec$$outputFieldWriters$lzycompute(SparkScriptTransformationExec.scala:38) at org.apache.spark.sql.execution.SparkScriptTransformationExec.org$apache$spark$sql$execution$BaseScriptTransformationExec$$outputFieldWriters(SparkScriptTransformationExec.scala:38) at org.apache.spark.sql.execution.BaseScriptTransformationExec$$anon$1.$anonfun$processRowWithoutSerde$1(BaseScriptTransformationExec.scala:121) at org.apache.spark.sql.execution.BaseScriptTransformationExec$$anon$1.next(BaseScriptTransformationExec.scala:162) at org.apache.spark.sql.execution.BaseScriptTransformationExec$$anon$1.next(BaseScriptTransformationExec.scala:113) ``` ### Why are the changes needed? For better usability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. Closes #33920 from sarutak/script-transformation-timestamp-ntz. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Max Gekk <max.gekk@gmail.com>
This commit is contained in:
parent
c6f3a13087
commit
0ab0cb108d
|
@ -220,6 +220,9 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
|
||||||
UTF8String.fromString(data),
|
UTF8String.fromString(data),
|
||||||
DateTimeUtils.getZoneId(conf.sessionLocalTimeZone))
|
DateTimeUtils.getZoneId(conf.sessionLocalTimeZone))
|
||||||
.map(DateTimeUtils.toJavaTimestamp).orNull, converter)
|
.map(DateTimeUtils.toJavaTimestamp).orNull, converter)
|
||||||
|
case TimestampNTZType =>
|
||||||
|
wrapperConvertException(data => DateTimeUtils.stringToTimestampWithoutTimeZone(
|
||||||
|
UTF8String.fromString(data)).map(DateTimeUtils.microsToLocalDateTime).orNull, converter)
|
||||||
case CalendarIntervalType => wrapperConvertException(
|
case CalendarIntervalType => wrapperConvertException(
|
||||||
data => IntervalUtils.stringToInterval(UTF8String.fromString(data)),
|
data => IntervalUtils.stringToInterval(UTF8String.fromString(data)),
|
||||||
converter)
|
converter)
|
||||||
|
|
|
@ -654,6 +654,20 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
|
||||||
df.select($"ym", $"dt").collect())
|
df.select($"ym", $"dt").collect())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("SPARK-36675: TRANSFORM should support timestamp_ntz (no serde)") {
|
||||||
|
val df = spark.sql("SELECT timestamp_ntz'2021-09-06 20:19:13' col")
|
||||||
|
checkAnswer(
|
||||||
|
df,
|
||||||
|
(child: SparkPlan) => createScriptTransformationExec(
|
||||||
|
script = "cat",
|
||||||
|
output = Seq(
|
||||||
|
AttributeReference("col", TimestampNTZType)()),
|
||||||
|
child = child,
|
||||||
|
ioschema = defaultIOSchema
|
||||||
|
),
|
||||||
|
df.select($"col").collect())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {
|
case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {
|
||||||
|
|
Loading…
Reference in a new issue