[SPARK-35552][SQL] Make query stage materialized more readable

### What changes were proposed in this pull request?

Add a new method `isMaterialized` in `QueryStageExec`.

### Why are the changes needed?

Currently, we use `resultOption().get.isDefined` to check if a query stage has materialized. The code is not readable at a glance. It's better to use a new method like `isMaterialized` to define it.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass CI.

Closes #32689 from ulysses-you/SPARK-35552.

Authored-by: ulysses-you <ulyssesyou18@gmail.com>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
This commit is contained in:
ulysses-you 2021-05-28 20:42:11 +08:00 committed by Gengliang Wang
parent 7eb74482a7
commit 3b94aad5e7
4 changed files with 11 additions and 9 deletions

View file

@ -37,14 +37,13 @@ object AQEPropagateEmptyRelation extends PropagateEmptyRelationBase {
super.nonEmpty(plan) || getRowCount(plan).exists(_ > 0)
private def getRowCount(plan: LogicalPlan): Option[BigInt] = plan match {
case LogicalQueryStage(_, stage: QueryStageExec) if stage.resultOption.get().isDefined =>
case LogicalQueryStage(_, stage: QueryStageExec) if stage.isMaterialized =>
stage.getRuntimeStatistics.rowCount
case _ => None
}
private def isRelationWithAllNullKeys(plan: LogicalPlan): Boolean = plan match {
case LogicalQueryStage(_, stage: BroadcastQueryStageExec)
if stage.resultOption.get().isDefined =>
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.isMaterialized =>
stage.broadcast.relationFuture.get().value == HashedRelationWithAllNullKeys
case _ => false
}

View file

@ -420,7 +420,7 @@ case class AdaptiveSparkPlanExec(
context.stageCache.get(e.canonicalized) match {
case Some(existingStage) if conf.exchangeReuseEnabled =>
val stage = reuseQueryStage(existingStage, e)
val isMaterialized = stage.resultOption.get().isDefined
val isMaterialized = stage.isMaterialized
CreateStageResult(
newPlan = stage,
allChildStagesMaterialized = isMaterialized,
@ -442,7 +442,7 @@ case class AdaptiveSparkPlanExec(
newStage = reuseQueryStage(queryStage, e)
}
}
val isMaterialized = newStage.resultOption.get().isDefined
val isMaterialized = newStage.isMaterialized
CreateStageResult(
newPlan = newStage,
allChildStagesMaterialized = isMaterialized,
@ -455,7 +455,7 @@ case class AdaptiveSparkPlanExec(
case q: QueryStageExec =>
CreateStageResult(newPlan = q,
allChildStagesMaterialized = q.resultOption.get().isDefined, newStages = Seq.empty)
allChildStagesMaterialized = q.isMaterialized, newStages = Seq.empty)
case _ =>
if (plan.children.isEmpty) {

View file

@ -53,7 +53,7 @@ object DynamicJoinSelection extends Rule[LogicalPlan] {
}
private def selectJoinStrategy(plan: LogicalPlan): Option[JoinStrategyHint] = plan match {
case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.resultOption.get().isDefined
case LogicalQueryStage(_, stage: ShuffleQueryStageExec) if stage.isMaterialized
&& stage.mapStats.isDefined =>
val demoteBroadcastHash = shouldDemoteBroadcastHashJoin(stage.mapStats.get)
val preferShuffleHash = preferShuffledHashJoin(stage.mapStats.get)

View file

@ -95,11 +95,13 @@ abstract class QueryStageExec extends LeafExecNode {
/**
* Compute the statistics of the query stage if executed, otherwise None.
*/
def computeStats(): Option[Statistics] = resultOption.get().map { _ =>
def computeStats(): Option[Statistics] = if (isMaterialized) {
val runtimeStats = getRuntimeStatistics
val dataSize = runtimeStats.sizeInBytes.max(0)
val numOutputRows = runtimeStats.rowCount.map(_.max(0))
Statistics(dataSize, numOutputRows, isRuntime = true)
Some(Statistics(dataSize, numOutputRows, isRuntime = true))
} else {
None
}
@transient
@ -107,6 +109,7 @@ abstract class QueryStageExec extends LeafExecNode {
protected var _resultOption = new AtomicReference[Option[Any]](None)
private[adaptive] def resultOption: AtomicReference[Option[Any]] = _resultOption
def isMaterialized: Boolean = resultOption.get().isDefined
override def output: Seq[Attribute] = plan.output
override def outputPartitioning: Partitioning = plan.outputPartitioning