[SPARK-33248][SQL] Add a configuration to control the legacy behavior of whether need to pad null value when value size less then schema size

### What changes were proposed in this pull request?
Add a configuration to control the legacy behavior of whether need to pad null value when value size less then schema size.
Since we can't decide whether it's a but and some use need it behavior same as Hive.

### Why are the changes needed?
Provides a compatible choice between historical behavior and Hive

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Existed UT

Closes #30156 from AngersZhuuuu/SPARK-33284.

Lead-authored-by: angerszhu <angers.zhu@gmail.com>
Co-authored-by: AngersZhuuuu <angers.zhu@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
angerszhu 2020-10-30 14:11:25 +09:00 committed by HyukjinKwon
parent 343e0bb3ad
commit 0c943cd2fb
3 changed files with 25 additions and 2 deletions

View file

@ -51,6 +51,8 @@ license: |
- In Spark 3.1, loading and saving of timestamps from/to parquet files fails if the timestamps are before 1900-01-01 00:00:00Z, and loaded (saved) as the INT96 type. In Spark 3.0, the actions don't fail but might lead to shifting of the input timestamps due to rebasing from/to Julian to/from Proleptic Gregorian calendar. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.parquet.int96RebaseModeInRead` or/and `spark.sql.legacy.parquet.int96RebaseModeInWrite` to `LEGACY`. - In Spark 3.1, loading and saving of timestamps from/to parquet files fails if the timestamps are before 1900-01-01 00:00:00Z, and loaded (saved) as the INT96 type. In Spark 3.0, the actions don't fail but might lead to shifting of the input timestamps due to rebasing from/to Julian to/from Proleptic Gregorian calendar. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.parquet.int96RebaseModeInRead` or/and `spark.sql.legacy.parquet.int96RebaseModeInWrite` to `LEGACY`.
- In Spark 3.1, the `schema_of_json` and `schema_of_csv` functions return the schema in the SQL format in which field names are quoted. In Spark 3.0, the function returns a catalog string without field quoting and in lower case. - In Spark 3.1, the `schema_of_json` and `schema_of_csv` functions return the schema in the SQL format in which field names are quoted. In Spark 3.0, the function returns a catalog string without field quoting and in lower case.
- In Spark 3.1, when `spark.sql.legacy.transformationPadNullWhenValueLessThenSchema` is true, Spark will pad NULL value when script transformation's output value size less then schema size in default-serde mode(script transformation with row format of `ROW FORMAT DELIMITED`). If false, Spark will keep original behavior to throw `ArrayIndexOutOfBoundsException`.
## Upgrading from Spark SQL 3.0 to 3.0.1 ## Upgrading from Spark SQL 3.0 to 3.0.1

View file

@ -2765,6 +2765,18 @@ object SQLConf {
.checkValue(_ > 0, "The timeout value must be positive") .checkValue(_ > 0, "The timeout value must be positive")
.createWithDefault(10L) .createWithDefault(10L)
val LEGACY_SCRIPT_TRANSFORM_PAD_NULL =
buildConf("spark.sql.legacy.transformationPadNullWhenValueLessThenSchema")
.internal()
.doc("Whether pad null value when transformation output's value size less then " +
"schema size in default-serde mode(script transformation with row format of " +
"`ROW FORMAT DELIMITED`)." +
"When true, Spark will pad NULL value to keep same behavior with hive." +
"When false, Spark keep original behavior to throw `ArrayIndexOutOfBoundsException`")
.version("3.1.0")
.booleanConf
.createWithDefault(true)
val LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP = val LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP =
buildConf("spark.sql.legacy.allowCastNumericToTimestamp") buildConf("spark.sql.legacy.allowCastNumericToTimestamp")
.internal() .internal()
@ -3493,6 +3505,9 @@ class SQLConf extends Serializable with Logging {
def legacyAllowModifyActiveSession: Boolean = def legacyAllowModifyActiveSession: Boolean =
getConf(StaticSQLConf.LEGACY_ALLOW_MODIFY_ACTIVE_SESSION) getConf(StaticSQLConf.LEGACY_ALLOW_MODIFY_ACTIVE_SESSION)
def legacyPadNullWhenValueLessThenSchema: Boolean =
getConf(SQLConf.LEGACY_SCRIPT_TRANSFORM_PAD_NULL)
def legacyAllowCastNumericToTimestamp: Boolean = def legacyAllowCastNumericToTimestamp: Boolean =
getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP) getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP)

View file

@ -104,10 +104,16 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8))
val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD") val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD")
val padNull = if (conf.legacyPadNullWhenValueLessThenSchema) {
(arr: Array[String], size: Int) => arr.padTo(size, null)
} else {
(arr: Array[String], size: Int) => arr
}
val processRowWithoutSerde = if (!ioschema.schemaLess) { val processRowWithoutSerde = if (!ioschema.schemaLess) {
prevLine: String => prevLine: String =>
new GenericInternalRow( new GenericInternalRow(
prevLine.split(outputRowFormat).padTo(outputFieldWriters.size, null) padNull(prevLine.split(outputRowFormat), outputFieldWriters.size)
.zip(outputFieldWriters) .zip(outputFieldWriters)
.map { case (data, writer) => writer(data) }) .map { case (data, writer) => writer(data) })
} else { } else {
@ -118,7 +124,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode {
val kvWriter = CatalystTypeConverters.createToCatalystConverter(StringType) val kvWriter = CatalystTypeConverters.createToCatalystConverter(StringType)
prevLine: String => prevLine: String =>
new GenericInternalRow( new GenericInternalRow(
prevLine.split(outputRowFormat).slice(0, 2).padTo(2, null) padNull(prevLine.split(outputRowFormat).slice(0, 2), 2)
.map(kvWriter)) .map(kvWriter))
} }