spark-instrumented-optimizer/mllib
Peter Toth ab8a9a0ceb [SPARK-34545][SQL] Fix issues with valueCompare feature of pyrolite
### What changes were proposed in this pull request?

pyrolite 4.21 introduced and enabled value comparison by default (`valueCompare=true`) during object memoization and serialization: https://github.com/irmen/Pyrolite/blob/pyrolite-4.21/java/src/main/java/net/razorvine/pickle/Pickler.java#L112-L122
This change has undesired effect when we serialize a row (actually `GenericRowWithSchema`) to be passed to python: https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala#L60. A simple example is that
```
new GenericRowWithSchema(Array(1.0, 1.0), StructType(Seq(StructField("_1", DoubleType), StructField("_2", DoubleType))))
```
and
```
new GenericRowWithSchema(Array(1, 1), StructType(Seq(StructField("_1", IntegerType), StructField("_2", IntegerType))))
```
are currently equal and the second instance is replaced to the short code of the first one during serialization.

### Why are the changes needed?
The above can cause nasty issues like the one in https://issues.apache.org/jira/browse/SPARK-34545 description:

```
>>> from pyspark.sql.functions import udf
>>> from pyspark.sql.types import *
>>>
>>> def udf1(data_type):
        def u1(e):
            return e[0]
        return udf(u1, data_type)
>>>
>>> df = spark.createDataFrame([((1.0, 1.0), (1, 1))], ['c1', 'c2'])
>>>
>>> df = df.withColumn("c3", udf1(DoubleType())("c1"))
>>> df = df.withColumn("c4", udf1(IntegerType())("c2"))
>>>
>>> df.select("c3").show()
+---+
| c3|
+---+
|1.0|
+---+

>>> df.select("c4").show()
+---+
| c4|
+---+
|  1|
+---+

>>> df.select("c3", "c4").show()
+---+----+
| c3|  c4|
+---+----+
|1.0|null|
+---+----+
```
This is because during serialization from JVM to Python `GenericRowWithSchema(1.0, 1.0)` (`c1`) is memoized first and when `GenericRowWithSchema(1, 1)` (`c2`) comes next, it is replaced to some short code of the `c1` (instead of serializing `c2` out) as they are `equal()`. The python functions then runs but the return type of `c4` is expected to be `IntegerType` and if a different type (`DoubleType`) comes back from python then it is discarded: https://github.com/apache/spark/blob/branch-3.0/sql/core/src/main/scala/org/apache/spark/sql/execution/python/EvaluatePython.scala#L108-L113

After this PR:
```
>>> df.select("c3", "c4").show()
+---+---+
| c3| c4|
+---+---+
|1.0|  1|
+---+---+
```

### Does this PR introduce _any_ user-facing change?
Yes, fixes a correctness issue.

### How was this patch tested?
Added new UT + manual tests.

Closes #31682 from peter-toth/SPARK-34545-fix-row-comparison.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
2021-03-07 19:12:42 -06:00
..
benchmarks [SPARK-29297][TESTS] Compare core/mllib module benchmarks in JDK8/11 2019-09-29 21:43:58 -07:00
src [SPARK-34545][SQL] Fix issues with valueCompare feature of pyrolite 2021-03-07 19:12:42 -06:00
pom.xml [SPARK-33662][BUILD] Setting version to 3.2.0-SNAPSHOT 2020-12-04 14:10:42 -08:00