[SPARK-20653][CORE] Add cleaning of old elements from the status store.

This change restores the functionality that keeps a limited number of
different types (jobs, stages, etc) depending on configuration, to avoid
the store growing indefinitely over time.

The feature is implemented by creating a new type (ElementTrackingStore)
that wraps a KVStore and allows triggers to be set up for when elements
of a certain type meet a certain threshold. Triggers don't need to
necessarily only delete elements, but the current API is set up in a way
that makes that use case easier.

The new store also has a trigger for the "close" call, which makes it
easier for listeners to register code for cleaning things up and flushing
partial state to the store.

The old configurations for cleaning up the stored elements from the core
and SQL UIs are now active again, and the old unit tests are re-enabled.

Author: Marcelo Vanzin <vanzin@cloudera.com>

Closes #19751 from vanzin/SPARK-20653.
This commit is contained in:
Marcelo Vanzin 2017-12-18 14:08:48 -06:00 committed by Imran Rashid
parent fb3636b482
commit 772e4648d9
22 changed files with 713 additions and 81 deletions

View file

@ -44,6 +44,7 @@ import org.apache.spark.scheduler.ReplayListenerBus._
import org.apache.spark.status._
import org.apache.spark.status.KVUtils._
import org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{Clock, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.kvstore._
@ -304,6 +305,9 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
val (kvstore, needReplay) = uiStorePath match {
case Some(path) =>
try {
// The store path is not guaranteed to exist - maybe it hasn't been created, or was
// invalidated because changes to the event log were detected. Need to replay in that
// case.
val _replay = !path.isDirectory()
(createDiskStore(path, conf), _replay)
} catch {
@ -318,24 +322,23 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
(new InMemoryStore(), true)
}
val trackingStore = new ElementTrackingStore(kvstore, conf)
if (needReplay) {
val replayBus = new ReplayListenerBus()
val listener = new AppStatusListener(kvstore, conf, false,
val listener = new AppStatusListener(trackingStore, conf, false,
lastUpdateTime = Some(attempt.info.lastUpdated.getTime()))
replayBus.addListener(listener)
AppStatusPlugin.loadPlugins().foreach { plugin =>
plugin.setupListeners(conf, kvstore, l => replayBus.addListener(l), false)
plugin.setupListeners(conf, trackingStore, l => replayBus.addListener(l), false)
}
try {
val fileStatus = fs.getFileStatus(new Path(logDir, attempt.logPath))
replay(fileStatus, isApplicationCompleted(fileStatus), replayBus)
listener.flush()
trackingStore.close(false)
} catch {
case e: Exception =>
try {
kvstore.close()
} catch {
case _e: Exception => logInfo("Error closing store.", _e)
Utils.tryLogNonFatalError {
trackingStore.close()
}
uiStorePath.foreach(Utils.deleteRecursively)
if (e.isInstanceOf[FileNotFoundException]) {

View file

@ -240,11 +240,6 @@ package object config {
.stringConf
.createOptional
// To limit memory usage, we only track information for a fixed number of tasks
private[spark] val UI_RETAINED_TASKS = ConfigBuilder("spark.ui.retainedTasks")
.intConf
.createWithDefault(100000)
// To limit how many applications are shown in the History Server summary ui
private[spark] val HISTORY_UI_MAX_APPS =
ConfigBuilder("spark.history.ui.maxApplications").intConf.createWithDefault(Integer.MAX_VALUE)

View file

@ -32,7 +32,6 @@ import org.apache.spark.status.api.v1
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.scope._
import org.apache.spark.util.kvstore.KVStore
/**
* A Spark listener that writes application information to a data store. The types written to the
@ -42,7 +41,7 @@ import org.apache.spark.util.kvstore.KVStore
* unfinished tasks can be more accurately calculated (see SPARK-21922).
*/
private[spark] class AppStatusListener(
kvstore: KVStore,
kvstore: ElementTrackingStore,
conf: SparkConf,
live: Boolean,
lastUpdateTime: Option[Long] = None) extends SparkListener with Logging {
@ -51,6 +50,7 @@ private[spark] class AppStatusListener(
private var sparkVersion = SPARK_VERSION
private var appInfo: v1.ApplicationInfo = null
private var appSummary = new AppSummary(0, 0)
private var coresPerTask: Int = 1
// How often to update live entities. -1 means "never update" when replaying applications,
@ -58,6 +58,7 @@ private[spark] class AppStatusListener(
// operations that we can live without when rapidly processing incoming task events.
private val liveUpdatePeriodNs = if (live) conf.get(LIVE_ENTITY_UPDATE_PERIOD) else -1L
private val maxTasksPerStage = conf.get(MAX_RETAINED_TASKS_PER_STAGE)
private val maxGraphRootNodes = conf.get(MAX_RETAINED_ROOT_NODES)
// Keep track of live entities, so that task metrics can be efficiently updated (without
@ -68,10 +69,25 @@ private[spark] class AppStatusListener(
private val liveTasks = new HashMap[Long, LiveTask]()
private val liveRDDs = new HashMap[Int, LiveRDD]()
private val pools = new HashMap[String, SchedulerPool]()
// Keep the active executor count as a separate variable to avoid having to do synchronization
// around liveExecutors.
@volatile private var activeExecutorCount = 0
override def onOtherEvent(event: SparkListenerEvent): Unit = event match {
case SparkListenerLogStart(version) => sparkVersion = version
case _ =>
kvstore.addTrigger(classOf[ExecutorSummaryWrapper], conf.get(MAX_RETAINED_DEAD_EXECUTORS))
{ count => cleanupExecutors(count) }
kvstore.addTrigger(classOf[JobDataWrapper], conf.get(MAX_RETAINED_JOBS)) { count =>
cleanupJobs(count)
}
kvstore.addTrigger(classOf[StageDataWrapper], conf.get(MAX_RETAINED_STAGES)) { count =>
cleanupStages(count)
}
kvstore.onFlush {
if (!live) {
flush()
}
}
override def onApplicationStart(event: SparkListenerApplicationStart): Unit = {
@ -97,6 +113,7 @@ private[spark] class AppStatusListener(
Seq(attempt))
kvstore.write(new ApplicationInfoWrapper(appInfo))
kvstore.write(appSummary)
}
override def onEnvironmentUpdate(event: SparkListenerEnvironmentUpdate): Unit = {
@ -158,10 +175,11 @@ private[spark] class AppStatusListener(
override def onExecutorRemoved(event: SparkListenerExecutorRemoved): Unit = {
liveExecutors.remove(event.executorId).foreach { exec =>
val now = System.nanoTime()
activeExecutorCount = math.max(0, activeExecutorCount - 1)
exec.isActive = false
exec.removeTime = new Date(event.time)
exec.removeReason = event.reason
update(exec, now)
update(exec, now, last = true)
// Remove all RDD distributions that reference the removed executor, in case there wasn't
// a corresponding event.
@ -290,8 +308,11 @@ private[spark] class AppStatusListener(
}
job.completionTime = if (event.time > 0) Some(new Date(event.time)) else None
update(job, now)
update(job, now, last = true)
}
appSummary = new AppSummary(appSummary.numCompletedJobs + 1, appSummary.numCompletedStages)
kvstore.write(appSummary)
}
override def onStageSubmitted(event: SparkListenerStageSubmitted): Unit = {
@ -350,6 +371,13 @@ private[spark] class AppStatusListener(
job.activeTasks += 1
maybeUpdate(job, now)
}
if (stage.savedTasks.incrementAndGet() > maxTasksPerStage && !stage.cleaning) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
}
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
@ -449,6 +477,13 @@ private[spark] class AppStatusListener(
esummary.metrics.update(metricsDelta)
}
maybeUpdate(esummary, now)
if (!stage.cleaning && stage.savedTasks.get() > maxTasksPerStage) {
stage.cleaning = true
kvstore.doAsync {
cleanupTasks(stage)
}
}
}
liveExecutors.get(event.taskInfo.executorId).foreach { exec =>
@ -516,8 +551,11 @@ private[spark] class AppStatusListener(
}
stage.executorSummaries.values.foreach(update(_, now))
update(stage, now)
update(stage, now, last = true)
}
appSummary = new AppSummary(appSummary.numCompletedJobs, appSummary.numCompletedStages + 1)
kvstore.write(appSummary)
}
override def onBlockManagerAdded(event: SparkListenerBlockManagerAdded): Unit = {
@ -573,7 +611,7 @@ private[spark] class AppStatusListener(
}
/** Flush all live entities' data to the underlying store. */
def flush(): Unit = {
private def flush(): Unit = {
val now = System.nanoTime()
liveStages.values.asScala.foreach { stage =>
update(stage, now)
@ -708,7 +746,10 @@ private[spark] class AppStatusListener(
}
private def getOrCreateExecutor(executorId: String, addTime: Long): LiveExecutor = {
liveExecutors.getOrElseUpdate(executorId, new LiveExecutor(executorId, addTime))
liveExecutors.getOrElseUpdate(executorId, {
activeExecutorCount += 1
new LiveExecutor(executorId, addTime)
})
}
private def updateStreamBlock(event: SparkListenerBlockUpdated, stream: StreamBlockId): Unit = {
@ -754,8 +795,8 @@ private[spark] class AppStatusListener(
}
}
private def update(entity: LiveEntity, now: Long): Unit = {
entity.write(kvstore, now)
private def update(entity: LiveEntity, now: Long, last: Boolean = false): Unit = {
entity.write(kvstore, now, checkTriggers = last)
}
/** Update a live entity only if it hasn't been updated in the last configured period. */
@ -772,4 +813,127 @@ private[spark] class AppStatusListener(
}
}
private def cleanupExecutors(count: Long): Unit = {
// Because the limit is on the number of *dead* executors, we need to calculate whether
// there are actually enough dead executors to be deleted.
val threshold = conf.get(MAX_RETAINED_DEAD_EXECUTORS)
val dead = count - activeExecutorCount
if (dead > threshold) {
val countToDelete = calculateNumberToRemove(dead, threshold)
val toDelete = kvstore.view(classOf[ExecutorSummaryWrapper]).index("active")
.max(countToDelete).first(false).last(false).asScala.toSeq
toDelete.foreach { e => kvstore.delete(e.getClass(), e.info.id) }
}
}
private def cleanupJobs(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_JOBS))
if (countToDelete <= 0L) {
return
}
val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[JobDataWrapper]),
countToDelete.toInt) { j =>
j.info.status != JobExecutionStatus.RUNNING && j.info.status != JobExecutionStatus.UNKNOWN
}
toDelete.foreach { j => kvstore.delete(j.getClass(), j.info.jobId) }
}
private def cleanupStages(count: Long): Unit = {
val countToDelete = calculateNumberToRemove(count, conf.get(MAX_RETAINED_STAGES))
if (countToDelete <= 0L) {
return
}
val stages = KVUtils.viewToSeq(kvstore.view(classOf[StageDataWrapper]),
countToDelete.toInt) { s =>
s.info.status != v1.StageStatus.ACTIVE && s.info.status != v1.StageStatus.PENDING
}
stages.foreach { s =>
val key = s.id
kvstore.delete(s.getClass(), key)
val execSummaries = kvstore.view(classOf[ExecutorStageSummaryWrapper])
.index("stage")
.first(key)
.last(key)
.asScala
.toSeq
execSummaries.foreach { e =>
kvstore.delete(e.getClass(), e.id)
}
val tasks = kvstore.view(classOf[TaskDataWrapper])
.index("stage")
.first(key)
.last(key)
.asScala
tasks.foreach { t =>
kvstore.delete(t.getClass(), t.info.taskId)
}
// Check whether there are remaining attempts for the same stage. If there aren't, then
// also delete the RDD graph data.
val remainingAttempts = kvstore.view(classOf[StageDataWrapper])
.index("stageId")
.first(s.stageId)
.last(s.stageId)
.closeableIterator()
val hasMoreAttempts = try {
remainingAttempts.asScala.exists { other =>
other.info.attemptId != s.info.attemptId
}
} finally {
remainingAttempts.close()
}
if (!hasMoreAttempts) {
kvstore.delete(classOf[RDDOperationGraphWrapper], s.stageId)
}
}
}
private def cleanupTasks(stage: LiveStage): Unit = {
val countToDelete = calculateNumberToRemove(stage.savedTasks.get(), maxTasksPerStage).toInt
if (countToDelete > 0) {
val stageKey = Array(stage.info.stageId, stage.info.attemptId)
val view = kvstore.view(classOf[TaskDataWrapper]).index("stage").first(stageKey)
.last(stageKey)
// Try to delete finished tasks only.
val toDelete = KVUtils.viewToSeq(view, countToDelete) { t =>
!live || t.info.status != TaskState.RUNNING.toString()
}
toDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
stage.savedTasks.addAndGet(-toDelete.size)
// If there are more running tasks than the configured limit, delete running tasks. This
// should be extremely rare since the limit should generally far exceed the number of tasks
// that can run in parallel.
val remaining = countToDelete - toDelete.size
if (remaining > 0) {
val runningTasksToDelete = view.max(remaining).iterator().asScala.toList
runningTasksToDelete.foreach { t => kvstore.delete(t.getClass(), t.info.taskId) }
stage.savedTasks.addAndGet(-remaining)
}
}
stage.cleaning = false
}
/**
* Remove at least (retainedSize / 10) items to reduce friction. Because tracking may be done
* asynchronously, this method may return 0 in case enough items have been deleted already.
*/
private def calculateNumberToRemove(dataSize: Long, retainedSize: Long): Long = {
if (dataSize > retainedSize) {
math.max(retainedSize / 10L, dataSize - retainedSize)
} else {
0L
}
}
}

View file

@ -48,7 +48,7 @@ private[spark] trait AppStatusPlugin {
*/
def setupListeners(
conf: SparkConf,
store: KVStore,
store: ElementTrackingStore,
addListenerFn: SparkListener => Unit,
live: Boolean): Unit

View file

@ -330,6 +330,10 @@ private[spark] class AppStatusStore(
store.read(classOf[PoolData], name)
}
def appSummary(): AppSummary = {
store.read(classOf[AppSummary], classOf[AppSummary].getName())
}
def close(): Unit = {
store.close()
}
@ -347,7 +351,7 @@ private[spark] object AppStatusStore {
* @param addListenerFn Function to register a listener with a bus.
*/
def createLiveStore(conf: SparkConf, addListenerFn: SparkListener => Unit): AppStatusStore = {
val store = new InMemoryStore()
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val listener = new AppStatusListener(store, conf, true)
addListenerFn(listener)
AppStatusPlugin.loadPlugins().foreach { p =>

View file

@ -0,0 +1,160 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status
import java.util.concurrent.TimeUnit
import scala.collection.mutable.{HashMap, ListBuffer}
import com.google.common.util.concurrent.MoreExecutors
import org.apache.spark.SparkConf
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.kvstore._
/**
* A KVStore wrapper that allows tracking the number of elements of specific types, and triggering
* actions once they reach a threshold. This allows writers, for example, to control how much data
* is stored by potentially deleting old data as new data is added.
*
* This store is used when populating data either from a live UI or an event log. On top of firing
* triggers when elements reach a certain threshold, it provides two extra bits of functionality:
*
* - a generic worker thread that can be used to run expensive tasks asynchronously; the tasks can
* be configured to run on the calling thread when more determinism is desired (e.g. unit tests).
* - a generic flush mechanism so that listeners can be notified about when they should flush
* internal state to the store (e.g. after the SHS finishes parsing an event log).
*
* The configured triggers are run on a separate thread by default; they can be forced to run on
* the calling thread by setting the `ASYNC_TRACKING_ENABLED` configuration to `false`.
*/
private[spark] class ElementTrackingStore(store: KVStore, conf: SparkConf) extends KVStore {
import config._
private val triggers = new HashMap[Class[_], Seq[Trigger[_]]]()
private val flushTriggers = new ListBuffer[() => Unit]()
private val executor = if (conf.get(ASYNC_TRACKING_ENABLED)) {
ThreadUtils.newDaemonSingleThreadExecutor("element-tracking-store-worker")
} else {
MoreExecutors.sameThreadExecutor()
}
@volatile private var stopped = false
/**
* Register a trigger that will be fired once the number of elements of a given type reaches
* the given threshold.
*
* @param klass The type to monitor.
* @param threshold The number of elements that should trigger the action.
* @param action Action to run when the threshold is reached; takes as a parameter the number
* of elements of the registered type currently known to be in the store.
*/
def addTrigger(klass: Class[_], threshold: Long)(action: Long => Unit): Unit = {
val existing = triggers.getOrElse(klass, Seq())
triggers(klass) = existing :+ Trigger(threshold, action)
}
/**
* Adds a trigger to be executed before the store is flushed. This normally happens before
* closing, and is useful for flushing intermediate state to the store, e.g. when replaying
* in-progress applications through the SHS.
*
* Flush triggers are called synchronously in the same thread that is closing the store.
*/
def onFlush(action: => Unit): Unit = {
flushTriggers += { () => action }
}
/**
* Enqueues an action to be executed asynchronously. The task will run on the calling thread if
* `ASYNC_TRACKING_ENABLED` is `false`.
*/
def doAsync(fn: => Unit): Unit = {
executor.submit(new Runnable() {
override def run(): Unit = Utils.tryLog { fn }
})
}
override def read[T](klass: Class[T], naturalKey: Any): T = store.read(klass, naturalKey)
override def write(value: Any): Unit = store.write(value)
/** Write an element to the store, optionally checking for whether to fire triggers. */
def write(value: Any, checkTriggers: Boolean): Unit = {
write(value)
if (checkTriggers && !stopped) {
triggers.get(value.getClass()).foreach { list =>
doAsync {
val count = store.count(value.getClass())
list.foreach { t =>
if (count > t.threshold) {
t.action(count)
}
}
}
}
}
}
override def delete(klass: Class[_], naturalKey: Any): Unit = store.delete(klass, naturalKey)
override def getMetadata[T](klass: Class[T]): T = store.getMetadata(klass)
override def setMetadata(value: Any): Unit = store.setMetadata(value)
override def view[T](klass: Class[T]): KVStoreView[T] = store.view(klass)
override def count(klass: Class[_]): Long = store.count(klass)
override def count(klass: Class[_], index: String, indexedValue: Any): Long = {
store.count(klass, index, indexedValue)
}
override def close(): Unit = {
close(true)
}
/** A close() method that optionally leaves the parent store open. */
def close(closeParent: Boolean): Unit = synchronized {
if (stopped) {
return
}
stopped = true
executor.shutdown()
if (!executor.awaitTermination(5, TimeUnit.SECONDS)) {
executor.shutdownNow()
}
flushTriggers.foreach { trigger =>
Utils.tryLog(trigger())
}
if (closeParent) {
store.close()
}
}
private case class Trigger[T](
threshold: Long,
action: Long => Unit)
}

View file

@ -20,6 +20,7 @@ package org.apache.spark.status
import java.io.File
import scala.annotation.meta.getter
import scala.collection.JavaConverters._
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
@ -68,6 +69,19 @@ private[spark] object KVUtils extends Logging {
db
}
/** Turns a KVStoreView into a Scala sequence, applying a filter. */
def viewToSeq[T](
view: KVStoreView[T],
max: Int)
(filter: T => Boolean): Seq[T] = {
val iter = view.closeableIterator()
try {
iter.asScala.filter(filter).take(max).toList
} finally {
iter.close()
}
}
private[spark] class MetadataMismatchException extends Exception
}

View file

@ -18,6 +18,7 @@
package org.apache.spark.status
import java.util.Date
import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.HashMap
@ -38,10 +39,12 @@ import org.apache.spark.util.kvstore.KVStore
*/
private[spark] abstract class LiveEntity {
var lastWriteTime = 0L
var lastWriteTime = -1L
def write(store: KVStore, now: Long): Unit = {
store.write(doUpdate())
def write(store: ElementTrackingStore, now: Long, checkTriggers: Boolean = false): Unit = {
// Always check triggers on the first write, since adding an element to the store may
// cause the maximum count for the element type to be exceeded.
store.write(doUpdate(), checkTriggers || lastWriteTime == -1L)
lastWriteTime = now
}
@ -403,6 +406,10 @@ private class LiveStage extends LiveEntity {
val executorSummaries = new HashMap[String, LiveExecutorStageSummary]()
// Used for cleanup of tasks after they reach the configured limit. Not written to the store.
@volatile var cleaning = false
var savedTasks = new AtomicInteger(0)
def executorSummary(executorId: String): LiveExecutorStageSummary = {
executorSummaries.getOrElseUpdate(executorId,
new LiveExecutorStageSummary(info.stageId, info.attemptId, executorId))

View file

@ -59,7 +59,15 @@ private[v1] class StagesResource extends BaseAppResource {
ui.store.stageAttempt(stageId, stageAttemptId, details = details)
} catch {
case _: NoSuchElementException =>
throw new NotFoundException(s"unknown attempt $stageAttemptId for stage $stageId.")
// Change the message depending on whether there are any attempts for the requested stage.
val all = ui.store.stageData(stageId)
val msg = if (all.nonEmpty) {
val ids = all.map(_.attemptId)
s"unknown attempt for stage $stageId. Found attempts: [${ids.mkString(",")}]"
} else {
s"unknown stage: $stageId"
}
throw new NotFoundException(msg)
}
}

View file

@ -23,10 +23,30 @@ import org.apache.spark.internal.config._
private[spark] object config {
val ASYNC_TRACKING_ENABLED = ConfigBuilder("spark.appStateStore.asyncTracking.enable")
.booleanConf
.createWithDefault(true)
val LIVE_ENTITY_UPDATE_PERIOD = ConfigBuilder("spark.ui.liveUpdate.period")
.timeConf(TimeUnit.NANOSECONDS)
.createWithDefaultString("100ms")
val MAX_RETAINED_JOBS = ConfigBuilder("spark.ui.retainedJobs")
.intConf
.createWithDefault(1000)
val MAX_RETAINED_STAGES = ConfigBuilder("spark.ui.retainedStages")
.intConf
.createWithDefault(1000)
val MAX_RETAINED_TASKS_PER_STAGE = ConfigBuilder("spark.ui.retainedTasks")
.intConf
.createWithDefault(100000)
val MAX_RETAINED_DEAD_EXECUTORS = ConfigBuilder("spark.ui.retainedDeadExecutors")
.intConf
.createWithDefault(100)
val MAX_RETAINED_ROOT_NODES = ConfigBuilder("spark.ui.dagGraph.retainedRootRDDs")
.intConf
.createWithDefault(Int.MaxValue)

View file

@ -112,6 +112,9 @@ private[spark] class TaskDataWrapper(
Array(stageId: JInteger, stageAttemptId: JInteger, info.launchTime.getTime(): JLong)
}
@JsonIgnore @KVIndex("active")
def active: Boolean = info.duration.isEmpty
}
private[spark] class RDDStorageInfoWrapper(val info: RDDStorageInfo) {
@ -187,3 +190,16 @@ private[spark] class RDDOperationGraphWrapper(
private[spark] class PoolData(
@KVIndexParam val name: String,
val stageIds: Set[Int])
/**
* A class with information about an app, to be used by the UI. There's only one instance of
* this summary per application, so its ID in the store is the class name.
*/
private[spark] class AppSummary(
val numCompletedJobs: Int,
val numCompletedStages: Int) {
@KVIndex
def id: String = classOf[AppSummary].getName()
}

View file

@ -154,8 +154,6 @@ private[spark] object SparkUI {
val DEFAULT_PORT = 4040
val STATIC_RESOURCE_DIR = "org/apache/spark/ui/static"
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
val DEFAULT_RETAINED_JOBS = 1000
def getUIPort(conf: SparkConf): Int = {
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)

View file

@ -300,7 +300,13 @@ private[ui] class AllJobsPage(parent: JobsTab, store: AppStatusStore) extends We
val shouldShowCompletedJobs = completedJobs.nonEmpty
val shouldShowFailedJobs = failedJobs.nonEmpty
val completedJobNumStr = s"${completedJobs.size}"
val appSummary = store.appSummary()
val completedJobNumStr = if (completedJobs.size == appSummary.numCompletedJobs) {
s"${completedJobs.size}"
} else {
s"${appSummary.numCompletedJobs}, only showing ${completedJobs.size}"
}
val schedulingMode = store.environmentInfo().sparkProperties.toMap
.get("spark.scheduler.mode")
.map { mode => SchedulingMode.withName(mode).toString }

View file

@ -39,7 +39,6 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
val completedStages = allStages.filter(_.status == StageStatus.COMPLETE)
val failedStages = allStages.filter(_.status == StageStatus.FAILED).reverse
val numCompletedStages = completedStages.size
val numFailedStages = failedStages.size
val subPath = "stages"
@ -69,10 +68,11 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
val shouldShowCompletedStages = completedStages.nonEmpty
val shouldShowFailedStages = failedStages.nonEmpty
val completedStageNumStr = if (numCompletedStages == completedStages.size) {
s"$numCompletedStages"
val appSummary = parent.store.appSummary()
val completedStageNumStr = if (appSummary.numCompletedStages == completedStages.size) {
s"${appSummary.numCompletedStages}"
} else {
s"$numCompletedStages, only showing ${completedStages.size}"
s"${appSummary.numCompletedStages}, only showing ${completedStages.size}"
}
val summary: NodeSeq =

View file

@ -264,7 +264,7 @@ class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with Matchers
val badStageAttemptId = getContentAndCode("applications/local-1422981780767/stages/1/1")
badStageAttemptId._1 should be (HttpServletResponse.SC_NOT_FOUND)
badStageAttemptId._3 should be (Some("unknown attempt 1 for stage 1."))
badStageAttemptId._3 should be (Some("unknown attempt for stage 1. Found attempts: [0]"))
val badStageId2 = getContentAndCode("applications/local-1422981780767/stages/flimflam")
badStageId2._1 should be (HttpServletResponse.SC_NOT_FOUND)

View file

@ -39,16 +39,20 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
import config._
private val conf = new SparkConf().set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
private val conf = new SparkConf()
.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
.set(ASYNC_TRACKING_ENABLED, false)
private var time: Long = _
private var testDir: File = _
private var store: KVStore = _
private var store: ElementTrackingStore = _
private var taskIdTracker = -1L
before {
time = 0L
testDir = Utils.createTempDir()
store = KVUtils.open(testDir, getClass().getName())
store = new ElementTrackingStore(KVUtils.open(testDir, getClass().getName()), conf)
taskIdTracker = -1L
}
after {
@ -185,22 +189,8 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Start tasks from stage 1
time += 1
var _taskIdTracker = -1L
def nextTaskId(): Long = {
_taskIdTracker += 1
_taskIdTracker
}
def createTasks(count: Int, time: Long): Seq[TaskInfo] = {
(1 to count).map { id =>
val exec = execIds(id.toInt % execIds.length)
val taskId = nextTaskId()
new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com",
TaskLocality.PROCESS_LOCAL, id % 2 == 0)
}
}
val s1Tasks = createTasks(4, time)
val s1Tasks = createTasks(4, execIds)
s1Tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stages.head.stageId, stages.head.attemptId, task))
}
@ -419,7 +409,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
// Start and fail all tasks of stage 2.
time += 1
val s2Tasks = createTasks(4, time)
val s2Tasks = createTasks(4, execIds)
s2Tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(stages.last.stageId, stages.last.attemptId, task))
}
@ -470,7 +460,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
listener.onStageSubmitted(SparkListenerStageSubmitted(newS2, jobProps))
assert(store.count(classOf[StageDataWrapper]) === 3)
val newS2Tasks = createTasks(4, time)
val newS2Tasks = createTasks(4, execIds)
newS2Tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(newS2.stageId, newS2.attemptId, task))
@ -526,7 +516,7 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
assert(store.count(classOf[StageDataWrapper]) === 5)
time += 1
val j2s2Tasks = createTasks(4, time)
val j2s2Tasks = createTasks(4, execIds)
j2s2Tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(j2Stages.last.stageId, j2Stages.last.attemptId,
@ -587,8 +577,9 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
// Stop executors.
listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "1", "Test"))
listener.onExecutorRemoved(SparkListenerExecutorRemoved(41L, "2", "Test"))
time += 1
listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "1", "Test"))
listener.onExecutorRemoved(SparkListenerExecutorRemoved(time, "2", "Test"))
Seq("1", "2").foreach { id =>
check[ExecutorSummaryWrapper](id) { exec =>
@ -851,6 +842,103 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
}
}
test("eviction of old data") {
val testConf = conf.clone()
.set(MAX_RETAINED_JOBS, 2)
.set(MAX_RETAINED_STAGES, 2)
.set(MAX_RETAINED_TASKS_PER_STAGE, 2)
.set(MAX_RETAINED_DEAD_EXECUTORS, 1)
val listener = new AppStatusListener(store, testConf, true)
// Start 3 jobs, all should be kept. Stop one, it should be evicted.
time += 1
listener.onJobStart(SparkListenerJobStart(1, time, Nil, null))
listener.onJobStart(SparkListenerJobStart(2, time, Nil, null))
listener.onJobStart(SparkListenerJobStart(3, time, Nil, null))
assert(store.count(classOf[JobDataWrapper]) === 3)
time += 1
listener.onJobEnd(SparkListenerJobEnd(2, time, JobSucceeded))
assert(store.count(classOf[JobDataWrapper]) === 2)
intercept[NoSuchElementException] {
store.read(classOf[JobDataWrapper], 2)
}
// Start 3 stages, all should be kept. Stop 2 of them, the stopped one with the lowest id should
// be deleted. Start a new attempt of the second stopped one, and verify that the stage graph
// data is not deleted.
time += 1
val stages = Seq(
new StageInfo(1, 0, "stage1", 4, Nil, Nil, "details1"),
new StageInfo(2, 0, "stage2", 4, Nil, Nil, "details2"),
new StageInfo(3, 0, "stage3", 4, Nil, Nil, "details3"))
// Graph data is generated by the job start event, so fire it.
listener.onJobStart(SparkListenerJobStart(4, time, stages, null))
stages.foreach { s =>
time += 1
s.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(s, new Properties()))
}
assert(store.count(classOf[StageDataWrapper]) === 3)
assert(store.count(classOf[RDDOperationGraphWrapper]) === 3)
stages.drop(1).foreach { s =>
time += 1
s.completionTime = Some(time)
listener.onStageCompleted(SparkListenerStageCompleted(s))
}
assert(store.count(classOf[StageDataWrapper]) === 2)
assert(store.count(classOf[RDDOperationGraphWrapper]) === 2)
intercept[NoSuchElementException] {
store.read(classOf[StageDataWrapper], Array(2, 0))
}
val attempt2 = new StageInfo(3, 1, "stage3", 4, Nil, Nil, "details3")
time += 1
attempt2.submissionTime = Some(time)
listener.onStageSubmitted(SparkListenerStageSubmitted(attempt2, new Properties()))
assert(store.count(classOf[StageDataWrapper]) === 2)
assert(store.count(classOf[RDDOperationGraphWrapper]) === 2)
intercept[NoSuchElementException] {
store.read(classOf[StageDataWrapper], Array(2, 0))
}
intercept[NoSuchElementException] {
store.read(classOf[StageDataWrapper], Array(3, 0))
}
store.read(classOf[StageDataWrapper], Array(3, 1))
// Start 2 tasks. Finish the second one.
time += 1
val tasks = createTasks(2, Array("1"))
tasks.foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
}
assert(store.count(classOf[TaskDataWrapper]) === 2)
// Start a 3rd task. The finished tasks should be deleted.
createTasks(1, Array("1")).foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
}
assert(store.count(classOf[TaskDataWrapper]) === 2)
intercept[NoSuchElementException] {
store.read(classOf[TaskDataWrapper], tasks.last.id)
}
// Start a 4th task. The first task should be deleted, even if it's still running.
createTasks(1, Array("1")).foreach { task =>
listener.onTaskStart(SparkListenerTaskStart(attempt2.stageId, attempt2.attemptId, task))
}
assert(store.count(classOf[TaskDataWrapper]) === 2)
intercept[NoSuchElementException] {
store.read(classOf[TaskDataWrapper], tasks.head.id)
}
}
private def key(stage: StageInfo): Array[Int] = Array(stage.stageId, stage.attemptId)
private def check[T: ClassTag](key: Any)(fn: T => Unit): Unit = {
@ -864,6 +952,20 @@ class AppStatusListenerSuite extends SparkFunSuite with BeforeAndAfter {
s"${orig.executorId}.example.com", TaskLocality.PROCESS_LOCAL, orig.speculative)
}
private def createTasks(count: Int, execs: Array[String]): Seq[TaskInfo] = {
(1 to count).map { id =>
val exec = execs(id.toInt % execs.length)
val taskId = nextTaskId()
new TaskInfo(taskId, taskId.toInt, 1, time, exec, s"$exec.example.com",
TaskLocality.PROCESS_LOCAL, id % 2 == 0)
}
}
private def nextTaskId(): Long = {
taskIdTracker += 1
taskIdTracker
}
private case class RddBlock(
rddId: Int,
partId: Int,

View file

@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.status
import org.mockito.Mockito._
import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.util.kvstore._
class ElementTrackingStoreSuite extends SparkFunSuite {
import config._
test("tracking for multiple types") {
val store = mock(classOf[KVStore])
val tracking = new ElementTrackingStore(store, new SparkConf()
.set(ASYNC_TRACKING_ENABLED, false))
var type1 = 0L
var type2 = 0L
var flushed = false
tracking.addTrigger(classOf[Type1], 100) { count =>
type1 = count
}
tracking.addTrigger(classOf[Type2], 1000) { count =>
type2 = count
}
tracking.onFlush {
flushed = true
}
when(store.count(classOf[Type1])).thenReturn(1L)
tracking.write(new Type1, true)
assert(type1 === 0L)
assert(type2 === 0L)
when(store.count(classOf[Type1])).thenReturn(100L)
tracking.write(new Type1, true)
assert(type1 === 0L)
assert(type2 === 0L)
when(store.count(classOf[Type1])).thenReturn(101L)
tracking.write(new Type1, true)
assert(type1 === 101L)
assert(type2 === 0L)
when(store.count(classOf[Type1])).thenReturn(200L)
tracking.write(new Type1, true)
assert(type1 === 200L)
assert(type2 === 0L)
when(store.count(classOf[Type2])).thenReturn(500L)
tracking.write(new Type2, true)
assert(type1 === 200L)
assert(type2 === 0L)
when(store.count(classOf[Type2])).thenReturn(1000L)
tracking.write(new Type2, true)
assert(type1 === 200L)
assert(type2 === 0L)
when(store.count(classOf[Type2])).thenReturn(2000L)
tracking.write(new Type2, true)
assert(type1 === 200L)
assert(type2 === 2000L)
tracking.close(false)
assert(flushed)
verify(store, never()).close()
}
private class Type1
private class Type2
}

View file

@ -42,6 +42,7 @@ import org.apache.spark.deploy.history.HistoryServerSuite
import org.apache.spark.internal.config.MEMORY_OFFHEAP_SIZE
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.status.api.v1.{JacksonMessageWriter, RDDDataDistribution, StageStatus}
import org.apache.spark.status.config._
private[spark] class SparkUICssErrorHandler extends DefaultCssErrorHandler {
@ -525,14 +526,15 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
}
}
ignore("stage & job retention") {
test("stage & job retention") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set("spark.ui.enabled", "true")
.set("spark.ui.port", "0")
.set("spark.ui.retainedStages", "3")
.set("spark.ui.retainedJobs", "2")
.set(MAX_RETAINED_STAGES, 3)
.set(MAX_RETAINED_JOBS, 2)
.set(ASYNC_TRACKING_ENABLED, false)
val sc = new SparkContext(conf)
assert(sc.ui.isDefined)

View file

@ -95,4 +95,11 @@ object StaticSQLConf {
.stringConf
.toSequence
.createOptional
val UI_RETAINED_EXECUTIONS =
buildStaticConf("spark.sql.ui.retainedExecutions")
.doc("Number of executions to retain in the Spark UI.")
.intConf
.createWithDefault(1000)
}

View file

@ -27,14 +27,15 @@ import org.apache.spark.internal.Logging
import org.apache.spark.scheduler._
import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.metric._
import org.apache.spark.status.LiveEntity
import org.apache.spark.sql.internal.StaticSQLConf._
import org.apache.spark.status.{ElementTrackingStore, KVUtils, LiveEntity}
import org.apache.spark.status.config._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.kvstore.KVStore
private[sql] class SQLAppStatusListener(
conf: SparkConf,
kvstore: KVStore,
kvstore: ElementTrackingStore,
live: Boolean,
ui: Option[SparkUI] = None)
extends SparkListener with Logging {
@ -51,6 +52,23 @@ private[sql] class SQLAppStatusListener(
private var uiInitialized = false
kvstore.addTrigger(classOf[SQLExecutionUIData], conf.get(UI_RETAINED_EXECUTIONS)) { count =>
cleanupExecutions(count)
}
kvstore.onFlush {
if (!live) {
val now = System.nanoTime()
liveExecutions.values.asScala.foreach { exec =>
// This saves the partial aggregated metrics to the store; this works currently because
// when the SHS sees an updated event log, all old data for the application is thrown
// away.
exec.metricsValues = aggregateMetrics(exec)
exec.write(kvstore, now)
}
}
}
override def onJobStart(event: SparkListenerJobStart): Unit = {
val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)
if (executionIdString == null) {
@ -317,6 +335,17 @@ private[sql] class SQLAppStatusListener(
}
}
private def cleanupExecutions(count: Long): Unit = {
val countToDelete = count - conf.get(UI_RETAINED_EXECUTIONS)
if (countToDelete <= 0) {
return
}
val toDelete = KVUtils.viewToSeq(kvstore.view(classOf[SQLExecutionUIData]),
countToDelete.toInt) { e => e.completionTime.isDefined }
toDelete.foreach { e => kvstore.delete(e.getClass(), e.executionId) }
}
}
private class LiveExecutionData(val executionId: Long) extends LiveEntity {

View file

@ -27,7 +27,7 @@ import com.fasterxml.jackson.databind.annotation.JsonDeserialize
import org.apache.spark.{JobExecutionStatus, SparkConf}
import org.apache.spark.scheduler.SparkListener
import org.apache.spark.status.AppStatusPlugin
import org.apache.spark.status.{AppStatusPlugin, ElementTrackingStore}
import org.apache.spark.status.KVUtils.KVIndexParam
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
@ -84,7 +84,7 @@ private[sql] class SQLAppStatusPlugin extends AppStatusPlugin {
override def setupListeners(
conf: SparkConf,
store: KVStore,
store: ElementTrackingStore,
addListenerFn: SparkListener => Unit,
live: Boolean): Unit = {
// For live applications, the listener is installed in [[setupUI]]. This also avoids adding
@ -100,7 +100,8 @@ private[sql] class SQLAppStatusPlugin extends AppStatusPlugin {
case Some(sc) =>
// If this is a live application, then install a listener that will enable the SQL
// tab as soon as there's a SQL event posted to the bus.
val listener = new SQLAppStatusListener(sc.conf, ui.store.store, true, Some(ui))
val listener = new SQLAppStatusListener(sc.conf,
ui.store.store.asInstanceOf[ElementTrackingStore], true, Some(ui))
sc.listenerBus.addToStatusQueue(listener)
case _ =>

View file

@ -36,6 +36,7 @@ import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.execution.{LeafExecNode, QueryExecution, SparkPlanInfo, SQLExecution}
import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.status.ElementTrackingStore
import org.apache.spark.status.config._
import org.apache.spark.util.{AccumulatorMetadata, JsonProtocol, LongAccumulator}
import org.apache.spark.util.kvstore.InMemoryStore
@ -43,7 +44,9 @@ import org.apache.spark.util.kvstore.InMemoryStore
class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTestUtils {
import testImplicits._
override protected def sparkConf = super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L)
override protected def sparkConf = {
super.sparkConf.set(LIVE_ENTITY_UPDATE_PERIOD, 0L).set(ASYNC_TRACKING_ENABLED, false)
}
private def createTestDataFrame: DataFrame = {
Seq(
@ -107,10 +110,12 @@ class SQLListenerSuite extends SparkFunSuite with SharedSQLContext with JsonTest
private def sqlStoreTest(name: String)
(fn: (SQLAppStatusStore, SparkListenerBus) => Unit): Unit = {
test(name) {
val store = new InMemoryStore()
val conf = sparkConf
val store = new ElementTrackingStore(new InMemoryStore(), conf)
val bus = new ReplayListenerBus()
val listener = new SQLAppStatusListener(sparkConf, store, true)
val listener = new SQLAppStatusListener(conf, store, true)
bus.addListener(listener)
store.close(false)
val sqlStore = new SQLAppStatusStore(store, Some(listener))
fn(sqlStore, bus)
}
@ -491,15 +496,15 @@ private case class MyPlan(sc: SparkContext, expectedValue: Long) extends LeafExe
class SQLListenerMemoryLeakSuite extends SparkFunSuite {
// TODO: this feature is not yet available in SQLAppStatusStore.
ignore("no memory leak") {
quietly {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
withSpark(new SparkContext(conf)) { sc =>
test("no memory leak") {
val conf = new SparkConf()
.setMaster("local")
.setAppName("test")
.set(config.MAX_TASK_FAILURES, 1) // Don't retry the tasks to run this test quickly
.set("spark.sql.ui.retainedExecutions", "50") // Set it to 50 to run this test quickly
.set(ASYNC_TRACKING_ENABLED, false)
withSpark(new SparkContext(conf)) { sc =>
quietly {
val spark = new SparkSession(sc)
import spark.implicits._
// Run 100 successful executions and 100 failed executions.