[SPARK-32667][SQL] Script transform 'default-serde' mode should pad null value to filling column

### What changes were proposed in this pull request?
Hive no serde mode when  column less then output specified column, it will pad null value to it, spark should do this also.
```
hive> SELECT TRANSFORM(a, b)
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY '|'
    >   LINES TERMINATED BY '\n'
    >   NULL DEFINED AS 'NULL'
    > USING 'cat' as (a string, b string, c string, d string)
    >   ROW FORMAT DELIMITED
    >   FIELDS TERMINATED BY '|'
    >   LINES TERMINATED BY '\n'
    >   NULL DEFINED AS 'NULL'
    > FROM (
    > select 1 as a, 2 as b
    > ) tmp ;
OK
1	2	NULL	NULL
Time taken: 24.626 seconds, Fetched: 1 row(s)
```

### Why are the changes needed?
Keep save behavior with hive data.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #29500 from AngersZhuuuu/SPARK-32667.

Authored-by: angerszhu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
angerszhu 2020-08-21 07:37:11 +00:00 committed by Wenchen Fan
parent 79b4dea1b0
commit c75a82794f
2 changed files with 30 additions and 1 deletions

View file

@ -107,7 +107,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
val processRowWithoutSerde = if (!ioschema.schemaLess) { val processRowWithoutSerde = if (!ioschema.schemaLess) {
prevLine: String => prevLine: String =>
new GenericInternalRow( new GenericInternalRow(
prevLine.split(outputRowFormat) prevLine.split(outputRowFormat).padTo(outputFieldWriters.size, null)
.zip(outputFieldWriters) .zip(outputFieldWriters)
.map { case (data, writer) => writer(data) }) .map { case (data, writer) => writer(data) })
} else { } else {

View file

@ -372,6 +372,35 @@ abstract class BaseScriptTransformationSuite extends SparkPlanTest with SQLTestU
'e.cast("string"))).collect()) 'e.cast("string"))).collect())
} }
} }
test("SPARK-32667: SCRIPT TRANSFORM pad null value to fill column" +
" when without schema less (no-serde)") {
val df = Seq(
(1, "1", 1.0, BigDecimal(1.0), new Timestamp(1)),
(2, "2", 2.0, BigDecimal(2.0), new Timestamp(2)),
(3, "3", 3.0, BigDecimal(3.0), new Timestamp(3))
).toDF("a", "b", "c", "d", "e") // Note column d's data type is Decimal(38, 18)
checkAnswer(
df,
(child: SparkPlan) => createScriptTransformationExec(
input = Seq(
df.col("a").expr,
df.col("b").expr),
script = "cat",
output = Seq(
AttributeReference("a", StringType)(),
AttributeReference("b", StringType)(),
AttributeReference("c", StringType)(),
AttributeReference("d", StringType)()),
child = child,
ioschema = defaultIOSchema
),
df.select(
'a.cast("string").as("a"),
'b.cast("string").as("b"),
lit(null), lit(null)).collect())
}
} }
case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode { case class ExceptionInjectingOperator(child: SparkPlan) extends UnaryExecNode {