[SPARK-30041][SQL][WEBUI] Add Codegen Stage Id to Stage DAG visualization in Web UI

### What changes were proposed in this pull request?
SPARK-29894 provides information on the Codegen Stage Id in WEBUI for SQL Plan graphs. Similarly, this proposes to add Codegen Stage Id in the DAG visualization for Stage execution. DAGs for Stage execution are available in the WEBUI under the Jobs and Stages tabs.

### Why are the changes needed?
This is proposed as an aid for drill-down analysis of complex SQL statement execution, as it is not always easy to match parts of the SQL Plan graph with the corresponding Stage DAG execution graph. Adding Codegen Stage Id for WholeStageCodegen operations makes this task easier.

### Does this PR introduce any user-facing change?
Stage DAG visualization in the WEBUI will show codegen stage id for WholeStageCodegen operations, as in the example snippet from the WEBUI, Jobs tab  (the query used in the example is TPCDS 2.4 q14a):
![](https://issues.apache.org/jira/secure/attachment/12987461/Snippet_StagesDags_with_CodegenId%20_annotated.png)

### How was this patch tested?
Manually tested, see also example snippet.

Closes #26675 from LucaCanali/addCodegenStageIdtoStageGraph.

Authored-by: Luca Canali <luca.canali@cern.ch>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Luca Canali 2020-01-18 01:00:45 +08:00 committed by Wenchen Fan
parent f5f05d549e
commit fd308ade52
7 changed files with 9 additions and 9 deletions

Binary file not shown.

Before

Width:  |  Height:  |  Size: 26 KiB

After

Width:  |  Height:  |  Size: 18 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 45 KiB

After

Width:  |  Height:  |  Size: 32 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

After

Width:  |  Height:  |  Size: 24 KiB

View file

@ -73,7 +73,8 @@ This page displays the details of a specific job identified by its job ID.
</p>
* DAG visualization: Visual representation of the directed acyclic graph of this job where vertices represent the RDDs or DataFrames and the edges represent an operation to be applied on RDD.
* An example of DAG visualization for `sc.parallelize(1 to 100).toDF.count()`
<p style="text-align: center;">
<img src="img/JobPageDetail2.png" title="DAG" alt="DAG" width="40%">
</p>
@ -124,6 +125,8 @@ The stage detail page begins with information like total time across all tasks,
</p>
There is also a visual representation of the directed acyclic graph (DAG) of this stage, where vertices represent the RDDs or DataFrames and the edges represent an operation to be applied.
Nodes are grouped by operation scope in the DAG visualization and labelled with the operation scope name (BatchScan, WholeStageCodegen, Exchange, etc).
Notably, Whole Stage Code Generation operations are also annotated with the code generation id. For stages belonging to Spark DataFrame or SQL execution, this allows to cross-reference Stage execution details to the relevant details in the Web-UI SQL Tab page where SQL plan graphs and execution plans are reported.
<p style="text-align: center;">
<img src="img/AllStagesPageDetail5.png" title="Stage DAG" alt="Stage DAG" width="50%">

View file

@ -62,19 +62,13 @@ private[execution] object SparkPlanInfo {
new SQLMetricInfo(metric.name.getOrElse(key), metric.id, metric.metricType)
}
val nodeName = plan match {
case physicalOperator: WholeStageCodegenExec =>
s"${plan.nodeName} (${physicalOperator.codegenStageId})"
case _ => plan.nodeName
}
// dump the file scan metadata (e.g file path) to event log
val metadata = plan match {
case fileScan: FileSourceScanExec => fileScan.metadata
case _ => Map[String, String]()
}
new SparkPlanInfo(
nodeName,
plan.nodeName,
plan.simpleString(SQLConf.get.maxToStringFields),
children.map(fromSparkPlan),
metadata,

View file

@ -54,6 +54,7 @@ trait CodegenSupport extends SparkPlan {
case _: RDDScanExec => "rdd"
case _: DataSourceScanExec => "scan"
case _: InMemoryTableScanExec => "memoryScan"
case _: WholeStageCodegenExec => "wholestagecodegen"
case _ => nodeName.toLowerCase(Locale.ROOT)
}
@ -613,6 +614,8 @@ case class WholeStageCodegenExec(child: SparkPlan)(val codegenStageId: Int)
"pipelineTime" -> SQLMetrics.createTimingMetric(sparkContext,
WholeStageCodegenExec.PIPELINE_DURATION_METRIC))
override def nodeName: String = s"WholeStageCodegen (${codegenStageId})"
def generatedClassName(): String = if (conf.wholeStageUseIdInClassName) {
s"GeneratedIteratorForCodegenStage$codegenStageId"
} else {

View file

@ -86,7 +86,7 @@ class DebuggingSuite extends SharedSparkSession {
"""== BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, false])), [id=#x] ==
|Tuples output: 0
| id LongType: {}
|== WholeStageCodegen ==
|== WholeStageCodegen (1) ==
|Tuples output: 10
| id LongType: {java.lang.Long}
|== Range (0, 10, step=1, splits=2) ==