[SPARK-28375][SQL] Make pullupCorrelatedPredicate idempotent

## What changes were proposed in this pull request?

This PR makes the optimizer rule PullupCorrelatedPredicates idempotent.
## How was this patch tested?

A new test PullupCorrelatedPredicatesSuite

Closes #25268 from dilipbiswal/pr-25164.

Authored-by: Dilip Biswal <dbiswal@us.ibm.com>
Signed-off-by: gatorsmile <gatorsmile@gmail.com>
This commit is contained in:
Dilip Biswal 2019-07-30 16:29:24 -07:00 committed by gatorsmile
parent abef84a868
commit ee3c1c777d
3 changed files with 72 additions and 14 deletions

View file

@ -48,9 +48,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog)
}
override protected val blacklistedOnceBatches: Set[String] =
Set("Pullup Correlated Expressions",
"Extract Python UDFs"
)
Set("Extract Python UDFs")
protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations)

View file

@ -273,16 +273,28 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper
}
private def rewriteSubQueries(plan: LogicalPlan, outerPlans: Seq[LogicalPlan]): LogicalPlan = {
/**
* This function is used as a aid to enforce idempotency of pullUpCorrelatedPredicate rule.
* In the first call to rewriteSubqueries, all the outer references from the subplan are
* pulled up and join predicates are recorded as children of the enclosing subquery expression.
* The subsequent call to rewriteSubqueries would simply re-records the `children` which would
* contains the pulled up correlated predicates (from the previous call) in the enclosing
* subquery expression.
*/
def getJoinCondition(newCond: Seq[Expression], oldCond: Seq[Expression]): Seq[Expression] = {
if (newCond.isEmpty) oldCond else newCond
}
plan transformExpressions {
case ScalarSubquery(sub, children, exprId) if children.nonEmpty =>
val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans)
ScalarSubquery(newPlan, newCond, exprId)
ScalarSubquery(newPlan, getJoinCondition(newCond, children), exprId)
case Exists(sub, children, exprId) if children.nonEmpty =>
val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans)
Exists(newPlan, newCond, exprId)
case ListQuery(sub, _, exprId, childOutputs) =>
Exists(newPlan, getJoinCondition(newCond, children), exprId)
case ListQuery(sub, children, exprId, childOutputs) if children.nonEmpty =>
val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans)
ListQuery(newPlan, newCond, exprId, childOutputs)
ListQuery(newPlan, getJoinCondition(newCond, children), exprId, childOutputs)
}
}

View file

@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.optimizer
import org.apache.spark.sql.catalyst.dsl.expressions._
import org.apache.spark.sql.catalyst.dsl.plans._
import org.apache.spark.sql.catalyst.expressions.{InSubquery, ListQuery}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.PlanTest
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
class PullupCorrelatedPredicatesSuite extends PlanTest {
@ -38,17 +38,65 @@ class PullupCorrelatedPredicatesSuite extends PlanTest {
val testRelation2 = LocalRelation('c.int, 'd.double)
test("PullupCorrelatedPredicates should not produce unresolved plan") {
val correlatedSubquery =
val subPlan =
testRelation2
.where('b < 'd)
.select('c)
val outerQuery =
val inSubquery =
testRelation
.where(InSubquery(Seq('a), ListQuery(correlatedSubquery)))
.where(InSubquery(Seq('a), ListQuery(subPlan)))
.select('a).analyze
assert(outerQuery.resolved)
assert(inSubquery.resolved)
val optimized = Optimize.execute(outerQuery)
val optimized = Optimize.execute(inSubquery)
assert(optimized.resolved)
}
test("PullupCorrelatedPredicates in correlated subquery idempotency check") {
val subPlan =
testRelation2
.where('b < 'd)
.select('c)
val inSubquery =
testRelation
.where(InSubquery(Seq('a), ListQuery(subPlan)))
.select('a).analyze
assert(inSubquery.resolved)
val optimized = Optimize.execute(inSubquery)
val doubleOptimized = Optimize.execute(optimized)
comparePlans(optimized, doubleOptimized)
}
test("PullupCorrelatedPredicates exists correlated subquery idempotency check") {
val subPlan =
testRelation2
.where('b === 'd && 'd === 1)
.select(Literal(1))
val existsSubquery =
testRelation
.where(Exists(subPlan))
.select('a).analyze
assert(existsSubquery.resolved)
val optimized = Optimize.execute(existsSubquery)
val doubleOptimized = Optimize.execute(optimized)
comparePlans(optimized, doubleOptimized)
}
test("PullupCorrelatedPredicates scalar correlated subquery idempotency check") {
val subPlan =
testRelation2
.where('b === 'd && 'd === 1)
.select(max('d))
val scalarSubquery =
testRelation
.where(ScalarSubquery(subPlan))
.select('a).analyze
assert(scalarSubquery.resolved)
val optimized = Optimize.execute(scalarSubquery)
val doubleOptimized = Optimize.execute(optimized)
comparePlans(optimized, doubleOptimized, false)
}
}