[SPARK-35585][SQL] Support propagate empty relation through project/filter

### What changes were proposed in this pull request?

Add rule `ConvertToLocalRelation` into AQE Optimizer.

### Why are the changes needed?

Support propagate empty local relation through project and filter like such SQL case:
```
Aggregate
  Project
    Join
      ShuffleStage
      ShuffleStage
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Add test.

Closes #32724 from ulysses-you/SPARK-35585.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
ulysses-you 2021-06-02 07:49:56 +00:00 committed by Wenchen Fan
parent 54e9999d39
commit daf9d198dc
3 changed files with 49 additions and 21 deletions

View file

@ -18,6 +18,7 @@
package org.apache.spark.sql.execution.adaptive
import org.apache.spark.sql.catalyst.analysis.UpdateAttributeNullability
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, LogicalPlanIntegrity, PlanHelper}
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.internal.SQLConf
@ -27,9 +28,15 @@ import org.apache.spark.util.Utils
* The optimizer for re-optimizing the logical plan used by AdaptiveSparkPlanExec.
*/
class AQEOptimizer(conf: SQLConf) extends RuleExecutor[LogicalPlan] {
private def fixedPoint =
FixedPoint(
conf.optimizerMaxIterations,
maxIterationsSetting = SQLConf.OPTIMIZER_MAX_ITERATIONS.key)
private val defaultBatches = Seq(
Batch("Propagate Empty Relations", Once,
Batch("Propagate Empty Relations", fixedPoint,
AQEPropagateEmptyRelation,
ConvertToLocalRelation,
UpdateAttributeNullability),
Batch("Dynamic Join Selection", Once, DynamicJoinSelection)
)

View file

@ -525,41 +525,40 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), avg("v2"))
// trigger the final plan for AQE
testDf.collect()
// == Physical Plan ==
// AdaptiveSparkPlan (14)
// +- * HashAggregate (13)
// +- CustomShuffleReader (12)
// +- ShuffleQueryStage (11)
// +- Exchange (10)
// +- * HashAggregate (9)
// +- * Project (8)
// +- * BroadcastHashJoin Inner BuildRight (7)
// :- * Project (2)
// : +- * LocalTableScan (1)
// +- BroadcastQueryStage (6)
// +- BroadcastExchange (5)
// +- * Project (4)
// +- * LocalTableScan (3)
// AdaptiveSparkPlan (13)
// +- == Final Plan ==
// * HashAggregate (12)
// +- CustomShuffleReader (11)
// +- ShuffleQueryStage (10)
// +- Exchange (9)
// +- * HashAggregate (8)
// +- * Project (7)
// +- * BroadcastHashJoin Inner BuildRight (6)
// :- * LocalTableScan (1)
// +- BroadcastQueryStage (5)
// +- BroadcastExchange (4)
// +- * Project (3)
// +- * LocalTableScan (2)
checkKeywordsExistsInExplain(
testDf,
FormattedMode,
s"""
|(6) BroadcastQueryStage
|(5) BroadcastQueryStage
|Output [2]: [k#x, v2#x]
|Arguments: 0
|""".stripMargin,
s"""
|(11) ShuffleQueryStage
|(10) ShuffleQueryStage
|Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|Arguments: 1
|""".stripMargin,
s"""
|(12) CustomShuffleReader
|(11) CustomShuffleReader
|Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|Arguments: coalesced
|""".stripMargin,
s"""
|(14) AdaptiveSparkPlan
|(13) AdaptiveSparkPlan
|Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
|Arguments: isFinalPlan=true
|""".stripMargin

View file

@ -27,7 +27,7 @@ import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent, SparkListe
import org.apache.spark.sql.{Dataset, QueryTest, Row, SparkSession, Strategy}
import org.apache.spark.sql.catalyst.optimizer.{BuildLeft, BuildRight}
import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, LogicalPlan}
import org.apache.spark.sql.execution.{PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.{LocalTableScanExec, PartialReducerPartitionSpec, QueryExecution, ReusedSubqueryExec, ShuffledRowRDD, SortExec, SparkPlan, UnaryExecNode}
import org.apache.spark.sql.execution.command.DataWritingCommandExec
import org.apache.spark.sql.execution.datasources.noop.NoopDataSource
import org.apache.spark.sql.execution.datasources.v2.V2TableWriteExec
@ -120,6 +120,12 @@ class AdaptiveQueryExecSuite
}
}
private def findTopLevelSort(plan: SparkPlan): Seq[SortExec] = {
collect(plan) {
case s: SortExec => s
}
}
private def findReusedExchange(plan: SparkPlan): Seq[ReusedExchangeExec] = {
collectWithSubqueries(plan) {
case ShuffleQueryStageExec(_, e: ReusedExchangeExec, _) => e
@ -1373,6 +1379,22 @@ class AdaptiveQueryExecSuite
}
}
test("SPARK-35585: Support propagate empty relation through project/filter") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1") {
val (plan1, adaptivePlan1) = runAdaptiveAndVerifyResult(
"SELECT key FROM testData WHERE key = 0 ORDER BY key, value")
assert(findTopLevelSort(plan1).size == 1)
assert(stripAQEPlan(adaptivePlan1).isInstanceOf[LocalTableScanExec])
val (plan2, adaptivePlan2) = runAdaptiveAndVerifyResult(
"SELECT key FROM (SELECT * FROM testData WHERE value = 'no_match' ORDER BY key)" +
" WHERE key > rand()")
assert(findTopLevelSort(plan2).size == 1)
assert(stripAQEPlan(adaptivePlan2).isInstanceOf[LocalTableScanExec])
}
}
test("SPARK-32753: Only copy tags to node with no tags") {
withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {
withTempView("v1") {