[SPARK-2875] [PySpark] [SQL] handle null in schemaRDD()

Handle null in schemaRDD during converting them into Python.

Author: Davies Liu <davies.liu@gmail.com>

Closes #1802 from davies/json and squashes the following commits:

88e6b1f [Davies Liu] handle null in schemaRDD()
This commit is contained in:
Davies Liu 2014-08-06 11:08:12 -07:00 committed by Michael Armbrust
parent 09f7e4587b
commit 48789117c2
2 changed files with 23 additions and 11 deletions

View file

@ -1231,6 +1231,13 @@ class SQLContext:
... "field3.field5[0] as f3 from table3")
>>> srdd6.collect()
[Row(f1=u'row1', f2=None,...Row(f1=u'row3', f2=[], f3=None)]
>>> sqlCtx.jsonRDD(sc.parallelize(['{}',
... '{"key0": {"key1": "value1"}}'])).collect()
[Row(key0=None), Row(key0=Row(key1=u'value1'))]
>>> sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}',
... '{"key0": {"key1": "value1"}}'])).collect()
[Row(key0=None), Row(key0=Row(key1=u'value1'))]
"""
def func(iterator):

View file

@ -382,21 +382,26 @@ class SchemaRDD(
private[sql] def javaToPython: JavaRDD[Array[Byte]] = {
import scala.collection.Map
def toJava(obj: Any, dataType: DataType): Any = dataType match {
case struct: StructType => rowToArray(obj.asInstanceOf[Row], struct)
case array: ArrayType => obj match {
case seq: Seq[Any] => seq.map(x => toJava(x, array.elementType)).asJava
case list: JList[_] => list.map(x => toJava(x, array.elementType)).asJava
case arr if arr != null && arr.getClass.isArray =>
def toJava(obj: Any, dataType: DataType): Any = (obj, dataType) match {
case (null, _) => null
case (obj: Row, struct: StructType) => rowToArray(obj, struct)
case (seq: Seq[Any], array: ArrayType) =>
seq.map(x => toJava(x, array.elementType)).asJava
case (list: JList[_], array: ArrayType) =>
list.map(x => toJava(x, array.elementType)).asJava
case (arr, array: ArrayType) if arr.getClass.isArray =>
arr.asInstanceOf[Array[Any]].map(x => toJava(x, array.elementType))
case other => other
}
case mt: MapType => obj.asInstanceOf[Map[_, _]].map {
case (obj: Map[_, _], mt: MapType) => obj.map {
case (k, v) => (k, toJava(v, mt.valueType)) // key should be primitive type
}.asJava
// Pyrolite can handle Timestamp
case other => obj
case (other, _) => other
}
def rowToArray(row: Row, structType: StructType): Array[Any] = {
val fields = structType.fields.map(field => field.dataType)
row.zip(fields).map {