From 3efd8bb6cf139ce094ff631c7a9c1eb93fdcd566 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Mon, 2 Mar 2015 23:18:07 +0800 Subject: [PATCH] [SPARK-6052][SQL]In JSON schema inference, we should always set containsNull of an ArrayType to true Always set `containsNull = true` when infer the schema of JSON datasets. If we set `containsNull` based on records we scanned, we may miss arrays with null values when we do sampling. Also, because future data can have arrays with null values, if we convert JSON data to parquet, always setting `containsNull = true` is a more robust way to go. JIRA: https://issues.apache.org/jira/browse/SPARK-6052 Author: Yin Huai Closes #4806 from yhuai/jsonArrayContainsNull and squashes the following commits: 05eab9d [Yin Huai] Change containsNull to true. --- .../org/apache/spark/sql/json/JsonRDD.scala | 9 ++--- .../org/apache/spark/sql/json/JsonSuite.scala | 38 +++++++++---------- 2 files changed, 23 insertions(+), 24 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala index d83bdc2f7f..e54a2a3679 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/json/JsonRDD.scala @@ -199,13 +199,12 @@ private[sql] object JsonRDD extends Logging { * type conflicts. */ private def typeOfArray(l: Seq[Any]): ArrayType = { - val containsNull = l.exists(v => v == null) val elements = l.flatMap(v => Option(v)) if (elements.isEmpty) { // If this JSON array is empty, we use NullType as a placeholder. // If this array is not empty in other JSON objects, we can resolve // the type after we have passed through all JSON objects. - ArrayType(NullType, containsNull) + ArrayType(NullType, containsNull = true) } else { val elementType = elements.map { e => e match { @@ -217,7 +216,7 @@ private[sql] object JsonRDD extends Logging { } }.reduce((type1: DataType, type2: DataType) => compatibleType(type1, type2)) - ArrayType(elementType, containsNull) + ArrayType(elementType, containsNull = true) } } @@ -245,7 +244,7 @@ private[sql] object JsonRDD extends Logging { // The value associated with the key is an array. // Handle inner structs of an array. def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match { - case ArrayType(e: StructType, containsNull) => { + case ArrayType(e: StructType, _) => { // The elements of this arrays are structs. v.asInstanceOf[Seq[Map[String, Any]]].flatMap(Option(_)).flatMap { element => allKeysWithValueTypes(element) @@ -253,7 +252,7 @@ private[sql] object JsonRDD extends Logging { case (k, t) => (s"$key.$k", t) } } - case ArrayType(t1, containsNull) => + case ArrayType(t1, _) => v.asInstanceOf[Seq[Any]].flatMap(Option(_)).flatMap { element => buildKeyPathForInnerStructs(element, t1) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala index 005f20b96d..9d94d3406a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/json/JsonSuite.scala @@ -248,26 +248,26 @@ class JsonSuite extends QueryTest { val jsonDF = jsonRDD(complexFieldAndType1) val expectedSchema = StructType( - StructField("arrayOfArray1", ArrayType(ArrayType(StringType, false), false), true) :: - StructField("arrayOfArray2", ArrayType(ArrayType(DoubleType, false), false), true) :: - StructField("arrayOfBigInteger", ArrayType(DecimalType.Unlimited, false), true) :: - StructField("arrayOfBoolean", ArrayType(BooleanType, false), true) :: - StructField("arrayOfDouble", ArrayType(DoubleType, false), true) :: - StructField("arrayOfInteger", ArrayType(LongType, false), true) :: - StructField("arrayOfLong", ArrayType(LongType, false), true) :: + StructField("arrayOfArray1", ArrayType(ArrayType(StringType, true), true), true) :: + StructField("arrayOfArray2", ArrayType(ArrayType(DoubleType, true), true), true) :: + StructField("arrayOfBigInteger", ArrayType(DecimalType.Unlimited, true), true) :: + StructField("arrayOfBoolean", ArrayType(BooleanType, true), true) :: + StructField("arrayOfDouble", ArrayType(DoubleType, true), true) :: + StructField("arrayOfInteger", ArrayType(LongType, true), true) :: + StructField("arrayOfLong", ArrayType(LongType, true), true) :: StructField("arrayOfNull", ArrayType(StringType, true), true) :: - StructField("arrayOfString", ArrayType(StringType, false), true) :: + StructField("arrayOfString", ArrayType(StringType, true), true) :: StructField("arrayOfStruct", ArrayType( StructType( StructField("field1", BooleanType, true) :: StructField("field2", StringType, true) :: - StructField("field3", StringType, true) :: Nil), false), true) :: + StructField("field3", StringType, true) :: Nil), true), true) :: StructField("struct", StructType( StructField("field1", BooleanType, true) :: StructField("field2", DecimalType.Unlimited, true) :: Nil), true) :: StructField("structWithArrayFields", StructType( - StructField("field1", ArrayType(LongType, false), true) :: - StructField("field2", ArrayType(StringType, false), true) :: Nil), true) :: Nil) + StructField("field1", ArrayType(LongType, true), true) :: + StructField("field2", ArrayType(StringType, true), true) :: Nil), true) :: Nil) assert(expectedSchema === jsonDF.schema) @@ -487,7 +487,7 @@ class JsonSuite extends QueryTest { val jsonDF = jsonRDD(complexFieldValueTypeConflict) val expectedSchema = StructType( - StructField("array", ArrayType(LongType, false), true) :: + StructField("array", ArrayType(LongType, true), true) :: StructField("num_struct", StringType, true) :: StructField("str_array", StringType, true) :: StructField("struct", StructType( @@ -513,8 +513,8 @@ class JsonSuite extends QueryTest { val expectedSchema = StructType( StructField("array1", ArrayType(StringType, true), true) :: StructField("array2", ArrayType(StructType( - StructField("field", LongType, true) :: Nil), false), true) :: - StructField("array3", ArrayType(StringType, false), true) :: Nil) + StructField("field", LongType, true) :: Nil), true), true) :: + StructField("array3", ArrayType(StringType, true), true) :: Nil) assert(expectedSchema === jsonDF.schema) @@ -541,7 +541,7 @@ class JsonSuite extends QueryTest { val expectedSchema = StructType( StructField("a", BooleanType, true) :: StructField("b", LongType, true) :: - StructField("c", ArrayType(LongType, false), true) :: + StructField("c", ArrayType(LongType, true), true) :: StructField("d", StructType( StructField("field", BooleanType, true) :: Nil), true) :: StructField("e", StringType, true) :: Nil) @@ -835,15 +835,15 @@ class JsonSuite extends QueryTest { val schema = StructType( StructField("field1", - ArrayType(ArrayType(ArrayType(ArrayType(StringType, false), false), true), false), true) :: + ArrayType(ArrayType(ArrayType(ArrayType(StringType, true), true), true), true), true) :: StructField("field2", ArrayType(ArrayType( - StructType(StructField("Test", LongType, true) :: Nil), false), true), true) :: + StructType(StructField("Test", LongType, true) :: Nil), true), true), true) :: StructField("field3", ArrayType(ArrayType( - StructType(StructField("Test", StringType, true) :: Nil), true), false), true) :: + StructType(StructField("Test", StringType, true) :: Nil), true), true), true) :: StructField("field4", - ArrayType(ArrayType(ArrayType(LongType, false), true), false), true) :: Nil) + ArrayType(ArrayType(ArrayType(LongType, true), true), true), true) :: Nil) assert(schema === jsonDF.schema)