spark-instrumented-optimizer/sql/catalyst
Liang-Chi Hsieh b94fa979ef [SPARK-28345][SQL][PYTHON] PythonUDF predicate should be able to pushdown to join
## What changes were proposed in this pull request?

A `Filter` predicate using `PythonUDF` can't be push down into join condition, currently. A predicate like that should be able to push down to join condition. For `PythonUDF`s that can't be evaluated in join condition, `PullOutPythonUDFInJoinCondition` will pull them out later.

An example like:

```scala
val pythonTestUDF = TestPythonUDF(name = "udf")

val left = Seq((1, 2), (2, 3)).toDF("a", "b")
val right = Seq((1, 2), (3, 4)).toDF("c", "d")
val df = left.crossJoin(right).where(pythonTestUDF($"a") === pythonTestUDF($"c"))
```

Query plan before the PR:
```
== Physical Plan ==
*(3) Project [a#2121, b#2122, c#2132, d#2133]
+- *(3) Filter (pythonUDF0#2142 = pythonUDF1#2143)
   +- BatchEvalPython [udf(a#2121), udf(c#2132)], [pythonUDF0#2142, pythonUDF1#2143]
      +- BroadcastNestedLoopJoin BuildRight, Cross
         :- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]
         :  +- LocalTableScan [_1#2116, _2#2117]
         +- BroadcastExchange IdentityBroadcastMode
            +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]
               +- LocalTableScan [_1#2127, _2#2128]
```

Query plan after the PR:
```
== Physical Plan ==
*(3) Project [a#2121, b#2122, c#2132, d#2133]
+- *(3) BroadcastHashJoin [pythonUDF0#2142], [pythonUDF0#2143], Cross, BuildRight
   :- BatchEvalPython [udf(a#2121)], [pythonUDF0#2142]
   :  +- *(1) Project [_1#2116 AS a#2121, _2#2117 AS b#2122]
   :     +- LocalTableScan [_1#2116, _2#2117]
   +- BroadcastExchange HashedRelationBroadcastMode(List(input[2, string, true]))
      +- BatchEvalPython [udf(c#2132)], [pythonUDF0#2143]
         +- *(2) Project [_1#2127 AS c#2132, _2#2128 AS d#2133]
            +- LocalTableScan [_1#2127, _2#2128]
```

After this PR, the join can use `BroadcastHashJoin`, instead of `BroadcastNestedLoopJoin`.

## How was this patch tested?

Added tests.

Closes #25106 from viirya/pythonudf-join-condition.

Authored-by: Liang-Chi Hsieh <viirya@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
2019-07-16 16:15:49 +09:00
..
benchmarks [SPARK-25657][SQL][TEST] Refactor HashBenchmark to use main method 2018-10-07 09:49:37 -07:00
src [SPARK-28345][SQL][PYTHON] PythonUDF predicate should be able to pushdown to join 2019-07-16 16:15:49 +09:00
pom.xml [SPARK-27521][SQL] Move data source v2 to catalyst module 2019-06-05 09:55:55 -07:00