From c1b3f86c58ff9a54d2d422a8332ed3e1080cc86c Mon Sep 17 00:00:00 2001 From: Ke Jia Date: Fri, 16 Jul 2021 16:01:07 +0800 Subject: [PATCH] [SPARK-35710][SQL] Support DPP + AQE when there is no reused broadcast exchange ### What changes were proposed in this pull request? This PR add the DPP + AQE support when spark can't reuse the broadcast but executing the DPP subquery is cheaper. ### Why are the changes needed? Improve AQE + DPP ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Adding new ut Closes #32861 from JkSelf/supportDPP3. Lead-authored-by: Ke Jia Co-authored-by: Wenchen Fan Signed-off-by: Wenchen Fan --- .../SubqueryAdaptiveBroadcastExec.scala | 3 ++ .../adaptive/InsertAdaptiveSparkPlan.scala | 3 +- .../PlanAdaptiveDynamicPruningFilters.scala | 22 ++++++++++-- .../sql/DynamicPartitionPruningSuite.scala | 36 ++++++++----------- 4 files changed, 39 insertions(+), 25 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala index b21bfef8e3..e7092ee91d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SubqueryAdaptiveBroadcastExec.scala @@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.QueryPlan +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.errors.QueryExecutionErrors /** @@ -34,6 +35,8 @@ import org.apache.spark.sql.errors.QueryExecutionErrors case class SubqueryAdaptiveBroadcastExec( name: String, index: Int, + onlyInBroadcast: Boolean, + @transient buildPlan: LogicalPlan, buildKeys: Seq[Expression], child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala index 2bcfa1b108..c92878c514 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/InsertAdaptiveSparkPlan.scala @@ -140,7 +140,8 @@ case class InsertAdaptiveSparkPlan( val name = s"dynamicpruning#${exprId.id}" val subquery = SubqueryAdaptiveBroadcastExec( - name, broadcastKeyIndex, buildKeys, executedPlan) + name, broadcastKeyIndex, onlyInBroadcast, + buildPlan, buildKeys, executedPlan) subqueryMap.put(exprId.id, subquery) case _ => })) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala index 68d4a8cc00..f9c1bbe70d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/PlanAdaptiveDynamicPruningFilters.scala @@ -17,8 +17,9 @@ package org.apache.spark.sql.execution.adaptive -import org.apache.spark.sql.catalyst.expressions.{BindReferences, DynamicPruningExpression, Literal} +import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, DynamicPruningExpression, Literal} import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight} +import org.apache.spark.sql.catalyst.plans.logical.Aggregate import org.apache.spark.sql.catalyst.rules.Rule import org.apache.spark.sql.catalyst.trees.TreePattern._ import org.apache.spark.sql.execution._ @@ -38,7 +39,7 @@ case class PlanAdaptiveDynamicPruningFilters( plan.transformAllExpressionsWithPruning( _.containsAllPatterns(DYNAMIC_PRUNING_EXPRESSION, IN_SUBQUERY_EXEC)) { case DynamicPruningExpression(InSubqueryExec( - value, SubqueryAdaptiveBroadcastExec(name, index, buildKeys, + value, SubqueryAdaptiveBroadcastExec(name, index, onlyInBroadcast, buildPlan, buildKeys, adaptivePlan: AdaptiveSparkPlanExec), exprId, _)) => val packedKeys = BindReferences.bindReferences( HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output) @@ -62,8 +63,23 @@ case class PlanAdaptiveDynamicPruningFilters( val broadcastValues = SubqueryBroadcastExec( name, index, buildKeys, newAdaptivePlan) DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId)) - } else { + } else if (onlyInBroadcast) { DynamicPruningExpression(Literal.TrueLiteral) + } else { + // we need to apply an aggregate on the buildPlan in order to be column pruned + val alias = Alias(buildKeys(index), buildKeys(index).toString)() + val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan) + + val session = adaptivePlan.context.session + val planner = session.sessionState.planner + // Here we can't call the QueryExecution.prepareExecutedPlan() method to + // get the sparkPlan as Non-AQE use case, which will cause the physical + // plan optimization rules be inserted twice, once in AQE framework and + // another in prepareExecutedPlan() method. + val sparkPlan = QueryExecution.createSparkPlan(session, planner, aggregate) + val newAdaptivePlan = adaptivePlan.copy(inputPlan = sparkPlan) + val values = SubqueryExec(name, newAdaptivePlan) + DynamicPruningExpression(InSubqueryExec(value, values, exprId)) } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala index b175701ac8..38527fb6be 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DynamicPartitionPruningSuite.scala @@ -277,8 +277,7 @@ abstract class DynamicPartitionPruningSuiteBase /** * Test the result of a simple join on mock-up tables */ - test("simple inner join triggers DPP with mock-up tables", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + test("simple inner join triggers DPP with mock-up tables") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { @@ -353,8 +352,7 @@ abstract class DynamicPartitionPruningSuiteBase * (2) DPP should be triggered only for certain join types * (3) DPP should trigger only when we have attributes on both sides of the join condition */ - test("DPP triggers only for certain types of query", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + test("DPP triggers only for certain types of query") { withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.DYNAMIC_PARTITION_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") { @@ -449,11 +447,11 @@ abstract class DynamicPartitionPruningSuiteBase /** * The filtering policy has a fallback when the stats are unavailable */ - test("filtering ratio policy fallback", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + test("filtering ratio policy fallback") { withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", - SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { + SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false", + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) { Given("no stats and selective predicate") withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true", SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") { @@ -520,8 +518,7 @@ abstract class DynamicPartitionPruningSuiteBase /** * The filtering ratio policy performs best when it uses cardinality estimates */ - test("filtering ratio policy with stats when the broadcast pruning is disabled", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + test("filtering ratio policy with stats when the broadcast pruning is disabled") { withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") { @@ -714,8 +711,7 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("partition pruning in broadcast hash joins", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + test("partition pruning in broadcast hash joins") { Given("disable broadcast pruning and disable subquery duplication") withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", @@ -1022,8 +1018,7 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("avoid reordering broadcast join keys to match input hash partitioning", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + test("avoid reordering broadcast join keys to match input hash partitioning") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { withTable("large", "dimTwo", "dimThree") { @@ -1147,9 +1142,10 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("join key with multiple references on the filtering plan", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { - withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { + test("join key with multiple references on the filtering plan") { + withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true", + SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName + ) { // when enable AQE, the reusedExchange is inserted when executed. withTable("fact", "dim") { spark.range(100).select( @@ -1209,8 +1205,7 @@ abstract class DynamicPartitionPruningSuiteBase } test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " + - "canonicalization and exchange reuse", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + "canonicalization and exchange reuse") { withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") { withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") { val df = sql( @@ -1222,7 +1217,7 @@ abstract class DynamicPartitionPruningSuiteBase """.stripMargin) checkPartitionPruningPredicate(df, false, false) - val reuseExchangeNodes = df.queryExecution.executedPlan.collect { + val reuseExchangeNodes = collect(df.queryExecution.executedPlan) { case se: ReusedExchangeExec => se } assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 ReusedExchangeExec " + @@ -1418,8 +1413,7 @@ abstract class DynamicPartitionPruningSuiteBase } } - test("SPARK-32855: Filtering side can not broadcast by join type", - DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) { + test("SPARK-32855: Filtering side can not broadcast by join type") { withSQLConf( SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false", SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",