[SPARK-19104][SQL] Lambda variables in ExternalMapToCatalyst should be global
## What changes were proposed in this pull request? The issue happens in `ExternalMapToCatalyst`. For example, the following codes create `ExternalMapToCatalyst` to convert Scala Map to catalyst map format. val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100)))) val ds = spark.createDataset(data) The `valueConverter` in `ExternalMapToCatalyst` looks like: if (isnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true))) null else named_struct(name, staticinvoke(class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).name, true), value, assertnotnull(lambdavariable(ExternalMapToCatalyst_value52, ExternalMapToCatalyst_value_isNull52, ObjectType(class org.apache.spark.sql.InnerData), true)).value) There is a `CreateNamedStruct` expression (`named_struct`) to create a row of `InnerData.name` and `InnerData.value` that are referred by `ExternalMapToCatalyst_value52`. Because `ExternalMapToCatalyst_value52` are local variable, when `CreateNamedStruct` splits expressions to individual functions, the local variable can't be accessed anymore. ## How was this patch tested? Jenkins tests. Author: Liang-Chi Hsieh <viirya@gmail.com> Closes #18418 from viirya/SPARK-19104.
This commit is contained in:
parent
b32bd005e4
commit
fd8c931a30
|
@ -911,6 +911,12 @@ case class ExternalMapToCatalyst private(
|
|||
val entry = ctx.freshName("entry")
|
||||
val entries = ctx.freshName("entries")
|
||||
|
||||
val keyElementJavaType = ctx.javaType(keyType)
|
||||
val valueElementJavaType = ctx.javaType(valueType)
|
||||
ctx.addMutableState(keyElementJavaType, key, "")
|
||||
ctx.addMutableState("boolean", valueIsNull, "")
|
||||
ctx.addMutableState(valueElementJavaType, value, "")
|
||||
|
||||
val (defineEntries, defineKeyValue) = child.dataType match {
|
||||
case ObjectType(cls) if classOf[java.util.Map[_, _]].isAssignableFrom(cls) =>
|
||||
val javaIteratorCls = classOf[java.util.Iterator[_]].getName
|
||||
|
@ -922,8 +928,8 @@ case class ExternalMapToCatalyst private(
|
|||
val defineKeyValue =
|
||||
s"""
|
||||
final $javaMapEntryCls $entry = ($javaMapEntryCls) $entries.next();
|
||||
${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry.getKey();
|
||||
${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry.getValue();
|
||||
$key = (${ctx.boxedType(keyType)}) $entry.getKey();
|
||||
$value = (${ctx.boxedType(valueType)}) $entry.getValue();
|
||||
"""
|
||||
|
||||
defineEntries -> defineKeyValue
|
||||
|
@ -937,17 +943,17 @@ case class ExternalMapToCatalyst private(
|
|||
val defineKeyValue =
|
||||
s"""
|
||||
final $scalaMapEntryCls $entry = ($scalaMapEntryCls) $entries.next();
|
||||
${ctx.javaType(keyType)} $key = (${ctx.boxedType(keyType)}) $entry._1();
|
||||
${ctx.javaType(valueType)} $value = (${ctx.boxedType(valueType)}) $entry._2();
|
||||
$key = (${ctx.boxedType(keyType)}) $entry._1();
|
||||
$value = (${ctx.boxedType(valueType)}) $entry._2();
|
||||
"""
|
||||
|
||||
defineEntries -> defineKeyValue
|
||||
}
|
||||
|
||||
val valueNullCheck = if (ctx.isPrimitiveType(valueType)) {
|
||||
s"boolean $valueIsNull = false;"
|
||||
s"$valueIsNull = false;"
|
||||
} else {
|
||||
s"boolean $valueIsNull = $value == null;"
|
||||
s"$valueIsNull = $value == null;"
|
||||
}
|
||||
|
||||
val arrayCls = classOf[GenericArrayData].getName
|
||||
|
|
|
@ -39,6 +39,9 @@ case class ComplexClass(seq: SeqClass, list: ListClass, queue: QueueClass)
|
|||
|
||||
case class ComplexMapClass(map: MapClass, lhmap: LHMapClass)
|
||||
|
||||
case class InnerData(name: String, value: Int)
|
||||
case class NestedData(id: Int, param: Map[String, InnerData])
|
||||
|
||||
package object packageobject {
|
||||
case class PackageClass(value: Int)
|
||||
}
|
||||
|
@ -354,4 +357,9 @@ class DatasetPrimitiveSuite extends QueryTest with SharedSQLContext {
|
|||
checkDataset(Seq(PackageClass(1)).toDS(), PackageClass(1))
|
||||
}
|
||||
|
||||
test("SPARK-19104: Lambda variables in ExternalMapToCatalyst should be global") {
|
||||
val data = Seq.tabulate(10)(i => NestedData(1, Map("key" -> InnerData("name", i + 100))))
|
||||
val ds = spark.createDataset(data)
|
||||
checkDataset(ds, data: _*)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue