[SPARK-12902] [SQL] visualization for generated operators
This PR brings back visualization for generated operators, they looks like: ![sql](https://cloud.githubusercontent.com/assets/40902/12460920/0dc7956a-bf6b-11e5-9c3f-8389f452526e.png) ![stage](https://cloud.githubusercontent.com/assets/40902/12460923/11806ac4-bf6b-11e5-9c72-e84a62c5ea93.png) Note: SQL metrics are not supported right now, because they are very slow, will be supported once we have batch mode. Author: Davies Liu <davies@databricks.com> Closes #10828 from davies/viz_codegen.
This commit is contained in:
parent
c037d25482
commit
7d877c3439
|
@ -284,7 +284,7 @@ function renderDot(dot, container, forJob) {
|
|||
renderer(container, g);
|
||||
|
||||
// Find the stage cluster and mark it for styling and post-processing
|
||||
container.selectAll("g.cluster[name*=\"Stage\"]").classed("stage", true);
|
||||
container.selectAll("g.cluster[name^=\"Stage \"]").classed("stage", true);
|
||||
}
|
||||
|
||||
/* -------------------- *
|
||||
|
|
|
@ -130,7 +130,11 @@ private[ui] object RDDOperationGraph extends Logging {
|
|||
}
|
||||
}
|
||||
// Attach the outermost cluster to the root cluster, and the RDD to the innermost cluster
|
||||
rddClusters.headOption.foreach { cluster => rootCluster.attachChildCluster(cluster) }
|
||||
rddClusters.headOption.foreach { cluster =>
|
||||
if (!rootCluster.childClusters.contains(cluster)) {
|
||||
rootCluster.attachChildCluster(cluster)
|
||||
}
|
||||
}
|
||||
rddClusters.lastOption.foreach { cluster => cluster.attachChildNode(node) }
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,6 +20,12 @@
|
|||
text-shadow: none;
|
||||
}
|
||||
|
||||
#plan-viz-graph svg g.cluster rect {
|
||||
fill: #A0DFFF;
|
||||
stroke: #3EC0FF;
|
||||
stroke-width: 1px;
|
||||
}
|
||||
|
||||
#plan-viz-graph svg g.node rect {
|
||||
fill: #C3EBFF;
|
||||
stroke: #3EC0FF;
|
||||
|
|
|
@ -36,12 +36,17 @@ class SparkPlanInfo(
|
|||
private[sql] object SparkPlanInfo {
|
||||
|
||||
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
|
||||
val children = plan match {
|
||||
case WholeStageCodegen(child, _) => child :: Nil
|
||||
case InputAdapter(child) => child :: Nil
|
||||
case plan => plan.children
|
||||
}
|
||||
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
|
||||
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
|
||||
Utils.getFormattedClassName(metric.param))
|
||||
}
|
||||
val children = plan.children.map(fromSparkPlan)
|
||||
|
||||
new SparkPlanInfo(plan.nodeName, plan.simpleString, children, plan.metadata, metrics)
|
||||
new SparkPlanInfo(plan.nodeName, plan.simpleString, children.map(fromSparkPlan),
|
||||
plan.metadata, metrics)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
|
|||
}
|
||||
|
||||
private def planVisualization(metrics: Map[Long, String], graph: SparkPlanGraph): Seq[Node] = {
|
||||
val metadata = graph.nodes.flatMap { node =>
|
||||
val metadata = graph.allNodes.flatMap { node =>
|
||||
val nodeId = s"plan-meta-data-${node.id}"
|
||||
<div id={nodeId}>{node.desc}</div>
|
||||
}
|
||||
|
@ -110,7 +110,7 @@ private[sql] class ExecutionPage(parent: SQLTab) extends WebUIPage("execution")
|
|||
<div class="dot-file">
|
||||
{graph.makeDotFile(metrics)}
|
||||
</div>
|
||||
<div id="plan-viz-metadata-size">{graph.nodes.size.toString}</div>
|
||||
<div id="plan-viz-metadata-size">{graph.allNodes.size.toString}</div>
|
||||
{metadata}
|
||||
</div>
|
||||
{planVisualizationResources}
|
||||
|
|
|
@ -219,7 +219,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
|
|||
case SparkListenerSQLExecutionStart(executionId, description, details,
|
||||
physicalPlanDescription, sparkPlanInfo, time) =>
|
||||
val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
|
||||
val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
|
||||
val sqlPlanMetrics = physicalPlanGraph.allNodes.flatMap { node =>
|
||||
node.metrics.map(metric => metric.accumulatorId -> metric)
|
||||
}
|
||||
val executionUIData = new SQLExecutionUIData(
|
||||
|
|
|
@ -21,7 +21,7 @@ import java.util.concurrent.atomic.AtomicLong
|
|||
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.spark.sql.execution.SparkPlanInfo
|
||||
import org.apache.spark.sql.execution.{InputAdapter, SparkPlanInfo, WholeStageCodegen}
|
||||
import org.apache.spark.sql.execution.metric.SQLMetrics
|
||||
|
||||
/**
|
||||
|
@ -41,6 +41,16 @@ private[ui] case class SparkPlanGraph(
|
|||
dotFile.append("}")
|
||||
dotFile.toString()
|
||||
}
|
||||
|
||||
/**
|
||||
* All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
|
||||
*/
|
||||
val allNodes: Seq[SparkPlanGraphNode] = {
|
||||
nodes.flatMap {
|
||||
case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster
|
||||
case node => Seq(node)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[sql] object SparkPlanGraph {
|
||||
|
@ -52,7 +62,7 @@ private[sql] object SparkPlanGraph {
|
|||
val nodeIdGenerator = new AtomicLong(0)
|
||||
val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
|
||||
val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
|
||||
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
|
||||
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges, null, null)
|
||||
new SparkPlanGraph(nodes, edges)
|
||||
}
|
||||
|
||||
|
@ -60,22 +70,40 @@ private[sql] object SparkPlanGraph {
|
|||
planInfo: SparkPlanInfo,
|
||||
nodeIdGenerator: AtomicLong,
|
||||
nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
|
||||
edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
|
||||
val metrics = planInfo.metrics.map { metric =>
|
||||
SQLPlanMetric(metric.name, metric.accumulatorId,
|
||||
SQLMetrics.getMetricParam(metric.metricParam))
|
||||
}
|
||||
val node = SparkPlanGraphNode(
|
||||
nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
|
||||
planInfo.simpleString, planInfo.metadata, metrics)
|
||||
edges: mutable.ArrayBuffer[SparkPlanGraphEdge],
|
||||
parent: SparkPlanGraphNode,
|
||||
subgraph: SparkPlanGraphCluster): Unit = {
|
||||
if (planInfo.nodeName == classOf[WholeStageCodegen].getSimpleName) {
|
||||
val cluster = new SparkPlanGraphCluster(
|
||||
nodeIdGenerator.getAndIncrement(),
|
||||
planInfo.nodeName,
|
||||
planInfo.simpleString,
|
||||
mutable.ArrayBuffer[SparkPlanGraphNode]())
|
||||
nodes += cluster
|
||||
buildSparkPlanGraphNode(
|
||||
planInfo.children.head, nodeIdGenerator, nodes, edges, parent, cluster)
|
||||
} else if (planInfo.nodeName == classOf[InputAdapter].getSimpleName) {
|
||||
buildSparkPlanGraphNode(planInfo.children.head, nodeIdGenerator, nodes, edges, parent, null)
|
||||
} else {
|
||||
val metrics = planInfo.metrics.map { metric =>
|
||||
SQLPlanMetric(metric.name, metric.accumulatorId,
|
||||
SQLMetrics.getMetricParam(metric.metricParam))
|
||||
}
|
||||
val node = new SparkPlanGraphNode(
|
||||
nodeIdGenerator.getAndIncrement(), planInfo.nodeName,
|
||||
planInfo.simpleString, planInfo.metadata, metrics)
|
||||
if (subgraph == null) {
|
||||
nodes += node
|
||||
} else {
|
||||
subgraph.nodes += node
|
||||
}
|
||||
|
||||
nodes += node
|
||||
val childrenNodes = planInfo.children.map(
|
||||
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
|
||||
for (child <- childrenNodes) {
|
||||
edges += SparkPlanGraphEdge(child.id, node.id)
|
||||
if (parent != null) {
|
||||
edges += SparkPlanGraphEdge(node.id, parent.id)
|
||||
}
|
||||
planInfo.children.foreach(
|
||||
buildSparkPlanGraphNode(_, nodeIdGenerator, nodes, edges, node, subgraph))
|
||||
}
|
||||
node
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -86,12 +114,12 @@ private[sql] object SparkPlanGraph {
|
|||
* @param name the name of this SparkPlan node
|
||||
* @param metrics metrics that this SparkPlan node will track
|
||||
*/
|
||||
private[ui] case class SparkPlanGraphNode(
|
||||
id: Long,
|
||||
name: String,
|
||||
desc: String,
|
||||
metadata: Map[String, String],
|
||||
metrics: Seq[SQLPlanMetric]) {
|
||||
private[ui] class SparkPlanGraphNode(
|
||||
val id: Long,
|
||||
val name: String,
|
||||
val desc: String,
|
||||
val metadata: Map[String, String],
|
||||
val metrics: Seq[SQLPlanMetric]) {
|
||||
|
||||
def makeDotNode(metricsValue: Map[Long, String]): String = {
|
||||
val builder = new mutable.StringBuilder(name)
|
||||
|
@ -117,6 +145,27 @@ private[ui] case class SparkPlanGraphNode(
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Represent a tree of SparkPlan for WholeStageCodegen.
|
||||
*/
|
||||
private[ui] class SparkPlanGraphCluster(
|
||||
id: Long,
|
||||
name: String,
|
||||
desc: String,
|
||||
val nodes: mutable.ArrayBuffer[SparkPlanGraphNode])
|
||||
extends SparkPlanGraphNode(id, name, desc, Map.empty, Nil) {
|
||||
|
||||
override def makeDotNode(metricsValue: Map[Long, String]): String = {
|
||||
s"""
|
||||
| subgraph cluster${id} {
|
||||
| label=${name};
|
||||
| ${nodes.map(_.makeDotNode(metricsValue)).mkString(" \n")}
|
||||
| }
|
||||
""".stripMargin
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Represent an edge in the SparkPlan tree. `fromId` is the parent node id, and `toId` is the child
|
||||
* node id.
|
||||
|
|
|
@ -86,7 +86,7 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
|
|||
// If we can track all jobs, check the metric values
|
||||
val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
|
||||
val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
|
||||
df.queryExecution.executedPlan)).nodes.filter { node =>
|
||||
df.queryExecution.executedPlan)).allNodes.filter { node =>
|
||||
expectedMetrics.contains(node.id)
|
||||
}.map { node =>
|
||||
val nodeMetrics = node.metrics.map { metric =>
|
||||
|
@ -134,6 +134,14 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
|
|||
)
|
||||
}
|
||||
|
||||
test("WholeStageCodegen metrics") {
|
||||
// Assume the execution plan is
|
||||
// WholeStageCodegen(nodeId = 0, Range(nodeId = 2) -> Filter(nodeId = 1))
|
||||
// TODO: update metrics in generated operators
|
||||
val df = sqlContext.range(10).filter('id < 5)
|
||||
testSparkPlanMetrics(df, 1, Map.empty)
|
||||
}
|
||||
|
||||
test("TungstenAggregate metrics") {
|
||||
// Assume the execution plan is
|
||||
// ... -> TungstenAggregate(nodeId = 2) -> Exchange(nodeId = 1)
|
||||
|
|
|
@ -83,7 +83,7 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
|
|||
val df = createTestDataFrame
|
||||
val accumulatorIds =
|
||||
SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
|
||||
.nodes.flatMap(_.metrics.map(_.accumulatorId))
|
||||
.allNodes.flatMap(_.metrics.map(_.accumulatorId))
|
||||
// Assume all accumulators are long
|
||||
var accumulatorValue = 0L
|
||||
val accumulatorUpdates = accumulatorIds.map { id =>
|
||||
|
|
Loading…
Reference in a new issue