[SPARK-20650][CORE] Remove JobProgressListener.

The only remaining use of this class was the SparkStatusTracker, which
was modified to use the new status store. The test code to wait for
executors was moved to TestUtils and now uses the SparkStatusTracker API.

Indirectly, ConsoleProgressBar also uses this data. Because it has
some lower latency requirements, a shortcut to efficiently get the
active stages from the active listener was added to the AppStateStore.

Now that all UI code goes through the status store to get its data,
the FsHistoryProvider can be cleaned up to only replay event logs
when needed - that is, when there is no pre-existing disk store for
the application.

As part of this change I also modified the streaming UI to read the needed
data from the store, which was missed in the previous patch that made
JobProgressListener redundant.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19750 from vanzin/SPARK-20650.
This commit is contained in:
Marcelo Vanzin 2017-11-29 14:34:41 -08:00
parent 193555f79c
commit 8ff474f6e5
21 changed files with 210 additions and 1530 deletions

View file

@ -58,7 +58,6 @@ import org.apache.spark.status.{AppStatusPlugin, AppStatusStore}
import org.apache.spark.storage._ import org.apache.spark.storage._
import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump import org.apache.spark.storage.BlockManagerMessages.TriggerThreadDump
import org.apache.spark.ui.{ConsoleProgressBar, SparkUI} import org.apache.spark.ui.{ConsoleProgressBar, SparkUI}
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util._ import org.apache.spark.util._
/** /**
@ -195,7 +194,6 @@ class SparkContext(config: SparkConf) extends Logging {
private var _eventLogCodec: Option[String] = None private var _eventLogCodec: Option[String] = None
private var _listenerBus: LiveListenerBus = _ private var _listenerBus: LiveListenerBus = _
private var _env: SparkEnv = _ private var _env: SparkEnv = _
private var _jobProgressListener: JobProgressListener = _
private var _statusTracker: SparkStatusTracker = _ private var _statusTracker: SparkStatusTracker = _
private var _progressBar: Option[ConsoleProgressBar] = None private var _progressBar: Option[ConsoleProgressBar] = None
private var _ui: Option[SparkUI] = None private var _ui: Option[SparkUI] = None
@ -270,8 +268,6 @@ class SparkContext(config: SparkConf) extends Logging {
val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]() val map: ConcurrentMap[Int, RDD[_]] = new MapMaker().weakValues().makeMap[Int, RDD[_]]()
map.asScala map.asScala
} }
private[spark] def jobProgressListener: JobProgressListener = _jobProgressListener
def statusTracker: SparkStatusTracker = _statusTracker def statusTracker: SparkStatusTracker = _statusTracker
private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar private[spark] def progressBar: Option[ConsoleProgressBar] = _progressBar
@ -421,11 +417,6 @@ class SparkContext(config: SparkConf) extends Logging {
_listenerBus = new LiveListenerBus(_conf) _listenerBus = new LiveListenerBus(_conf)
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addToStatusQueue(jobProgressListener)
// Initialize the app status store and listener before SparkEnv is created so that it gets // Initialize the app status store and listener before SparkEnv is created so that it gets
// all events. // all events.
_statusStore = AppStatusStore.createLiveStore(conf, l => listenerBus.addToStatusQueue(l)) _statusStore = AppStatusStore.createLiveStore(conf, l => listenerBus.addToStatusQueue(l))
@ -440,7 +431,7 @@ class SparkContext(config: SparkConf) extends Logging {
_conf.set("spark.repl.class.uri", replUri) _conf.set("spark.repl.class.uri", replUri)
} }
_statusTracker = new SparkStatusTracker(this) _statusTracker = new SparkStatusTracker(this, _statusStore)
_progressBar = _progressBar =
if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) { if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {

View file

@ -17,7 +17,10 @@
package org.apache.spark package org.apache.spark
import org.apache.spark.scheduler.TaskSchedulerImpl import java.util.Arrays
import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1.StageStatus
/** /**
* Low-level status reporting APIs for monitoring job and stage progress. * Low-level status reporting APIs for monitoring job and stage progress.
@ -33,9 +36,7 @@ import org.apache.spark.scheduler.TaskSchedulerImpl
* *
* NOTE: this class's constructor should be considered private and may be subject to change. * NOTE: this class's constructor should be considered private and may be subject to change.
*/ */
class SparkStatusTracker private[spark] (sc: SparkContext) { class SparkStatusTracker private[spark] (sc: SparkContext, store: AppStatusStore) {
private val jobProgressListener = sc.jobProgressListener
/** /**
* Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then * Return a list of all known jobs in a particular job group. If `jobGroup` is `null`, then
@ -46,9 +47,8 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* its result. * its result.
*/ */
def getJobIdsForGroup(jobGroup: String): Array[Int] = { def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized { val expected = Option(jobGroup)
jobProgressListener.jobGroupToJobIds.getOrElse(jobGroup, Seq.empty).toArray store.jobsList(null).filter(_.jobGroup == expected).map(_.jobId).toArray
}
} }
/** /**
@ -57,9 +57,7 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* This method does not guarantee the order of the elements in its result. * This method does not guarantee the order of the elements in its result.
*/ */
def getActiveStageIds(): Array[Int] = { def getActiveStageIds(): Array[Int] = {
jobProgressListener.synchronized { store.stageList(Arrays.asList(StageStatus.ACTIVE)).map(_.stageId).toArray
jobProgressListener.activeStages.values.map(_.stageId).toArray
}
} }
/** /**
@ -68,19 +66,15 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* This method does not guarantee the order of the elements in its result. * This method does not guarantee the order of the elements in its result.
*/ */
def getActiveJobIds(): Array[Int] = { def getActiveJobIds(): Array[Int] = {
jobProgressListener.synchronized { store.jobsList(Arrays.asList(JobExecutionStatus.RUNNING)).map(_.jobId).toArray
jobProgressListener.activeJobs.values.map(_.jobId).toArray
}
} }
/** /**
* Returns job information, or `None` if the job info could not be found or was garbage collected. * Returns job information, or `None` if the job info could not be found or was garbage collected.
*/ */
def getJobInfo(jobId: Int): Option[SparkJobInfo] = { def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
jobProgressListener.synchronized { store.asOption(store.job(jobId)).map { job =>
jobProgressListener.jobIdToData.get(jobId).map { data => new SparkJobInfoImpl(jobId, job.stageIds.toArray, job.status)
new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
}
} }
} }
@ -89,21 +83,16 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* garbage collected. * garbage collected.
*/ */
def getStageInfo(stageId: Int): Option[SparkStageInfo] = { def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
jobProgressListener.synchronized { store.asOption(store.lastStageAttempt(stageId)).map { stage =>
for ( new SparkStageInfoImpl(
info <- jobProgressListener.stageIdToInfo.get(stageId); stageId,
data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId)) stage.attemptId,
) yield { stage.submissionTime.map(_.getTime()).getOrElse(0L),
new SparkStageInfoImpl( stage.name,
stageId, stage.numTasks,
info.attemptId, stage.numActiveTasks,
info.submissionTime.getOrElse(0), stage.numCompleteTasks,
info.name, stage.numFailedTasks)
info.numTasks,
data.numActiveTasks,
data.numCompleteTasks,
data.numFailedTasks)
}
} }
} }
@ -111,17 +100,20 @@ class SparkStatusTracker private[spark] (sc: SparkContext) {
* Returns information of all known executors, including host, port, cacheSize, numRunningTasks. * Returns information of all known executors, including host, port, cacheSize, numRunningTasks.
*/ */
def getExecutorInfos: Array[SparkExecutorInfo] = { def getExecutorInfos: Array[SparkExecutorInfo] = {
val executorIdToRunningTasks: Map[String, Int] = store.executorList(true).map { exec =>
sc.taskScheduler.asInstanceOf[TaskSchedulerImpl].runningTasksByExecutors val (host, port) = exec.hostPort.split(":", 2) match {
case Array(h, p) => (h, p.toInt)
case Array(h) => (h, -1)
}
val cachedMem = exec.memoryMetrics.map { mem =>
mem.usedOnHeapStorageMemory + mem.usedOffHeapStorageMemory
}.getOrElse(0L)
sc.getExecutorStorageStatus.map { status =>
val bmId = status.blockManagerId
new SparkExecutorInfoImpl( new SparkExecutorInfoImpl(
bmId.host, host,
bmId.port, port,
status.cacheSize, cachedMem,
executorIdToRunningTasks.getOrElse(bmId.executorId, 0) exec.activeTasks)
) }.toArray
}
} }
} }

View file

@ -23,7 +23,7 @@ import java.nio.charset.StandardCharsets
import java.security.SecureRandom import java.security.SecureRandom
import java.security.cert.X509Certificate import java.security.cert.X509Certificate
import java.util.Arrays import java.util.Arrays
import java.util.concurrent.{CountDownLatch, TimeUnit} import java.util.concurrent.{CountDownLatch, TimeoutException, TimeUnit}
import java.util.jar.{JarEntry, JarOutputStream} import java.util.jar.{JarEntry, JarOutputStream}
import javax.net.ssl._ import javax.net.ssl._
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
@ -232,6 +232,30 @@ private[spark] object TestUtils {
} }
} }
/**
* Wait until at least `numExecutors` executors are up, or throw `TimeoutException` if the waiting
* time elapsed before `numExecutors` executors up. Exposed for testing.
*
* @param numExecutors the number of executors to wait at least
* @param timeout time to wait in milliseconds
*/
private[spark] def waitUntilExecutorsUp(
sc: SparkContext,
numExecutors: Int,
timeout: Long): Unit = {
val finishTime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(timeout)
while (System.nanoTime() < finishTime) {
if (sc.statusTracker.getExecutorInfos.length > numExecutors) {
return
}
// Sleep rather than using wait/notify, because this is used only for testing and wait/notify
// add overhead in the general case.
Thread.sleep(10)
}
throw new TimeoutException(
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}
} }

View file

@ -299,8 +299,6 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
attempt.adminAclsGroups.getOrElse("")) attempt.adminAclsGroups.getOrElse(""))
secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse("")) secManager.setViewAclsGroups(attempt.viewAclsGroups.getOrElse(""))
val replayBus = new ReplayListenerBus()
val uiStorePath = storePath.map { path => getStorePath(path, appId, attemptId) } val uiStorePath = storePath.map { path => getStorePath(path, appId, attemptId) }
val (kvstore, needReplay) = uiStorePath match { val (kvstore, needReplay) = uiStorePath match {
@ -320,49 +318,44 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
(new InMemoryStore(), true) (new InMemoryStore(), true)
} }
val listener = if (needReplay) { if (needReplay) {
val _listener = new AppStatusListener(kvstore, conf, false, val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(kvstore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime())) lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(_listener) replayBus.addListener(listener)
AppStatusPlugin.loadPlugins().foreach { plugin => AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false) plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false)
} }
Some(_listener) try {
} else { val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
None replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
} listener.flush()
} catch {
val loadedUI = { case e: Exception =>
val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, secManager, app.info.name, try {
HistoryServer.getAttemptURI(appId, attempt.info.attemptId), kvstore.close()
attempt.info.startTime.getTime(), } catch {
attempt.info.appSparkVersion) case _e: Exception => logInfo("Error closing store.", _e)
LoadedAppUI(ui) }
} uiStorePath.foreach(Utils.deleteRecursively)
if (e.isInstanceOf[FileNotFoundException]) {
try { return None
AppStatusPlugin.loadPlugins().foreach { plugin => } else {
plugin.setupUI(loadedUI.ui) throw e
}
} }
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
listener.foreach(_.flush())
} catch {
case e: Exception =>
try {
kvstore.close()
} catch {
case _e: Exception => logInfo("Error closing store.", _e)
}
uiStorePath.foreach(Utils.deleteRecursively)
if (e.isInstanceOf[FileNotFoundException]) {
return None
} else {
throw e
}
} }
val ui = SparkUI.create(None, new AppStatusStore(kvstore), conf, secManager, app.info.name,
HistoryServer.getAttemptURI(appId, attempt.info.attemptId),
attempt.info.startTime.getTime(),
attempt.info.appSparkVersion)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupUI(ui)
}
val loadedUI = LoadedAppUI(ui)
synchronized { synchronized {
activeUIs((appId, attemptId)) = loadedUI activeUIs((appId, attemptId)) = loadedUI
} }

View file

@ -18,7 +18,10 @@
package org.apache.spark.status package org.apache.spark.status
import java.util.Date import java.util.Date
import java.util.concurrent.ConcurrentHashMap
import java.util.function.Function
import scala.collection.JavaConverters._
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
import org.apache.spark._ import org.apache.spark._
@ -59,7 +62,7 @@ private[spark] class AppStatusListener(
// Keep track of live entities, so that task metrics can be efficiently updated (without // Keep track of live entities, so that task metrics can be efficiently updated (without
// causing too many writes to the underlying store, and other expensive operations). // causing too many writes to the underlying store, and other expensive operations).
private val liveStages = new HashMap[(Int, Int), LiveStage]() private val liveStages = new ConcurrentHashMap[(Int, Int), LiveStage]()
private val liveJobs = new HashMap[Int, LiveJob]() private val liveJobs = new HashMap[Int, LiveJob]()
private val liveExecutors = new HashMap[String, LiveExecutor]() private val liveExecutors = new HashMap[String, LiveExecutor]()
private val liveTasks = new HashMap[Long, LiveTask]() private val liveTasks = new HashMap[Long, LiveTask]()
@ -268,13 +271,15 @@ private[spark] class AppStatusListener(
val now = System.nanoTime() val now = System.nanoTime()
// Check if there are any pending stages that match this job; mark those as skipped. // Check if there are any pending stages that match this job; mark those as skipped.
job.stageIds.foreach { sid => val it = liveStages.entrySet.iterator()
val pending = liveStages.filter { case ((id, _), _) => id == sid } while (it.hasNext()) {
pending.foreach { case (key, stage) => val e = it.next()
if (job.stageIds.contains(e.getKey()._1)) {
val stage = e.getValue()
stage.status = v1.StageStatus.SKIPPED stage.status = v1.StageStatus.SKIPPED
job.skippedStages += stage.info.stageId job.skippedStages += stage.info.stageId
job.skippedTasks += stage.info.numTasks job.skippedTasks += stage.info.numTasks
liveStages.remove(key) it.remove()
update(stage, now) update(stage, now)
} }
} }
@ -336,7 +341,7 @@ private[spark] class AppStatusListener(
liveTasks.put(event.taskInfo.taskId, task) liveTasks.put(event.taskInfo.taskId, task)
liveUpdate(task, now) liveUpdate(task, now)
liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage => Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
stage.activeTasks += 1 stage.activeTasks += 1
stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime) stage.firstLaunchTime = math.min(stage.firstLaunchTime, event.taskInfo.launchTime)
maybeUpdate(stage, now) maybeUpdate(stage, now)
@ -403,7 +408,7 @@ private[spark] class AppStatusListener(
(0, 1, 0) (0, 1, 0)
} }
liveStages.get((event.stageId, event.stageAttemptId)).foreach { stage => Option(liveStages.get((event.stageId, event.stageAttemptId))).foreach { stage =>
if (metricsDelta != null) { if (metricsDelta != null) {
stage.metrics.update(metricsDelta) stage.metrics.update(metricsDelta)
} }
@ -466,12 +471,19 @@ private[spark] class AppStatusListener(
} }
} }
maybeUpdate(exec, now) // Force an update on live applications when the number of active tasks reaches 0. This is
// checked in some tests (e.g. SQLTestUtilsBase) so it needs to be reliably up to date.
if (exec.activeTasks == 0) {
liveUpdate(exec, now)
} else {
maybeUpdate(exec, now)
}
} }
} }
override def onStageCompleted(event: SparkListenerStageCompleted): Unit = { override def onStageCompleted(event: SparkListenerStageCompleted): Unit = {
liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptId)).foreach { stage => val maybeStage = Option(liveStages.remove((event.stageInfo.stageId, event.stageInfo.attemptId)))
maybeStage.foreach { stage =>
val now = System.nanoTime() val now = System.nanoTime()
stage.info = event.stageInfo stage.info = event.stageInfo
@ -540,7 +552,7 @@ private[spark] class AppStatusListener(
val delta = task.updateMetrics(metrics) val delta = task.updateMetrics(metrics)
maybeUpdate(task, now) maybeUpdate(task, now)
liveStages.get((sid, sAttempt)).foreach { stage => Option(liveStages.get((sid, sAttempt))).foreach { stage =>
stage.metrics.update(delta) stage.metrics.update(delta)
maybeUpdate(stage, now) maybeUpdate(stage, now)
@ -563,7 +575,7 @@ private[spark] class AppStatusListener(
/** Flush all live entities' data to the underlying store. */ /** Flush all live entities' data to the underlying store. */
def flush(): Unit = { def flush(): Unit = {
val now = System.nanoTime() val now = System.nanoTime()
liveStages.values.foreach { stage => liveStages.values.asScala.foreach { stage =>
update(stage, now) update(stage, now)
stage.executorSummaries.values.foreach(update(_, now)) stage.executorSummaries.values.foreach(update(_, now))
} }
@ -574,6 +586,18 @@ private[spark] class AppStatusListener(
pools.values.foreach(update(_, now)) pools.values.foreach(update(_, now))
} }
/**
* Shortcut to get active stages quickly in a live application, for use by the console
* progress bar.
*/
def activeStages(): Seq[v1.StageData] = {
liveStages.values.asScala
.filter(_.info.submissionTime.isDefined)
.map(_.toApi())
.toList
.sortBy(_.stageId)
}
private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = { private def updateRDDBlock(event: SparkListenerBlockUpdated, block: RDDBlockId): Unit = {
val now = System.nanoTime() val now = System.nanoTime()
val executorId = event.blockUpdatedInfo.blockManagerId.executorId val executorId = event.blockUpdatedInfo.blockManagerId.executorId
@ -708,7 +732,10 @@ private[spark] class AppStatusListener(
} }
private def getOrCreateStage(info: StageInfo): LiveStage = { private def getOrCreateStage(info: StageInfo): LiveStage = {
val stage = liveStages.getOrElseUpdate((info.stageId, info.attemptId), new LiveStage()) val stage = liveStages.computeIfAbsent((info.stageId, info.attemptId),
new Function[(Int, Int), LiveStage]() {
override def apply(key: (Int, Int)): LiveStage = new LiveStage()
})
stage.info = info stage.info = info
stage stage
} }

View file

@ -32,7 +32,9 @@ import org.apache.spark.util.kvstore.{InMemoryStore, KVStore}
/** /**
* A wrapper around a KVStore that provides methods for accessing the API data stored within. * A wrapper around a KVStore that provides methods for accessing the API data stored within.
*/ */
private[spark] class AppStatusStore(val store: KVStore) { private[spark] class AppStatusStore(
val store: KVStore,
listener: Option[AppStatusListener] = None) {
def applicationInfo(): v1.ApplicationInfo = { def applicationInfo(): v1.ApplicationInfo = {
store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info store.view(classOf[ApplicationInfoWrapper]).max(1).iterator().next().info
@ -70,6 +72,14 @@ private[spark] class AppStatusStore(val store: KVStore) {
store.read(classOf[ExecutorSummaryWrapper], executorId).info store.read(classOf[ExecutorSummaryWrapper], executorId).info
} }
/**
* This is used by ConsoleProgressBar to quickly fetch active stages for drawing the progress
* bar. It will only return anything useful when called from a live application.
*/
def activeStages(): Seq[v1.StageData] = {
listener.map(_.activeStages()).getOrElse(Nil)
}
def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = { def stageList(statuses: JList[v1.StageStatus]): Seq[v1.StageData] = {
val it = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info) val it = store.view(classOf[StageDataWrapper]).reverse().asScala.map(_.info)
if (statuses != null && !statuses.isEmpty()) { if (statuses != null && !statuses.isEmpty()) {
@ -338,11 +348,12 @@ private[spark] object AppStatusStore {
*/ */
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = { def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
val store = new InMemoryStore() val store = new InMemoryStore()
addListenerFn(new AppStatusListener(store, conf, true)) val listener = new AppStatusListener(store, conf, true)
addListenerFn(listener)
AppStatusPlugin.loadPlugins().foreach { p => AppStatusPlugin.loadPlugins().foreach { p =>
p.setupListeners(conf, store, addListenerFn, true) p.setupListeners(conf, store, addListenerFn, true)
} }
new AppStatusStore(store) new AppStatusStore(store, listener = Some(listener))
} }
} }

View file

@ -408,8 +408,8 @@ private class LiveStage extends LiveEntity {
new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId)) new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId))
} }
override protected def doUpdate(): Any = { def toApi(): v1.StageData = {
val update = new v1.StageData( new v1.StageData(
status, status,
info.stageId, info.stageId,
info.attemptId, info.attemptId,
@ -449,8 +449,10 @@ private class LiveStage extends LiveEntity {
None, None,
None, None,
killedSummary) killedSummary)
}
new StageDataWrapper(update, jobIds) override protected def doUpdate(): Any = {
new StageDataWrapper(toApi(), jobIds)
} }
} }

View file

@ -25,7 +25,6 @@ import org.apache.spark.scheduler.StageInfo
import org.apache.spark.status.api.v1.StageStatus._ import org.apache.spark.status.api.v1.StageStatus._
import org.apache.spark.status.api.v1.TaskSorting._ import org.apache.spark.status.api.v1.TaskSorting._
import org.apache.spark.ui.SparkUI import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData.StageUIData
@Produces(Array(MediaType.APPLICATION_JSON)) @Produces(Array(MediaType.APPLICATION_JSON))
private[v1] class StagesResource extends BaseAppResource { private[v1] class StagesResource extends BaseAppResource {

View file

@ -21,10 +21,11 @@ import java.util.{Timer, TimerTask}
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.internal.Logging import org.apache.spark.internal.Logging
import org.apache.spark.status.api.v1.StageData
/** /**
* ConsoleProgressBar shows the progress of stages in the next line of the console. It poll the * ConsoleProgressBar shows the progress of stages in the next line of the console. It poll the
* status of active stages from `sc.statusTracker` periodically, the progress bar will be showed * status of active stages from the app state store periodically, the progress bar will be showed
* up after the stage has ran at least 500ms. If multiple stages run in the same time, the status * up after the stage has ran at least 500ms. If multiple stages run in the same time, the status
* of them will be combined together, showed in one line. * of them will be combined together, showed in one line.
*/ */
@ -64,9 +65,8 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
if (now - lastFinishTime < firstDelayMSec) { if (now - lastFinishTime < firstDelayMSec) {
return return
} }
val stageIds = sc.statusTracker.getActiveStageIds() val stages = sc.statusStore.activeStages()
val stages = stageIds.flatMap(sc.statusTracker.getStageInfo).filter(_.numTasks() > 1) .filter { s => now - s.submissionTime.get.getTime() > firstDelayMSec }
.filter(now - _.submissionTime() > firstDelayMSec).sortBy(_.stageId())
if (stages.length > 0) { if (stages.length > 0) {
show(now, stages.take(3)) // display at most 3 stages in same time show(now, stages.take(3)) // display at most 3 stages in same time
} }
@ -77,15 +77,15 @@ private[spark] class ConsoleProgressBar(sc: SparkContext) extends Logging {
* after your last output, keeps overwriting itself to hold in one line. The logging will follow * after your last output, keeps overwriting itself to hold in one line. The logging will follow
* the progress bar, then progress bar will be showed in next line without overwrite logs. * the progress bar, then progress bar will be showed in next line without overwrite logs.
*/ */
private def show(now: Long, stages: Seq[SparkStageInfo]) { private def show(now: Long, stages: Seq[StageData]) {
val width = TerminalWidth / stages.size val width = TerminalWidth / stages.size
val bar = stages.map { s => val bar = stages.map { s =>
val total = s.numTasks() val total = s.numTasks
val header = s"[Stage ${s.stageId()}:" val header = s"[Stage ${s.stageId}:"
val tailer = s"(${s.numCompletedTasks()} + ${s.numActiveTasks()}) / $total]" val tailer = s"(${s.numCompleteTasks} + ${s.numActiveTasks}) / $total]"
val w = width - header.length - tailer.length val w = width - header.length - tailer.length
val bar = if (w > 0) { val bar = if (w > 0) {
val percent = w * s.numCompletedTasks() / total val percent = w * s.numCompleteTasks / total
(0 until w).map { i => (0 until w).map { i =>
if (i < percent) "=" else if (i == percent) ">" else " " if (i < percent) "=" else if (i == percent) ">" else " "
}.mkString("") }.mkString("")

View file

@ -1,612 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
import java.util.concurrent.TimeoutException
import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap, ListBuffer}
import org.apache.spark._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.scheduler._
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.UIData._
/**
* :: DeveloperApi ::
* Tracks task-level information to be displayed in the UI.
*
* All access to the data structures in this class must be synchronized on the
* class, since the UI thread and the EventBus loop may otherwise be reading and
* updating the internal data structures concurrently.
*/
@DeveloperApi
@deprecated("This class will be removed in a future release.", "2.2.0")
class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
// Define a handful of type aliases so that data structures' types can serve as documentation.
// These type aliases are public because they're used in the types of public fields:
type JobId = Int
type JobGroupId = String
type StageId = Int
type StageAttemptId = Int
type PoolName = String
type ExecutorId = String
// Application:
@volatile var startTime = -1L
@volatile var endTime = -1L
// Jobs:
val activeJobs = new HashMap[JobId, JobUIData]
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]
val jobGroupToJobIds = new HashMap[JobGroupId, HashSet[JobId]]
// Stages:
val pendingStages = new HashMap[StageId, StageInfo]
val activeStages = new HashMap[StageId, StageInfo]
val completedStages = ListBuffer[StageInfo]()
val skippedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
val stageIdToInfo = new HashMap[StageId, StageInfo]
val stageIdToActiveJobIds = new HashMap[StageId, HashSet[JobId]]
val poolToActiveStages = HashMap[PoolName, HashMap[StageId, StageInfo]]()
// Total of completed and failed stages that have ever been run. These may be greater than
// `completedStages.size` and `failedStages.size` if we have run more stages or jobs than
// JobProgressListener's retention limits.
var numCompletedStages = 0
var numFailedStages = 0
var numCompletedJobs = 0
var numFailedJobs = 0
// Misc:
val executorIdToBlockManagerId = HashMap[ExecutorId, BlockManagerId]()
def blockManagerIds: Seq[BlockManagerId] = executorIdToBlockManagerId.values.toSeq
var schedulingMode: Option[SchedulingMode] = None
// To limit the total memory usage of JobProgressListener, we only track information for a fixed
// number of non-active jobs and stages (there is no limit for active jobs and stages):
val retainedStages = conf.getInt("spark.ui.retainedStages", SparkUI.DEFAULT_RETAINED_STAGES)
val retainedJobs = conf.getInt("spark.ui.retainedJobs", SparkUI.DEFAULT_RETAINED_JOBS)
val retainedTasks = conf.get(UI_RETAINED_TASKS)
// We can test for memory leaks by ensuring that collections that track non-active jobs and
// stages do not grow without bound and that collections for active jobs/stages eventually become
// empty once Spark is idle. Let's partition our collections into ones that should be empty
// once Spark is idle and ones that should have a hard- or soft-limited sizes.
// These methods are used by unit tests, but they're defined here so that people don't forget to
// update the tests when adding new collections. Some collections have multiple levels of
// nesting, etc, so this lets us customize our notion of "size" for each structure:
// These collections should all be empty once Spark is idle (no active stages / jobs):
private[spark] def getSizesOfActiveStateTrackingCollections: Map[String, Int] = {
Map(
"activeStages" -> activeStages.size,
"activeJobs" -> activeJobs.size,
"poolToActiveStages" -> poolToActiveStages.values.map(_.size).sum,
"stageIdToActiveJobIds" -> stageIdToActiveJobIds.values.map(_.size).sum
)
}
// These collections should stop growing once we have run at least `spark.ui.retainedStages`
// stages and `spark.ui.retainedJobs` jobs:
private[spark] def getSizesOfHardSizeLimitedCollections: Map[String, Int] = {
Map(
"completedJobs" -> completedJobs.size,
"failedJobs" -> failedJobs.size,
"completedStages" -> completedStages.size,
"skippedStages" -> skippedStages.size,
"failedStages" -> failedStages.size
)
}
// These collections may grow arbitrarily, but once Spark becomes idle they should shrink back to
// some bound based on the `spark.ui.retainedStages` and `spark.ui.retainedJobs` settings:
private[spark] def getSizesOfSoftSizeLimitedCollections: Map[String, Int] = {
Map(
"jobIdToData" -> jobIdToData.size,
"stageIdToData" -> stageIdToData.size,
"stageIdToStageInfo" -> stageIdToInfo.size,
"jobGroupToJobIds" -> jobGroupToJobIds.values.map(_.size).sum,
// Since jobGroupToJobIds is map of sets, check that we don't leak keys with empty values:
"jobGroupToJobIds keySet" -> jobGroupToJobIds.keys.size
)
}
/** If stages is too large, remove and garbage collect old stages */
private def trimStagesIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = calculateNumberToRemove(stages.size, retainedStages)
stages.take(toRemove).foreach { s =>
stageIdToData.remove((s.stageId, s.attemptId))
stageIdToInfo.remove(s.stageId)
}
stages.trimStart(toRemove)
}
}
/** If jobs is too large, remove and garbage collect old jobs */
private def trimJobsIfNecessary(jobs: ListBuffer[JobUIData]) = synchronized {
if (jobs.size > retainedJobs) {
val toRemove = calculateNumberToRemove(jobs.size, retainedJobs)
jobs.take(toRemove).foreach { job =>
// Remove the job's UI data, if it exists
jobIdToData.remove(job.jobId).foreach { removedJob =>
// A null jobGroupId is used for jobs that are run without a job group
val jobGroupId = removedJob.jobGroup.orNull
// Remove the job group -> job mapping entry, if it exists
jobGroupToJobIds.get(jobGroupId).foreach { jobsInGroup =>
jobsInGroup.remove(job.jobId)
// If this was the last job in this job group, remove the map entry for the job group
if (jobsInGroup.isEmpty) {
jobGroupToJobIds.remove(jobGroupId)
}
}
}
}
jobs.trimStart(toRemove)
}
}
override def onJobStart(jobStart: SparkListenerJobStart): Unit = synchronized {
val jobGroup = for (
props <- Option(jobStart.properties);
group <- Option(props.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
) yield group
val jobData: JobUIData =
new JobUIData(
jobId = jobStart.jobId,
submissionTime = Option(jobStart.time).filter(_ >= 0),
stageIds = jobStart.stageIds,
jobGroup = jobGroup,
status = JobExecutionStatus.RUNNING)
// A null jobGroupId is used for jobs that are run without a job group
jobGroupToJobIds.getOrElseUpdate(jobGroup.orNull, new HashSet[JobId]).add(jobStart.jobId)
jobStart.stageInfos.foreach(x => pendingStages(x.stageId) = x)
// Compute (a potential underestimate of) the number of tasks that will be run by this job.
// This may be an underestimate because the job start event references all of the result
// stages' transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
jobData.numTasks = {
val allStages = jobStart.stageInfos
val missingStages = allStages.filter(_.completionTime.isEmpty)
missingStages.map(_.numTasks).sum
}
jobIdToData(jobStart.jobId) = jobData
activeJobs(jobStart.jobId) = jobData
for (stageId <- jobStart.stageIds) {
stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet[StageId]).add(jobStart.jobId)
}
// If there's no information for a stage, store the StageInfo received from the scheduler
// so that we can display stage descriptions for pending stages:
for (stageInfo <- jobStart.stageInfos) {
stageIdToInfo.getOrElseUpdate(stageInfo.stageId, stageInfo)
stageIdToData.getOrElseUpdate((stageInfo.stageId, stageInfo.attemptId), new StageUIData)
}
}
override def onJobEnd(jobEnd: SparkListenerJobEnd): Unit = synchronized {
val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
new JobUIData(jobId = jobEnd.jobId)
}
jobData.completionTime = Option(jobEnd.time).filter(_ >= 0)
jobData.stageIds.foreach(pendingStages.remove)
jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
trimJobsIfNecessary(completedJobs)
jobData.status = JobExecutionStatus.SUCCEEDED
numCompletedJobs += 1
case JobFailed(_) =>
failedJobs += jobData
trimJobsIfNecessary(failedJobs)
jobData.status = JobExecutionStatus.FAILED
numFailedJobs += 1
}
for (stageId <- jobData.stageIds) {
stageIdToActiveJobIds.get(stageId).foreach { jobsUsingStage =>
jobsUsingStage.remove(jobEnd.jobId)
if (jobsUsingStage.isEmpty) {
stageIdToActiveJobIds.remove(stageId)
}
stageIdToInfo.get(stageId).foreach { stageInfo =>
if (stageInfo.submissionTime.isEmpty) {
// if this stage is pending, it won't complete, so mark it as "skipped":
skippedStages += stageInfo
trimStagesIfNecessary(skippedStages)
jobData.numSkippedStages += 1
jobData.numSkippedTasks += stageInfo.numTasks
}
}
}
}
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted): Unit = synchronized {
val stage = stageCompleted.stageInfo
stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
logWarning("Stage completed for unknown stage " + stage.stageId)
new StageUIData
})
for ((id, info) <- stageCompleted.stageInfo.accumulables) {
stageData.accumulables(id) = info
}
poolToActiveStages.get(stageData.schedulingPool).foreach { hashMap =>
hashMap.remove(stage.stageId)
}
activeStages.remove(stage.stageId)
if (stage.failureReason.isEmpty) {
completedStages += stage
numCompletedStages += 1
trimStagesIfNecessary(completedStages)
} else {
failedStages += stage
numFailedStages += 1
trimStagesIfNecessary(failedStages)
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
jobId <- activeJobsDependentOnStage;
jobData <- jobIdToData.get(jobId)
) {
jobData.numActiveStages -= 1
if (stage.failureReason.isEmpty) {
if (stage.submissionTime.isDefined) {
jobData.completedStageIndices.add(stage.stageId)
}
} else {
jobData.numFailedStages += 1
}
}
}
/** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = synchronized {
val stage = stageSubmitted.stageInfo
activeStages(stage.stageId) = stage
pendingStages.remove(stage.stageId)
val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.pool", SparkUI.DEFAULT_POOL_NAME)
}.getOrElse(SparkUI.DEFAULT_POOL_NAME)
stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
stageData.schedulingPool = poolName
stageData.description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap[Int, StageInfo])
stages(stage.stageId) = stage
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
jobId <- activeJobsDependentOnStage;
jobData <- jobIdToData.get(jobId)
) {
jobData.numActiveStages += 1
// If a stage retries again, it should be removed from completedStageIndices set
jobData.completedStageIndices.remove(stage.stageId)
}
}
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = synchronized {
val taskInfo = taskStart.taskInfo
if (taskInfo != null) {
val stageData = stageIdToData.getOrElseUpdate((taskStart.stageId, taskStart.stageAttemptId), {
logWarning("Task start for unknown stage " + taskStart.stageId)
new StageUIData
})
stageData.numActiveTasks += 1
stageData.taskData.put(taskInfo.taskId, TaskUIData(taskInfo))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
jobId <- activeJobsDependentOnStage;
jobData <- jobIdToData.get(jobId)
) {
jobData.numActiveTasks += 1
}
}
override def onTaskGettingResult(taskGettingResult: SparkListenerTaskGettingResult) {
// Do nothing: because we don't do a deep copy of the TaskInfo, the TaskInfo in
// stageToTaskInfos already has the updated status.
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
val info = taskEnd.taskInfo
// If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
// completion event is for. Let's just drop it here. This means we might have some speculation
// tasks on the web ui that's never marked as complete.
if (info != null && taskEnd.stageAttemptId != -1) {
val stageData = stageIdToData.getOrElseUpdate((taskEnd.stageId, taskEnd.stageAttemptId), {
logWarning("Task end for unknown stage " + taskEnd.stageId)
new StageUIData
})
for (accumulableInfo <- info.accumulables) {
stageData.accumulables(accumulableInfo.id) = accumulableInfo
}
val execSummaryMap = stageData.executorSummary
val execSummary = execSummaryMap.getOrElseUpdate(info.executorId, new ExecutorSummary)
taskEnd.reason match {
case Success =>
execSummary.succeededTasks += 1
case kill: TaskKilled =>
execSummary.reasonToNumKilled = execSummary.reasonToNumKilled.updated(
kill.reason, execSummary.reasonToNumKilled.getOrElse(kill.reason, 0) + 1)
case commitDenied: TaskCommitDenied =>
execSummary.reasonToNumKilled = execSummary.reasonToNumKilled.updated(
commitDenied.toErrorString, execSummary.reasonToNumKilled.getOrElse(
commitDenied.toErrorString, 0) + 1)
case _ =>
execSummary.failedTasks += 1
}
execSummary.taskTime += info.duration
stageData.numActiveTasks -= 1
val errorMessage: Option[String] =
taskEnd.reason match {
case org.apache.spark.Success =>
stageData.completedIndices.add(info.index)
stageData.numCompleteTasks += 1
None
case kill: TaskKilled =>
stageData.reasonToNumKilled = stageData.reasonToNumKilled.updated(
kill.reason, stageData.reasonToNumKilled.getOrElse(kill.reason, 0) + 1)
Some(kill.toErrorString)
case commitDenied: TaskCommitDenied =>
stageData.reasonToNumKilled = stageData.reasonToNumKilled.updated(
commitDenied.toErrorString, stageData.reasonToNumKilled.getOrElse(
commitDenied.toErrorString, 0) + 1)
Some(commitDenied.toErrorString)
case e: ExceptionFailure => // Handle ExceptionFailure because we might have accumUpdates
stageData.numFailedTasks += 1
Some(e.toErrorString)
case e: TaskFailedReason => // All other failure cases
stageData.numFailedTasks += 1
Some(e.toErrorString)
}
val taskMetrics = Option(taskEnd.taskMetrics)
taskMetrics.foreach { m =>
val oldMetrics = stageData.taskData.get(info.taskId).flatMap(_.metrics)
updateAggregateMetrics(stageData, info.executorId, m, oldMetrics)
}
val taskData = stageData.taskData.getOrElseUpdate(info.taskId, TaskUIData(info))
taskData.updateTaskInfo(info)
taskData.updateTaskMetrics(taskMetrics)
taskData.errorMessage = errorMessage
// If Tasks is too large, remove and garbage collect old tasks
if (stageData.taskData.size > retainedTasks) {
stageData.taskData = stageData.taskData.drop(
calculateNumberToRemove(stageData.taskData.size, retainedTasks))
}
for (
activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId);
jobId <- activeJobsDependentOnStage;
jobData <- jobIdToData.get(jobId)
) {
jobData.numActiveTasks -= 1
taskEnd.reason match {
case Success =>
jobData.completedIndices.add((taskEnd.stageId, info.index))
jobData.numCompletedTasks += 1
case kill: TaskKilled =>
jobData.reasonToNumKilled = jobData.reasonToNumKilled.updated(
kill.reason, jobData.reasonToNumKilled.getOrElse(kill.reason, 0) + 1)
case commitDenied: TaskCommitDenied =>
jobData.reasonToNumKilled = jobData.reasonToNumKilled.updated(
commitDenied.toErrorString, jobData.reasonToNumKilled.getOrElse(
commitDenied.toErrorString, 0) + 1)
case _ =>
jobData.numFailedTasks += 1
}
}
}
}
/**
* Remove at least (maxRetained / 10) items to reduce friction.
*/
private def calculateNumberToRemove(dataSize: Int, retainedSize: Int): Int = {
math.max(retainedSize / 10, dataSize - retainedSize)
}
/**
* Upon receiving new metrics for a task, updates the per-stage and per-executor-per-stage
* aggregate metrics by calculating deltas between the currently recorded metrics and the new
* metrics.
*/
def updateAggregateMetrics(
stageData: StageUIData,
execId: String,
taskMetrics: TaskMetrics,
oldMetrics: Option[TaskMetricsUIData]) {
val execSummary = stageData.executorSummary.getOrElseUpdate(execId, new ExecutorSummary)
val shuffleWriteDelta =
taskMetrics.shuffleWriteMetrics.bytesWritten -
oldMetrics.map(_.shuffleWriteMetrics.bytesWritten).getOrElse(0L)
stageData.shuffleWriteBytes += shuffleWriteDelta
execSummary.shuffleWrite += shuffleWriteDelta
val shuffleWriteRecordsDelta =
taskMetrics.shuffleWriteMetrics.recordsWritten -
oldMetrics.map(_.shuffleWriteMetrics.recordsWritten).getOrElse(0L)
stageData.shuffleWriteRecords += shuffleWriteRecordsDelta
execSummary.shuffleWriteRecords += shuffleWriteRecordsDelta
val shuffleReadDelta =
taskMetrics.shuffleReadMetrics.totalBytesRead -
oldMetrics.map(_.shuffleReadMetrics.totalBytesRead).getOrElse(0L)
stageData.shuffleReadTotalBytes += shuffleReadDelta
execSummary.shuffleRead += shuffleReadDelta
val shuffleReadRecordsDelta =
taskMetrics.shuffleReadMetrics.recordsRead -
oldMetrics.map(_.shuffleReadMetrics.recordsRead).getOrElse(0L)
stageData.shuffleReadRecords += shuffleReadRecordsDelta
execSummary.shuffleReadRecords += shuffleReadRecordsDelta
val inputBytesDelta =
taskMetrics.inputMetrics.bytesRead -
oldMetrics.map(_.inputMetrics.bytesRead).getOrElse(0L)
stageData.inputBytes += inputBytesDelta
execSummary.inputBytes += inputBytesDelta
val inputRecordsDelta =
taskMetrics.inputMetrics.recordsRead -
oldMetrics.map(_.inputMetrics.recordsRead).getOrElse(0L)
stageData.inputRecords += inputRecordsDelta
execSummary.inputRecords += inputRecordsDelta
val outputBytesDelta =
taskMetrics.outputMetrics.bytesWritten -
oldMetrics.map(_.outputMetrics.bytesWritten).getOrElse(0L)
stageData.outputBytes += outputBytesDelta
execSummary.outputBytes += outputBytesDelta
val outputRecordsDelta =
taskMetrics.outputMetrics.recordsWritten -
oldMetrics.map(_.outputMetrics.recordsWritten).getOrElse(0L)
stageData.outputRecords += outputRecordsDelta
execSummary.outputRecords += outputRecordsDelta
val diskSpillDelta =
taskMetrics.diskBytesSpilled - oldMetrics.map(_.diskBytesSpilled).getOrElse(0L)
stageData.diskBytesSpilled += diskSpillDelta
execSummary.diskBytesSpilled += diskSpillDelta
val memorySpillDelta =
taskMetrics.memoryBytesSpilled - oldMetrics.map(_.memoryBytesSpilled).getOrElse(0L)
stageData.memoryBytesSpilled += memorySpillDelta
execSummary.memoryBytesSpilled += memorySpillDelta
val timeDelta =
taskMetrics.executorRunTime - oldMetrics.map(_.executorRunTime).getOrElse(0L)
stageData.executorRunTime += timeDelta
val cpuTimeDelta =
taskMetrics.executorCpuTime - oldMetrics.map(_.executorCpuTime).getOrElse(0L)
stageData.executorCpuTime += cpuTimeDelta
}
override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate) {
for ((taskId, sid, sAttempt, accumUpdates) <- executorMetricsUpdate.accumUpdates) {
val stageData = stageIdToData.getOrElseUpdate((sid, sAttempt), {
logWarning("Metrics update for task in unknown stage " + sid)
new StageUIData
})
val taskData = stageData.taskData.get(taskId)
val metrics = TaskMetrics.fromAccumulatorInfos(accumUpdates)
taskData.foreach { t =>
if (!t.taskInfo.finished) {
updateAggregateMetrics(stageData, executorMetricsUpdate.execId, metrics, t.metrics)
// Overwrite task metrics
t.updateTaskMetrics(Some(metrics))
}
}
}
}
override def onEnvironmentUpdate(environmentUpdate: SparkListenerEnvironmentUpdate) {
synchronized {
schedulingMode = environmentUpdate
.environmentDetails("Spark Properties").toMap
.get("spark.scheduler.mode")
.map(SchedulingMode.withName)
}
}
override def onBlockManagerAdded(blockManagerAdded: SparkListenerBlockManagerAdded) {
synchronized {
val blockManagerId = blockManagerAdded.blockManagerId
val executorId = blockManagerId.executorId
executorIdToBlockManagerId(executorId) = blockManagerId
}
}
override def onBlockManagerRemoved(blockManagerRemoved: SparkListenerBlockManagerRemoved) {
synchronized {
val executorId = blockManagerRemoved.blockManagerId.executorId
executorIdToBlockManagerId.remove(executorId)
}
}
override def onApplicationStart(appStarted: SparkListenerApplicationStart) {
startTime = appStarted.time
}
override def onApplicationEnd(appEnded: SparkListenerApplicationEnd) {
endTime = appEnded.time
}
/**
* For testing only. Wait until at least `numExecutors` executors are up, or throw
* `TimeoutException` if the waiting time elapsed before `numExecutors` executors up.
* Exposed for testing.
*
* @param numExecutors the number of executors to wait at least
* @param timeout time to wait in milliseconds
*/
private[spark] def waitUntilExecutorsUp(numExecutors: Int, timeout: Long): Unit = {
val finishTime = System.currentTimeMillis() + timeout
while (System.currentTimeMillis() < finishTime) {
val numBlockManagers = synchronized {
blockManagerIds.size
}
if (numBlockManagers >= numExecutors + 1) {
// Need to count the block manager in driver
return
}
// Sleep rather than using wait/notify, because this is used only for testing and wait/notify
// add overhead in the general case.
Thread.sleep(10)
}
throw new TimeoutException(
s"Can't find $numExecutors executors before $timeout milliseconds elapsed")
}
}

View file

@ -32,7 +32,6 @@ import org.apache.spark.scheduler.TaskLocality
import org.apache.spark.status.AppStatusStore import org.apache.spark.status.AppStatusStore
import org.apache.spark.status.api.v1._ import org.apache.spark.status.api.v1._
import org.apache.spark.ui._ import org.apache.spark.ui._
import org.apache.spark.ui.jobs.UIData._
import org.apache.spark.util.{Distribution, Utils} import org.apache.spark.util.{Distribution, Utils}
/** Page showing statistics and task list for a given stage */ /** Page showing statistics and task list for a given stage */

View file

@ -1,311 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
import scala.collection.mutable
import scala.collection.mutable.{HashMap, LinkedHashMap}
import com.google.common.collect.Interners
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor._
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
import org.apache.spark.util.AccumulatorContext
import org.apache.spark.util.collection.OpenHashSet
private[spark] object UIData {
class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
var reasonToNumKilled : Map[String, Int] = Map.empty
var inputBytes : Long = 0
var inputRecords : Long = 0
var outputBytes : Long = 0
var outputRecords : Long = 0
var shuffleRead : Long = 0
var shuffleReadRecords : Long = 0
var shuffleWrite : Long = 0
var shuffleWriteRecords : Long = 0
var memoryBytesSpilled : Long = 0
var diskBytesSpilled : Long = 0
var isBlacklisted : Int = 0
}
class JobUIData(
var jobId: Int = -1,
var submissionTime: Option[Long] = None,
var completionTime: Option[Long] = None,
var stageIds: Seq[Int] = Seq.empty,
var jobGroup: Option[String] = None,
var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN,
/* Tasks */
// `numTasks` is a potential underestimate of the true number of tasks that this job will run.
// This may be an underestimate because the job start event references all of the result
// stages' transitive stage dependencies, but some of these stages might be skipped if their
// output is available from earlier runs.
// See https://github.com/apache/spark/pull/3009 for a more extensive discussion.
var numTasks: Int = 0,
var numActiveTasks: Int = 0,
var numCompletedTasks: Int = 0,
var completedIndices: OpenHashSet[(Int, Int)] = new OpenHashSet[(Int, Int)](),
var numSkippedTasks: Int = 0,
var numFailedTasks: Int = 0,
var reasonToNumKilled: Map[String, Int] = Map.empty,
/* Stages */
var numActiveStages: Int = 0,
// This needs to be a set instead of a simple count to prevent double-counting of rerun stages:
var completedStageIndices: mutable.HashSet[Int] = new mutable.HashSet[Int](),
var numSkippedStages: Int = 0,
var numFailedStages: Int = 0
)
class StageUIData {
var numActiveTasks: Int = _
var numCompleteTasks: Int = _
var completedIndices = new OpenHashSet[Int]()
var numFailedTasks: Int = _
var reasonToNumKilled: Map[String, Int] = Map.empty
var executorRunTime: Long = _
var executorCpuTime: Long = _
var inputBytes: Long = _
var inputRecords: Long = _
var outputBytes: Long = _
var outputRecords: Long = _
var shuffleReadTotalBytes: Long = _
var shuffleReadRecords : Long = _
var shuffleWriteBytes: Long = _
var shuffleWriteRecords: Long = _
var memoryBytesSpilled: Long = _
var diskBytesSpilled: Long = _
var isBlacklisted: Int = _
var lastUpdateTime: Option[Long] = None
var schedulingPool: String = ""
var description: Option[String] = None
var accumulables = new HashMap[Long, AccumulableInfo]
var taskData = new LinkedHashMap[Long, TaskUIData]
var executorSummary = new HashMap[String, ExecutorSummary]
def hasInput: Boolean = inputBytes > 0
def hasOutput: Boolean = outputBytes > 0
def hasShuffleRead: Boolean = shuffleReadTotalBytes > 0
def hasShuffleWrite: Boolean = shuffleWriteBytes > 0
def hasBytesSpilled: Boolean = memoryBytesSpilled > 0 || diskBytesSpilled > 0
}
/**
* These are kept mutable and reused throughout a task's lifetime to avoid excessive reallocation.
*/
class TaskUIData private(private var _taskInfo: TaskInfo) {
private[this] var _metrics: Option[TaskMetricsUIData] = Some(TaskMetricsUIData.EMPTY)
var errorMessage: Option[String] = None
def taskInfo: TaskInfo = _taskInfo
def metrics: Option[TaskMetricsUIData] = _metrics
def updateTaskInfo(taskInfo: TaskInfo): Unit = {
_taskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
}
def updateTaskMetrics(metrics: Option[TaskMetrics]): Unit = {
_metrics = metrics.map(TaskMetricsUIData.fromTaskMetrics)
}
def taskDuration(lastUpdateTime: Option[Long] = None): Option[Long] = {
if (taskInfo.status == "RUNNING") {
Some(_taskInfo.timeRunning(lastUpdateTime.getOrElse(System.currentTimeMillis)))
} else {
_metrics.map(_.executorRunTime)
}
}
}
object TaskUIData {
private val stringInterner = Interners.newWeakInterner[String]()
/** String interning to reduce the memory usage. */
private def weakIntern(s: String): String = {
stringInterner.intern(s)
}
def apply(taskInfo: TaskInfo): TaskUIData = {
new TaskUIData(dropInternalAndSQLAccumulables(taskInfo))
}
/**
* We don't need to store internal or SQL accumulables as their values will be shown in other
* places, so drop them to reduce the memory usage.
*/
private[spark] def dropInternalAndSQLAccumulables(taskInfo: TaskInfo): TaskInfo = {
val newTaskInfo = new TaskInfo(
taskId = taskInfo.taskId,
index = taskInfo.index,
attemptNumber = taskInfo.attemptNumber,
launchTime = taskInfo.launchTime,
executorId = weakIntern(taskInfo.executorId),
host = weakIntern(taskInfo.host),
taskLocality = taskInfo.taskLocality,
speculative = taskInfo.speculative
)
newTaskInfo.gettingResultTime = taskInfo.gettingResultTime
newTaskInfo.setAccumulables(taskInfo.accumulables.filter {
accum => !accum.internal && accum.metadata != Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER)
})
newTaskInfo.finishTime = taskInfo.finishTime
newTaskInfo.failed = taskInfo.failed
newTaskInfo.killed = taskInfo.killed
newTaskInfo
}
}
case class TaskMetricsUIData(
executorDeserializeTime: Long,
executorDeserializeCpuTime: Long,
executorRunTime: Long,
executorCpuTime: Long,
resultSize: Long,
jvmGCTime: Long,
resultSerializationTime: Long,
memoryBytesSpilled: Long,
diskBytesSpilled: Long,
peakExecutionMemory: Long,
inputMetrics: InputMetricsUIData,
outputMetrics: OutputMetricsUIData,
shuffleReadMetrics: ShuffleReadMetricsUIData,
shuffleWriteMetrics: ShuffleWriteMetricsUIData)
object TaskMetricsUIData {
def fromTaskMetrics(m: TaskMetrics): TaskMetricsUIData = {
TaskMetricsUIData(
executorDeserializeTime = m.executorDeserializeTime,
executorDeserializeCpuTime = m.executorDeserializeCpuTime,
executorRunTime = m.executorRunTime,
executorCpuTime = m.executorCpuTime,
resultSize = m.resultSize,
jvmGCTime = m.jvmGCTime,
resultSerializationTime = m.resultSerializationTime,
memoryBytesSpilled = m.memoryBytesSpilled,
diskBytesSpilled = m.diskBytesSpilled,
peakExecutionMemory = m.peakExecutionMemory,
inputMetrics = InputMetricsUIData(m.inputMetrics),
outputMetrics = OutputMetricsUIData(m.outputMetrics),
shuffleReadMetrics = ShuffleReadMetricsUIData(m.shuffleReadMetrics),
shuffleWriteMetrics = ShuffleWriteMetricsUIData(m.shuffleWriteMetrics))
}
val EMPTY: TaskMetricsUIData = fromTaskMetrics(TaskMetrics.empty)
}
case class InputMetricsUIData(bytesRead: Long, recordsRead: Long)
object InputMetricsUIData {
def apply(metrics: InputMetrics): InputMetricsUIData = {
if (metrics.bytesRead == 0 && metrics.recordsRead == 0) {
EMPTY
} else {
new InputMetricsUIData(
bytesRead = metrics.bytesRead,
recordsRead = metrics.recordsRead)
}
}
private val EMPTY = InputMetricsUIData(0, 0)
}
case class OutputMetricsUIData(bytesWritten: Long, recordsWritten: Long)
object OutputMetricsUIData {
def apply(metrics: OutputMetrics): OutputMetricsUIData = {
if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0) {
EMPTY
} else {
new OutputMetricsUIData(
bytesWritten = metrics.bytesWritten,
recordsWritten = metrics.recordsWritten)
}
}
private val EMPTY = OutputMetricsUIData(0, 0)
}
case class ShuffleReadMetricsUIData(
remoteBlocksFetched: Long,
localBlocksFetched: Long,
remoteBytesRead: Long,
remoteBytesReadToDisk: Long,
localBytesRead: Long,
fetchWaitTime: Long,
recordsRead: Long,
totalBytesRead: Long,
totalBlocksFetched: Long)
object ShuffleReadMetricsUIData {
def apply(metrics: ShuffleReadMetrics): ShuffleReadMetricsUIData = {
if (
metrics.remoteBlocksFetched == 0 &&
metrics.localBlocksFetched == 0 &&
metrics.remoteBytesRead == 0 &&
metrics.localBytesRead == 0 &&
metrics.fetchWaitTime == 0 &&
metrics.recordsRead == 0 &&
metrics.totalBytesRead == 0 &&
metrics.totalBlocksFetched == 0) {
EMPTY
} else {
new ShuffleReadMetricsUIData(
remoteBlocksFetched = metrics.remoteBlocksFetched,
localBlocksFetched = metrics.localBlocksFetched,
remoteBytesRead = metrics.remoteBytesRead,
remoteBytesReadToDisk = metrics.remoteBytesReadToDisk,
localBytesRead = metrics.localBytesRead,
fetchWaitTime = metrics.fetchWaitTime,
recordsRead = metrics.recordsRead,
totalBytesRead = metrics.totalBytesRead,
totalBlocksFetched = metrics.totalBlocksFetched
)
}
}
private val EMPTY = ShuffleReadMetricsUIData(0, 0, 0, 0, 0, 0, 0, 0, 0)
}
case class ShuffleWriteMetricsUIData(
bytesWritten: Long,
recordsWritten: Long,
writeTime: Long)
object ShuffleWriteMetricsUIData {
def apply(metrics: ShuffleWriteMetrics): ShuffleWriteMetricsUIData = {
if (metrics.bytesWritten == 0 && metrics.recordsWritten == 0 && metrics.writeTime == 0) {
EMPTY
} else {
new ShuffleWriteMetricsUIData(
bytesWritten = metrics.bytesWritten,
recordsWritten = metrics.recordsWritten,
writeTime = metrics.writeTime
)
}
}
private val EMPTY = ShuffleWriteMetricsUIData(0, 0, 0)
}
}

View file

@ -156,7 +156,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = { private def testCaching(conf: SparkConf, storageLevel: StorageLevel): Unit = {
sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test")) sc = new SparkContext(conf.setMaster(clusterUrl).setAppName("test"))
sc.jobProgressListener.waitUntilExecutorsUp(2, 30000) TestUtils.waitUntilExecutorsUp(sc, 2, 30000)
val data = sc.parallelize(1 to 1000, 10) val data = sc.parallelize(1 to 1000, 10)
val cachedData = data.persist(storageLevel) val cachedData = data.persist(storageLevel)
assert(cachedData.count === 1000) assert(cachedData.count === 1000)

View file

@ -66,7 +66,7 @@ class ExternalShuffleServiceSuite extends ShuffleSuite with BeforeAndAfterAll {
// local blocks from the local BlockManager and won't send requests to ExternalShuffleService. // local blocks from the local BlockManager and won't send requests to ExternalShuffleService.
// In this case, we won't receive FetchFailed. And it will make this test fail. // In this case, we won't receive FetchFailed. And it will make this test fail.
// Therefore, we should wait until all slaves are up // Therefore, we should wait until all slaves are up
sc.jobProgressListener.waitUntilExecutorsUp(2, 60000) TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _) val rdd = sc.parallelize(0 until 1000, 10).map(i => (i, 1)).reduceByKey(_ + _)

View file

@ -44,13 +44,13 @@ class StatusTrackerSuite extends SparkFunSuite with Matchers with LocalSparkCont
stageIds.size should be(2) stageIds.size should be(2)
val firstStageInfo = eventually(timeout(10 seconds)) { val firstStageInfo = eventually(timeout(10 seconds)) {
sc.statusTracker.getStageInfo(stageIds(0)).get sc.statusTracker.getStageInfo(stageIds.min).get
} }
firstStageInfo.stageId() should be(stageIds(0)) firstStageInfo.stageId() should be(stageIds.min)
firstStageInfo.currentAttemptId() should be(0) firstStageInfo.currentAttemptId() should be(0)
firstStageInfo.numTasks() should be(2) firstStageInfo.numTasks() should be(2)
eventually(timeout(10 seconds)) { eventually(timeout(10 seconds)) {
val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds(0)).get val updatedFirstStageInfo = sc.statusTracker.getStageInfo(stageIds.min).get
updatedFirstStageInfo.numCompletedTasks() should be(2) updatedFirstStageInfo.numCompletedTasks() should be(2)
updatedFirstStageInfo.numActiveTasks() should be(0) updatedFirstStageInfo.numActiveTasks() should be(0)
updatedFirstStageInfo.numFailedTasks() should be(0) updatedFirstStageInfo.numFailedTasks() should be(0)

View file

@ -224,7 +224,7 @@ class BroadcastSuite extends SparkFunSuite with LocalSparkContext with Encryptio
new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test") new SparkContext("local-cluster[%d, 1, 1024]".format(numSlaves), "test")
// Wait until all salves are up // Wait until all salves are up
try { try {
_sc.jobProgressListener.waitUntilExecutorsUp(numSlaves, 60000) TestUtils.waitUntilExecutorsUp(_sc, numSlaves, 60000)
_sc _sc
} catch { } catch {
case e: Throwable => case e: Throwable =>

View file

@ -2406,13 +2406,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// OutputCommitCoordinator requires the task info itself to not be null. // OutputCommitCoordinator requires the task info itself to not be null.
private def createFakeTaskInfo(): TaskInfo = { private def createFakeTaskInfo(): TaskInfo = {
val info = new TaskInfo(0, 0, 0, 0L, "", "", TaskLocality.ANY, false) val info = new TaskInfo(0, 0, 0, 0L, "", "", TaskLocality.ANY, false)
info.finishTime = 1 // to prevent spurious errors in JobProgressListener info.finishTime = 1
info info
} }
private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = { private def createFakeTaskInfoWithId(taskId: Long): TaskInfo = {
val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false) val info = new TaskInfo(taskId, 0, 0, 0L, "", "", TaskLocality.ANY, false)
info.finishTime = 1 // to prevent spurious errors in JobProgressListener info.finishTime = 1
info info
} }

View file

@ -21,7 +21,7 @@ import scala.collection.mutable
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll} import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll}
import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite} import org.apache.spark.{LocalSparkContext, SparkContext, SparkFunSuite, TestUtils}
import org.apache.spark.scheduler.cluster.ExecutorInfo import org.apache.spark.scheduler.cluster.ExecutorInfo
/** /**
@ -43,7 +43,7 @@ class SparkListenerWithClusterSuite extends SparkFunSuite with LocalSparkContext
// This test will check if the number of executors received by "SparkListener" is same as the // This test will check if the number of executors received by "SparkListener" is same as the
// number of all executors, so we need to wait until all executors are up // number of all executors, so we need to wait until all executors are up
sc.jobProgressListener.waitUntilExecutorsUp(2, 60000) TestUtils.waitUntilExecutorsUp(sc, 2, 60000)
val rdd1 = sc.parallelize(1 to 100, 4) val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.map(_.toString) val rdd2 = rdd1.map(_.toString)

View file

@ -1,442 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.ui.jobs
import java.util.Properties
import org.scalatest.Matchers
import org.apache.spark._
import org.apache.spark.{LocalSparkContext, SparkConf, Success}
import org.apache.spark.executor._
import org.apache.spark.scheduler._
import org.apache.spark.ui.jobs.UIData.TaskUIData
import org.apache.spark.util.{AccumulatorContext, Utils}
class JobProgressListenerSuite extends SparkFunSuite with LocalSparkContext with Matchers {
val jobSubmissionTime = 1421191042750L
val jobCompletionTime = 1421191296660L
private def createStageStartEvent(stageId: Int) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
SparkListenerStageSubmitted(stageInfo)
}
private def createStageEndEvent(stageId: Int, failed: Boolean = false) = {
val stageInfo = new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
if (failed) {
stageInfo.failureReason = Some("Failed!")
}
SparkListenerStageCompleted(stageInfo)
}
private def createJobStartEvent(
jobId: Int,
stageIds: Seq[Int],
jobGroup: Option[String] = None): SparkListenerJobStart = {
val stageInfos = stageIds.map { stageId =>
new StageInfo(stageId, 0, stageId.toString, 0, null, null, "")
}
val properties: Option[Properties] = jobGroup.map { groupId =>
val props = new Properties()
props.setProperty(SparkContext.SPARK_JOB_GROUP_ID, groupId)
props
}
SparkListenerJobStart(jobId, jobSubmissionTime, stageInfos, properties.orNull)
}
private def createJobEndEvent(jobId: Int, failed: Boolean = false) = {
val result = if (failed) JobFailed(new Exception("dummy failure")) else JobSucceeded
SparkListenerJobEnd(jobId, jobCompletionTime, result)
}
private def runJob(listener: SparkListener, jobId: Int, shouldFail: Boolean = false) {
val stagesThatWontBeRun = jobId * 200 to jobId * 200 + 10
val stageIds = jobId * 100 to jobId * 100 + 50
listener.onJobStart(createJobStartEvent(jobId, stageIds ++ stagesThatWontBeRun))
for (stageId <- stageIds) {
listener.onStageSubmitted(createStageStartEvent(stageId))
listener.onStageCompleted(createStageEndEvent(stageId, failed = stageId % 2 == 0))
}
listener.onJobEnd(createJobEndEvent(jobId, shouldFail))
}
private def assertActiveJobsStateIsEmpty(listener: JobProgressListener) {
listener.getSizesOfActiveStateTrackingCollections.foreach { case (fieldName, size) =>
assert(size === 0, s"$fieldName was not empty")
}
}
test("test LRU eviction of stages") {
def runWithListener(listener: JobProgressListener) : Unit = {
for (i <- 1 to 50) {
listener.onStageSubmitted(createStageStartEvent(i))
listener.onStageCompleted(createStageEndEvent(i))
}
assertActiveJobsStateIsEmpty(listener)
}
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
var listener = new JobProgressListener(conf)
// Test with 5 retainedStages
runWithListener(listener)
listener.completedStages.size should be (5)
listener.completedStages.map(_.stageId).toSet should be (Set(50, 49, 48, 47, 46))
// Test with 0 retainedStages
conf.set("spark.ui.retainedStages", 0.toString)
listener = new JobProgressListener(conf)
runWithListener(listener)
listener.completedStages.size should be (0)
}
test("test clearing of stageIdToActiveJobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
val listener = new JobProgressListener(conf)
val jobId = 0
val stageIds = 1 to 50
// Start a job with 50 stages
listener.onJobStart(createJobStartEvent(jobId, stageIds))
for (stageId <- stageIds) {
listener.onStageSubmitted(createStageStartEvent(stageId))
}
listener.stageIdToActiveJobIds.size should be > 0
// Complete the stages and job
for (stageId <- stageIds) {
listener.onStageCompleted(createStageEndEvent(stageId, failed = false))
}
listener.onJobEnd(createJobEndEvent(jobId, false))
assertActiveJobsStateIsEmpty(listener)
listener.stageIdToActiveJobIds.size should be (0)
}
test("test clearing of jobGroupToJobIds") {
def runWithListener(listener: JobProgressListener): Unit = {
// Run 50 jobs, each with one stage
for (jobId <- 0 to 50) {
listener.onJobStart(createJobStartEvent(jobId, Seq(0), jobGroup = Some(jobId.toString)))
listener.onStageSubmitted(createStageStartEvent(0))
listener.onStageCompleted(createStageEndEvent(0, failed = false))
listener.onJobEnd(createJobEndEvent(jobId, false))
}
assertActiveJobsStateIsEmpty(listener)
}
val conf = new SparkConf()
conf.set("spark.ui.retainedJobs", 5.toString)
var listener = new JobProgressListener(conf)
runWithListener(listener)
// This collection won't become empty, but it should be bounded by spark.ui.retainedJobs
listener.jobGroupToJobIds.size should be (5)
// Test with 0 jobs
conf.set("spark.ui.retainedJobs", 0.toString)
listener = new JobProgressListener(conf)
runWithListener(listener)
listener.jobGroupToJobIds.size should be (0)
}
test("test LRU eviction of jobs") {
val conf = new SparkConf()
conf.set("spark.ui.retainedStages", 5.toString)
conf.set("spark.ui.retainedJobs", 5.toString)
val listener = new JobProgressListener(conf)
// Run a bunch of jobs to get the listener into a state where we've exceeded both the
// job and stage retention limits:
for (jobId <- 1 to 10) {
runJob(listener, jobId, shouldFail = false)
}
for (jobId <- 200 to 210) {
runJob(listener, jobId, shouldFail = true)
}
assertActiveJobsStateIsEmpty(listener)
// Snapshot the sizes of various soft- and hard-size-limited collections:
val softLimitSizes = listener.getSizesOfSoftSizeLimitedCollections
val hardLimitSizes = listener.getSizesOfHardSizeLimitedCollections
// Run some more jobs:
for (jobId <- 11 to 50) {
runJob(listener, jobId, shouldFail = false)
// We shouldn't exceed the hard / soft limit sizes after the jobs have finished:
listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
}
listener.completedJobs.size should be (5)
listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46))
for (jobId <- 51 to 100) {
runJob(listener, jobId, shouldFail = true)
// We shouldn't exceed the hard / soft limit sizes after the jobs have finished:
listener.getSizesOfSoftSizeLimitedCollections should be (softLimitSizes)
listener.getSizesOfHardSizeLimitedCollections should be (hardLimitSizes)
}
assertActiveJobsStateIsEmpty(listener)
// Completed and failed jobs each their own size limits, so this should still be the same:
listener.completedJobs.size should be (5)
listener.completedJobs.map(_.jobId).toSet should be (Set(50, 49, 48, 47, 46))
listener.failedJobs.size should be (5)
listener.failedJobs.map(_.jobId).toSet should be (Set(100, 99, 98, 97, 96))
}
test("test executor id to summary") {
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val taskMetrics = TaskMetrics.empty
val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
assert(listener.stageIdToData.size === 0)
// finish this task, should get updated shuffleRead
shuffleReadMetrics.incRemoteBytesRead(1000)
taskMetrics.mergeShuffleReadMetrics()
var taskInfo = new TaskInfo(1234L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
var task = new ShuffleMapTask(0)
val taskType = Utils.getFormattedClassName(task)
listener.onTaskEnd(
SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToData.getOrElse((0, 0), fail())
.executorSummary.getOrElse("exe-1", fail()).shuffleRead === 1000)
// finish a task with unknown executor-id, nothing should happen
taskInfo =
new TaskInfo(1234L, 0, 1, 1000L, "exe-unknown", "host1", TaskLocality.NODE_LOCAL, true)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0)
listener.onTaskEnd(
SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToData.size === 1)
// finish this task, should get updated duration
taskInfo = new TaskInfo(1235L, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0)
listener.onTaskEnd(
SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToData.getOrElse((0, 0), fail())
.executorSummary.getOrElse("exe-1", fail()).shuffleRead === 2000)
// finish this task, should get updated duration
taskInfo = new TaskInfo(1236L, 0, 2, 0L, "exe-2", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
task = new ShuffleMapTask(0)
listener.onTaskEnd(
SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
assert(listener.stageIdToData.getOrElse((0, 0), fail())
.executorSummary.getOrElse("exe-2", fail()).shuffleRead === 1000)
}
test("test task success vs failure counting for different task end reasons") {
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val metrics = TaskMetrics.empty
val taskInfo = new TaskInfo(1234L, 0, 3, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
val task = new ShuffleMapTask(0)
val taskType = Utils.getFormattedClassName(task)
// Go through all the failure cases to make sure we are counting them as failures.
val taskFailedReasons = Seq(
Resubmitted,
new FetchFailed(null, 0, 0, 0, "ignored"),
ExceptionFailure("Exception", "description", null, null, None),
TaskResultLost,
ExecutorLostFailure("0", true, Some("Induced failure")),
UnknownReason)
var failCount = 0
for (reason <- taskFailedReasons) {
listener.onTaskEnd(
SparkListenerTaskEnd(task.stageId, 0, taskType, reason, taskInfo, metrics))
failCount += 1
assert(listener.stageIdToData((task.stageId, 0)).numCompleteTasks === 0)
assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount)
}
// Make sure killed tasks are accounted for correctly.
listener.onTaskEnd(
SparkListenerTaskEnd(
task.stageId, 0, taskType, TaskKilled("test"), taskInfo, metrics))
assert(listener.stageIdToData((task.stageId, 0)).reasonToNumKilled === Map("test" -> 1))
// Make sure we count success as success.
listener.onTaskEnd(
SparkListenerTaskEnd(task.stageId, 1, taskType, Success, taskInfo, metrics))
assert(listener.stageIdToData((task.stageId, 1)).numCompleteTasks === 1)
assert(listener.stageIdToData((task.stageId, 0)).numFailedTasks === failCount)
}
test("test update metrics") {
val conf = new SparkConf()
val listener = new JobProgressListener(conf)
val taskType = Utils.getFormattedClassName(new ShuffleMapTask(0))
val execId = "exe-1"
def makeTaskMetrics(base: Int): TaskMetrics = {
val taskMetrics = TaskMetrics.registered
val shuffleReadMetrics = taskMetrics.createTempShuffleReadMetrics()
val shuffleWriteMetrics = taskMetrics.shuffleWriteMetrics
val inputMetrics = taskMetrics.inputMetrics
val outputMetrics = taskMetrics.outputMetrics
shuffleReadMetrics.incRemoteBytesRead(base + 1)
shuffleReadMetrics.incLocalBytesRead(base + 9)
shuffleReadMetrics.incRemoteBlocksFetched(base + 2)
taskMetrics.mergeShuffleReadMetrics()
shuffleWriteMetrics.incBytesWritten(base + 3)
taskMetrics.setExecutorRunTime(base + 4)
taskMetrics.incDiskBytesSpilled(base + 5)
taskMetrics.incMemoryBytesSpilled(base + 6)
inputMetrics.setBytesRead(base + 7)
outputMetrics.setBytesWritten(base + 8)
taskMetrics
}
def makeTaskInfo(taskId: Long, finishTime: Int = 0): TaskInfo = {
val taskInfo = new TaskInfo(taskId, 0, 1, 0L, execId, "host1", TaskLocality.NODE_LOCAL,
false)
taskInfo.finishTime = finishTime
taskInfo
}
listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1234L)))
listener.onTaskStart(SparkListenerTaskStart(0, 0, makeTaskInfo(1235L)))
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1236L)))
listener.onTaskStart(SparkListenerTaskStart(1, 0, makeTaskInfo(1237L)))
listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate(execId, Array(
(1234L, 0, 0, makeTaskMetrics(0).accumulators().map(AccumulatorSuite.makeInfo)),
(1235L, 0, 0, makeTaskMetrics(100).accumulators().map(AccumulatorSuite.makeInfo)),
(1236L, 1, 0, makeTaskMetrics(200).accumulators().map(AccumulatorSuite.makeInfo)))))
var stage0Data = listener.stageIdToData.get((0, 0)).get
var stage1Data = listener.stageIdToData.get((1, 0)).get
assert(stage0Data.shuffleReadTotalBytes == 220)
assert(stage1Data.shuffleReadTotalBytes == 410)
assert(stage0Data.shuffleWriteBytes == 106)
assert(stage1Data.shuffleWriteBytes == 203)
assert(stage0Data.executorRunTime == 108)
assert(stage1Data.executorRunTime == 204)
assert(stage0Data.diskBytesSpilled == 110)
assert(stage1Data.diskBytesSpilled == 205)
assert(stage0Data.memoryBytesSpilled == 112)
assert(stage1Data.memoryBytesSpilled == 206)
assert(stage0Data.inputBytes == 114)
assert(stage1Data.inputBytes == 207)
assert(stage0Data.outputBytes == 116)
assert(stage1Data.outputBytes == 208)
assert(
stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 2)
assert(
stage0Data.taskData.get(1235L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 102)
assert(
stage1Data.taskData.get(1236L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 202)
// task that was included in a heartbeat
listener.onTaskEnd(SparkListenerTaskEnd(0, 0, taskType, Success, makeTaskInfo(1234L, 1),
makeTaskMetrics(300)))
// task that wasn't included in a heartbeat
listener.onTaskEnd(SparkListenerTaskEnd(1, 0, taskType, Success, makeTaskInfo(1237L, 1),
makeTaskMetrics(400)))
stage0Data = listener.stageIdToData.get((0, 0)).get
stage1Data = listener.stageIdToData.get((1, 0)).get
// Task 1235 contributed (100+1)+(100+9) = 210 shuffle bytes, and task 1234 contributed
// (300+1)+(300+9) = 610 total shuffle bytes, so the total for the stage is 820.
assert(stage0Data.shuffleReadTotalBytes == 820)
// Task 1236 contributed 410 shuffle bytes, and task 1237 contributed 810 shuffle bytes.
assert(stage1Data.shuffleReadTotalBytes == 1220)
assert(stage0Data.shuffleWriteBytes == 406)
assert(stage1Data.shuffleWriteBytes == 606)
assert(stage0Data.executorRunTime == 408)
assert(stage1Data.executorRunTime == 608)
assert(stage0Data.diskBytesSpilled == 410)
assert(stage1Data.diskBytesSpilled == 610)
assert(stage0Data.memoryBytesSpilled == 412)
assert(stage1Data.memoryBytesSpilled == 612)
assert(stage0Data.inputBytes == 414)
assert(stage1Data.inputBytes == 614)
assert(stage0Data.outputBytes == 416)
assert(stage1Data.outputBytes == 616)
assert(
stage0Data.taskData.get(1234L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 302)
assert(
stage1Data.taskData.get(1237L).get.metrics.get.shuffleReadMetrics.totalBlocksFetched == 402)
}
test("drop internal and sql accumulators") {
val taskInfo = new TaskInfo(0, 0, 0, 0, "", "", TaskLocality.ANY, false)
val internalAccum =
AccumulableInfo(id = 1, name = Some("internal"), None, None, true, false, None)
val sqlAccum = AccumulableInfo(
id = 2,
name = Some("sql"),
update = None,
value = None,
internal = false,
countFailedValues = false,
metadata = Some(AccumulatorContext.SQL_ACCUM_IDENTIFIER))
val userAccum = AccumulableInfo(
id = 3,
name = Some("user"),
update = None,
value = None,
internal = false,
countFailedValues = false,
metadata = None)
taskInfo.setAccumulables(List(internalAccum, sqlAccum, userAccum))
val newTaskInfo = TaskUIData.dropInternalAndSQLAccumulables(taskInfo)
assert(newTaskInfo.accumulables === Seq(userAccum))
}
test("SPARK-19146 drop more elements when stageData.taskData.size > retainedTasks") {
val conf = new SparkConf()
conf.set("spark.ui.retainedTasks", "100")
val taskMetrics = TaskMetrics.empty
taskMetrics.mergeShuffleReadMetrics()
val task = new ShuffleMapTask(0)
val taskType = Utils.getFormattedClassName(task)
val listener1 = new JobProgressListener(conf)
for (t <- 1 to 101) {
val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
listener1.onTaskEnd(
SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
}
// 101 - math.max(100 / 10, 101 - 100) = 91
assert(listener1.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 91)
val listener2 = new JobProgressListener(conf)
for (t <- 1 to 150) {
val taskInfo = new TaskInfo(t, 0, 1, 0L, "exe-1", "host1", TaskLocality.NODE_LOCAL, false)
taskInfo.finishTime = 1
listener2.onTaskEnd(
SparkListenerTaskEnd(task.stageId, 0, taskType, Success, taskInfo, taskMetrics))
}
// 150 - math.max(100 / 10, 150 - 100) = 100
assert(listener2.stageIdToData((task.stageId, task.stageAttemptId)).taskData.size === 100)
}
}

View file

@ -45,6 +45,8 @@ object MimaExcludes {
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.StorageStatusListener"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.storage.StorageStatusListener"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorStageSummary.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.ExecutorStageSummary.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.JobData.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.JobData.this"),
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.SparkStatusTracker.this"),
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.ui.jobs.JobProgressListener"),
// [SPARK-20495][SQL] Add StorageLevel to cacheTable API // [SPARK-20495][SQL] Add StorageLevel to cacheTable API
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable"), ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.cacheTable"),

View file

@ -23,16 +23,16 @@ import scala.xml._
import org.apache.commons.lang3.StringEscapeUtils import org.apache.commons.lang3.StringEscapeUtils
import org.apache.spark.status.api.v1.{JobData, StageData}
import org.apache.spark.streaming.Time import org.apache.spark.streaming.Time
import org.apache.spark.streaming.ui.StreamingJobProgressListener.SparkJobId import org.apache.spark.streaming.ui.StreamingJobProgressListener.SparkJobId
import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage} import org.apache.spark.ui.{UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.ui.jobs.UIData.JobUIData
private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobUIData: Option[JobUIData]) private[ui] case class SparkJobIdWithUIData(sparkJobId: SparkJobId, jobData: Option[JobData])
private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") { private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
private val streamingListener = parent.listener private val streamingListener = parent.listener
private val sparkListener = parent.ssc.sc.jobProgressListener private val store = parent.parent.store
private def columns: Seq[Node] = { private def columns: Seq[Node] = {
<th>Output Op Id</th> <th>Output Op Id</th>
@ -52,13 +52,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
formattedOutputOpDuration: String, formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int, numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean, isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = { jobIdWithData: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) { if (jobIdWithData.jobData.isDefined) {
generateNormalJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration, generateNormalJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get) numSparkJobRowsInOutputOp, isFirstRow, jobIdWithData.jobData.get)
} else { } else {
generateDroppedJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration, generateDroppedJobRow(outputOpData, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId) numSparkJobRowsInOutputOp, isFirstRow, jobIdWithData.sparkJobId)
} }
} }
@ -94,15 +94,15 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
formattedOutputOpDuration: String, formattedOutputOpDuration: String,
numSparkJobRowsInOutputOp: Int, numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean, isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = { sparkJob: JobData): Seq[Node] = {
val duration: Option[Long] = { val duration: Option[Long] = {
sparkJob.submissionTime.map { start => sparkJob.submissionTime.map { start =>
val end = sparkJob.completionTime.getOrElse(System.currentTimeMillis()) val end = sparkJob.completionTime.map(_.getTime()).getOrElse(System.currentTimeMillis())
end - start end - start.getTime()
} }
} }
val lastFailureReason = val lastFailureReason =
sparkJob.stageIds.sorted.reverse.flatMap(sparkListener.stageIdToInfo.get). sparkJob.stageIds.sorted.reverse.flatMap(getStageData).
dropWhile(_.failureReason == None).take(1). // get the first info that contains failure dropWhile(_.failureReason == None).take(1). // get the first info that contains failure
flatMap(info => info.failureReason).headOption.getOrElse("") flatMap(info => info.failureReason).headOption.getOrElse("")
val formattedDuration = duration.map(d => SparkUIUtils.formatDuration(d)).getOrElse("-") val formattedDuration = duration.map(d => SparkUIUtils.formatDuration(d)).getOrElse("-")
@ -135,7 +135,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
{formattedDuration} {formattedDuration}
</td> </td>
<td class="stage-progress-cell"> <td class="stage-progress-cell">
{sparkJob.completedStageIndices.size}/{sparkJob.stageIds.size - sparkJob.numSkippedStages} {sparkJob.numCompletedStages}/{sparkJob.stageIds.size - sparkJob.numSkippedStages}
{if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"} {if (sparkJob.numFailedStages > 0) s"(${sparkJob.numFailedStages} failed)"}
{if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"} {if (sparkJob.numSkippedStages > 0) s"(${sparkJob.numSkippedStages} skipped)"}
</td> </td>
@ -146,7 +146,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
completed = sparkJob.numCompletedTasks, completed = sparkJob.numCompletedTasks,
failed = sparkJob.numFailedTasks, failed = sparkJob.numFailedTasks,
skipped = sparkJob.numSkippedTasks, skipped = sparkJob.numSkippedTasks,
reasonToNumKilled = sparkJob.reasonToNumKilled, reasonToNumKilled = sparkJob.killedTasksSummary,
total = sparkJob.numTasks - sparkJob.numSkippedTasks) total = sparkJob.numTasks - sparkJob.numSkippedTasks)
} }
</td> </td>
@ -246,11 +246,19 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
</div> </div>
} }
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = { private def getJobData(sparkJobId: SparkJobId): Option[JobData] = {
sparkListener.activeJobs.get(sparkJobId).orElse { try {
sparkListener.completedJobs.find(_.jobId == sparkJobId).orElse { Some(store.job(sparkJobId))
sparkListener.failedJobs.find(_.jobId == sparkJobId) } catch {
} case _: NoSuchElementException => None
}
}
private def getStageData(stageId: Int): Option[StageData] = {
try {
Some(store.lastStageAttempt(stageId))
} catch {
case _: NoSuchElementException => None
} }
} }
@ -282,25 +290,22 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty) val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
(outputOperation, sparkJobIds) (outputOperation, sparkJobIds)
}.toSeq.sortBy(_._1.id) }.toSeq.sortBy(_._1.id)
sparkListener.synchronized { val outputOpWithJobs = outputOps.map { case (outputOpData, sparkJobIds) =>
val outputOpWithJobs = outputOps.map { case (outputOpData, sparkJobIds) => (outputOpData, sparkJobIds.map { jobId => SparkJobIdWithUIData(jobId, getJobData(jobId)) })
(outputOpData, }
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}
<table id="batch-job-table" class="table table-bordered table-striped table-condensed"> <table id="batch-job-table" class="table table-bordered table-striped table-condensed">
<thead> <thead>
{columns} {columns}
</thead> </thead>
<tbody> <tbody>
{ {
outputOpWithJobs.map { case (outputOpData, sparkJobIds) => outputOpWithJobs.map { case (outputOpData, sparkJobs) =>
generateOutputOpIdRow(outputOpData, sparkJobIds) generateOutputOpIdRow(outputOpData, sparkJobs)
}
} }
</tbody> }
</table> </tbody>
} </table>
} }
def render(request: HttpServletRequest): Seq[Node] = streamingListener.synchronized { def render(request: HttpServletRequest): Seq[Node] = streamingListener.synchronized {