From c75a82794fc6a0f35697f8e1258562d43e860f68 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 21 Aug 2020 07:37:11 +0000 Subject: [PATCH] [SPARK-32667][SQL] Script transform 'default-serde' mode should pad null value to filling column ### What changes were proposed in this pull request? Hive no serde mode when column less then output specified column, it will pad null value to it, spark should do this also. ``` hive> SELECT TRANSFORM(a, b) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY '|' > LINES TERMINATED BY '\n' > NULL DEFINED AS 'NULL' > USING 'cat' as (a string, b string, c string, d string) > ROW FORMAT DELIMITED > FIELDS TERMINATED BY '|' > LINES TERMINATED BY '\n' > NULL DEFINED AS 'NULL' > FROM ( > select 1 as a, 2 as b > ) tmp ; OK 1 2 NULL NULL Time taken: 24.626 seconds, Fetched: 1 row(s) ``` ### Why are the changes needed? Keep save behavior with hive data. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added UT Closes #29500 from AngersZhuuuu/SPARK-32667. Authored-by: angerszhu Signed-off-by: Wenchen Fan --- .../BaseScriptTransformationExec.scala | 2 +- .../BaseScriptTransformationSuite.scala | 29 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 9fb12c614e..c5107645f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -107,7 +107,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode { val processRowWithoutSerde = if (!ioschema.schemaLess) { prevLine: String => new GenericInternalRow( - prevLine.split(outputRowFormat) + prevLine.split(outputRowFormat).padTo(outputFieldWriters.size, null) .zip(outputFieldWriters) .map { case (data, writer) => writer(data) }) } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala index 3a190840c0..a82d87c682 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/BaseScriptTransformationSuite.scala @@ -372,6 +372,35 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU 'e.cast("string"))).collect()) } } + + test("SPARK-32667: SCRIPT TRANSFORM pad null value to fill column" + + " when without schema less (no-serde)") { + val df = Seq( + (1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)), + (2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)), + (3, "3", 3.0, BigDecimal(3.0), new Timestamp(3)) + ).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18) + + checkAnswer( + df, + (child: SparkPlan) => createScriptTransformationExec( + input = Seq( + df.col("a").expr, + df.col("b").expr), + script = "cat", + output = Seq( + AttributeReference("a", StringType)(), + AttributeReference("b", StringType)(), + AttributeReference("c", StringType)(), + AttributeReference("d", StringType)()), + child = child, + ioschema = defaultIOSchema + ), + df.select( + 'a.cast("string").as("a"), + 'b.cast("string").as("b"), + lit(null), lit(null)).collect()) + } } case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {