From b16ea8e1ab58bd24c50d31ce0dfc6c79c87fa3b2 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 6 May 2020 12:52:53 +0000 Subject: [PATCH] [SPARK-31650][SQL] Fix wrong UI in case of AdaptiveSparkPlanExec has unmanaged subqueries ### What changes were proposed in this pull request? Make the non-subquery `AdaptiveSparkPlanExec` update UI again after execute/executeCollect/executeTake/executeTail if the `AdaptiveSparkPlanExec` has subqueries which do not belong to any query stages. ### Why are the changes needed? If there're subqueries do not belong to any query stages of the main query, the main query could get final physical plan and update UI before those subqueries finished. As a result, the UI can not reflect the change from the subqueries, e.g. new nodes generated from subqueries. Before: before_aqe_ui After: after_aqe_ui ### Does this PR introduce _any_ user-facing change? No(AQE feature hasn't been released). ### How was this patch tested? Tested manually. Closes #28460 from Ngone51/fix_aqe_ui. Authored-by: yi.wu Signed-off-by: Wenchen Fan --- .../adaptive/AdaptiveSparkPlanExec.scala | 41 ++++++++++++++----- .../adaptive/AdaptiveQueryExecSuite.scala | 7 +++- 2 files changed, 36 insertions(+), 12 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index 54d32c3f93..32308063a1 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -138,6 +138,13 @@ case class AdaptiveSparkPlanExec( executedPlan.resetMetrics() } + private def getExecutionId: Option[Long] = { + // If the `QueryExecution` does not match the current execution ID, it means the execution ID + // belongs to another (parent) query, and we should not call update UI in this query. + Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) + .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) + } + private def getFinalPhysicalPlan(): SparkPlan = lock.synchronized { if (isFinalPlan) return currentPhysicalPlan @@ -145,11 +152,7 @@ case class AdaptiveSparkPlanExec( // `plan.queryExecution.rdd`, we need to set active session here as new plan nodes can be // created in the middle of the execution. context.session.withActive { - // If the `QueryExecution` does not match the current execution ID, it means the execution ID - // belongs to another (parent) query, and we should not call update UI in this query. - val executionId = - Option(context.session.sparkContext.getLocalProperty(SQLExecution.EXECUTION_ID_KEY)) - .map(_.toLong).filter(SQLExecution.getQueryExecution(_) eq context.qe) + val executionId = getExecutionId var currentLogicalPlan = currentPhysicalPlan.logicalLink.get var result = createQueryStages(currentPhysicalPlan) val events = new LinkedBlockingQueue[StageMaterializationEvent]() @@ -230,25 +233,43 @@ case class AdaptiveSparkPlanExec( currentPhysicalPlan = applyPhysicalRules(result.newPlan, queryStageOptimizerRules) isFinalPlan = true executionId.foreach(onUpdatePlan(_, Seq(currentPhysicalPlan))) - logOnLevel(s"Final plan: $currentPhysicalPlan") currentPhysicalPlan } } + // Use a lazy val to avoid this being called more than once. + @transient private lazy val finalPlanUpdate: Unit = { + // Subqueries that don't belong to any query stage of the main query will execute after the + // last UI update in `getFinalPhysicalPlan`, so we need to update UI here again to make sure + // the newly generated nodes of those subqueries are updated. + if (!isSubquery && currentPhysicalPlan.find(_.subqueries.nonEmpty).isDefined) { + getExecutionId.foreach(onUpdatePlan(_, Seq.empty)) + } + logOnLevel(s"Final plan: $currentPhysicalPlan") + } + override def executeCollect(): Array[InternalRow] = { - getFinalPhysicalPlan().executeCollect() + val rdd = getFinalPhysicalPlan().executeCollect() + finalPlanUpdate + rdd } override def executeTake(n: Int): Array[InternalRow] = { - getFinalPhysicalPlan().executeTake(n) + val rdd = getFinalPhysicalPlan().executeTake(n) + finalPlanUpdate + rdd } override def executeTail(n: Int): Array[InternalRow] = { - getFinalPhysicalPlan().executeTail(n) + val rdd = getFinalPhysicalPlan().executeTail(n) + finalPlanUpdate + rdd } override def doExecute(): RDD[InternalRow] = { - getFinalPhysicalPlan().execute() + val rdd = getFinalPhysicalPlan().execute() + finalPlanUpdate + rdd } protected override def stringArgs: Iterator[Any] = Iterator(s"isFinalPlan=$isFinalPlan") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index b85e89e7ba..7a23e048da 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -71,12 +71,15 @@ class AdaptiveQueryExecSuite } val planAfter = dfAdaptive.queryExecution.executedPlan assert(planAfter.toString.startsWith("AdaptiveSparkPlan isFinalPlan=true")) + val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan spark.sparkContext.listenerBus.waitUntilEmpty() - assert(finalPlanCnt == 1) + // AQE will post `SparkListenerSQLAdaptiveExecutionUpdate` twice in case of subqueries that + // exist out of query stages. + val expectedFinalPlanCnt = adaptivePlan.find(_.subqueries.nonEmpty).map(_ => 2).getOrElse(1) + assert(finalPlanCnt == expectedFinalPlanCnt) spark.sparkContext.removeSparkListener(listener) - val adaptivePlan = planAfter.asInstanceOf[AdaptiveSparkPlanExec].executedPlan val exchanges = adaptivePlan.collect { case e: Exchange => e }