spark-instrumented-optimizer/python/pyspark/sql/tests
Dilip Biswal 7f44c9a252 [SPARK-26864][SQL] Query may return incorrect result when python udf is used as a join condition and the udf uses attributes from both legs of left semi join.
## What changes were proposed in this pull request?
In SPARK-25314, we supported the scenario of having a python UDF that refers to attributes from both legs of a join condition by rewriting the plan to convert an inner join or left semi join to a filter over a cross join. In case of left semi join, this transformation may cause incorrect results when the right leg of join condition produces duplicate rows based on the join condition. This fix disallows the rewrite for left semi join and raises an error in the case like we do for other types of join. In future, we should have separate rule in optimizer to convert left semi join to inner join (I am aware of one case we could do it if we leverage informational constraint i.e when we know the right side does not produce duplicates).

**Python**

```SQL
>>> from pyspark import SparkContext
>>> from pyspark.sql import SparkSession, Column, Row
>>> from pyspark.sql.functions import UserDefinedFunction, udf
>>> from pyspark.sql.types import *
>>> from pyspark.sql.utils import AnalysisException
>>>
>>> spark.conf.set("spark.sql.crossJoin.enabled", "True")
>>> left = spark.createDataFrame([Row(lc1=1, lc2=1), Row(lc1=2, lc2=2)])
>>> right = spark.createDataFrame([Row(rc1=1, rc2=1), Row(rc1=1, rc2=1)])
>>> func = udf(lambda a, b: a == b, BooleanType())
>>> df = left.join(right, func("lc1", "rc1"), "leftsemi").show()
19/02/12 16:07:10 WARN PullOutPythonUDFInJoinCondition: The join condition:<lambda>(lc1#0L, rc1#4L) of the join plan contains PythonUDF only, it will be moved out and the join plan will be turned to cross join.
+---+---+
|lc1|lc2|
+---+---+
|  1|  1|
|  1|  1|
+---+---+
```

**Scala**

```SQL
scala> val left = Seq((1, 1), (2, 2)).toDF("lc1", "lc2")
left: org.apache.spark.sql.DataFrame = [lc1: int, lc2: int]

scala> val right = Seq((1, 1), (1, 1)).toDF("rc1", "rc2")
right: org.apache.spark.sql.DataFrame = [rc1: int, rc2: int]

scala> val equal = udf((p1: Integer, p2: Integer) => {
     |   p1 == p2
     | })
equal: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2141/11016292394666f1b5,BooleanType,List(Some(Schema(IntegerType,true)), Some(Schema(IntegerType,true))),None,false,true)

scala> val df = left.join(right, equal(col("lc1"), col("rc1")), "leftsemi")
df: org.apache.spark.sql.DataFrame = [lc1: int, lc2: int]

scala> df.show()
+---+---+
|lc1|lc2|
+---+---+
|  1|  1|
+---+---+

```

## How was this patch tested?
Modified existing tests.

Closes #23769 from dilipbiswal/dkb_python_udf_in_join.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
2019-02-13 21:14:19 +08:00
..
__init__.py
test_appsubmit.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_arrow.py [SPARK-26566][PYTHON][SQL] Upgrade Apache Arrow to version 0.12.0 2019-01-29 14:18:45 +08:00
test_catalog.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_column.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_conf.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_context.py [SPARK-26676][PYTHON] Make HiveContextSQLTests.test_unbounded_frames test compatible with Python 2 and PyPy 2019-01-21 14:27:17 -08:00
test_dataframe.py [SPARK-23647][PYTHON][SQL] Adds more types for hint in pyspark 2018-12-01 10:37:03 +08:00
test_datasources.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_functions.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_group.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_pandas_udf.py [SPARK-25811][PYSPARK] Raise a proper error when unsafe cast is detected by PyArrow 2019-01-22 14:54:41 +08:00
test_pandas_udf_grouped_agg.py [SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf* 2018-12-14 10:45:24 +08:00
test_pandas_udf_grouped_map.py [SPARK-26566][PYTHON][SQL] Upgrade Apache Arrow to version 0.12.0 2019-01-29 14:18:45 +08:00
test_pandas_udf_scalar.py [SPARK-26364][PYTHON][TESTING] Clean up imports in test_pandas_udf* 2018-12-14 10:45:24 +08:00
test_pandas_udf_window.py [SPARK-24561][SQL][PYTHON] User-defined window aggregation functions with Pandas UDF (bounded window) 2018-12-18 09:15:21 +08:00
test_readwriter.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_serde.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_session.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_streaming.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00
test_types.py [SPARK-26645][PYTHON] Support decimals with negative scale when parsing datatype 2019-01-20 17:43:50 +08:00
test_udf.py [SPARK-26864][SQL] Query may return incorrect result when python udf is used as a join condition and the udf uses attributes from both legs of left semi join. 2019-02-13 21:14:19 +08:00
test_utils.py [SPARK-26036][PYTHON] Break large tests.py files into smaller files 2018-11-15 12:30:52 +08:00