From daf9d198dc9b71dbc79d7efa24c9336ed700e80c Mon Sep 17 00:00:00 2001 From: ulysses-you Date: Wed, 2 Jun 2021 07:49:56 +0000 Subject: [PATCH] [SPARK-35585][SQL] Support propagate empty relation through project/filter ### What changes were proposed in this pull request? Add rule `ConvertToLocalRelation` into AQE Optimizer. ### Why are the changes needed? Support propagate empty local relation through project and filter like such SQL case: ``` Aggregate Project Join ShuffleStage ShuffleStage ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Add test. Closes #32724 from ulysses-you/SPARK-35585. Authored-by: ulysses-you Signed-off-by: Wenchen Fan --- .../sql/execution/adaptive/AQEOptimizer.scala | 9 ++++- .../org/apache/spark/sql/ExplainSuite.scala | 37 +++++++++---------- .../adaptive/AdaptiveQueryExecSuite.scala | 24 +++++++++++- 3 files changed, 49 insertions(+), 21 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala index 95dc7cc1cc..0767039d10 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AQEOptimizer.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.adaptive import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability +import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LogicalPlanIntegrity, PlanHelper} import org.apache.spark.sql.catalyst.rules.RuleExecutor import org.apache.spark.sql.internal.SQLConf @@ -27,9 +28,15 @@ import org.apache.spark.util.Utils * The optimizer for re-optimizing the logical plan used by AdaptiveSparkPlanExec. */ class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] { + private def fixedPoint = + FixedPoint( + conf.optimizerMaxIterations, + maxIterationsSetting = SQLConf.OPTIMIZER_MAX_ITERATIONS.key) + private val defaultBatches = Seq( - Batch("Propagate Empty Relations", Once, + Batch("Propagate Empty Relations", fixedPoint, AQEPropagateEmptyRelation, + ConvertToLocalRelation, UpdateAttributeNullability), Batch("Dynamic Join Selection", Once, DynamicJoinSelection) ) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala index 3edc40133c..22636484cd 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ExplainSuite.scala @@ -525,41 +525,40 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), avg("v2")) // trigger the final plan for AQE testDf.collect() - // == Physical Plan == - // AdaptiveSparkPlan (14) - // +- * HashAggregate (13) - // +- CustomShuffleReader (12) - // +- ShuffleQueryStage (11) - // +- Exchange (10) - // +- * HashAggregate (9) - // +- * Project (8) - // +- * BroadcastHashJoin Inner BuildRight (7) - // :- * Project (2) - // : +- * LocalTableScan (1) - // +- BroadcastQueryStage (6) - // +- BroadcastExchange (5) - // +- * Project (4) - // +- * LocalTableScan (3) + // AdaptiveSparkPlan (13) + // +- == Final Plan == + // * HashAggregate (12) + // +- CustomShuffleReader (11) + // +- ShuffleQueryStage (10) + // +- Exchange (9) + // +- * HashAggregate (8) + // +- * Project (7) + // +- * BroadcastHashJoin Inner BuildRight (6) + // :- * LocalTableScan (1) + // +- BroadcastQueryStage (5) + // +- BroadcastExchange (4) + // +- * Project (3) + // +- * LocalTableScan (2) checkKeywordsExistsInExplain( testDf, FormattedMode, s""" - |(6) BroadcastQueryStage + |(5) BroadcastQueryStage |Output [2]: [k#x, v2#x] |Arguments: 0 |""".stripMargin, s""" - |(11) ShuffleQueryStage + |(10) ShuffleQueryStage |Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] |Arguments: 1 |""".stripMargin, s""" - |(12) CustomShuffleReader + |(11) CustomShuffleReader |Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL] |Arguments: coalesced |""".stripMargin, s""" - |(14) AdaptiveSparkPlan + |(13) AdaptiveSparkPlan |Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x] |Arguments: isFinalPlan=true |""".stripMargin diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index 8fc5dd85f6..90ac62c39f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -27,7 +27,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan} -import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan, UnaryExecNode} +import org.apache.spark.sql.execution.{LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode} import org.apache.spark.sql.execution.command.DataWritingCommandExec import org.apache.spark.sql.execution.datasources.noop.NoopDataSource import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec @@ -120,6 +120,12 @@ class AdaptiveQueryExecSuite } } + private def findTopLevelSort(plan: SparkPlan): Seq[SortExec] = { + collect(plan) { + case s: SortExec => s + } + } + private def findReusedExchange(plan: SparkPlan): Seq[ReusedExchangeExec] = { collectWithSubqueries(plan) { case ShuffleQueryStageExec(_, e: ReusedExchangeExec, _) => e @@ -1373,6 +1379,22 @@ class AdaptiveQueryExecSuite } } + test("SPARK-35585: Support propagate empty relation through project/filter") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { + val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult( + "SELECT key FROM testData WHERE key = 0 ORDER BY key, value") + assert(findTopLevelSort(plan1).size == 1) + assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec]) + + val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult( + "SELECT key FROM (SELECT * FROM testData WHERE value = 'no_match' ORDER BY key)" + + " WHERE key > rand()") + assert(findTopLevelSort(plan2).size == 1) + assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec]) + } + } + test("SPARK-32753: Only copy tags to node with no tags") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") { withTempView("v1") {