diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index 61ee960aad..bf32da1b71 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -242,14 +242,14 @@ private[sql] object JsonRDD extends Logging { def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match { case ArrayType(StructType(Nil), containsNull) => { // The elements of this arrays are structs. - v.asInstanceOf[Seq[Map[String, Any]]].flatMap { + v.asInstanceOf[Seq[Map[String, Any]]].flatMap(Option(_)).flatMap { element => allKeysWithValueTypes(element) }.map { case (k, t) => (s"$key.$k", t) } } case ArrayType(t1, containsNull) => - v.asInstanceOf[Seq[Any]].flatMap { + v.asInstanceOf[Seq[Any]].flatMap(Option(_)).flatMap { element => buildKeyPathForInnerStructs(element, t1) } case other => Nil diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 7bb08f1b51..41927e8369 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -208,7 +208,7 @@ class JsonSuite extends QueryTest { } test("Complex field and type inferring") { - val jsonSchemaRDD = jsonRDD(complexFieldAndType) + val jsonSchemaRDD = jsonRDD(complexFieldAndType1) val expectedSchema = StructType( StructField("arrayOfArray1", ArrayType(ArrayType(StringType, false), false), true) :: @@ -305,7 +305,7 @@ class JsonSuite extends QueryTest { } ignore("Complex field and type inferring (Ignored)") { - val jsonSchemaRDD = jsonRDD(complexFieldAndType) + val jsonSchemaRDD = jsonRDD(complexFieldAndType1) jsonSchemaRDD.registerTempTable("jsonTable") // Right now, "field1" and "field2" are treated as aliases. We should fix it. @@ -707,4 +707,35 @@ class JsonSuite extends QueryTest { TestSQLContext.setConf(SQLConf.COLUMN_NAME_OF_CORRUPT_RECORD, oldColumnNameOfCorruptRecord) } + + test("SPARK-4068: nulls in arrays") { + val jsonSchemaRDD = jsonRDD(nullsInArrays) + jsonSchemaRDD.registerTempTable("jsonTable") + + val schema = StructType( + StructField("field1", + ArrayType(ArrayType(ArrayType(ArrayType(StringType, false), false), true), false), true) :: + StructField("field2", + ArrayType(ArrayType( + StructType(StructField("Test", IntegerType, true) :: Nil), false), true), true) :: + StructField("field3", + ArrayType(ArrayType( + StructType(StructField("Test", StringType, true) :: Nil), true), false), true) :: + StructField("field4", + ArrayType(ArrayType(ArrayType(IntegerType, false), true), false), true) :: Nil) + + assert(schema === jsonSchemaRDD.schema) + + checkAnswer( + sql( + """ + |SELECT field1, field2, field3, field4 + |FROM jsonTable + """.stripMargin), + Seq(Seq(Seq(null), Seq(Seq(Seq("Test")))), null, null, null) :: + Seq(null, Seq(null, Seq(Seq(1))), null, null) :: + Seq(null, null, Seq(Seq(null), Seq(Seq("2"))), null) :: + Seq(null, null, null, Seq(Seq(null, Seq(1, 2, 3)))) :: Nil + ) + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala index eaca9f0508..c204162dd2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/TestJsonData.scala @@ -32,22 +32,6 @@ object TestJsonData { "null":null }""" :: Nil) - val complexFieldAndType = - TestSQLContext.sparkContext.parallelize( - """{"struct":{"field1": true, "field2": 92233720368547758070}, - "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, - "arrayOfString":["str1", "str2"], - "arrayOfInteger":[1, 2147483647, -2147483648], - "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], - "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], - "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], - "arrayOfBoolean":[true, false, true], - "arrayOfNull":[null, null, null, null], - "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], - "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], - "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] - }""" :: Nil) - val primitiveFieldValueTypeConflict = TestSQLContext.sparkContext.parallelize( """{"num_num_1":11, "num_num_2":null, "num_num_3": 1.1, @@ -83,6 +67,22 @@ object TestJsonData { """{"d":{"field":true}}""" :: """{"e":"str"}""" :: Nil) + val complexFieldAndType1 = + TestSQLContext.sparkContext.parallelize( + """{"struct":{"field1": true, "field2": 92233720368547758070}, + "structWithArrayFields":{"field1":[4, 5, 6], "field2":["str1", "str2"]}, + "arrayOfString":["str1", "str2"], + "arrayOfInteger":[1, 2147483647, -2147483648], + "arrayOfLong":[21474836470, 9223372036854775807, -9223372036854775808], + "arrayOfBigInteger":[922337203685477580700, -922337203685477580800], + "arrayOfDouble":[1.2, 1.7976931348623157E308, 4.9E-324, 2.2250738585072014E-308], + "arrayOfBoolean":[true, false, true], + "arrayOfNull":[null, null, null, null], + "arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], + "arrayOfArray1":[[1, 2, 3], ["str1", "str2"]], + "arrayOfArray2":[[1, 2, 3], [1.1, 2.1, 3.1]] + }""" :: Nil) + val complexFieldAndType2 = TestSQLContext.sparkContext.parallelize( """{"arrayOfStruct":[{"field1": true, "field2": "str1"}, {"field1": false}, {"field3": null}], @@ -137,6 +137,13 @@ object TestJsonData { ]] }""" :: Nil) + val nullsInArrays = + TestSQLContext.sparkContext.parallelize( + """{"field1":[[null], [[["Test"]]]]}""" :: + """{"field2":[null, [{"Test":1}]]}""" :: + """{"field3":[[null], [{"Test":"2"}]]}""" :: + """{"field4":[[null, [1,2,3]]]}""" :: Nil) + val jsonArray = TestSQLContext.sparkContext.parallelize( """[{"a":"str_a_1"}]""" ::