From 0c943cd2fbc6f2d25588991613abf469ace0153e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 30 Oct 2020 14:11:25 +0900 Subject: [PATCH] [SPARK-33248][SQL] Add a configuration to control the legacy behavior of whether need to pad null value when value size less then schema size ### What changes were proposed in this pull request? Add a configuration to control the legacy behavior of whether need to pad null value when value size less then schema size. Since we can't decide whether it's a but and some use need it behavior same as Hive. ### Why are the changes needed? Provides a compatible choice between historical behavior and Hive ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existed UT Closes #30156 from AngersZhuuuu/SPARK-33284. Lead-authored-by: angerszhu Co-authored-by: AngersZhuuuu Signed-off-by: HyukjinKwon --- docs/sql-migration-guide.md | 2 ++ .../org/apache/spark/sql/internal/SQLConf.scala | 15 +++++++++++++++ .../execution/BaseScriptTransformationExec.scala | 10 ++++++++-- 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/docs/sql-migration-guide.md b/docs/sql-migration-guide.md index fdc764a934..319e72172d 100644 --- a/docs/sql-migration-guide.md +++ b/docs/sql-migration-guide.md @@ -51,6 +51,8 @@ license: | - In Spark 3.1, loading and saving of timestamps from/to parquet files fails if the timestamps are before 1900-01-01 00:00:00Z, and loaded (saved) as the INT96 type. In Spark 3.0, the actions don't fail but might lead to shifting of the input timestamps due to rebasing from/to Julian to/from Proleptic Gregorian calendar. To restore the behavior before Spark 3.1, you can set `spark.sql.legacy.parquet.int96RebaseModeInRead` or/and `spark.sql.legacy.parquet.int96RebaseModeInWrite` to `LEGACY`. - In Spark 3.1, the `schema_of_json` and `schema_of_csv` functions return the schema in the SQL format in which field names are quoted. In Spark 3.0, the function returns a catalog string without field quoting and in lower case. + + - In Spark 3.1, when `spark.sql.legacy.transformationPadNullWhenValueLessThenSchema` is true, Spark will pad NULL value when script transformation's output value size less then schema size in default-serde mode(script transformation with row format of `ROW FORMAT DELIMITED`). If false, Spark will keep original behavior to throw `ArrayIndexOutOfBoundsException`. ## Upgrading from Spark SQL 3.0 to 3.0.1 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 21357a492e..8825f4f963 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -2765,6 +2765,18 @@ object SQLConf { .checkValue(_ > 0, "The timeout value must be positive") .createWithDefault(10L) + val LEGACY_SCRIPT_TRANSFORM_PAD_NULL = + buildConf("spark.sql.legacy.transformationPadNullWhenValueLessThenSchema") + .internal() + .doc("Whether pad null value when transformation output's value size less then " + + "schema size in default-serde mode(script transformation with row format of " + + "`ROW FORMAT DELIMITED`)." + + "When true, Spark will pad NULL value to keep same behavior with hive." + + "When false, Spark keep original behavior to throw `ArrayIndexOutOfBoundsException`") + .version("3.1.0") + .booleanConf + .createWithDefault(true) + val LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP = buildConf("spark.sql.legacy.allowCastNumericToTimestamp") .internal() @@ -3493,6 +3505,9 @@ class SQLConf extends Serializable with Logging { def legacyAllowModifyActiveSession: Boolean = getConf(StaticSQLConf.LEGACY_ALLOW_MODIFY_ACTIVE_SESSION) + def legacyPadNullWhenValueLessThenSchema: Boolean = + getConf(SQLConf.LEGACY_SCRIPT_TRANSFORM_PAD_NULL) + def legacyAllowCastNumericToTimestamp: Boolean = getConf(SQLConf.LEGACY_ALLOW_CAST_NUMERIC_TO_TIMESTAMP) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala index 74e5aa716a..f2cddc7ba7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/BaseScriptTransformationExec.scala @@ -104,10 +104,16 @@ trait BaseScriptTransformationExec extends UnaryExecNode { val reader = new BufferedReader(new InputStreamReader(inputStream, StandardCharsets.UTF_8)) val outputRowFormat = ioschema.outputRowFormatMap("TOK_TABLEROWFORMATFIELD") + + val padNull = if (conf.legacyPadNullWhenValueLessThenSchema) { + (arr: Array[String], size: Int) => arr.padTo(size, null) + } else { + (arr: Array[String], size: Int) => arr + } val processRowWithoutSerde = if (!ioschema.schemaLess) { prevLine: String => new GenericInternalRow( - prevLine.split(outputRowFormat).padTo(outputFieldWriters.size, null) + padNull(prevLine.split(outputRowFormat), outputFieldWriters.size) .zip(outputFieldWriters) .map { case (data, writer) => writer(data) }) } else { @@ -118,7 +124,7 @@ trait BaseScriptTransformationExec extends UnaryExecNode { val kvWriter = CatalystTypeConverters.createToCatalystConverter(StringType) prevLine: String => new GenericInternalRow( - prevLine.split(outputRowFormat).slice(0, 2).padTo(2, null) + padNull(prevLine.split(outputRowFormat).slice(0, 2), 2) .map(kvWriter)) }