[SPARK-11206] Support SQL UI on the history server (resubmit)

Resubmit #9297 and #9991
On the live web UI, there is a SQL tab which provides valuable information for the SQL query. But once the workload is finished, we won't see the SQL tab on the history server. It will be helpful if we support SQL UI on the history server so we can analyze it even after its execution.

To support SQL UI on the history server:
1. I added an onOtherEvent method to the SparkListener trait and post all SQL related events to the same event bus.
2. Two SQL events SparkListenerSQLExecutionStart and SparkListenerSQLExecutionEnd are defined in the sql module.
3. The new SQL events are written to event log using Jackson.
4. A new trait SparkHistoryListenerFactory is added to allow the history server to feed events to the SQL history listener. The SQL implementation is loaded at runtime using java.util.ServiceLoader.

Author: Carson Wang <carson.wang@intel.com>

Closes #10061 from carsonwang/SqlHistoryUI.
This commit is contained in:
Carson Wang 2015-12-03 16:39:12 -08:00 committed by Marcelo Vanzin
parent f434f36d50
commit b6e9963ee4
21 changed files with 329 additions and 135 deletions

View file

@ -82,4 +82,5 @@ INDEX
gen-java.*
.*avpr
org.apache.spark.sql.sources.DataSourceRegister
org.apache.spark.scheduler.SparkHistoryListenerFactory
.*parquet

View file

@ -82,4 +82,7 @@ public class JavaSparkListener implements SparkListener {
@Override
public void onBlockUpdated(SparkListenerBlockUpdated blockUpdated) { }
@Override
public void onOtherEvent(SparkListenerEvent event) { }
}

View file

@ -118,4 +118,8 @@ public class SparkFirehoseListener implements SparkListener {
onEvent(blockUpdated);
}
@Override
public void onOtherEvent(SparkListenerEvent event) {
onEvent(event);
}
}

View file

@ -207,6 +207,10 @@ private[spark] class EventLoggingListener(
// No-op because logging every update would be overkill
override def onExecutorMetricsUpdate(event: SparkListenerExecutorMetricsUpdate): Unit = { }
override def onOtherEvent(event: SparkListenerEvent): Unit = {
logEvent(event, flushLogger = true)
}
/**
* Stop logging events. The event log file will be renamed so that it loses the
* ".inprogress" suffix.

View file

@ -22,15 +22,19 @@ import java.util.Properties
import scala.collection.Map
import scala.collection.mutable
import org.apache.spark.{Logging, TaskEndReason}
import com.fasterxml.jackson.annotation.JsonTypeInfo
import org.apache.spark.{Logging, SparkConf, TaskEndReason}
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage.{BlockManagerId, BlockUpdatedInfo}
import org.apache.spark.util.{Distribution, Utils}
import org.apache.spark.ui.SparkUI
@DeveloperApi
sealed trait SparkListenerEvent
@JsonTypeInfo(use = JsonTypeInfo.Id.CLASS, include = JsonTypeInfo.As.PROPERTY, property = "Event")
trait SparkListenerEvent
@DeveloperApi
case class SparkListenerStageSubmitted(stageInfo: StageInfo, properties: Properties = null)
@ -130,6 +134,17 @@ case class SparkListenerApplicationEnd(time: Long) extends SparkListenerEvent
*/
private[spark] case class SparkListenerLogStart(sparkVersion: String) extends SparkListenerEvent
/**
* Interface for creating history listeners defined in other modules like SQL, which are used to
* rebuild the history UI.
*/
private[spark] trait SparkHistoryListenerFactory {
/**
* Create listeners used to rebuild the history UI.
*/
def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener]
}
/**
* :: DeveloperApi ::
* Interface for listening to events from the Spark scheduler. Note that this is an internal
@ -223,6 +238,11 @@ trait SparkListener {
* Called when the driver receives a block update info.
*/
def onBlockUpdated(blockUpdated: SparkListenerBlockUpdated) { }
/**
* Called when other events like SQL-specific events are posted.
*/
def onOtherEvent(event: SparkListenerEvent) { }
}
/**

View file

@ -61,6 +61,7 @@ private[spark] trait SparkListenerBus extends ListenerBus[SparkListener, SparkLi
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case logStart: SparkListenerLogStart => // ignore event log metadata
case _ => listener.onOtherEvent(event)
}
}

View file

@ -17,10 +17,13 @@
package org.apache.spark.ui
import java.util.Date
import java.util.{Date, ServiceLoader}
import scala.collection.JavaConverters._
import org.apache.spark.status.api.v1.{ApiRootResource, ApplicationAttemptInfo, ApplicationInfo,
UIRoot}
import org.apache.spark.util.Utils
import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
@ -154,7 +157,16 @@ private[spark] object SparkUI {
appName: String,
basePath: String,
startTime: Long): SparkUI = {
create(None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
val sparkUI = create(
None, conf, listenerBus, securityManager, appName, basePath, startTime = startTime)
val listenerFactories = ServiceLoader.load(classOf[SparkHistoryListenerFactory],
Utils.getContextOrSparkClassLoader).asScala
listenerFactories.foreach { listenerFactory =>
val listeners = listenerFactory.createListeners(conf, sparkUI)
listeners.foreach(listenerBus.addListener)
}
sparkUI
}
/**

View file

@ -19,19 +19,21 @@ package org.apache.spark.util
import java.util.{Properties, UUID}
import org.apache.spark.scheduler.cluster.ExecutorInfo
import scala.collection.JavaConverters._
import scala.collection.Map
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.json4s.DefaultFormats
import org.json4s.JsonDSL._
import org.json4s.JsonAST._
import org.json4s.jackson.JsonMethods._
import org.apache.spark._
import org.apache.spark.executor._
import org.apache.spark.rdd.RDDOperationScope
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.cluster.ExecutorInfo
import org.apache.spark.storage._
/**
@ -54,6 +56,8 @@ private[spark] object JsonProtocol {
private implicit val format = DefaultFormats
private val mapper = new ObjectMapper().registerModule(DefaultScalaModule)
/** ------------------------------------------------- *
* JSON serialization methods for SparkListenerEvents |
* -------------------------------------------------- */
@ -96,6 +100,7 @@ private[spark] object JsonProtocol {
executorMetricsUpdateToJson(metricsUpdate)
case blockUpdated: SparkListenerBlockUpdated =>
throw new MatchError(blockUpdated) // TODO(ekl) implement this
case _ => parse(mapper.writeValueAsString(event))
}
}
@ -506,6 +511,8 @@ private[spark] object JsonProtocol {
case `executorRemoved` => executorRemovedFromJson(json)
case `logStart` => logStartFromJson(json)
case `metricsUpdate` => executorMetricsUpdateFromJson(json)
case other => mapper.readValue(compact(render(json)), Utils.classForName(other))
.asInstanceOf[SparkListenerEvent]
}
}

View file

@ -0,0 +1 @@
org.apache.spark.sql.execution.ui.SQLHistoryListenerFactory

View file

@ -1245,6 +1245,7 @@ class SQLContext private[sql](
sparkContext.addSparkListener(new SparkListener {
override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
SQLContext.clearInstantiatedContext()
SQLContext.clearSqlListener()
}
})
@ -1272,6 +1273,8 @@ object SQLContext {
*/
@transient private val instantiatedContext = new AtomicReference[SQLContext]()
@transient private val sqlListener = new AtomicReference[SQLListener]()
/**
* Get the singleton SQLContext if it exists or create a new one using the given SparkContext.
*
@ -1316,6 +1319,10 @@ object SQLContext {
Option(instantiatedContext.get())
}
private[sql] def clearSqlListener(): Unit = {
sqlListener.set(null)
}
/**
* Changes the SQLContext that will be returned in this thread and its children when
* SQLContext.getOrCreate() is called. This can be used to ensure that a given thread receives
@ -1364,9 +1371,13 @@ object SQLContext {
* Create a SQLListener then add it into SparkContext, and create an SQLTab if there is SparkUI.
*/
private[sql] def createListenerAndUI(sc: SparkContext): SQLListener = {
val listener = new SQLListener(sc.conf)
sc.addSparkListener(listener)
sc.ui.foreach(new SQLTab(listener, _))
listener
if (sqlListener.get() == null) {
val listener = new SQLListener(sc.conf)
if (sqlListener.compareAndSet(null, listener)) {
sc.addSparkListener(listener)
sc.ui.foreach(new SQLTab(listener, _))
}
}
sqlListener.get()
}
}

View file

@ -21,7 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.SparkContext
import org.apache.spark.sql.SQLContext
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.execution.ui.{SparkListenerSQLExecutionStart,
SparkListenerSQLExecutionEnd}
import org.apache.spark.util.Utils
private[sql] object SQLExecution {
@ -45,25 +46,14 @@ private[sql] object SQLExecution {
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
val r = try {
val callSite = Utils.getCallSite()
sqlContext.listener.onExecutionStart(
executionId,
callSite.shortForm,
callSite.longForm,
queryExecution.toString,
SparkPlanGraph(queryExecution.executedPlan),
System.currentTimeMillis())
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionStart(
executionId, callSite.shortForm, callSite.longForm, queryExecution.toString,
SparkPlanInfo.fromSparkPlan(queryExecution.executedPlan), System.currentTimeMillis()))
try {
body
} finally {
// Ideally, we need to make sure onExecutionEnd happens after onJobStart and onJobEnd.
// However, onJobStart and onJobEnd run in the listener thread. Because we cannot add new
// SQL event types to SparkListener since it's a public API, we cannot guarantee that.
//
// SQLListener should handle the case that onExecutionEnd happens before onJobEnd.
//
// The worst case is onExecutionEnd may happen before onJobStart when the listener thread
// is very busy. If so, we cannot track the jobs for the execution. It seems acceptable.
sqlContext.listener.onExecutionEnd(executionId, System.currentTimeMillis())
sqlContext.sparkContext.listenerBus.post(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
}
} finally {
sc.setLocalProperty(EXECUTION_ID_KEY, null)

View file

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.sql.execution.metric.SQLMetricInfo
import org.apache.spark.util.Utils
/**
* :: DeveloperApi ::
* Stores information about a SQL SparkPlan.
*/
@DeveloperApi
class SparkPlanInfo(
val nodeName: String,
val simpleString: String,
val children: Seq[SparkPlanInfo],
val metrics: Seq[SQLMetricInfo])
private[sql] object SparkPlanInfo {
def fromSparkPlan(plan: SparkPlan): SparkPlanInfo = {
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
new SQLMetricInfo(metric.name.getOrElse(key), metric.id,
Utils.getFormattedClassName(metric.param))
}
val children = plan.children.map(fromSparkPlan)
new SparkPlanInfo(plan.nodeName, plan.simpleString, children, metrics)
}
}

View file

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.metric
import org.apache.spark.annotation.DeveloperApi
/**
* :: DeveloperApi ::
* Stores information about a SQL Metric.
*/
@DeveloperApi
class SQLMetricInfo(
val name: String,
val accumulatorId: Long,
val metricParam: String)

View file

@ -104,21 +104,39 @@ private class LongSQLMetricParam(val stringValue: Seq[Long] => String, initialVa
override def zero: LongSQLMetricValue = new LongSQLMetricValue(initialValue)
}
private object LongSQLMetricParam extends LongSQLMetricParam(_.sum.toString, 0L)
private object StaticsLongSQLMetricParam extends LongSQLMetricParam(
(values: Seq[Long]) => {
// This is a workaround for SPARK-11013.
// We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
// it at the end of task and the value will be at least 0.
val validValues = values.filter(_ >= 0)
val Seq(sum, min, med, max) = {
val metric = if (validValues.length == 0) {
Seq.fill(4)(0L)
} else {
val sorted = validValues.sorted
Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
}
metric.map(Utils.bytesToString)
}
s"\n$sum ($min, $med, $max)"
}, -1L)
private[sql] object SQLMetrics {
private def createLongMetric(
sc: SparkContext,
name: String,
stringValue: Seq[Long] => String,
initialValue: Long): LongSQLMetric = {
val param = new LongSQLMetricParam(stringValue, initialValue)
param: LongSQLMetricParam): LongSQLMetric = {
val acc = new LongSQLMetric(name, param)
sc.cleaner.foreach(_.registerAccumulatorForCleanup(acc))
acc
}
def createLongMetric(sc: SparkContext, name: String): LongSQLMetric = {
createLongMetric(sc, name, _.sum.toString, 0L)
createLongMetric(sc, name, LongSQLMetricParam)
}
/**
@ -126,31 +144,25 @@ private[sql] object SQLMetrics {
* spill size, etc.
*/
def createSizeMetric(sc: SparkContext, name: String): LongSQLMetric = {
val stringValue = (values: Seq[Long]) => {
// This is a workaround for SPARK-11013.
// We use -1 as initial value of the accumulator, if the accumulator is valid, we will update
// it at the end of task and the value will be at least 0.
val validValues = values.filter(_ >= 0)
val Seq(sum, min, med, max) = {
val metric = if (validValues.length == 0) {
Seq.fill(4)(0L)
} else {
val sorted = validValues.sorted
Seq(sorted.sum, sorted(0), sorted(validValues.length / 2), sorted(validValues.length - 1))
}
metric.map(Utils.bytesToString)
}
s"\n$sum ($min, $med, $max)"
}
// The final result of this metric in physical operator UI may looks like:
// data size total (min, med, max):
// 100GB (100MB, 1GB, 10GB)
createLongMetric(sc, s"$name total (min, med, max)", stringValue, -1L)
createLongMetric(sc, s"$name total (min, med, max)", StaticsLongSQLMetricParam)
}
def getMetricParam(metricParamName: String): SQLMetricParam[SQLMetricValue[Any], Any] = {
val longSQLMetricParam = Utils.getFormattedClassName(LongSQLMetricParam)
val staticsSQLMetricParam = Utils.getFormattedClassName(StaticsLongSQLMetricParam)
val metricParam = metricParamName match {
case `longSQLMetricParam` => LongSQLMetricParam
case `staticsSQLMetricParam` => StaticsLongSQLMetricParam
}
metricParam.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]]
}
/**
* A metric that its value will be ignored. Use this one when we need a metric parameter but don't
* care about the value.
*/
val nullLongMetric = new LongSQLMetric("null", new LongSQLMetricParam(_.sum.toString, 0L))
val nullLongMetric = new LongSQLMetric("null", LongSQLMetricParam)
}

View file

@ -19,9 +19,7 @@ package org.apache.spark.sql.execution.ui
import javax.servlet.http.HttpServletRequest
import scala.xml.{Node, Unparsed}
import org.apache.commons.lang3.StringEscapeUtils
import scala.xml.Node
import org.apache.spark.Logging
import org.apache.spark.ui.{UIUtils, WebUIPage}

View file

@ -19,11 +19,34 @@ package org.apache.spark.sql.execution.ui
import scala.collection.mutable
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.metric.{LongSQLMetricValue, SQLMetricValue, SQLMetricParam}
import org.apache.spark.{JobExecutionStatus, Logging, SparkConf}
import org.apache.spark.ui.SparkUI
@DeveloperApi
case class SparkListenerSQLExecutionStart(
executionId: Long,
description: String,
details: String,
physicalPlanDescription: String,
sparkPlanInfo: SparkPlanInfo,
time: Long)
extends SparkListenerEvent
@DeveloperApi
case class SparkListenerSQLExecutionEnd(executionId: Long, time: Long)
extends SparkListenerEvent
private[sql] class SQLHistoryListenerFactory extends SparkHistoryListenerFactory {
override def createListeners(conf: SparkConf, sparkUI: SparkUI): Seq[SparkListener] = {
List(new SQLHistoryListener(conf, sparkUI))
}
}
private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Logging {
@ -118,7 +141,8 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
for ((taskId, stageId, stageAttemptID, metrics) <- executorMetricsUpdate.taskMetrics) {
updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics, finishTask = false)
updateTaskAccumulatorValues(taskId, stageId, stageAttemptID, metrics.accumulatorUpdates(),
finishTask = false)
}
}
@ -140,7 +164,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
taskEnd.taskMetrics,
taskEnd.taskMetrics.accumulatorUpdates(),
finishTask = true)
}
@ -148,15 +172,12 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
* Update the accumulator values of a task with the latest metrics for this task. This is called
* every time we receive an executor heartbeat or when a task finishes.
*/
private def updateTaskAccumulatorValues(
protected def updateTaskAccumulatorValues(
taskId: Long,
stageId: Int,
stageAttemptID: Int,
metrics: TaskMetrics,
accumulatorUpdates: Map[Long, Any],
finishTask: Boolean): Unit = {
if (metrics == null) {
return
}
_stageIdToStageMetrics.get(stageId) match {
case Some(stageMetrics) =>
@ -174,9 +195,9 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
case Some(taskMetrics) =>
if (finishTask) {
taskMetrics.finished = true
taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
taskMetrics.accumulatorUpdates = accumulatorUpdates
} else if (!taskMetrics.finished) {
taskMetrics.accumulatorUpdates = metrics.accumulatorUpdates()
taskMetrics.accumulatorUpdates = accumulatorUpdates
} else {
// If a task is finished, we should not override with accumulator updates from
// heartbeat reports
@ -185,7 +206,7 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
// TODO Now just set attemptId to 0. Should fix here when we can get the attempt
// id from SparkListenerExecutorMetricsUpdate
stageMetrics.taskIdToMetricUpdates(taskId) = new SQLTaskMetrics(
attemptId = 0, finished = finishTask, metrics.accumulatorUpdates())
attemptId = 0, finished = finishTask, accumulatorUpdates)
}
}
case None =>
@ -193,38 +214,40 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
}
}
def onExecutionStart(
executionId: Long,
description: String,
details: String,
physicalPlanDescription: String,
physicalPlanGraph: SparkPlanGraph,
time: Long): Unit = {
val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
node.metrics.map(metric => metric.accumulatorId -> metric)
}
val executionUIData = new SQLExecutionUIData(executionId, description, details,
physicalPlanDescription, physicalPlanGraph, sqlPlanMetrics.toMap, time)
synchronized {
activeExecutions(executionId) = executionUIData
_executionIdToData(executionId) = executionUIData
}
}
def onExecutionEnd(executionId: Long, time: Long): Unit = synchronized {
_executionIdToData.get(executionId).foreach { executionUIData =>
executionUIData.completionTime = Some(time)
if (!executionUIData.hasRunningJobs) {
// onExecutionEnd happens after all "onJobEnd"s
// So we should update the execution lists.
markExecutionFinished(executionId)
} else {
// There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
// Then we don't if the execution is successful, so let the last onJobEnd updates the
// execution lists.
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerSQLExecutionStart(executionId, description, details,
physicalPlanDescription, sparkPlanInfo, time) =>
val physicalPlanGraph = SparkPlanGraph(sparkPlanInfo)
val sqlPlanMetrics = physicalPlanGraph.nodes.flatMap { node =>
node.metrics.map(metric => metric.accumulatorId -> metric)
}
val executionUIData = new SQLExecutionUIData(
executionId,
description,
details,
physicalPlanDescription,
physicalPlanGraph,
sqlPlanMetrics.toMap,
time)
synchronized {
activeExecutions(executionId) = executionUIData
_executionIdToData(executionId) = executionUIData
}
case SparkListenerSQLExecutionEnd(executionId, time) => synchronized {
_executionIdToData.get(executionId).foreach { executionUIData =>
executionUIData.completionTime = Some(time)
if (!executionUIData.hasRunningJobs) {
// onExecutionEnd happens after all "onJobEnd"s
// So we should update the execution lists.
markExecutionFinished(executionId)
} else {
// There are some running jobs, onExecutionEnd happens before some "onJobEnd"s.
// Then we don't if the execution is successful, so let the last onJobEnd updates the
// execution lists.
}
}
}
case _ => // Ignore
}
private def markExecutionFinished(executionId: Long): Unit = {
@ -289,6 +312,38 @@ private[sql] class SQLListener(conf: SparkConf) extends SparkListener with Loggi
}
private[spark] class SQLHistoryListener(conf: SparkConf, sparkUI: SparkUI)
extends SQLListener(conf) {
private var sqlTabAttached = false
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = synchronized {
// Do nothing
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
updateTaskAccumulatorValues(
taskEnd.taskInfo.taskId,
taskEnd.stageId,
taskEnd.stageAttemptId,
taskEnd.taskInfo.accumulables.map { acc =>
(acc.id, new LongSQLMetricValue(acc.update.getOrElse("0").toLong))
}.toMap,
finishTask = true)
}
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case _: SparkListenerSQLExecutionStart =>
if (!sqlTabAttached) {
new SQLTab(this, sparkUI)
sqlTabAttached = true
}
super.onOtherEvent(event)
case _ => super.onOtherEvent(event)
}
}
/**
* Represent all necessary data for an execution that will be used in Web UI.
*/

View file

@ -17,13 +17,11 @@
package org.apache.spark.sql.execution.ui
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.Logging
import org.apache.spark.ui.{SparkUI, SparkUITab}
private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
extends SparkUITab(sparkUI, SQLTab.nextTabName) with Logging {
extends SparkUITab(sparkUI, "SQL") with Logging {
val parent = sparkUI
@ -35,13 +33,5 @@ private[sql] class SQLTab(val listener: SQLListener, sparkUI: SparkUI)
}
private[sql] object SQLTab {
private val STATIC_RESOURCE_DIR = "org/apache/spark/sql/execution/ui/static"
private val nextTabId = new AtomicInteger(0)
private def nextTabName: String = {
val nextId = nextTabId.getAndIncrement()
if (nextId == 0) "SQL" else s"SQL$nextId"
}
}

View file

@ -21,8 +21,8 @@ import java.util.concurrent.atomic.AtomicLong
import scala.collection.mutable
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.metric.{SQLMetricParam, SQLMetricValue}
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.metric.SQLMetrics
/**
* A graph used for storing information of an executionPlan of DataFrame.
@ -48,27 +48,27 @@ private[sql] object SparkPlanGraph {
/**
* Build a SparkPlanGraph from the root of a SparkPlan tree.
*/
def apply(plan: SparkPlan): SparkPlanGraph = {
def apply(planInfo: SparkPlanInfo): SparkPlanGraph = {
val nodeIdGenerator = new AtomicLong(0)
val nodes = mutable.ArrayBuffer[SparkPlanGraphNode]()
val edges = mutable.ArrayBuffer[SparkPlanGraphEdge]()
buildSparkPlanGraphNode(plan, nodeIdGenerator, nodes, edges)
buildSparkPlanGraphNode(planInfo, nodeIdGenerator, nodes, edges)
new SparkPlanGraph(nodes, edges)
}
private def buildSparkPlanGraphNode(
plan: SparkPlan,
planInfo: SparkPlanInfo,
nodeIdGenerator: AtomicLong,
nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
edges: mutable.ArrayBuffer[SparkPlanGraphEdge]): SparkPlanGraphNode = {
val metrics = plan.metrics.toSeq.map { case (key, metric) =>
SQLPlanMetric(metric.name.getOrElse(key), metric.id,
metric.param.asInstanceOf[SQLMetricParam[SQLMetricValue[Any], Any]])
val metrics = planInfo.metrics.map { metric =>
SQLPlanMetric(metric.name, metric.accumulatorId,
SQLMetrics.getMetricParam(metric.metricParam))
}
val node = SparkPlanGraphNode(
nodeIdGenerator.getAndIncrement(), plan.nodeName, plan.simpleString, metrics)
nodeIdGenerator.getAndIncrement(), planInfo.nodeName, planInfo.simpleString, metrics)
nodes += node
val childrenNodes = plan.children.map(
val childrenNodes = planInfo.children.map(
child => buildSparkPlanGraphNode(child, nodeIdGenerator, nodes, edges))
for (child <- childrenNodes) {
edges += SparkPlanGraphEdge(child.id, node.id)

View file

@ -26,6 +26,7 @@ import org.apache.xbean.asm5.Opcodes._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql._
import org.apache.spark.sql.execution.SparkPlanInfo
import org.apache.spark.sql.execution.ui.SparkPlanGraph
import org.apache.spark.sql.functions._
import org.apache.spark.sql.test.SharedSQLContext
@ -82,7 +83,8 @@ class SQLMetricsSuite extends SparkFunSuite with SharedSQLContext {
if (jobs.size == expectedNumOfJobs) {
// If we can track all jobs, check the metric values
val metricValues = sqlContext.listener.getExecutionMetrics(executionId)
val actualMetrics = SparkPlanGraph(df.queryExecution.executedPlan).nodes.filter { node =>
val actualMetrics = SparkPlanGraph(SparkPlanInfo.fromSparkPlan(
df.queryExecution.executedPlan)).nodes.filter { node =>
expectedMetrics.contains(node.id)
}.map { node =>
val nodeMetrics = node.metrics.map { metric =>

View file

@ -21,10 +21,10 @@ import java.util.Properties
import org.apache.spark.{SparkException, SparkContext, SparkConf, SparkFunSuite}
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.scheduler._
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.{SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric.LongSQLMetricValue
import org.apache.spark.sql.test.SharedSQLContext
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
@ -82,7 +82,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val executionId = 0
val df = createTestDataFrame
val accumulatorIds =
SparkPlanGraph(df.queryExecution.executedPlan).nodes.flatMap(_.metrics.map(_.accumulatorId))
SparkPlanGraph(SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan))
.nodes.flatMap(_.metrics.map(_.accumulatorId))
// Assume all accumulators are long
var accumulatorValue = 0L
val accumulatorUpdates = accumulatorIds.map { id =>
@ -90,13 +91,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
(id, accumulatorValue)
}.toMap
listener.onExecutionStart(
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanGraph(df.queryExecution.executedPlan),
System.currentTimeMillis())
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
val executionUIData = listener.executionIdToData(0)
@ -206,7 +207,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
time = System.currentTimeMillis(),
JobSucceeded
))
listener.onExecutionEnd(executionId, System.currentTimeMillis())
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
assert(executionUIData.runningJobs.isEmpty)
assert(executionUIData.succeededJobs === Seq(0))
@ -219,19 +221,20 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
listener.onExecutionStart(
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanGraph(df.queryExecution.executedPlan),
System.currentTimeMillis())
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
listener.onExecutionEnd(executionId, System.currentTimeMillis())
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
@ -248,13 +251,13 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
listener.onExecutionStart(
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanGraph(df.queryExecution.executedPlan),
System.currentTimeMillis())
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
@ -271,7 +274,8 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
time = System.currentTimeMillis(),
stageInfos = Nil,
createProperties(executionId)))
listener.onExecutionEnd(executionId, System.currentTimeMillis())
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
listener.onJobEnd(SparkListenerJobEnd(
jobId = 1,
time = System.currentTimeMillis(),
@ -288,19 +292,20 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext {
val listener = new SQLListener(sqlContext.sparkContext.conf)
val executionId = 0
val df = createTestDataFrame
listener.onExecutionStart(
listener.onOtherEvent(SparkListenerSQLExecutionStart(
executionId,
"test",
"test",
df.queryExecution.toString,
SparkPlanGraph(df.queryExecution.executedPlan),
System.currentTimeMillis())
SparkPlanInfo.fromSparkPlan(df.queryExecution.executedPlan),
System.currentTimeMillis()))
listener.onJobStart(SparkListenerJobStart(
jobId = 0,
time = System.currentTimeMillis(),
stageInfos = Seq.empty,
createProperties(executionId)))
listener.onExecutionEnd(executionId, System.currentTimeMillis())
listener.onOtherEvent(SparkListenerSQLExecutionEnd(
executionId, System.currentTimeMillis()))
listener.onJobEnd(SparkListenerJobEnd(
jobId = 0,
time = System.currentTimeMillis(),
@ -338,6 +343,7 @@ class SQLListenerMemoryLeakSuite extends SparkFunSuite {
.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
val sc = new SparkContext(conf)
try {
SQLContext.clearSqlListener()
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
// Run 100 successful executions and 100 failed executions.

View file

@ -42,6 +42,7 @@ trait SharedSQLContext extends SQLTestUtils {
* Initialize the [[TestSQLContext]].
*/
protected override def beforeAll(): Unit = {
SQLContext.clearSqlListener()
if (_ctx == null) {
_ctx = new TestSQLContext
}