[SPARK-31650][SQL] Fix wrong UI in case of AdaptiveSparkPlanExec has unmanaged subqueries

### What changes were proposed in this pull request?

Make the non-subquery `AdaptiveSparkPlanExec` update UI again after execute/executeCollect/executeTake/executeTail if the `AdaptiveSparkPlanExec` has subqueries which do not belong to any query stages.

### Why are the changes needed?

If there're subqueries do not belong to any query stages of the main query, the main query could get final physical plan and update UI before those subqueries finished. As a result, the UI can not reflect the change from the subqueries, e.g. new nodes generated from subqueries.

Before:

<img width="335" alt="before_aqe_ui" src="https://user-images.githubusercontent.com/16397174/81149758-671a9480-8fb1-11ea-84c4-9a4520e2b08e.png">

After:
<img width="546" alt="after_aqe_ui" src="https://user-images.githubusercontent.com/16397174/81149752-63870d80-8fb1-11ea-9852-f41e11afe216.png">

### Does this PR introduce _any_ user-facing change?

No(AQE feature hasn't been released).

### How was this patch tested?

Tested manually.

Closes #28460 from Ngone51/fix_aqe_ui.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
yi.wu 2020-05-06 12:52:53 +00:00 committed by Wenchen Fan
parent f05560bf50
commit b16ea8e1ab
2 changed files with 36 additions and 12 deletions

View file

@ -138,6 +138,13 @@ case class AdaptiveSparkPlanExec(
executedPlan.resetMetrics() executedPlan.resetMetrics()
} }
private def getExecutionId: Option[Long] = {
// If the `QueryExecution` does not match the current execution ID, it means the execution ID
// belongs to another (parent) query, and we should not call update UI in this query.
Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY))
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
}
private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized {
if (isFinalPlan) return currentPhysicalPlan if (isFinalPlan) return currentPhysicalPlan
@ -145,11 +152,7 @@ case class AdaptiveSparkPlanExec(
// `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be
// created in the middle of the execution. // created in the middle of the execution.
context.session.withActive { context.session.withActive {
// If the `QueryExecution` does not match the current execution ID, it means the execution ID val executionId = getExecutionId
// belongs to another (parent) query, and we should not call update UI in this query.
val executionId =
Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY))
.map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe)
var currentLogicalPlan = currentPhysicalPlan.logicalLink.get var currentLogicalPlan = currentPhysicalPlan.logicalLink.get
var result = createQueryStages(currentPhysicalPlan) var result = createQueryStages(currentPhysicalPlan)
val events = new LinkedBlockingQueue[StageMaterializationEvent]() val events = new LinkedBlockingQueue[StageMaterializationEvent]()
@ -230,25 +233,43 @@ case class AdaptiveSparkPlanExec(
currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules) currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules)
isFinalPlan = true isFinalPlan = true
executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan)))
logOnLevel(s"Final plan: $currentPhysicalPlan")
currentPhysicalPlan currentPhysicalPlan
} }
} }
// Use a lazy val to avoid this being called more than once.
@transient private lazy val finalPlanUpdate: Unit = {
// Subqueries that don't belong to any query stage of the main query will execute after the
// last UI update in `getFinalPhysicalPlan`, so we need to update UI here again to make sure
// the newly generated nodes of those subqueries are updated.
if (!isSubquery && currentPhysicalPlan.find(_.subqueries.nonEmpty).isDefined) {
getExecutionId.foreach(onUpdatePlan(_, Seq.empty))
}
logOnLevel(s"Final plan: $currentPhysicalPlan")
}
override def executeCollect(): Array[InternalRow] = { override def executeCollect(): Array[InternalRow] = {
getFinalPhysicalPlan().executeCollect() val rdd = getFinalPhysicalPlan().executeCollect()
finalPlanUpdate
rdd
} }
override def executeTake(n: Int): Array[InternalRow] = { override def executeTake(n: Int): Array[InternalRow] = {
getFinalPhysicalPlan().executeTake(n) val rdd = getFinalPhysicalPlan().executeTake(n)
finalPlanUpdate
rdd
} }
override def executeTail(n: Int): Array[InternalRow] = { override def executeTail(n: Int): Array[InternalRow] = {
getFinalPhysicalPlan().executeTail(n) val rdd = getFinalPhysicalPlan().executeTail(n)
finalPlanUpdate
rdd
} }
override def doExecute(): RDD[InternalRow] = { override def doExecute(): RDD[InternalRow] = {
getFinalPhysicalPlan().execute() val rdd = getFinalPhysicalPlan().execute()
finalPlanUpdate
rdd
} }
protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan") protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan")

View file

@ -71,12 +71,15 @@ class AdaptiveQueryExecSuite
} }
val planAfter = dfAdaptive.queryExecution.executedPlan val planAfter = dfAdaptive.queryExecution.executedPlan
assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true"))
val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
spark.sparkContext.listenerBus.waitUntilEmpty() spark.sparkContext.listenerBus.waitUntilEmpty()
assert(finalPlanCnt == 1) // AQE will post `SparkListenerSQLAdaptiveExecutionUpdate` twice in case of subqueries that
// exist out of query stages.
val expectedFinalPlanCnt = adaptivePlan.find(_.subqueries.nonEmpty).map(_ => 2).getOrElse(1)
assert(finalPlanCnt == expectedFinalPlanCnt)
spark.sparkContext.removeSparkListener(listener) spark.sparkContext.removeSparkListener(listener)
val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan
val exchanges = adaptivePlan.collect { val exchanges = adaptivePlan.collect {
case e: Exchange => e case e: Exchange => e
} }