[SPARK-29894][SQL][WEBUI] Add Codegen Stage Id to Spark plan graphs in Web UI SQL Tab

### What changes were proposed in this pull request?
The Web UI SQL Tab provides information on the executed SQL using plan graphs and by reporting SQL execution plans. Both sources provide useful information. Physical execution plans report Codegen Stage Ids. This PR adds Codegen Stage Ids to the plan graphs.

### Why are the changes needed?
It is useful to have Codegen Stage Id information also reported in plan graphs, this allows to more easily match physical plans and graphs with metrics when troubleshooting SQL execution.
Example snippet to show the proposed change:

![](https://issues.apache.org/jira/secure/attachment/12985837/snippet__plan_graph_with_Codegen_Stage_Id_Annotated.png)

Example of the current state:
![](https://issues.apache.org/jira/secure/attachment/12985838/snippet_plan_graph_before_patch.png)

Physical plan:
![](https://issues.apache.org/jira/secure/attachment/12985932/Physical_plan_Annotated.png)

### Does this PR introduce any user-facing change?
This PR adds Codegen Stage Id information to SQL plan graphs in the Web UI/SQL Tab.

### How was this patch tested?
Added a test + manually tested

Closes #26519 from LucaCanali/addCodegenStageIdtoWEBUIGraphs.

Authored-by: Luca Canali <luca.canali@cern.ch>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Luca Canali 2019-11-20 23:20:33 +08:00 committed by Wenchen Fan
parent 56a65b971d
commit b5df40bd87
6 changed files with 27 additions and 9 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 335 KiB

After

Width:  |  Height:  |  Size: 72 KiB

View file

@ -336,7 +336,7 @@ scala> spark.sql("select name,sum(count) from global_temp.df group by name").sho
</p>
Now the above three dataframe/SQL operators are shown in the list. If we click the
'show at \<console\>: 24' link of the last query, we will see the DAG of the job.
'show at \<console\>: 24' link of the last query, we will see the DAG and details of the query execution.
<p style="text-align: center;">
<img src="img/webui-sql-dag.png"
@ -346,10 +346,12 @@ Now the above three dataframe/SQL operators are shown in the list. If we click t
<!-- Images are downsized intentionally to improve quality on retina displays -->
</p>
We can see that details information of each stage. The first block 'WholeStageCodegen'
compile multiple operator ('LocalTableScan' and 'HashAggregate') together into a single Java
function to improve performance, and metrics like number of rows and spill size are listed in
the block. The second block 'Exchange' shows the metrics on the shuffle exchange, including
The query details page displays information about the query execution time, its duration,
the list of associated jobs, and the query execution DAG.
The first block 'WholeStageCodegen (1)' compiles multiple operators ('LocalTableScan' and 'HashAggregate') together into a single Java
function to improve performance, and metrics like number of rows and spill size are listed in the block.
The annotation '(1)' in the block name is the code generation id.
The second block 'Exchange' shows the metrics on the shuffle exchange, including
number of written shuffle records, total data size, etc.
@ -362,6 +364,8 @@ number of written shuffle records, total data size, etc.
</p>
Clicking the 'Details' link on the bottom displays the logical plans and the physical plan, which
illustrate how Spark parses, analyzes, optimizes and performs the query.
Steps in the physical plan subject to whole stage code generation optimization, are prefixed by a star followed by
the code generation id, for example: '*(1) LocalTableScan'
### SQL metrics

View file

@ -62,15 +62,22 @@ private[execution] object SparkPlanInfo {
new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
}
val nodeName = plan match {
case physicalOperator: WholeStageCodegenExec =>
s"${plan.nodeName} (${physicalOperator.codegenStageId})"
case _ => plan.nodeName
}
// dump the file scan metadata (e.g file path) to event log
val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
case _ => Map[String, String]()
}
new SparkPlanInfo(
plan.nodeName,
nodeName,
plan.simpleString(SQLConf.get.maxToStringFields),
children.map(fromSparkPlan),
metadata, metrics)
metadata,
metrics)
}
}

View file

@ -78,7 +78,7 @@ object SparkPlanGraph {
subgraph: SparkPlanGraphCluster,
exchanges: mutable.HashMap[SparkPlanInfo, SparkPlanGraphNode]): Unit = {
planInfo.nodeName match {
case "WholeStageCodegen" =>
case name if name.startsWith("WholeStageCodegen") =>
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId, metric.metricType)
}

View file

@ -83,7 +83,7 @@ class SQLMetricsSuite extends SharedSparkSession with SQLMetricsTestUtils {
// TODO: update metrics in generated operators
val ds = spark.range(10).filter('id < 5)
testSparkPlanMetricsWithPredicates(ds.toDF(), 1, Map(
0L -> (("WholeStageCodegen", Map(
0L -> (("WholeStageCodegen (1)", Map(
"duration total (min, med, max)" -> {_.toString.matches(timingMetricPattern)})))
), true)
}

View file

@ -38,6 +38,7 @@ import org.apache.spark.sql.catalyst.plans.logical.LocalRelation
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.functions.count
import org.apache.spark.sql.internal.StaticSQLConf.UI_RETAINED_EXECUTIONS
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.status.ElementTrackingStore
@ -616,6 +617,12 @@ class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTestUtils
assert(statusStore.executionsCount === 2)
assert(statusStore.execution(2) === None)
}
test("SPARK-29894 test Codegen Stage Id in SparkPlanInfo") {
val df = createTestDataFrame.select(count("*"))
val sparkPlanInfo = SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan)
assert(sparkPlanInfo.nodeName === "WholeStageCodegen (2)")
}
}