spark-instrumented-optimizer/python/pyspark/sql
gatorsmile 422a45cf04 [SPARK-18766][SQL] Push Down Filter Through BatchEvalPython (Python UDF)
### What changes were proposed in this pull request?
Currently, when users use Python UDF in Filter, BatchEvalPython is always generated below FilterExec. However, not all the predicates need to be evaluated after Python UDF execution. Thus, this PR is to push down the determinisitc predicates through `BatchEvalPython`.
```Python
>>> df = spark.createDataFrame([(1, "1"), (2, "2"), (1, "2"), (1, "2")], ["key", "value"])
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import BooleanType
>>> my_filter = udf(lambda a: a < 2, BooleanType())
>>> sel = df.select(col("key"), col("value")).filter((my_filter(col("key"))) & (df.value < "2"))
>>> sel.explain(True)
```
Before the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]

== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter ((isnotnull(value#1) && pythonUDF0#9) && (value#1 < 2))
   +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
      +- Scan ExistingRDD[key#0L,value#1]
```

After the fix, the plan looks like
```
== Optimized Logical Plan ==
Filter ((isnotnull(value#1) && <lambda>(key#0L)) && (value#1 < 2))
+- LogicalRDD [key#0L, value#1]

== Physical Plan ==
*Project [key#0L, value#1]
+- *Filter pythonUDF0#9: boolean
   +- BatchEvalPython [<lambda>(key#0L)], [key#0L, value#1, pythonUDF0#9]
      +- *Filter (isnotnull(value#1) && (value#1 < 2))
         +- Scan ExistingRDD[key#0L,value#1]
```

### How was this patch tested?
Added both unit test cases for `BatchEvalPythonExec` and also add an end-to-end test case in Python test suite.

Author: gatorsmile <gatorsmile@gmail.com>

Closes #16193 from gatorsmile/pythonUDFPredicatePushDown.
2016-12-10 08:47:45 -08:00
..
__init__.py [SPARK-16772][PYTHON][DOCS] Fix API doc references to UDFRegistration + Update "important classes" 2016-08-06 05:02:59 +01:00
catalog.py [SPARK-17338][SQL][FOLLOW-UP] add global temp view 2016-10-11 15:21:28 +08:00
column.py [SPARK-17215][SQL] Method SQLContext.parseDataType(dataTypeString: String) could be removed. 2016-08-24 23:36:04 -07:00
conf.py [SPARK-15464][ML][MLLIB][SQL][TESTS] Replace SQLContext and SparkContext with SparkSession using builder pattern in python test code 2016-05-23 18:14:48 -07:00
context.py [SPARK-11775][PYSPARK][SQL] Allow PySpark to register Java UDF 2016-10-14 15:50:35 -07:00
dataframe.py [SPARK-18447][DOCS] Fix the markdown for Note:/NOTE:/Note that across Python API documentation 2016-11-22 11:40:18 +00:00
functions.py [SPARK-18447][DOCS] Fix the markdown for Note:/NOTE:/Note that across Python API documentation 2016-11-22 11:40:18 +00:00
group.py [MINOR][PYSPARK][DOC] Fix wrongly formatted examples in PySpark documentation 2016-07-06 10:45:51 -07:00
readwriter.py [SPARK-17764][SQL] Add to_json supporting to convert nested struct column to JSON string 2016-11-01 12:46:41 -07:00
session.py [SPARK-17720][SQL] introduce static SQL conf 2016-10-11 20:27:08 -07:00
streaming.py [SPARK-18754][SS] Rename recentProgresses to recentProgress 2016-12-07 15:36:29 -08:00
tests.py [SPARK-18766][SQL] Push Down Filter Through BatchEvalPython (Python UDF) 2016-12-10 08:47:45 -08:00
types.py [SPARK-17215][SQL] Method SQLContext.parseDataType(dataTypeString: String) could be removed. 2016-08-24 23:36:04 -07:00
utils.py [SPARK-15953][WIP][STREAMING] Renamed ContinuousQuery to StreamingQuery 2016-06-15 10:46:07 -07:00
window.py [SPARK-18690][PYTHON][SQL] Backward compatibility of unbounded frames 2016-12-02 17:39:28 -08:00