[SPARK-3390][SQL] sqlContext.jsonRDD fails on a complex structure of JSON array and JSON object nesting
This PR aims to correctly handle JSON arrays in the type of `ArrayType(...(ArrayType(StructType)))`. JIRA: https://issues.apache.org/jira/browse/SPARK-3390. Author: Yin Huai <huai@cse.ohio-state.edu> Closes #2364 from yhuai/SPARK-3390 and squashes the following commits: 46db418 [Yin Huai] Handle JSON arrays in the type of ArrayType(...(ArrayType(StructType))).
This commit is contained in:
parent
ca83f1e2c4
commit
4bc9e046cb
|
@ -68,8 +68,15 @@ private[sql] object JsonRDD extends Logging {
|
|||
val (topLevel, structLike) = values.partition(_.size == 1)
|
||||
val topLevelFields = topLevel.filter {
|
||||
name => resolved.get(prefix ++ name).get match {
|
||||
case ArrayType(StructType(Nil), _) => false
|
||||
case ArrayType(_, _) => true
|
||||
case ArrayType(elementType, _) => {
|
||||
def hasInnerStruct(t: DataType): Boolean = t match {
|
||||
case s: StructType => false
|
||||
case ArrayType(t1, _) => hasInnerStruct(t1)
|
||||
case o => true
|
||||
}
|
||||
|
||||
hasInnerStruct(elementType)
|
||||
}
|
||||
case struct: StructType => false
|
||||
case _ => true
|
||||
}
|
||||
|
@ -84,7 +91,18 @@ private[sql] object JsonRDD extends Logging {
|
|||
val dataType = resolved.get(prefix :+ name).get
|
||||
dataType match {
|
||||
case array: ArrayType =>
|
||||
Some(StructField(name, ArrayType(structType, array.containsNull), nullable = true))
|
||||
// The pattern of this array is ArrayType(...(ArrayType(StructType))).
|
||||
// Since the inner struct of array is a placeholder (StructType(Nil)),
|
||||
// we need to replace this placeholder with the actual StructType (structType).
|
||||
def getActualArrayType(
|
||||
innerStruct: StructType,
|
||||
currentArray: ArrayType): ArrayType = currentArray match {
|
||||
case ArrayType(s: StructType, containsNull) =>
|
||||
ArrayType(innerStruct, containsNull)
|
||||
case ArrayType(a: ArrayType, containsNull) =>
|
||||
ArrayType(getActualArrayType(innerStruct, a), containsNull)
|
||||
}
|
||||
Some(StructField(name, getActualArrayType(structType, array), nullable = true))
|
||||
case struct: StructType => Some(StructField(name, structType, nullable = true))
|
||||
// dataType is StringType means that we have resolved type conflicts involving
|
||||
// primitive types and complex types. So, the type of name has been relaxed to
|
||||
|
@ -168,8 +186,7 @@ private[sql] object JsonRDD extends Logging {
|
|||
/**
|
||||
* Returns the element type of an JSON array. We go through all elements of this array
|
||||
* to detect any possible type conflict. We use [[compatibleType]] to resolve
|
||||
* type conflicts. Right now, when the element of an array is another array, we
|
||||
* treat the element as String.
|
||||
* type conflicts.
|
||||
*/
|
||||
private def typeOfArray(l: Seq[Any]): ArrayType = {
|
||||
val containsNull = l.exists(v => v == null)
|
||||
|
@ -216,18 +233,24 @@ private[sql] object JsonRDD extends Logging {
|
|||
}
|
||||
case (key: String, array: Seq[_]) => {
|
||||
// The value associated with the key is an array.
|
||||
typeOfArray(array) match {
|
||||
// Handle inner structs of an array.
|
||||
def buildKeyPathForInnerStructs(v: Any, t: DataType): Seq[(String, DataType)] = t match {
|
||||
case ArrayType(StructType(Nil), containsNull) => {
|
||||
// The elements of this arrays are structs.
|
||||
array.asInstanceOf[Seq[Map[String, Any]]].flatMap {
|
||||
v.asInstanceOf[Seq[Map[String, Any]]].flatMap {
|
||||
element => allKeysWithValueTypes(element)
|
||||
}.map {
|
||||
case (k, dataType) => (s"$key.$k", dataType)
|
||||
} :+ (key, ArrayType(StructType(Nil), containsNull))
|
||||
case (k, t) => (s"$key.$k", t)
|
||||
}
|
||||
}
|
||||
case ArrayType(elementType, containsNull) =>
|
||||
(key, ArrayType(elementType, containsNull)) :: Nil
|
||||
case ArrayType(t1, containsNull) =>
|
||||
v.asInstanceOf[Seq[Any]].flatMap {
|
||||
element => buildKeyPathForInnerStructs(element, t1)
|
||||
}
|
||||
case other => Nil
|
||||
}
|
||||
val elementType = typeOfArray(array)
|
||||
buildKeyPathForInnerStructs(array, elementType) :+ (key, elementType)
|
||||
}
|
||||
case (key: String, value) => (key, typeOfPrimitiveValue(value)) :: Nil
|
||||
}
|
||||
|
@ -339,8 +362,6 @@ private[sql] object JsonRDD extends Logging {
|
|||
null
|
||||
} else {
|
||||
desiredType match {
|
||||
case ArrayType(elementType, _) =>
|
||||
value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
|
||||
case StringType => toString(value)
|
||||
case IntegerType => value.asInstanceOf[IntegerType.JvmType]
|
||||
case LongType => toLong(value)
|
||||
|
@ -348,6 +369,10 @@ private[sql] object JsonRDD extends Logging {
|
|||
case DecimalType => toDecimal(value)
|
||||
case BooleanType => value.asInstanceOf[BooleanType.JvmType]
|
||||
case NullType => null
|
||||
|
||||
case ArrayType(elementType, _) =>
|
||||
value.asInstanceOf[Seq[Any]].map(enforceCorrectType(_, elementType))
|
||||
case struct: StructType => asRow(value.asInstanceOf[Map[String, Any]], struct)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -356,22 +381,9 @@ private[sql] object JsonRDD extends Logging {
|
|||
// TODO: Reuse the row instead of creating a new one for every record.
|
||||
val row = new GenericMutableRow(schema.fields.length)
|
||||
schema.fields.zipWithIndex.foreach {
|
||||
// StructType
|
||||
case (StructField(name, fields: StructType, _), i) =>
|
||||
row.update(i, json.get(name).flatMap(v => Option(v)).map(
|
||||
v => asRow(v.asInstanceOf[Map[String, Any]], fields)).orNull)
|
||||
|
||||
// ArrayType(StructType)
|
||||
case (StructField(name, ArrayType(structType: StructType, _), _), i) =>
|
||||
row.update(i,
|
||||
json.get(name).flatMap(v => Option(v)).map(
|
||||
v => v.asInstanceOf[Seq[Any]].map(
|
||||
e => asRow(e.asInstanceOf[Map[String, Any]], structType))).orNull)
|
||||
|
||||
// Other cases
|
||||
case (StructField(name, dataType, _), i) =>
|
||||
row.update(i, json.get(name).flatMap(v => Option(v)).map(
|
||||
enforceCorrectType(_, dataType)).getOrElse(null))
|
||||
enforceCorrectType(_, dataType)).orNull)
|
||||
}
|
||||
|
||||
row
|
||||
|
|
|
@ -591,8 +591,35 @@ class JsonSuite extends QueryTest {
|
|||
(true, "str1") :: Nil
|
||||
)
|
||||
checkAnswer(
|
||||
sql("select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1] from jsonTable"),
|
||||
sql(
|
||||
"""
|
||||
|select complexArrayOfStruct[0].field1[1].inner2[0], complexArrayOfStruct[1].field2[0][1]
|
||||
|from jsonTable
|
||||
""".stripMargin),
|
||||
("str2", 6) :: Nil
|
||||
)
|
||||
}
|
||||
|
||||
test("SPARK-3390 Complex arrays") {
|
||||
val jsonSchemaRDD = jsonRDD(complexFieldAndType2)
|
||||
jsonSchemaRDD.registerTempTable("jsonTable")
|
||||
|
||||
checkAnswer(
|
||||
sql(
|
||||
"""
|
||||
|select arrayOfArray1[0][0][0], arrayOfArray1[1][0][1], arrayOfArray1[1][1][0]
|
||||
|from jsonTable
|
||||
""".stripMargin),
|
||||
(5, 7, 8) :: Nil
|
||||
)
|
||||
checkAnswer(
|
||||
sql(
|
||||
"""
|
||||
|select arrayOfArray2[0][0][0].inner1, arrayOfArray2[1][0],
|
||||
|arrayOfArray2[1][1][1].inner2[0], arrayOfArray2[2][0][0].inner3[0][0].inner4
|
||||
|from jsonTable
|
||||
""".stripMargin),
|
||||
("str1", Nil, "str4", 2) :: Nil
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -106,6 +106,34 @@ object TestJsonData {
|
|||
"inner1": "str4"
|
||||
}],
|
||||
"field2": [[5, 6], [7, 8]]
|
||||
}]
|
||||
}],
|
||||
"arrayOfArray1": [
|
||||
[
|
||||
[5]
|
||||
],
|
||||
[
|
||||
[6, 7],
|
||||
[8]
|
||||
]],
|
||||
"arrayOfArray2": [
|
||||
[
|
||||
[
|
||||
{
|
||||
"inner1": "str1"
|
||||
}
|
||||
]
|
||||
],
|
||||
[
|
||||
[],
|
||||
[
|
||||
{"inner2": ["str3", "str33"]},
|
||||
{"inner2": ["str4"], "inner1": "str11"}
|
||||
]
|
||||
],
|
||||
[
|
||||
[
|
||||
{"inner3": [[{"inner4": 2}]]}
|
||||
]
|
||||
]]
|
||||
}""" :: Nil)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue