[SPARK-35884][SQL] EXPLAIN FORMATTED for AQE

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/29137 , which has some issues when running EXPLAIN FORMATTED
```
AdaptiveSparkPlan (13)
+- == Final Plan ==
   * HashAggregate (12)
   +- CustomShuffleReader (11)
      +- ShuffleQueryStage (10)
         +- Exchange (9)
            +- * HashAggregate (8)
               +- * Project (7)
                  +- * BroadcastHashJoin Inner BuildRight (6)
                     :- * LocalTableScan (1)
                     +- BroadcastQueryStage (5)
                        +- BroadcastExchange (4)
                           +- * Project (3)
                              +- * LocalTableScan (2)
+- == Initial Plan ==
   HashAggregate (unknown)
   +- Exchange (unknown)
      +- HashAggregate (unknown)
         +- Project (unknown)
            +- BroadcastHashJoin Inner BuildRight (unknown)
               :- Project (unknown)
               :  +- LocalTableScan (unknown)
               +- BroadcastExchange (unknown)
                  +- Project (3)
                     +- LocalTableScan (2)
```

Some nodes do not have an ID and show `unknown`. This PR fixes the issue.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

EXPLAIN FORMATTED with AQE displays correctly.

### How was this patch tested?

new tests

Closes #33067 from cloud-fan/explain.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com>
This commit is contained in:
Wenchen Fan 2021-06-25 00:18:26 -07:00 committed by Liang-Chi Hsieh
parent f1ad34558c
commit c0cfbb1743
3 changed files with 125 additions and 35 deletions

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{ArrayBuffer, BitSet}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.{Expression, PlanExpression}
@ -38,10 +38,13 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
*
* @param plan Input query plan to process
* @param append function used to append the explain output
* @param collectedOperators The IDs of the operators that are already collected and we shouldn't
* collect again.
*/
private def processPlanSkippingSubqueries[T <: QueryPlan[T]](
plan: T,
append: String => Unit): Unit = {
append: String => Unit,
collectedOperators: BitSet): Unit = {
try {
generateWholeStageCodegenIds(plan)
@ -55,7 +58,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
append("\n")
val operationsWithID = ArrayBuffer.empty[QueryPlan[_]]
collectOperatorsWithID(plan, operationsWithID)
collectOperatorsWithID(plan, operationsWithID, collectedOperators)
operationsWithID.foreach(p => append(p.verboseStringWithOperatorId()))
} catch {
@ -80,7 +83,8 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
(curId, plan) => generateOperatorIDs(plan._3.child, curId)
}
processPlanSkippingSubqueries(plan, append)
val collectedOperators = BitSet.empty
processPlanSkippingSubqueries(plan, append, collectedOperators)
var i = 0
for (sub <- subqueries) {
@ -95,7 +99,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
// the explain output. In case of subquery reuse, we don't print subquery plan more
// than once. So we skip [[ReusedSubqueryExec]] here.
if (!sub._3.isInstanceOf[ReusedSubqueryExec]) {
processPlanSkippingSubqueries(sub._3.child, append)
processPlanSkippingSubqueries(sub._3.child, append, collectedOperators)
}
append("\n")
}
@ -135,6 +139,9 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
case _: InputAdapter =>
case p: AdaptiveSparkPlanExec =>
currentOperationID = generateOperatorIDs(p.executedPlan, currentOperationID)
if (!p.executedPlan.fastEquals(p.initialPlan)) {
currentOperationID = generateOperatorIDs(p.initialPlan, currentOperationID)
}
setOpId(p)
case p: QueryStageExec =>
currentOperationID = generateOperatorIDs(p.plan, currentOperationID)
@ -154,18 +161,21 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
*
* @param plan Input query plan to process
* @param operators An output parameter that contains the operators.
* @param collectedOperators The IDs of the operators that are already collected and we shouldn't
* collect again.
*/
private def collectOperatorsWithID(
plan: QueryPlan[_],
operators: ArrayBuffer[QueryPlan[_]]): Unit = {
operators: ArrayBuffer[QueryPlan[_]],
collectedOperators: BitSet): Unit = {
// Skip the subqueries as they are not printed as part of main query block.
if (plan.isInstanceOf[BaseSubqueryExec]) {
return
}
def collectOperatorWithID(plan: QueryPlan[_]): Unit = {
if (plan.getTagValue(QueryPlan.OP_ID_TAG).isDefined) {
operators += plan
plan.getTagValue(QueryPlan.OP_ID_TAG).foreach { id =>
if (collectedOperators.add(id)) operators += plan
}
}
@ -173,14 +183,17 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
case _: WholeStageCodegenExec =>
case _: InputAdapter =>
case p: AdaptiveSparkPlanExec =>
collectOperatorsWithID(p.executedPlan, operators)
collectOperatorsWithID(p.executedPlan, operators, collectedOperators)
if (!p.executedPlan.fastEquals(p.initialPlan)) {
collectOperatorsWithID(p.initialPlan, operators, collectedOperators)
}
collectOperatorWithID(p)
case p: QueryStageExec =>
collectOperatorsWithID(p.plan, operators)
collectOperatorsWithID(p.plan, operators, collectedOperators)
collectOperatorWithID(p)
case other: QueryPlan[_] =>
collectOperatorWithID(other)
other.innerChildren.foreach(collectOperatorsWithID(_, operators))
other.innerChildren.foreach(collectOperatorsWithID(_, operators, collectedOperators))
}
}
@ -263,7 +276,7 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
}
plan foreach {
case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan))
case p: AdaptiveSparkPlanExec => remove(p, Seq(p.executedPlan, p.initialPlan))
case p: QueryStageExec => remove(p, Seq(p.plan))
case plan: QueryPlan[_] => remove(plan, plan.innerChildren)
}

View file

@ -130,7 +130,7 @@ case class AdaptiveSparkPlanExec(
@transient private val costEvaluator = SimpleCostEvaluator
@transient private val initialPlan = context.session.withActive {
@transient val initialPlan = context.session.withActive {
applyPhysicalRules(
inputPlan, queryStagePreparationRules, Some((planChangeLogger, "AQE Preparations")))
}

View file

@ -527,13 +527,13 @@ class ExplainSuite extends ExplainSuiteHelper with DisableAdaptiveExecutionSuite
class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuite {
import testImplicits._
test("Explain formatted") {
test("SPARK-35884: Explain Formatted") {
val df1 = Seq((1, 2), (2, 3)).toDF("k", "v1")
val df2 = Seq((2, 3), (1, 1)).toDF("k", "v2")
val testDf = df1.join(df2, "k").groupBy("k").agg(count("v1"), sum("v1"), avg("v2"))
// trigger the final plan for AQE
testDf.collect()
// AdaptiveSparkPlan (13)
// AdaptiveSparkPlan (21)
// +- == Final Plan ==
// * HashAggregate (12)
// +- CustomShuffleReader (11)
@ -547,30 +547,107 @@ class ExplainSuiteAE extends ExplainSuiteHelper with EnableAdaptiveExecutionSuit
// +- BroadcastExchange (4)
// +- * Project (3)
// +- * LocalTableScan (2)
// +- == Initial Plan ==
// HashAggregate (20)
// +- Exchange (19)
// +- HashAggregate (18)
// +- Project (17)
// +- BroadcastHashJoin Inner BuildRight (16)
// :- Project (14)
// : +- LocalTableScan (13)
// +- BroadcastExchange (15)
// +- Project (3)
// +- LocalTableScan (2)
checkKeywordsExistsInExplain(
testDf,
FormattedMode,
s"""
"""
|(5) BroadcastQueryStage
|Output [2]: [k#x, v2#x]
|Arguments: 0
|""".stripMargin,
s"""
|Arguments: 0""".stripMargin,
"""
|(10) ShuffleQueryStage
|Output [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|Arguments: 1
|""".stripMargin,
s"""
|Arguments: 1""".stripMargin,
"""
|(11) CustomShuffleReader
|Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|Arguments: coalesced
|""".stripMargin,
s"""
|(13) AdaptiveSparkPlan
"""
|(16) BroadcastHashJoin
|Left keys [1]: [k#x]
|Right keys [1]: [k#x]
|Join condition: None
|""".stripMargin,
"""
|(19) Exchange
|Input [5]: [k#x, count#xL, sum#xL, sum#x, count#xL]
|""".stripMargin,
"""
|(21) AdaptiveSparkPlan
|Output [4]: [k#x, count(v1)#xL, sum(v1)#xL, avg(v2)#x]
|Arguments: isFinalPlan=true
|""".stripMargin
)
checkKeywordsNotExistsInExplain(testDf, FormattedMode, "unknown")
}
test("SPARK-35884: Explain should only display one plan before AQE takes effect") {
val df = (0 to 10).toDF("id").where('id > 5)
val modes = Seq(SimpleMode, ExtendedMode, CostMode, FormattedMode)
modes.foreach { mode =>
checkKeywordsExistsInExplain(df, mode, "AdaptiveSparkPlan")
checkKeywordsNotExistsInExplain(df, mode, "Initial Plan", "Current Plan")
}
df.collect()
modes.foreach { mode =>
checkKeywordsExistsInExplain(df, mode, "Initial Plan", "Final Plan")
checkKeywordsNotExistsInExplain(df, mode, "unknown")
}
}
test("SPARK-35884: Explain formatted with subquery") {
withTempView("t1", "t2") {
spark.range(100).select('id % 10 as "key", 'id as "value").createOrReplaceTempView("t1")
spark.range(10).createOrReplaceTempView("t2")
val query =
"""
|SELECT key, value FROM t1
|JOIN t2 ON t1.key = t2.id
|WHERE value > (SELECT MAX(id) FROM t2)
|""".stripMargin
val df = sql(query).toDF()
df.collect()
checkKeywordsExistsInExplain(df, FormattedMode,
"""
|(2) Filter [codegen id : 2]
|Input [1]: [id#xL]
|Condition : ((id#xL > Subquery subquery#x, [id=#x]) AND isnotnull((id#xL % 10)))
|""".stripMargin,
"""
|(6) BroadcastQueryStage
|Output [1]: [id#xL]
|Arguments: 0""".stripMargin,
"""
|(12) AdaptiveSparkPlan
|Output [2]: [key#xL, value#xL]
|Arguments: isFinalPlan=true
|""".stripMargin,
"""
|Subquery:1 Hosting operator id = 2 Hosting Expression = Subquery subquery#x, [id=#x]
|""".stripMargin,
"""
|(16) ShuffleQueryStage
|Output [1]: [max#xL]
|Arguments: 0""".stripMargin,
"""
|(20) AdaptiveSparkPlan
|Output [1]: [max(id)#xL]
|Arguments: isFinalPlan=true
|""".stripMargin
)
checkKeywordsNotExistsInExplain(df, FormattedMode, "unknown")
}
}
test("SPARK-35133: explain codegen should work with AQE") {