[SPARK-35653][SQL] Fix CatalystToExternalMap interpreted path fails for Map with case classes as keys or values

### What changes were proposed in this pull request?
Use the key/value LambdaFunction to convert the elements instead of
using CatalystTypeConverters.createToScalaConverter. This is how it is
done in MapObjects and that correctly handles Arrays with case classes.

### Why are the changes needed?
Before these changes the added test cases would fail with the following:
```
[info] - encode/decode for map with case class as value: Map(1 -> IntAndString(1,a)) (interpreted path) *** FAILED *** (64 milliseconds)
[info]   Encoded/Decoded data does not match input data
[info]
[info]   in:  Map(1 -> IntAndString(1,a))
[info]   out: Map(1 -> [1,a])
[info]   types: scala.collection.immutable.Map$Map1 [info]
[info]   Encoded Data: [org.apache.spark.sql.catalyst.expressions.UnsafeMapData5ecf5d9e]
[info]   Schema: value#823
[info]   root
[info]   -- value: map (nullable = true)
[info]       |-- key: integer
[info]       |-- value: struct (valueContainsNull = true)
[info]       |    |-- i: integer (nullable = false)
[info]       |    |-- s: string (nullable = true)
[info]
[info]
[info]   fromRow Expressions:
[info]   catalysttoexternalmap(lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178), lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179), if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString), input[0, map<int,struct<i:int,s:string>>, true], interface scala.collection.immutable.Map
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178)
[info]   :- lambdavariable(CatalystToExternalMap_key, IntegerType, false, 178)
[info]   :- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :- if (isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))) null else newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :  :- isnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179))
[info]   :  :  +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :  :- null
[info]   :  +- newInstance(class org.apache.spark.sql.catalyst.encoders.IntAndString)
[info]   :     :- assertnotnull(lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i)
[info]   :     :  +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).i
[info]   :     :     +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   :     +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s.toString
[info]   :        +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179).s
[info]   :           +- lambdavariable(CatalystToExternalMap_value, StructField(i,IntegerType,false), StructField(s,StringType,true), true, 179)
[info]   +- input[0, map<int,struct<i:int,s:string>>, true] (ExpressionEncoderSuite.scala:627)
```
So using a map with cases classes for keys or values and using the interpreted path would incorrect deserialize data from the catalyst representation.

### Does this PR introduce _any_ user-facing change?
Yes, it fixes the bug.

### How was this patch tested?
Existing and new unit tests in the ExpressionEncoderSuite

Closes #32783 from eejbyfeldt/fix-interpreted-path-for-map-with-case-classes.

Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Emil Ejbyfeldt 2021-06-10 09:37:27 -07:00 committed by Liang-Chi Hsieh
parent 4180692135
commit e2e3fe7782
2 changed files with 11 additions and 8 deletions

View file

@ -29,7 +29,7 @@ import org.apache.commons.lang3.reflect.MethodUtils
import org.apache.spark.{SparkConf, SparkEnv}
import org.apache.spark.serializer._
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection}
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.codegen._
@ -1189,11 +1189,6 @@ case class CatalystToExternalMap private(
private lazy val inputMapType = inputData.dataType.asInstanceOf[MapType]
private lazy val keyConverter =
CatalystTypeConverters.createToScalaConverter(inputMapType.keyType)
private lazy val valueConverter =
CatalystTypeConverters.createToScalaConverter(inputMapType.valueType)
private lazy val (newMapBuilderMethod, moduleField) = {
val clazz = Utils.classForName(collClass.getCanonicalName + "$")
(clazz.getMethod("newBuilder"), clazz.getField("MODULE$").get(null))
@ -1210,10 +1205,13 @@ case class CatalystToExternalMap private(
builder.sizeHint(result.numElements())
val keyArray = result.keyArray()
val valueArray = result.valueArray()
val row = new GenericInternalRow(1)
var i = 0
while (i < result.numElements()) {
val key = keyConverter(keyArray.get(i, inputMapType.keyType))
val value = valueConverter(valueArray.get(i, inputMapType.valueType))
row.update(0, keyArray.get(i, inputMapType.keyType))
val key = keyLambdaFunction.eval(row)
row.update(0, valueArray.get(i, inputMapType.valueType))
val value = valueLambdaFunction.eval(row)
builder += Tuple2(key, value)
i += 1
}

View file

@ -114,6 +114,7 @@ case class ReferenceValueClass(wrapped: ReferenceValueClass.Container) extends A
object ReferenceValueClass {
case class Container(data: Int)
}
case class IntAndString(i: Int, s: String)
class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTest {
OuterScopes.addOuterScope(this)
@ -174,6 +175,10 @@ class ExpressionEncoderSuite extends CodegenInterpretedPlanTest with AnalysisTes
encodeDecodeTest(Map(1 -> "a", 2 -> "b"), "map")
encodeDecodeTest(Map(1 -> "a", 2 -> null), "map with null")
encodeDecodeTest(Map(1 -> Map("a" -> 1), 2 -> Map("b" -> 2)), "map of map")
encodeDecodeTest(Map(1 -> IntAndString(1, "a")), "map with case class as value")
encodeDecodeTest(Map(IntAndString(1, "a") -> 1), "map with case class as key")
encodeDecodeTest(Map(IntAndString(1, "a") -> IntAndString(2, "b")),
"map with case class as key and value")
encodeDecodeTest(Tuple1[Seq[Int]](null), "null seq in tuple")
encodeDecodeTest(Tuple1[Map[String, String]](null), "null map in tuple")