[SPARK-36379][SQL] Null at root level of a JSON array should not fail w/ permissive mode

This PR proposes to fail properly so JSON parser can proceed and parse the input with the permissive mode.
Previously, we passed `null`s as are, the root `InternalRow`s became `null`s, and it causes the query fails even with permissive mode on.
Now, we fail explicitly if `null` is passed when the input array contains `null`.

Note that this is consistent with non-array JSON input:

**Permissive mode:**

```scala
spark.read.json(Seq("""{"a": "str"}""", """null""").toDS).collect()
```
```
res0: Array[org.apache.spark.sql.Row] = Array([str], [null])
```

**Failfast mode**:

```scala
spark.read.option("mode", "failfast").json(Seq("""{"a": "str"}""", """null""").toDS).collect()
```
```
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
	at org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
```

To make the permissive mode to proceed and parse without throwing an exception.

**Permissive mode:**

```scala
spark.read.json(Seq("""[{"a": "str"}, null]""").toDS).collect()
```

Before:

```
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
```

After:

```
res0: Array[org.apache.spark.sql.Row] = Array([null])
```

NOTE that this behaviour is consistent when JSON object is malformed:

```scala
spark.read.schema("a int").json(Seq("""[{"a": 123}, {123123}, {"a": 123}]""").toDS).collect()
```

```
res0: Array[org.apache.spark.sql.Row] = Array([null])
```

Since we're parsing _one_ JSON array, related records all fail together.

**Failfast mode:**

```scala
spark.read.option("mode", "failfast").json(Seq("""[{"a": "str"}, null]""").toDS).collect()
```

Before:

```
java.lang.NullPointerException
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
```

After:

```
org.apache.spark.SparkException: Malformed records are detected in record parsing. Parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
	at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:70)
	at org.apache.spark.sql.DataFrameReader.$anonfun$json$7(DataFrameReader.scala:540)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
```

Manually tested, and unit test was added.

Closes #33608 from HyukjinKwon/SPARK-36379.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 0bbcbc6508)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
Hyukjin Kwon 2021-08-02 10:01:12 -07:00 committed by Dongjoon Hyun
parent ea559adc2e
commit 9eec11b956
2 changed files with 20 additions and 3 deletions

View file

@ -108,7 +108,7 @@ class JacksonParser(
// List([str_a_2,null], [null,str_b_3])
//
case START_ARRAY if allowArrayAsStructs =>
val array = convertArray(parser, elementConverter)
val array = convertArray(parser, elementConverter, isRoot = true)
// Here, as we support reading top level JSON arrays and take every element
// in such an array as a row, this case is possible.
if (array.numElements() == 0) {
@ -450,10 +450,13 @@ class JacksonParser(
*/
private def convertArray(
parser: JsonParser,
fieldConverter: ValueConverter): ArrayData = {
fieldConverter: ValueConverter,
isRoot: Boolean = false): ArrayData = {
val values = ArrayBuffer.empty[Any]
while (nextUntil(parser, JsonToken.END_ARRAY)) {
values += fieldConverter.apply(parser)
val v = fieldConverter.apply(parser)
if (isRoot && v == null) throw QueryExecutionErrors.rootConverterReturnNullError()
values += v
}
new GenericArrayData(values.toArray)

View file

@ -2919,6 +2919,20 @@ abstract class JsonSuite
}
}
}
test("SPARK-36379: proceed parsing with root nulls in permissive mode") {
assert(intercept[SparkException] {
spark.read.option("mode", "failfast")
.schema("a string").json(Seq("""[{"a": "str"}, null]""").toDS).collect()
}.getMessage.contains("Malformed records are detected"))
// Permissive modes should proceed parsing malformed records (null).
// Here, since an array fails to parse in the middle, we will return one row.
checkAnswer(
spark.read.option("mode", "permissive")
.json(Seq("""[{"a": "str"}, null, {"a": "str"}]""").toDS),
Row(null) :: Nil)
}
}
class JsonV1Suite extends JsonSuite {