From 0ab0cb108d64c95c0d46075c9c30d735d74a3b0d Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 6 Sep 2021 20:58:07 +0200 Subject: [PATCH] [SPARK-36675][SQL] Support ScriptTransformation for timestamp_ntz ### What changes were proposed in this pull request? This PR aims to support `ScriptTransformation` for `timestamp_ntz`. In the current master, it doesn't work. ``` spark.sql("SELECT transform(col1) USING 'cat' AS (col1 timestamp_ntz) FROM VALUES timestamp_ntz'2021-09-06 20:19:13' t").show(false) 21/09/06 22:03:55 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0) org.apache.spark.SparkException: SparkScriptTransformation without serde does not support TimestampNTZType$ as output data type at org.apache.spark.sql.errors.QueryExecutionErrors$.outputDataTypeUnsupportedByNodeWithoutSerdeError(QueryExecutionErrors.scala:1740) at org.apache.spark.sql.execution.BaseScriptTransformationExec.$anonfun$outputFieldWriters$1(BaseScriptTransformationExec.scala:245) at scala.collection.immutable.List.map(List.scala:293) at org.apache.spark.sql.execution.BaseScriptTransformationExec.org$apache$spark$sql$execution$BaseScriptTransformationExec$$outputFieldWriters(BaseScriptTransformationExec.scala:194) at org.apache.spark.sql.execution.BaseScriptTransformationExec.org$apache$spark$sql$execution$BaseScriptTransformationExec$$outputFieldWriters$(BaseScriptTransformationExec.scala:194) at org.apache.spark.sql.execution.SparkScriptTransformationExec.org$apache$spark$sql$execution$BaseScriptTransformationExec$$outputFieldWriters$lzycompute(SparkScriptTransformationExec.scala:38) at org.apache.spark.sql.execution.SparkScriptTransformationExec.org$apache$spark$sql$execution$BaseScriptTransformationExec$$outputFieldWriters(SparkScriptTransformationExec.scala:38) at org.apache.spark.sql.execution.BaseScriptTransformationExec$$anon$1.$anonfun$processRowWithoutSerde$1(BaseScriptTransformationExec.scala:121) at org.apache.spark.sql.execution.BaseScriptTransformationExec$$anon$1.next(BaseScriptTransformationExec.scala:162) at org.apache.spark.sql.execution.BaseScriptTransformationExec$$anon$1.next(BaseScriptTransformationExec.scala:113) ``` ### Why are the changes needed? For better usability. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New test. Closes #33920 from sarutak/script-transformation-timestamp-ntz. Authored-by: Kousuke Saruta Signed-off-by: Max Gekk --- .../execution/BaseScriptTransformationExec.scala | 3 +++ .../execution/BaseScriptTransformationSuite.scala | 14 ++++++++++++++ 2 files changed, 17 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index fc3a124e7f..60400506f9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -220,6 +220,9 @@ trait BaseScriptTransformationExec extends UnaryExecNode { UTF8String.fromString(data), DateTimeUtils.getZoneId(conf.sessionLocalTimeZone)) .map(DateTimeUtils.toJavaTimestamp).orNull, converter) + case TimestampNTZType => + wrapperConvertException(data => DateTimeUtils.stringToTimestampWithoutTimeZone( + UTF8String.fromString(data)).map(DateTimeUtils.microsToLocalDateTime).orNull, converter) case CalendarIntervalType => wrapperConvertException( data => IntervalUtils.stringToInterval(UTF8String.fromString(data)), converter) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index 9d8fcdac3f..488a0fdce2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -654,6 +654,20 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU df.select($"ym", $"dt").collect()) } } + + test("SPARK-36675: TRANSFORM should support timestamp_ntz (no serde)") { + val df = spark.sql("SELECT timestamp_ntz'2021-09-06 20:19:13' col") + checkAnswer( + df, + (child: SparkPlan) => createScriptTransformationExec( + script = "cat", + output = Seq( + AttributeReference("col", TimestampNTZType)()), + child = child, + ioschema = defaultIOSchema + ), + df.select($"col").collect()) + } } case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {