[SPARK-31440][SQL] Improve SQL Rest API

### What changes were proposed in this pull request?
SQL Rest API exposes query execution metrics as Public API. This PR aims to apply following improvements on SQL Rest API by aligning Spark-UI.

**Proposed Improvements:**
1- Support Physical Operations and group metrics per physical operation by aligning Spark UI.
2- Support `wholeStageCodegenId` for Physical Operations
3- `nodeId` can be useful for grouping metrics and sorting physical operations (according to execution order) to differentiate same operators (if used multiple times during the same query execution) and their metrics.
4- Filter `empty` metrics by aligning with Spark UI - SQL Tab. Currently, Spark UI does not show empty metrics.
5- Remove line breakers(`\n`) from `metricValue`.
6- `planDescription` can be `optional` Http parameter to avoid network cost where there is specially complex jobs creating big-plans.
7- `metrics` attribute needs to be exposed at the bottom order as `nodes`. Specially, this can be useful for the user where `nodes` array size is high.
8- `edges` attribute is being exposed to show relationship between `nodes`.
9- Reverse order on `metricDetails` aims to match with Spark UI by supporting Physical Operators' execution order.

### Why are the changes needed?
Proposed improvements provides more useful (e.g: physical operations and metrics correlation, grouping) and clear (e.g: filtering blank metrics, removing line breakers) result for the end-user.

### Does this PR introduce any user-facing change?
Yes. Please find both current and improved versions of the results as attached for following SQL Rest Endpoint:
```
curl -X GET http://localhost:4040/api/v1/applications/$appId/sql/$executionId?details=true
```
**Current version:**
https://issues.apache.org/jira/secure/attachment/12999821/current_version.json

**Improved version:**
https://issues.apache.org/jira/secure/attachment/13000621/improved_version.json

### Backward Compatibility
SQL Rest API will be started to expose with `Spark 3.0` and `3.0.0-preview2` (released on 12/23/19) does not cover this API so if PR can catch 3.0 release, this will not have any backward compatibility issue.

### How was this patch tested?
1. New Unit tests are added.
2. Also, patch has been tested manually through both **Spark Core** and **History Server** Rest APIs.

Closes #28208 from erenavsarogullari/SPARK-31440.

Authored-by: Eren Avsarogullari <eren.avsarogullari@gmail.com>
Signed-off-by: Gengliang Wang <gengliang.wang@databricks.com>
This commit is contained in:
Eren Avsarogullari 2020-05-18 23:21:32 -07:00 committed by Gengliang Wang
parent c560428fe0
commit ab4cf49a1c
4 changed files with 293 additions and 23 deletions

View file

@ -153,7 +153,7 @@ object SparkPlanGraph {
* @param name the name of this SparkPlan node
* @param metrics metrics that this SparkPlan node will track
*/
private[ui] class SparkPlanGraphNode(
class SparkPlanGraphNode(
val id: Long,
val name: String,
val desc: String,
@ -193,7 +193,7 @@ private[ui] class SparkPlanGraphNode(
/**
* Represent a tree of SparkPlan for WholeStageCodegen.
*/
private[ui] class SparkPlanGraphCluster(
class SparkPlanGraphCluster(
id: Long,
name: String,
desc: String,
@ -229,7 +229,7 @@ private[ui] class SparkPlanGraphCluster(
* Represent an edge in the SparkPlan tree. `fromId` is the child node id, and `toId` is the parent
* node id.
*/
private[ui] case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
case class SparkPlanGraphEdge(fromId: Long, toId: Long) {
def makeDotEdge: String = s""" $fromId->$toId;\n"""
}

View file

@ -21,21 +21,29 @@ import java.util.Date
import javax.ws.rs._
import javax.ws.rs.core.MediaType
import scala.util.{Failure, Success, Try}
import org.apache.spark.JobExecutionStatus
import org.apache.spark.sql.execution.ui.{SQLAppStatusStore, SQLExecutionUIData, SQLPlanMetric}
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphNode, SQLAppStatusStore, SQLExecutionUIData}
import org.apache.spark.status.api.v1.{BaseAppResource, NotFoundException}
@Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class SqlResource extends BaseAppResource {
val WHOLE_STAGE_CODEGEN = "WholeStageCodegen"
@GET
def sqlList(
@DefaultValue("false") @QueryParam("details") details: Boolean,
@DefaultValue("true") @QueryParam("details") details: Boolean,
@DefaultValue("true") @QueryParam("planDescription") planDescription: Boolean,
@DefaultValue("0") @QueryParam("offset") offset: Int,
@DefaultValue("20") @QueryParam("length") length: Int): Seq[ExecutionData] = {
withUI { ui =>
val sqlStore = new SQLAppStatusStore(ui.store.store)
sqlStore.executionsList(offset, length).map(prepareExecutionData(_, details))
sqlStore.executionsList(offset, length).map { exec =>
val graph = sqlStore.planGraph(exec.executionId)
prepareExecutionData(exec, graph, details, planDescription)
}
}
}
@ -43,24 +51,25 @@ private[v1] class SqlResource extends BaseAppResource {
@Path("{executionId:\\d+}")
def sql(
@PathParam("executionId") execId: Long,
@DefaultValue("false") @QueryParam("details") details: Boolean): ExecutionData = {
@DefaultValue("true") @QueryParam("details") details: Boolean,
@DefaultValue("true") @QueryParam("planDescription")
planDescription: Boolean): ExecutionData = {
withUI { ui =>
val sqlStore = new SQLAppStatusStore(ui.store.store)
val graph = sqlStore.planGraph(execId)
sqlStore
.execution(execId)
.map(prepareExecutionData(_, details))
.getOrElse(throw new NotFoundException("unknown id: " + execId))
.map(prepareExecutionData(_, graph, details, planDescription))
.getOrElse(throw new NotFoundException("unknown query execution id: " + execId))
}
}
private def printableMetrics(
metrics: Seq[SQLPlanMetric],
metricValues: Map[Long, String]): Seq[Metrics] = {
metrics.map(metric =>
Metrics(metric.name, metricValues.get(metric.accumulatorId).getOrElse("")))
}
private def prepareExecutionData(
exec: SQLExecutionUIData,
graph: SparkPlanGraph,
details: Boolean,
planDescription: Boolean): ExecutionData = {
private def prepareExecutionData(exec: SQLExecutionUIData, details: Boolean): ExecutionData = {
var running = Seq[Int]()
var completed = Seq[Int]()
var failed = Seq[Int]()
@ -84,18 +93,65 @@ private[v1] class SqlResource extends BaseAppResource {
}
val duration = exec.completionTime.getOrElse(new Date()).getTime - exec.submissionTime
val planDetails = if (details) exec.physicalPlanDescription else ""
val metrics = if (details) printableMetrics(exec.metrics, exec.metricValues) else Seq.empty
val planDetails = if (planDescription) exec.physicalPlanDescription else ""
val nodes = if (details) printableMetrics(graph.allNodes, exec.metricValues) else Seq.empty
val edges = if (details) graph.edges else Seq.empty
new ExecutionData(
exec.executionId,
status,
exec.description,
planDetails,
metrics,
new Date(exec.submissionTime),
duration,
running,
completed,
failed)
failed,
nodes,
edges)
}
private def printableMetrics(allNodes: Seq[SparkPlanGraphNode],
metricValues: Map[Long, String]): Seq[Node] = {
def getMetric(metricValues: Map[Long, String], accumulatorId: Long,
metricName: String): Option[Metric] = {
metricValues.get(accumulatorId).map( mv => {
val metricValue = if (mv.startsWith("\n")) mv.substring(1, mv.length) else mv
Metric(metricName, metricValue)
})
}
val nodeIdAndWSCGIdMap = getNodeIdAndWSCGIdMap(allNodes)
val nodes = allNodes.map { node =>
val wholeStageCodegenId = nodeIdAndWSCGIdMap.get(node.id).flatten
val metrics =
node.metrics.flatMap(m => getMetric(metricValues, m.accumulatorId, m.name.trim))
Node(nodeId = node.id, nodeName = node.name.trim, wholeStageCodegenId, metrics)
}
nodes.sortBy(_.nodeId).reverse
}
private def getNodeIdAndWSCGIdMap(allNodes: Seq[SparkPlanGraphNode]): Map[Long, Option[Long]] = {
val wscgNodes = allNodes.filter(_.name.trim.startsWith(WHOLE_STAGE_CODEGEN))
val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = wscgNodes.flatMap {
_ match {
case x: SparkPlanGraphCluster => x.nodes.map(_.id -> getWholeStageCodegenId(x.name.trim))
case _ => Seq.empty
}
}.toMap
nodeIdAndWSCGIdMap
}
private def getWholeStageCodegenId(wscgNodeName: String): Option[Long] = {
Try(wscgNodeName.substring(
s"$WHOLE_STAGE_CODEGEN (".length, wscgNodeName.length - 1).toLong) match {
case Success(wscgId) => Some(wscgId)
case Failure(t) => None
}
}
}

View file

@ -18,16 +18,25 @@ package org.apache.spark.status.api.v1.sql
import java.util.Date
import org.apache.spark.sql.execution.ui.SparkPlanGraphEdge
class ExecutionData private[spark] (
val id: Long,
val status: String,
val description: String,
val planDescription: String,
val metrics: Seq[Metrics],
val submissionTime: Date,
val duration: Long,
val runningJobIds: Seq[Int],
val successJobIds: Seq[Int],
val failedJobIds: Seq[Int])
val failedJobIds: Seq[Int],
val nodes: Seq[Node],
val edges: Seq[SparkPlanGraphEdge])
case class Metrics private[spark] (metricName: String, metricValue: String)
case class Node private[spark](
nodeId: Long,
nodeName: String,
wholeStageCodegenId: Option[Long] = None,
metrics: Seq[Metric])
case class Metric private[spark] (name: String, value: String)

View file

@ -0,0 +1,205 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status.api.v1.sql
import java.util.Date
import scala.collection.mutable.ArrayBuffer
import org.scalatest.PrivateMethodTester
import org.apache.spark.{JobExecutionStatus, SparkFunSuite}
import org.apache.spark.sql.execution.ui.{SparkPlanGraph, SparkPlanGraphCluster, SparkPlanGraphEdge, SparkPlanGraphNode, SQLExecutionUIData, SQLPlanMetric}
object SqlResourceSuite {
val SCAN_TEXT = "Scan text"
val FILTER = "Filter"
val WHOLE_STAGE_CODEGEN_1 = "WholeStageCodegen (1)"
val DURATION = "duration"
val NUMBER_OF_OUTPUT_ROWS = "number of output rows"
val METADATA_TIME = "metadata time"
val NUMBER_OF_FILES_READ = "number of files read"
val SIZE_OF_FILES_READ = "size of files read"
val PLAN_DESCRIPTION = "== Physical Plan ==\nCollectLimit (3)\n+- * Filter (2)\n +- Scan text..."
val DESCRIPTION = "csv at MyDataFrames.scala:57"
val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = Map(1L -> Some(1L))
val filterNode = new SparkPlanGraphNode(1, FILTER, "",
metrics = Seq(SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, "")))
val nodes: Seq[SparkPlanGraphNode] = Seq(
new SparkPlanGraphCluster(0, WHOLE_STAGE_CODEGEN_1, "",
nodes = ArrayBuffer(filterNode),
metrics = Seq(SQLPlanMetric(DURATION, 0, ""))),
new SparkPlanGraphNode(2, SCAN_TEXT, "",
metrics = Seq(
SQLPlanMetric(METADATA_TIME, 2, ""),
SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""),
SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""),
SQLPlanMetric(SIZE_OF_FILES_READ, 5, ""))))
val nodesWhenCodegenIsOff: Seq[SparkPlanGraphNode] =
SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == WHOLE_STAGE_CODEGEN_1)
val edges: Seq[SparkPlanGraphEdge] =
Seq(SparkPlanGraphEdge(3, 2))
val metrics: Seq[SQLPlanMetric] = {
Seq(SQLPlanMetric(DURATION, 0, ""),
SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 1, ""),
SQLPlanMetric(METADATA_TIME, 2, ""),
SQLPlanMetric(NUMBER_OF_FILES_READ, 3, ""),
SQLPlanMetric(NUMBER_OF_OUTPUT_ROWS, 4, ""),
SQLPlanMetric(SIZE_OF_FILES_READ, 5, ""))
}
val sqlExecutionUIData: SQLExecutionUIData = {
def getMetricValues() = {
Map[Long, String](
0L -> "0 ms",
1L -> "1",
2L -> "2 ms",
3L -> "1",
4L -> "1",
5L -> "330.0 B"
)
}
new SQLExecutionUIData(
executionId = 0,
description = DESCRIPTION,
details = "",
physicalPlanDescription = PLAN_DESCRIPTION,
metrics = metrics,
submissionTime = 1586768888233L,
completionTime = Some(new Date(1586768888999L)),
jobs = Map[Int, JobExecutionStatus](
0 -> JobExecutionStatus.SUCCEEDED,
1 -> JobExecutionStatus.SUCCEEDED),
stages = Set[Int](),
metricValues = getMetricValues()
)
}
private def getNodes(): Seq[Node] = {
val node = Node(0, WHOLE_STAGE_CODEGEN_1,
wholeStageCodegenId = None, metrics = Seq(Metric(DURATION, "0 ms")))
val node2 = Node(1, FILTER,
wholeStageCodegenId = Some(1), metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1")))
val node3 = Node(2, SCAN_TEXT, wholeStageCodegenId = None,
metrics = Seq(Metric(METADATA_TIME, "2 ms"),
Metric(NUMBER_OF_FILES_READ, "1"),
Metric(NUMBER_OF_OUTPUT_ROWS, "1"),
Metric(SIZE_OF_FILES_READ, "330.0 B")))
// reverse order because of supporting execution order by aligning with Spark-UI
Seq(node3, node2, node)
}
private def getExpectedNodesWhenWholeStageCodegenIsOff(): Seq[Node] = {
val node = Node(1, FILTER, metrics = Seq(Metric(NUMBER_OF_OUTPUT_ROWS, "1")))
val node2 = Node(2, SCAN_TEXT,
metrics = Seq(Metric(METADATA_TIME, "2 ms"),
Metric(NUMBER_OF_FILES_READ, "1"),
Metric(NUMBER_OF_OUTPUT_ROWS, "1"),
Metric(SIZE_OF_FILES_READ, "330.0 B")))
// reverse order because of supporting execution order by aligning with Spark-UI
Seq(node2, node)
}
private def verifyExpectedExecutionData(executionData: ExecutionData,
nodes: Seq[Node],
edges: Seq[SparkPlanGraphEdge],
planDescription: String): Unit = {
assert(executionData.id == 0)
assert(executionData.status == "COMPLETED")
assert(executionData.description == DESCRIPTION)
assert(executionData.planDescription == planDescription)
assert(executionData.submissionTime == new Date(1586768888233L))
assert(executionData.duration == 766L)
assert(executionData.successJobIds == Seq[Int](0, 1))
assert(executionData.runningJobIds == Seq[Int]())
assert(executionData.failedJobIds == Seq.empty)
assert(executionData.nodes == nodes)
assert(executionData.edges == edges)
}
}
/**
* Sql Resource Public API Unit Tests.
*/
class SqlResourceSuite extends SparkFunSuite with PrivateMethodTester {
import SqlResourceSuite._
val sqlResource = new SqlResource()
val prepareExecutionData = PrivateMethod[ExecutionData]('prepareExecutionData)
test("Prepare ExecutionData when details = false and planDescription = false") {
val executionData =
sqlResource invokePrivate prepareExecutionData(
sqlExecutionUIData, SparkPlanGraph(Seq.empty, Seq.empty), false, false)
verifyExpectedExecutionData(executionData, edges = Seq.empty,
nodes = Seq.empty, planDescription = "")
}
test("Prepare ExecutionData when details = true and planDescription = false") {
val executionData =
sqlResource invokePrivate prepareExecutionData(
sqlExecutionUIData, SparkPlanGraph(nodes, edges), true, false)
verifyExpectedExecutionData(
executionData,
nodes = getNodes(),
edges,
planDescription = "")
}
test("Prepare ExecutionData when details = true and planDescription = true") {
val executionData =
sqlResource invokePrivate prepareExecutionData(
sqlExecutionUIData, SparkPlanGraph(nodes, edges), true, true)
verifyExpectedExecutionData(
executionData,
nodes = getNodes(),
edges = edges,
planDescription = PLAN_DESCRIPTION)
}
test("Prepare ExecutionData when details = true and planDescription = false and WSCG = off") {
val executionData =
sqlResource invokePrivate prepareExecutionData(
sqlExecutionUIData, SparkPlanGraph(nodesWhenCodegenIsOff, edges), true, false)
verifyExpectedExecutionData(
executionData,
nodes = getExpectedNodesWhenWholeStageCodegenIsOff(),
edges = edges,
planDescription = "")
}
test("Parse wholeStageCodegenId from nodeName") {
val getWholeStageCodegenId = PrivateMethod[Option[Long]]('getWholeStageCodegenId)
val wholeStageCodegenId =
sqlResource invokePrivate getWholeStageCodegenId(WHOLE_STAGE_CODEGEN_1)
assert(wholeStageCodegenId == Some(1))
}
}