From ee3c1c777ddb8034d62213a5d8e064b97cc067e5 Mon Sep 17 00:00:00 2001 From: Dilip Biswal Date: Tue, 30 Jul 2019 16:29:24 -0700 Subject: [PATCH] [SPARK-28375][SQL] Make pullupCorrelatedPredicate idempotent ## What changes were proposed in this pull request? This PR makes the optimizer rule PullupCorrelatedPredicates idempotent. ## How was this patch tested? A new test PullupCorrelatedPredicatesSuite Closes #25268 from dilipbiswal/pr-25164. Authored-by: Dilip Biswal Signed-off-by: gatorsmile --- .../sql/catalyst/optimizer/Optimizer.scala | 4 +- .../sql/catalyst/optimizer/subquery.scala | 20 ++++-- .../PullupCorrelatedPredicatesSuite.scala | 62 ++++++++++++++++--- 3 files changed, 72 insertions(+), 14 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 670fc92cb7..346b2e6184 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -48,9 +48,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog) } override protected val blacklistedOnceBatches: Set[String] = - Set("Pullup Correlated Expressions", - "Extract Python UDFs" - ) + Set("Extract Python UDFs") protected def fixedPoint = FixedPoint(SQLConf.get.optimizerMaxIterations) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala index 4f7333c387..32dbd389af 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala @@ -273,16 +273,28 @@ object PullupCorrelatedPredicates extends Rule[LogicalPlan] with PredicateHelper } private def rewriteSubQueries(plan: LogicalPlan, outerPlans: Seq[LogicalPlan]): LogicalPlan = { + /** + * This function is used as a aid to enforce idempotency of pullUpCorrelatedPredicate rule. + * In the first call to rewriteSubqueries, all the outer references from the subplan are + * pulled up and join predicates are recorded as children of the enclosing subquery expression. + * The subsequent call to rewriteSubqueries would simply re-records the `children` which would + * contains the pulled up correlated predicates (from the previous call) in the enclosing + * subquery expression. + */ + def getJoinCondition(newCond: Seq[Expression], oldCond: Seq[Expression]): Seq[Expression] = { + if (newCond.isEmpty) oldCond else newCond + } + plan transformExpressions { case ScalarSubquery(sub, children, exprId) if children.nonEmpty => val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans) - ScalarSubquery(newPlan, newCond, exprId) + ScalarSubquery(newPlan, getJoinCondition(newCond, children), exprId) case Exists(sub, children, exprId) if children.nonEmpty => val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans) - Exists(newPlan, newCond, exprId) - case ListQuery(sub, _, exprId, childOutputs) => + Exists(newPlan, getJoinCondition(newCond, children), exprId) + case ListQuery(sub, children, exprId, childOutputs) if children.nonEmpty => val (newPlan, newCond) = pullOutCorrelatedPredicates(sub, outerPlans) - ListQuery(newPlan, newCond, exprId, childOutputs) + ListQuery(newPlan, getJoinCondition(newCond, children), exprId, childOutputs) } } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala index 960162a5bf..2d86d5a97e 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/PullupCorrelatedPredicatesSuite.scala @@ -19,9 +19,9 @@ package org.apache.spark.sql.catalyst.optimizer import org.apache.spark.sql.catalyst.dsl.expressions._ import org.apache.spark.sql.catalyst.dsl.plans._ -import org.apache.spark.sql.catalyst.expressions.{InSubquery, ListQuery} +import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.PlanTest -import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan} +import org.apache.spark.sql.catalyst.plans.logical.{Filter, LocalRelation, LogicalPlan} import org.apache.spark.sql.catalyst.rules.RuleExecutor class PullupCorrelatedPredicatesSuite extends PlanTest { @@ -38,17 +38,65 @@ class PullupCorrelatedPredicatesSuite extends PlanTest { val testRelation2 = LocalRelation('c.int, 'd.double) test("PullupCorrelatedPredicates should not produce unresolved plan") { - val correlatedSubquery = + val subPlan = testRelation2 .where('b < 'd) .select('c) - val outerQuery = + val inSubquery = testRelation - .where(InSubquery(Seq('a), ListQuery(correlatedSubquery))) + .where(InSubquery(Seq('a), ListQuery(subPlan))) .select('a).analyze - assert(outerQuery.resolved) + assert(inSubquery.resolved) - val optimized = Optimize.execute(outerQuery) + val optimized = Optimize.execute(inSubquery) assert(optimized.resolved) } + + test("PullupCorrelatedPredicates in correlated subquery idempotency check") { + val subPlan = + testRelation2 + .where('b < 'd) + .select('c) + val inSubquery = + testRelation + .where(InSubquery(Seq('a), ListQuery(subPlan))) + .select('a).analyze + assert(inSubquery.resolved) + + val optimized = Optimize.execute(inSubquery) + val doubleOptimized = Optimize.execute(optimized) + comparePlans(optimized, doubleOptimized) + } + + test("PullupCorrelatedPredicates exists correlated subquery idempotency check") { + val subPlan = + testRelation2 + .where('b === 'd && 'd === 1) + .select(Literal(1)) + val existsSubquery = + testRelation + .where(Exists(subPlan)) + .select('a).analyze + assert(existsSubquery.resolved) + + val optimized = Optimize.execute(existsSubquery) + val doubleOptimized = Optimize.execute(optimized) + comparePlans(optimized, doubleOptimized) + } + + test("PullupCorrelatedPredicates scalar correlated subquery idempotency check") { + val subPlan = + testRelation2 + .where('b === 'd && 'd === 1) + .select(max('d)) + val scalarSubquery = + testRelation + .where(ScalarSubquery(subPlan)) + .select('a).analyze + assert(scalarSubquery.resolved) + + val optimized = Optimize.execute(scalarSubquery) + val doubleOptimized = Optimize.execute(optimized) + comparePlans(optimized, doubleOptimized, false) + } }