[SPARK-35710][SQL] Support DPP + AQE when there is no reused broadcast exchange

### What changes were proposed in this pull request?
This PR add the DPP + AQE support when spark can't reuse the broadcast but executing the DPP subquery is cheaper.

### Why are the changes needed?
Improve AQE + DPP

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Adding new ut

Closes #32861 from JkSelf/supportDPP3.

Lead-authored-by: Ke Jia <ke.a.jia@intel.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Ke Jia 2021-07-16 16:01:07 +08:00 committed by Wenchen Fan
parent f06aa4a3f3
commit c1b3f86c58
4 changed files with 39 additions and 25 deletions

View file

@ -21,6 +21,7 @@ import org.apache.spark.rdd.RDD
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.errors.QueryExecutionErrors
/**
@ -34,6 +35,8 @@ import org.apache.spark.sql.errors.QueryExecutionErrors
case class SubqueryAdaptiveBroadcastExec(
name: String,
index: Int,
onlyInBroadcast: Boolean,
@transient buildPlan: LogicalPlan,
buildKeys: Seq[Expression],
child: SparkPlan) extends BaseSubqueryExec with UnaryExecNode {

View file

@ -140,7 +140,8 @@ case class InsertAdaptiveSparkPlan(
val name = s"dynamicpruning#${exprId.id}"
val subquery = SubqueryAdaptiveBroadcastExec(
name, broadcastKeyIndex, buildKeys, executedPlan)
name, broadcastKeyIndex, onlyInBroadcast,
buildPlan, buildKeys, executedPlan)
subqueryMap.put(exprId.id, subquery)
case _ =>
}))

View file

@ -17,8 +17,9 @@
package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.catalyst.expressions.{BindReferences, DynamicPruningExpression, Literal}
import org.apache.spark.sql.catalyst.expressions.{Alias, BindReferences, DynamicPruningExpression, Literal}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.Aggregate
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.catalyst.trees.TreePattern._
import org.apache.spark.sql.execution._
@ -38,7 +39,7 @@ case class PlanAdaptiveDynamicPruningFilters(
plan.transformAllExpressionsWithPruning(
_.containsAllPatterns(DYNAMIC_PRUNING_EXPRESSION, IN_SUBQUERY_EXEC)) {
case DynamicPruningExpression(InSubqueryExec(
value, SubqueryAdaptiveBroadcastExec(name, index, buildKeys,
value, SubqueryAdaptiveBroadcastExec(name, index, onlyInBroadcast, buildPlan, buildKeys,
adaptivePlan: AdaptiveSparkPlanExec), exprId, _)) =>
val packedKeys = BindReferences.bindReferences(
HashJoin.rewriteKeyExpr(buildKeys), adaptivePlan.executedPlan.output)
@ -62,8 +63,23 @@ case class PlanAdaptiveDynamicPruningFilters(
val broadcastValues = SubqueryBroadcastExec(
name, index, buildKeys, newAdaptivePlan)
DynamicPruningExpression(InSubqueryExec(value, broadcastValues, exprId))
} else {
} else if (onlyInBroadcast) {
DynamicPruningExpression(Literal.TrueLiteral)
} else {
// we need to apply an aggregate on the buildPlan in order to be column pruned
val alias = Alias(buildKeys(index), buildKeys(index).toString)()
val aggregate = Aggregate(Seq(alias), Seq(alias), buildPlan)
val session = adaptivePlan.context.session
val planner = session.sessionState.planner
// Here we can't call the QueryExecution.prepareExecutedPlan() method to
// get the sparkPlan as Non-AQE use case, which will cause the physical
// plan optimization rules be inserted twice, once in AQE framework and
// another in prepareExecutedPlan() method.
val sparkPlan = QueryExecution.createSparkPlan(session, planner, aggregate)
val newAdaptivePlan = adaptivePlan.copy(inputPlan = sparkPlan)
val values = SubqueryExec(name, newAdaptivePlan)
DynamicPruningExpression(InSubqueryExec(value, values, exprId))
}
}
}

View file

@ -277,8 +277,7 @@ abstract class DynamicPartitionPruningSuiteBase
/**
* Test the result of a simple join on mock-up tables
*/
test("simple inner join triggers DPP with mock-up tables",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
test("simple inner join triggers DPP with mock-up tables") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@ -353,8 +352,7 @@ abstract class DynamicPartitionPruningSuiteBase
* (2) DPP should be triggered only for certain join types
* (3) DPP should trigger only when we have attributes on both sides of the join condition
*/
test("DPP triggers only for certain types of query",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
test("DPP triggers only for certain types of query") {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_PRUNING_SIDE_EXTRA_FILTER_RATIO.key -> "1") {
@ -449,11 +447,11 @@ abstract class DynamicPartitionPruningSuiteBase
/**
* The filtering policy has a fallback when the stats are unavailable
*/
test("filtering ratio policy fallback",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
test("filtering ratio policy fallback") {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false",
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName) {
Given("no stats and selective predicate")
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_ENABLED.key -> "true",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "true") {
@ -520,8 +518,7 @@ abstract class DynamicPartitionPruningSuiteBase
/**
* The filtering ratio policy performs best when it uses cardinality estimates
*/
test("filtering ratio policy with stats when the broadcast pruning is disabled",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
test("filtering ratio policy with stats when the broadcast pruning is disabled") {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.EXCHANGE_REUSE_ENABLED.key -> "false") {
@ -714,8 +711,7 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
test("partition pruning in broadcast hash joins",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
test("partition pruning in broadcast hash joins") {
Given("disable broadcast pruning and disable subquery duplication")
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
@ -1022,8 +1018,7 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
test("avoid reordering broadcast join keys to match input hash partitioning",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
test("avoid reordering broadcast join keys to match input hash partitioning") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
withTable("large", "dimTwo", "dimThree") {
@ -1147,9 +1142,10 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
test("join key with multiple references on the filtering plan",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
test("join key with multiple references on the filtering plan") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true",
SQLConf.ADAPTIVE_OPTIMIZER_EXCLUDED_RULES.key -> AQEPropagateEmptyRelation.ruleName
) {
// when enable AQE, the reusedExchange is inserted when executed.
withTable("fact", "dim") {
spark.range(100).select(
@ -1209,8 +1205,7 @@ abstract class DynamicPartitionPruningSuiteBase
}
test("SPARK-32509: Unused Dynamic Pruning filter shouldn't affect " +
"canonicalization and exchange reuse",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
"canonicalization and exchange reuse") {
withSQLConf(SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "true") {
withSQLConf(SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val df = sql(
@ -1222,7 +1217,7 @@ abstract class DynamicPartitionPruningSuiteBase
""".stripMargin)
checkPartitionPruningPredicate(df, false, false)
val reuseExchangeNodes = df.queryExecution.executedPlan.collect {
val reuseExchangeNodes = collect(df.queryExecution.executedPlan) {
case se: ReusedExchangeExec => se
}
assert(reuseExchangeNodes.size == 1, "Expected plan to contain 1 ReusedExchangeExec " +
@ -1418,8 +1413,7 @@ abstract class DynamicPartitionPruningSuiteBase
}
}
test("SPARK-32855: Filtering side can not broadcast by join type",
DisableAdaptiveExecution("DPP in AQE must reuse broadcast")) {
test("SPARK-32855: Filtering side can not broadcast by join type") {
withSQLConf(
SQLConf.DYNAMIC_PARTITION_PRUNING_REUSE_BROADCAST_ONLY.key -> "false",
SQLConf.DYNAMIC_PARTITION_PRUNING_USE_STATS.key -> "false",