[SPARK-20094][SQL] Preventing push down of IN subquery to Join operator

## What changes were proposed in this pull request?

TPCDS q45 fails becuase:
`ReorderJoin` collects all predicates and try to put them into join condition when creating ordered join. If a predicate with an IN subquery (`ListQuery`) is in a join condition instead of a filter condition, `RewritePredicateSubquery.rewriteExistentialExpr` would fail to convert the subquery to an `ExistenceJoin`, and thus result in error.

We should prevent push down of IN subquery to Join operator.

## How was this patch tested?

Add a new test case in `FilterPushdownSuite`.

Author: wangzhenhua <wangzhenhua@huawei.com>

Closes #17428 from wzhfy/noSubqueryInJoinCond.
This commit is contained in:
wangzhenhua 2017-03-28 13:43:23 +02:00 committed by Herman van Hovell
parent a9abff281b
commit 91559d277f
2 changed files with 26 additions and 0 deletions

View file

@ -90,6 +90,12 @@ trait PredicateHelper {
* Returns true iff `expr` could be evaluated as a condition within join.
*/
protected def canEvaluateWithinJoin(expr: Expression): Boolean = expr match {
case l: ListQuery =>
// A ListQuery defines the query which we want to search in an IN subquery expression.
// Currently the only way to evaluate an IN subquery is to convert it to a
// LeftSemi/LeftAnti/ExistenceJoin by `RewritePredicateSubquery` rule.
// It cannot be evaluated as part of a Join operator.
false
case e: SubqueryExpression =>
// non-correlated subquery will be replaced as literal
e.children.isEmpty

View file

@ -836,6 +836,26 @@ class FilterPushdownSuite extends PlanTest {
comparePlans(optimized, answer)
}
test("SPARK-20094: don't push predicate with IN subquery into join condition") {
val x = testRelation.subquery('x)
val z = testRelation.subquery('z)
val w = testRelation1.subquery('w)
val queryPlan = x
.join(z)
.where(("x.b".attr === "z.b".attr) &&
("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr)))))
.analyze
val expectedPlan = x
.join(z, Inner, Some("x.b".attr === "z.b".attr))
.where("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr))))
.analyze
val optimized = Optimize.execute(queryPlan)
comparePlans(optimized, expectedPlan)
}
test("Window: predicate push down -- basic") {
val winExpr = windowExpr(count('b), windowSpec('a :: Nil, 'b.asc :: Nil, UnspecifiedFrame))