From 9eec11b9567c033b10a6d70340e63d6270b481e8 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 2 Aug 2021 10:01:12 -0700 Subject: [PATCH] [SPARK-36379][SQL] Null at root level of a JSON array should not fail w/ permissive mode This PR proposes to fail properly so JSON parser can proceed and parse the input with the permissive mode. Previously, we passed `null`s as are, the root `InternalRow`s became `null`s, and it causes the query fails even with permissive mode on. Now, we fail explicitly if `null` is passed when the input array contains `null`. Note that this is consistent with non-array JSON input: **Permissive mode:** ```scala spark.read.json(Seq("""{"a": "str"}""", """null""").toDS).collect() ``` ``` res0: Array[org.apache.spark.sql.Row] = Array([str], [null]) ``` **Failfast mode**: ```scala spark.read.option("mode", "failfast").json(Seq("""{"a": "str"}""", """null""").toDS).collect() ``` ``` org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70) at org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) ``` To make the permissive mode to proceed and parse without throwing an exception. **Permissive mode:** ```scala spark.read.json(Seq("""[{"a": "str"}, null]""").toDS).collect() ``` Before: ``` java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) ``` After: ``` res0: Array[org.apache.spark.sql.Row] = Array([null]) ``` NOTE that this behaviour is consistent when JSON object is malformed: ```scala spark.read.schema("a int").json(Seq("""[{"a": 123}, {123123}, {"a": 123}]""").toDS).collect() ``` ``` res0: Array[org.apache.spark.sql.Row] = Array([null]) ``` Since we're parsing _one_ JSON array, related records all fail together. **Failfast mode:** ```scala spark.read.option("mode", "failfast").json(Seq("""[{"a": "str"}, null]""").toDS).collect() ``` Before: ``` java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759) ``` After: ``` org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'. at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70) at org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484) ``` Manually tested, and unit test was added. Closes #33608 from HyukjinKwon/SPARK-36379. Authored-by: Hyukjin Kwon Signed-off-by: Dongjoon Hyun (cherry picked from commit 0bbcbc65080cd67a9997f49906d9d48fdf21db10) Signed-off-by: Dongjoon Hyun --- .../spark/sql/catalyst/json/JacksonParser.scala | 9 ++++++--- .../sql/execution/datasources/json/JsonSuite.scala | 14 ++++++++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala index 8a1191c5b7..04a0f1a6fc 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala @@ -108,7 +108,7 @@ class JacksonParser( // List([str_a_2,null], [null,str_b_3]) // case START_ARRAY if allowArrayAsStructs => - val array = convertArray(parser, elementConverter) + val array = convertArray(parser, elementConverter, isRoot = true) // Here, as we support reading top level JSON arrays and take every element // in such an array as a row, this case is possible. if (array.numElements() == 0) { @@ -450,10 +450,13 @@ class JacksonParser( */ private def convertArray( parser: JsonParser, - fieldConverter: ValueConverter): ArrayData = { + fieldConverter: ValueConverter, + isRoot: Boolean = false): ArrayData = { val values = ArrayBuffer.empty[Any] while (nextUntil(parser, JsonToken.END_ARRAY)) { - values += fieldConverter.apply(parser) + val v = fieldConverter.apply(parser) + if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError() + values += v } new GenericArrayData(values.toArray) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala index dab1255eea..58921485b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala @@ -2919,6 +2919,20 @@ abstract class JsonSuite } } } + + test("SPARK-36379: proceed parsing with root nulls in permissive mode") { + assert(intercept[SparkException] { + spark.read.option("mode", "failfast") + .schema("a string").json(Seq("""[{"a": "str"}, null]""").toDS).collect() + }.getMessage.contains("Malformed records are detected")) + + // Permissive modes should proceed parsing malformed records (null). + // Here, since an array fails to parse in the middle, we will return one row. + checkAnswer( + spark.read.option("mode", "permissive") + .json(Seq("""[{"a": "str"}, null, {"a": "str"}]""").toDS), + Row(null) :: Nil) + } } class JsonV1Suite extends JsonSuite {