[SPARK-36156][SQL] SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect NULL DEFINED AS and default value should be \N

### What changes were proposed in this pull request?
SCRIPT TRANSFORM ROW FORMAT DELIMITED should respect `NULL DEFINED AS` and default value should be `\N`
![image](https://user-images.githubusercontent.com/46485123/125775377-611d4f06-f9e5-453a-990d-5a0018774f43.png)
![image](https://user-images.githubusercontent.com/46485123/125775387-6618bd0c-78d8-4457-bcc2-12dd70522946.png)

### Why are the changes needed?
Keep consistence with Hive

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT

Closes #33363 from AngersZhuuuu/SPARK-36156.

Authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Angerszhuuuu 2021-07-22 17:28:37 +08:00 committed by Wenchen Fan
parent 2c35604044
commit bb09bd2e2d
3 changed files with 147 additions and 14 deletions

View file

@ -118,7 +118,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
val processRowWithoutSerde = if (!ioschema.schemaLess) {
prevLine: String =>
new GenericInternalRow(
prevLine.split(outputRowFormat).padTo(outputFieldWriters.size, null)
prevLine.split(outputRowFormat, -1).padTo(outputFieldWriters.size, null)
.zip(outputFieldWriters)
.map { case (data, writer) => writer(data) })
} else {
@ -129,7 +129,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
val kvWriter = CatalystTypeConverters.createToCatalystConverter(StringType)
prevLine: String =>
new GenericInternalRow(
prevLine.split(outputRowFormat).slice(0, 2).padTo(2, null)
prevLine.split(outputRowFormat, -1).slice(0, 2).padTo(2, null)
.map(kvWriter))
}
@ -247,10 +247,14 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
private val wrapperConvertException: (String => Any, Any => Any) => String => Any =
(f: String => Any, converter: Any => Any) =>
(data: String) => converter {
try {
f(data)
} catch {
case NonFatal(_) => null
if (data == ioschema.outputRowFormatMap("TOK_TABLEROWFORMATNULL")) {
null
} else {
try {
f(data)
} catch {
case NonFatal(_) => null
}
}
}
}
@ -282,11 +286,18 @@ abstract class BaseScriptTransformationWriterThread extends Thread with Logging
ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES")
} else {
val sb = new StringBuilder
sb.append(row.get(0, inputSchema(0)))
def appendToBuffer(s: AnyRef): Unit = {
if (s == null) {
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATNULL"))
} else {
sb.append(s)
}
}
appendToBuffer(row.get(0, inputSchema(0)))
var i = 1
while (i < len) {
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATFIELD"))
sb.append(row.get(i, inputSchema(i)))
appendToBuffer(row.get(i, inputSchema(i)))
i += 1
}
sb.append(ioSchema.inputRowFormatMap("TOK_TABLEROWFORMATLINES"))
@ -355,7 +366,8 @@ case class ScriptTransformationIOSchema(
object ScriptTransformationIOSchema {
val defaultFormat = Map(
("TOK_TABLEROWFORMATFIELD", "\u0001"),
("TOK_TABLEROWFORMATLINES", "\n")
("TOK_TABLEROWFORMATLINES", "\n"),
("TOK_TABLEROWFORMATNULL" -> "\\N")
)
val defaultIOSchema = ScriptTransformationIOSchema(

View file

@ -121,6 +121,49 @@ USING 'cat' AS (d)
NULL DEFINED AS 'NULL'
FROM t;
SELECT TRANSFORM(a, b, c, null)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
USING 'cat' AS (a, b, c, d)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
NULL DEFINED AS 'NULL'
FROM t;
SELECT TRANSFORM(a, b, c, null)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
USING 'cat' AS (a, b, c, d)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
FROM t;
SELECT TRANSFORM(a, b, c, null)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
NULL DEFINED AS 'XXXX'
USING 'cat' AS (a, b, c, d)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
FROM t;
SELECT TRANSFORM(a, b, c, null)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
NULL DEFINED AS '\n'
USING 'cat' AS (a, b, c, d)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
FROM t;
-- transform with defined row format delimit handle schema with correct type
SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)

View file

@ -1,5 +1,5 @@
-- Automatically generated by SQLQueryTestSuite
-- Number of queries: 47
-- Number of queries: 51
-- !query
@ -202,9 +202,9 @@ FROM t
-- !query schema
struct<a:string,b:string,c:string,d:string>
-- !query output
1 true Spark SQL null
2 false Spark SQL null
3 true Spark SQL null
1 true Spark SQL NULL
2 false Spark SQL NULL
3 true Spark SQL NULL
-- !query
@ -227,6 +227,84 @@ struct<d:string>
3
-- !query
SELECT TRANSFORM(a, b, c, null)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
USING 'cat' AS (a, b, c, d)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
NULL DEFINED AS 'NULL'
FROM t
-- !query schema
struct<a:string,b:string,c:string,d:string>
-- !query output
1 true Spark SQL \N
2 false Spark SQL \N
3 true Spark SQL \N
-- !query
SELECT TRANSFORM(a, b, c, null)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
USING 'cat' AS (a, b, c, d)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
FROM t
-- !query schema
struct<a:string,b:string,c:string,d:string>
-- !query output
1 true Spark SQL NULL
2 false Spark SQL NULL
3 true Spark SQL NULL
-- !query
SELECT TRANSFORM(a, b, c, null)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
NULL DEFINED AS 'XXXX'
USING 'cat' AS (a, b, c, d)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
FROM t
-- !query schema
struct<a:string,b:string,c:string,d:string>
-- !query output
1 true Spark SQL XXXX
2 false Spark SQL XXXX
3 true Spark SQL XXXX
-- !query
SELECT TRANSFORM(a, b, c, null)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
NULL DEFINED AS '\n'
USING 'cat' AS (a, b, c, d)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY '@'
LINES TERMINATED BY '\n'
FROM t
-- !query schema
struct<a:string,b:string,c:string,d:string>
-- !query output
NULL NULL NULL
NULL NULL NULL
NULL NULL NULL
1 true Spark SQL
2 false Spark SQL
3 true Spark SQL
-- !query
SELECT a, b, decode(c, 'UTF-8'), d, e, f, g, h, i, j, k, l FROM (
SELECT TRANSFORM(a, b, c, d, e, f, g, h, i, j, k, l)
@ -455,7 +533,7 @@ GROUP BY b
-- !query schema
struct<a:string,b:string,c:string>
-- !query output
2 null 3
2 NULL 3
5 4 6