[SPARK-35695][SQL][FOLLOWUP] Use AQE helper to simplify the code in CollectMetricsExec

### What changes were proposed in this pull request?

This is a followup of https://github.com/apache/spark/pull/32862 , to simplify the code with AQE helper.

### Why are the changes needed?

code cleanup

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

Closes #33026 from cloud-fan/follow.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
This commit is contained in:
Wenchen Fan 2021-06-23 09:54:12 +09:00 committed by Hyukjin Kwon
parent 68b54b702c
commit a87ee5d8b9

View file

@ -22,7 +22,7 @@ import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow}
import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder} import org.apache.spark.sql.catalyst.expressions.{Attribute, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanExec, QueryStageExec} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
import org.apache.spark.sql.types.StructType import org.apache.spark.sql.types.StructType
@ -89,19 +89,16 @@ case class CollectMetricsExec(
copy(child = newChild) copy(child = newChild)
} }
object CollectMetricsExec { object CollectMetricsExec extends AdaptiveSparkPlanHelper {
/** /**
* Recursively collect all collected metrics from a query tree. * Recursively collect all collected metrics from a query tree.
*/ */
def collect(plan: SparkPlan): Map[String, Row] = { def collect(plan: SparkPlan): Map[String, Row] = {
val metrics = plan.collectWithSubqueries { val metrics = collectWithSubqueries(plan) {
case collector: CollectMetricsExec => Map(collector.name -> collector.collectedMetrics) case collector: CollectMetricsExec =>
Map(collector.name -> collector.collectedMetrics)
case tableScan: InMemoryTableScanExec => case tableScan: InMemoryTableScanExec =>
CollectMetricsExec.collect(tableScan.relation.cachedPlan) CollectMetricsExec.collect(tableScan.relation.cachedPlan)
case adaptivePlan: AdaptiveSparkPlanExec =>
CollectMetricsExec.collect(adaptivePlan.executedPlan)
case queryStageExec: QueryStageExec =>
CollectMetricsExec.collect(queryStageExec.plan)
} }
metrics.reduceOption(_ ++ _).getOrElse(Map.empty) metrics.reduceOption(_ ++ _).getOrElse(Map.empty)
} }