[SPARK-4068][SQL] NPE in jsonRDD schema inference

Please refer to added tests for cases that can trigger the bug.

JIRA: https://issues.apache.org/jira/browse/SPARK-4068

Author: Yin Huai <huai@cse.ohio-state.edu>

Closes #2918 from yhuai/SPARK-4068 and squashes the following commits:

d360eae [Yin Huai] Handle nulls when building key paths from elements of an array.
This commit is contained in:
Yin Huai 2014-10-26 16:32:02 -07:00 committed by Michael Armbrust
parent 05308426f0
commit 0481aaa8d7
3 changed files with 58 additions and 20 deletions

View file

@ -242,14 +242,14 @@ private[sql] object JsonRDD extends Logging {
def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match {
case ArrayType(StructType(Nil), containsNull) => {
// The elements of this arrays are structs.
v.asInstanceOf[Seq[Map[String, Any]]].flatMap {
v.asInstanceOf[Seq[Map[String, Any]]].flatMap(Option(_)).flatMap {
element => allKeysWithValueTypes(element)
}.map {
case (k, t) => (s"$key.$k", t)
}
}
case ArrayType(t1, containsNull) =>
v.asInstanceOf[Seq[Any]].flatMap {
v.asInstanceOf[Seq[Any]].flatMap(Option(_)).flatMap {
element => buildKeyPathForInnerStructs(element, t1)
}
case other => Nil

View file

@ -208,7 +208,7 @@ class JsonSuite extends QueryTest {
}
test("Complex field and type inferring") {
val jsonSchemaRDD = jsonRDD(complexFieldAndType)
val jsonSchemaRDD = jsonRDD(complexFieldAndType1)
val expectedSchema = StructType(
StructField("arrayOfArray1", ArrayType(ArrayType(StringType, false), false), true) ::
@ -305,7 +305,7 @@ class JsonSuite extends QueryTest {
}
ignore("Complex field and type inferring (Ignored)") {
val jsonSchemaRDD = jsonRDD(complexFieldAndType)
val jsonSchemaRDD = jsonRDD(complexFieldAndType1)
jsonSchemaRDD.registerTempTable("jsonTable")
// Right now, "field1" and "field2" are treated as aliases. We should fix it.
@ -707,4 +707,35 @@ class JsonSuite extends QueryTest {
TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord)
}
test("SPARK-4068: nulls in arrays") {
val jsonSchemaRDD = jsonRDD(nullsInArrays)
jsonSchemaRDD.registerTempTable("jsonTable")
val schema = StructType(
StructField("field1",
ArrayType(ArrayType(ArrayType(ArrayType(StringType, false), false), true), false), true) ::
StructField("field2",
ArrayType(ArrayType(
StructType(StructField("Test", IntegerType, true) :: Nil), false), true), true) ::
StructField("field3",
ArrayType(ArrayType(
StructType(StructField("Test", StringType, true) :: Nil), true), false), true) ::
StructField("field4",
ArrayType(ArrayType(ArrayType(IntegerType, false), true), false), true) :: Nil)
assert(schema === jsonSchemaRDD.schema)
checkAnswer(
sql(
"""
|SELECT field1, field2, field3, field4
|FROM jsonTable
""".stripMargin),
Seq(Seq(Seq(null), Seq(Seq(Seq("Test")))), null, null, null) ::
Seq(null, Seq(null, Seq(Seq(1))), null, null) ::
Seq(null, null, Seq(Seq(null), Seq(Seq("2"))), null) ::
Seq(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil
)
}
}

View file

@ -32,22 +32,6 @@ object TestJsonData {
"null":null
}""" :: Nil)
val complexFieldAndType =
TestSQLContext.sparkContext.parallelize(
"""{"struct":{"field1": true, "field2": 92233720368547758070},
"structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
"arrayOfString":["str1", "str2"],
"arrayOfInteger":[1, 2147483647, -2147483648],
"arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808],
"arrayOfBigInteger":[922337203685477580700, -922337203685477580800],
"arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308],
"arrayOfBoolean":[true, false, true],
"arrayOfNull":[null, null, null, null],
"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
"arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
}""" :: Nil)
val primitiveFieldValueTypeConflict =
TestSQLContext.sparkContext.parallelize(
"""{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1,
@ -83,6 +67,22 @@ object TestJsonData {
"""{"d":{"field":true}}""" ::
"""{"e":"str"}""" :: Nil)
val complexFieldAndType1 =
TestSQLContext.sparkContext.parallelize(
"""{"struct":{"field1": true, "field2": 92233720368547758070},
"structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]},
"arrayOfString":["str1", "str2"],
"arrayOfInteger":[1, 2147483647, -2147483648],
"arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808],
"arrayOfBigInteger":[922337203685477580700, -922337203685477580800],
"arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308],
"arrayOfBoolean":[true, false, true],
"arrayOfNull":[null, null, null, null],
"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
"arrayOfArray1":[[1, 2, 3], ["str1", "str2"]],
"arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]]
}""" :: Nil)
val complexFieldAndType2 =
TestSQLContext.sparkContext.parallelize(
"""{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}],
@ -137,6 +137,13 @@ object TestJsonData {
]]
}""" :: Nil)
val nullsInArrays =
TestSQLContext.sparkContext.parallelize(
"""{"field1":[[null], [[["Test"]]]]}""" ::
"""{"field2":[null, [{"Test":1}]]}""" ::
"""{"field3":[[null], [{"Test":"2"}]]}""" ::
"""{"field4":[[null, [1,2,3]]]}""" :: Nil)
val jsonArray =
TestSQLContext.sparkContext.parallelize(
"""[{"a":"str_a_1"}]""" ::