[SPARK-2321] Stable pull-based progress / status API

This pull request is a first step towards the implementation of a stable, pull-based progress / status API for Spark (see [SPARK-2321](https://issues.apache.org/jira/browse/SPARK-2321)).  For now, I'd like to discuss the basic implementation, API names, and overall interface design.  Once we arrive at a good design, I'll go back and add additional methods to expose more information via these API.

#### Design goals:

- Pull-based API
- Usable from Java / Scala / Python (eventually, likely with a wrapper)
- Can be extended to expose more information without introducing binary incompatibilities.
- Returns immutable objects.
- Don't leak any implementation details, preserving our freedom to change the implementation.

#### Implementation:

- Add public methods (`getJobInfo`, `getStageInfo`) to SparkContext to allow status / progress information to be retrieved.
- Add public interfaces (`SparkJobInfo`, `SparkStageInfo`) for our API return values.  These interfaces consist entirely of Java-style getter methods.  The interfaces are currently implemented in Java.  I decided to explicitly separate the interface from its implementation (`SparkJobInfoImpl`, `SparkStageInfoImpl`) in order to prevent users from constructing these responses themselves.
-Allow an existing JobProgressListener to be used when constructing a live SparkUI.  This allows us to re-use this listeners in the implementation of this status API.  There are a few reasons why this listener re-use makes sense:
   - The status API and web UI are guaranteed to show consistent information.
   - These listeners are already well-tested.
   - The same garbage-collection / information retention configurations can apply to both this API and the web UI.
- Extend JobProgressListener to maintain `jobId -> Job` and `stageId -> Stage` mappings.

The progress API methods are implemented in a separate trait that's mixed into SparkContext.  This helps to avoid SparkContext.scala from becoming larger and more difficult to read.

Author: Josh Rosen <joshrosen@databricks.com>
Author: Josh Rosen <joshrosen@apache.org>

Closes #2696 from JoshRosen/progress-reporting-api and squashes the following commits:

e6aa78d [Josh Rosen] Add tests.
b585c16 [Josh Rosen] Accept SparkListenerBus instead of more specific subclasses.
c96402d [Josh Rosen] Address review comments.
2707f98 [Josh Rosen] Expose current stage attempt id
c28ba76 [Josh Rosen] Update demo code:
646ff1d [Josh Rosen] Document spark.ui.retainedJobs.
7f47d6d [Josh Rosen] Clean up SparkUI constructors, per Andrew's feedback.
b77b3d8 [Josh Rosen] Merge remote-tracking branch 'origin/master' into progress-reporting-api
787444c [Josh Rosen] Move status API methods into trait that can be mixed into SparkContext.
f9a9a00 [Josh Rosen] More review comments:
3dc79af [Josh Rosen] Remove creation of unused listeners in SparkContext.
249ca16 [Josh Rosen] Address several review comments:
da5648e [Josh Rosen] Add example of basic progress reporting in Java.
7319ffd [Josh Rosen] Add getJobIdsForGroup() and num*Tasks() methods.
cc568e5 [Josh Rosen] Add note explaining that interfaces should not be implemented outside of Spark.
6e840d4 [Josh Rosen] Remove getter-style names and "consistent snapshot" semantics:
08cbec9 [Josh Rosen] Begin to sketch the interfaces for a stable, public status API.
ac2d13a [Josh Rosen] Add jobId->stage, stageId->stage mappings in JobProgressListener
24de263 [Josh Rosen] Create UI listeners in SparkContext instead of in Tabs:
This commit is contained in:
Josh Rosen 2014-10-25 00:06:57 -07:00
parent 3a845d3c04
commit 9530316887
21 changed files with 588 additions and 134 deletions

View file

@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark;
public enum JobExecutionStatus {
RUNNING,
SUCCEEDED,
FAILED,
UNKNOWN
}

View file

@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark;
/**
* Exposes information about Spark Jobs.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkJobInfo {
int jobId();
int[] stageIds();
JobExecutionStatus status();
}

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark;
/**
* Exposes information about Spark Stages.
*
* This interface is not designed to be implemented outside of Spark. We may add additional methods
* which may break binary compatibility with outside implementations.
*/
public interface SparkStageInfo {
int stageId();
int currentAttemptId();
String name();
int numTasks();
int numActiveTasks();
int numCompletedTasks();
int numFailedTasks();
}

View file

@ -26,7 +26,6 @@ import java.util.concurrent.atomic.AtomicInteger
import java.util.{Properties, UUID}
import java.util.UUID.randomUUID
import scala.collection.{Map, Set}
import scala.collection.JavaConversions._
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.reflect.{ClassTag, classTag}
@ -51,6 +50,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage._
import org.apache.spark.ui.SparkUI
import org.apache.spark.ui.jobs.JobProgressListener
import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedWeakValueHashMap, Utils}
/**
@ -61,7 +61,7 @@ import org.apache.spark.util.{CallSite, ClosureCleaner, MetadataCleaner, Metadat
* this config overrides the default configs as well as system properties.
*/
class SparkContext(config: SparkConf) extends Logging {
class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging {
// This is used only by YARN for now, but should be relevant to other cluster types (Mesos,
// etc) too. This is typically generated from InputFormatInfo.computePreferredLocations. It
@ -224,10 +224,15 @@ class SparkContext(config: SparkConf) extends Logging {
private[spark] val metadataCleaner =
new MetadataCleaner(MetadataCleanerType.SPARK_CONTEXT, this.cleanup, conf)
// Initialize the Spark UI, registering all associated listeners
private[spark] val jobProgressListener = new JobProgressListener(conf)
listenerBus.addListener(jobProgressListener)
// Initialize the Spark UI
private[spark] val ui: Option[SparkUI] =
if (conf.getBoolean("spark.ui.enabled", true)) {
Some(new SparkUI(this))
Some(SparkUI.createLiveUI(this, conf, listenerBus, jobProgressListener,
env.securityManager,appName))
} else {
// For tests, do not enable the UI
None
@ -854,69 +859,6 @@ class SparkContext(config: SparkConf) extends Logging {
/** The version of Spark on which this application is running. */
def version = SPARK_VERSION
/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.host + ":" + blockManagerId.port, mem)
}
}
/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
rddInfos.filter(_.isCached)
}
/**
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
* Note that this does not necessarily mean the caching or computation was successful.
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
/**
* :: DeveloperApi ::
* Return information about blocks stored in all of the slaves
*/
@DeveloperApi
def getExecutorStorageStatus: Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}
/**
* :: DeveloperApi ::
* Return pools for fair scheduler
*/
@DeveloperApi
def getAllPools: Seq[Schedulable] = {
// TODO(xiajunluan): We should take nested pools into account
taskScheduler.rootPool.schedulableQueue.toSeq
}
/**
* :: DeveloperApi ::
* Return the pool associated with the given name, if one exists
*/
@DeveloperApi
def getPoolForName(pool: String): Option[Schedulable] = {
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
}
/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}
/**
* Clear the job's list of files added by `addFile` so that they do not get downloaded to
* any new nodes.

View file

@ -0,0 +1,142 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import scala.collection.Map
import scala.collection.JavaConversions._
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.rdd.RDD
import org.apache.spark.scheduler.{SchedulingMode, Schedulable}
import org.apache.spark.storage.{StorageStatus, StorageUtils, RDDInfo}
/**
* Trait that implements Spark's status APIs. This trait is designed to be mixed into
* SparkContext; it allows the status API code to live in its own file.
*/
private[spark] trait SparkStatusAPI { this: SparkContext =>
/**
* Return a map from the slave to the max memory available for caching and the remaining
* memory available for caching.
*/
def getExecutorMemoryStatus: Map[String, (Long, Long)] = {
env.blockManager.master.getMemoryStatus.map { case(blockManagerId, mem) =>
(blockManagerId.host + ":" + blockManagerId.port, mem)
}
}
/**
* :: DeveloperApi ::
* Return information about what RDDs are cached, if they are in mem or on disk, how much space
* they take, etc.
*/
@DeveloperApi
def getRDDStorageInfo: Array[RDDInfo] = {
val rddInfos = persistentRdds.values.map(RDDInfo.fromRdd).toArray
StorageUtils.updateRddInfo(rddInfos, getExecutorStorageStatus)
rddInfos.filter(_.isCached)
}
/**
* Returns an immutable map of RDDs that have marked themselves as persistent via cache() call.
* Note that this does not necessarily mean the caching or computation was successful.
*/
def getPersistentRDDs: Map[Int, RDD[_]] = persistentRdds.toMap
/**
* :: DeveloperApi ::
* Return information about blocks stored in all of the slaves
*/
@DeveloperApi
def getExecutorStorageStatus: Array[StorageStatus] = {
env.blockManager.master.getStorageStatus
}
/**
* :: DeveloperApi ::
* Return pools for fair scheduler
*/
@DeveloperApi
def getAllPools: Seq[Schedulable] = {
// TODO(xiajunluan): We should take nested pools into account
taskScheduler.rootPool.schedulableQueue.toSeq
}
/**
* :: DeveloperApi ::
* Return the pool associated with the given name, if one exists
*/
@DeveloperApi
def getPoolForName(pool: String): Option[Schedulable] = {
Option(taskScheduler.rootPool.schedulableNameToSchedulable.get(pool))
}
/**
* Return current scheduling mode
*/
def getSchedulingMode: SchedulingMode.SchedulingMode = {
taskScheduler.schedulingMode
}
/**
* Return a list of all known jobs in a particular job group. The returned list may contain
* running, failed, and completed jobs, and may vary across invocations of this method. This
* method does not guarantee the order of the elements in its result.
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = {
jobProgressListener.synchronized {
val jobData = jobProgressListener.jobIdToData.valuesIterator
jobData.filter(_.jobGroup.exists(_ == jobGroup)).map(_.jobId).toArray
}
}
/**
* Returns job information, or `None` if the job info could not be found or was garbage collected.
*/
def getJobInfo(jobId: Int): Option[SparkJobInfo] = {
jobProgressListener.synchronized {
jobProgressListener.jobIdToData.get(jobId).map { data =>
new SparkJobInfoImpl(jobId, data.stageIds.toArray, data.status)
}
}
}
/**
* Returns stage information, or `None` if the stage info could not be found or was
* garbage collected.
*/
def getStageInfo(stageId: Int): Option[SparkStageInfo] = {
jobProgressListener.synchronized {
for (
info <- jobProgressListener.stageIdToInfo.get(stageId);
data <- jobProgressListener.stageIdToData.get((stageId, info.attemptId))
) yield {
new SparkStageInfoImpl(
stageId,
info.attemptId,
info.name,
info.numTasks,
data.numActiveTasks,
data.numCompleteTasks,
data.numFailedTasks)
}
}
}
}

View file

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
private class SparkJobInfoImpl (
val jobId: Int,
val stageIds: Array[Int],
val status: JobExecutionStatus)
extends SparkJobInfo
private class SparkStageInfoImpl(
val stageId: Int,
val currentAttemptId: Int,
val name: String,
val numTasks: Int,
val numActiveTasks: Int,
val numCompletedTasks: Int,
val numFailedTasks: Int)
extends SparkStageInfo

View file

@ -132,6 +132,25 @@ class JavaSparkContext(val sc: SparkContext)
/** Default min number of partitions for Hadoop RDDs when not given by user */
def defaultMinPartitions: java.lang.Integer = sc.defaultMinPartitions
/**
* Return a list of all known jobs in a particular job group. The returned list may contain
* running, failed, and completed jobs, and may vary across invocations of this method. This
* method does not guarantee the order of the elements in its result.
*/
def getJobIdsForGroup(jobGroup: String): Array[Int] = sc.getJobIdsForGroup(jobGroup)
/**
* Returns job information, or `null` if the job info could not be found or was garbage collected.
*/
def getJobInfo(jobId: Int): SparkJobInfo = sc.getJobInfo(jobId).orNull
/**
* Returns stage information, or `null` if the stage info could not be found or was
* garbage collected.
*/
def getStageInfo(stageId: Int): SparkStageInfo = sc.getStageInfo(stageId).orNull
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag

View file

@ -112,7 +112,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
val ui = {
val conf = this.conf.clone()
val appSecManager = new SecurityManager(conf)
new SparkUI(conf, appSecManager, replayBus, appId,
SparkUI.createHistoryUI(conf, replayBus, appSecManager, appId,
s"${HistoryServer.UI_PATH_PREFIX}/$appId")
// Do not call ui.bind() to avoid creating a new server for each application
}

View file

@ -721,8 +721,8 @@ private[spark] class Master(
try {
val replayBus = new ReplayListenerBus(eventLogPaths, fileSystem, compressionCodec)
val ui = new SparkUI(new SparkConf, replayBus, appName + " (completed)",
HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
appName + " (completed)", HistoryServer.UI_PATH_PREFIX + s"/${app.id}")
replayBus.replay()
appIdToUI(app.id) = ui
webUi.attachSparkUI(ui)

View file

@ -21,47 +21,30 @@ import org.apache.spark.{Logging, SecurityManager, SparkConf, SparkContext}
import org.apache.spark.scheduler._
import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.JettyUtils._
import org.apache.spark.ui.env.EnvironmentTab
import org.apache.spark.ui.exec.ExecutorsTab
import org.apache.spark.ui.jobs.JobProgressTab
import org.apache.spark.ui.storage.StorageTab
import org.apache.spark.ui.env.{EnvironmentListener, EnvironmentTab}
import org.apache.spark.ui.exec.{ExecutorsListener, ExecutorsTab}
import org.apache.spark.ui.jobs.{JobProgressListener, JobProgressTab}
import org.apache.spark.ui.storage.{StorageListener, StorageTab}
/**
* Top level user interface for a Spark application.
*/
private[spark] class SparkUI(
val sc: SparkContext,
private[spark] class SparkUI private (
val sc: Option[SparkContext],
val conf: SparkConf,
val securityManager: SecurityManager,
val listenerBus: SparkListenerBus,
val environmentListener: EnvironmentListener,
val storageStatusListener: StorageStatusListener,
val executorsListener: ExecutorsListener,
val jobProgressListener: JobProgressListener,
val storageListener: StorageListener,
var appName: String,
val basePath: String = "")
val basePath: String)
extends WebUI(securityManager, SparkUI.getUIPort(conf), conf, basePath, "SparkUI")
with Logging {
def this(sc: SparkContext) = this(sc, sc.conf, sc.env.securityManager, sc.listenerBus, sc.appName)
def this(conf: SparkConf, listenerBus: SparkListenerBus, appName: String, basePath: String) =
this(null, conf, new SecurityManager(conf), listenerBus, appName, basePath)
def this(
conf: SparkConf,
securityManager: SecurityManager,
listenerBus: SparkListenerBus,
appName: String,
basePath: String) =
this(null, conf, securityManager, listenerBus, appName, basePath)
// If SparkContext is not provided, assume the associated application is not live
val live = sc != null
// Maintain executor storage status through Spark events
val storageStatusListener = new StorageStatusListener
initialize()
/** Initialize all components of the server. */
def initialize() {
listenerBus.addListener(storageStatusListener)
val jobProgressTab = new JobProgressTab(this)
attachTab(jobProgressTab)
attachTab(new StorageTab(this))
@ -71,10 +54,10 @@ private[spark] class SparkUI(
attachHandler(createRedirectHandler("/", "/stages", basePath = basePath))
attachHandler(
createRedirectHandler("/stages/stage/kill", "/stages", jobProgressTab.handleKillRequest))
if (live) {
sc.env.metricsSystem.getServletHandlers.foreach(attachHandler)
}
// If the UI is live, then serve
sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) }
}
initialize()
def getAppName = appName
@ -83,11 +66,6 @@ private[spark] class SparkUI(
appName = name
}
/** Register the given listener with the listener bus. */
def registerListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
/** Stop the server behind this web interface. Only valid after bind(). */
override def stop() {
super.stop()
@ -116,4 +94,60 @@ private[spark] object SparkUI {
def getUIPort(conf: SparkConf): Int = {
conf.getInt("spark.ui.port", SparkUI.DEFAULT_PORT)
}
def createLiveUI(
sc: SparkContext,
conf: SparkConf,
listenerBus: SparkListenerBus,
jobProgressListener: JobProgressListener,
securityManager: SecurityManager,
appName: String): SparkUI = {
create(Some(sc), conf, listenerBus, securityManager, appName,
jobProgressListener = Some(jobProgressListener))
}
def createHistoryUI(
conf: SparkConf,
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appName: String,
basePath: String): SparkUI = {
create(None, conf, listenerBus, securityManager, appName, basePath)
}
/**
* Create a new Spark UI.
*
* @param sc optional SparkContext; this can be None when reconstituting a UI from event logs.
* @param jobProgressListener if supplied, this JobProgressListener will be used; otherwise, the
* web UI will create and register its own JobProgressListener.
*/
private def create(
sc: Option[SparkContext],
conf: SparkConf,
listenerBus: SparkListenerBus,
securityManager: SecurityManager,
appName: String,
basePath: String = "",
jobProgressListener: Option[JobProgressListener] = None): SparkUI = {
val _jobProgressListener: JobProgressListener = jobProgressListener.getOrElse {
val listener = new JobProgressListener(conf)
listenerBus.addListener(listener)
listener
}
val environmentListener = new EnvironmentListener
val storageStatusListener = new StorageStatusListener
val executorsListener = new ExecutorsListener(storageStatusListener)
val storageListener = new StorageListener(storageStatusListener)
listenerBus.addListener(environmentListener)
listenerBus.addListener(storageStatusListener)
listenerBus.addListener(executorsListener)
listenerBus.addListener(storageListener)
new SparkUI(sc, conf, securityManager, environmentListener, storageStatusListener,
executorsListener, _jobProgressListener, storageListener, appName, basePath)
}
}

View file

@ -22,10 +22,8 @@ import org.apache.spark.scheduler._
import org.apache.spark.ui._
private[ui] class EnvironmentTab(parent: SparkUI) extends SparkUITab(parent, "environment") {
val listener = new EnvironmentListener
val listener = parent.environmentListener
attachPage(new EnvironmentPage(this))
parent.registerListener(listener)
}
/**

View file

@ -26,10 +26,9 @@ import org.apache.spark.storage.StorageStatusListener
import org.apache.spark.ui.{SparkUI, SparkUITab}
private[ui] class ExecutorsTab(parent: SparkUI) extends SparkUITab(parent, "executors") {
val listener = new ExecutorsListener(parent.storageStatusListener)
val listener = parent.executorsListener
attachPage(new ExecutorsPage(this))
parent.registerListener(listener)
}
/**

View file

@ -40,17 +40,25 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
import JobProgressListener._
type JobId = Int
type StageId = Int
type StageAttemptId = Int
// How many stages to remember
val retainedStages = conf.getInt("spark.ui.retainedStages", DEFAULT_RETAINED_STAGES)
// How many jobs to remember
val retailedJobs = conf.getInt("spark.ui.retainedJobs", DEFAULT_RETAINED_JOBS)
// Map from stageId to StageInfo
val activeStages = new HashMap[Int, StageInfo]
// Map from (stageId, attemptId) to StageUIData
val stageIdToData = new HashMap[(Int, Int), StageUIData]
val activeJobs = new HashMap[JobId, JobUIData]
val completedJobs = ListBuffer[JobUIData]()
val failedJobs = ListBuffer[JobUIData]()
val jobIdToData = new HashMap[JobId, JobUIData]
val activeStages = new HashMap[StageId, StageInfo]
val completedStages = ListBuffer[StageInfo]()
val failedStages = ListBuffer[StageInfo]()
val stageIdToData = new HashMap[(StageId, StageAttemptId), StageUIData]
val stageIdToInfo = new HashMap[StageId, StageInfo]
// Map from pool name to a hash map (map from stage id to StageInfo).
val poolToActiveStages = HashMap[String, HashMap[Int, StageInfo]]()
@ -61,8 +69,32 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
def blockManagerIds = executorIdToBlockManagerId.values.toSeq
override def onJobStart(jobStart: SparkListenerJobStart) = synchronized {
val jobGroup = Option(jobStart.properties).map(_.getProperty(SparkContext.SPARK_JOB_GROUP_ID))
val jobData: JobUIData =
new JobUIData(jobStart.jobId, jobStart.stageIds, jobGroup, JobExecutionStatus.RUNNING)
jobIdToData(jobStart.jobId) = jobData
activeJobs(jobStart.jobId) = jobData
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) = synchronized {
val jobData = activeJobs.remove(jobEnd.jobId).getOrElse {
logWarning(s"Job completed for unknown job ${jobEnd.jobId}")
new JobUIData(jobId = jobEnd.jobId)
}
jobEnd.jobResult match {
case JobSucceeded =>
completedJobs += jobData
jobData.status = JobExecutionStatus.SUCCEEDED
case JobFailed(exception) =>
failedJobs += jobData
jobData.status = JobExecutionStatus.FAILED
}
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) = synchronized {
val stage = stageCompleted.stageInfo
stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), {
logWarning("Stage completed for unknown stage " + stage.stageId)
new StageUIData
@ -89,7 +121,10 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
private def trimIfNecessary(stages: ListBuffer[StageInfo]) = synchronized {
if (stages.size > retainedStages) {
val toRemove = math.max(retainedStages / 10, 1)
stages.take(toRemove).foreach { s => stageIdToData.remove((s.stageId, s.attemptId)) }
stages.take(toRemove).foreach { s =>
stageIdToData.remove((s.stageId, s.attemptId))
stageIdToInfo.remove(s.stageId)
}
stages.trimStart(toRemove)
}
}
@ -103,6 +138,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
p => p.getProperty("spark.scheduler.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME)
stageIdToInfo(stage.stageId) = stage
val stageData = stageIdToData.getOrElseUpdate((stage.stageId, stage.attemptId), new StageUIData)
stageData.schedulingPool = poolName
@ -277,4 +313,5 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
private object JobProgressListener {
val DEFAULT_POOL_NAME = "default"
val DEFAULT_RETAINED_STAGES = 1000
val DEFAULT_RETAINED_JOBS = 1000
}

View file

@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing list of all ongoing and recently finished stages and pools */
private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("") {
private val live = parent.live
private val sc = parent.sc
private val listener = parent.listener
private def isFairScheduler = parent.isFairScheduler
@ -47,17 +46,17 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
new FailedStageTable(failedStages.sortBy(_.submissionTime).reverse, parent)
// For now, pool information is only accessible in live UIs
val pools = if (live) sc.getAllPools else Seq[Schedulable]()
val pools = sc.map(_.getAllPools).getOrElse(Seq.empty[Schedulable])
val poolTable = new PoolTable(pools, parent)
val summary: NodeSeq =
<div>
<ul class="unstyled">
{if (live) {
{if (sc.isDefined) {
// Total duration is not meaningful unless the UI is live
<li>
<strong>Total Duration: </strong>
{UIUtils.formatDuration(now - sc.startTime)}
{UIUtils.formatDuration(now - sc.get.startTime)}
</li>
}}
<li>
@ -80,7 +79,7 @@ private[ui] class JobProgressPage(parent: JobProgressTab) extends WebUIPage("")
</div>
val content = summary ++
{if (live && isFairScheduler) {
{if (sc.isDefined && isFairScheduler) {
<h4>{pools.size} Fair Scheduler Pools</h4> ++ poolTable.toNodeSeq
} else {
Seq[Node]()

View file

@ -25,16 +25,14 @@ import org.apache.spark.ui.{SparkUI, SparkUITab}
/** Web UI showing progress status of all jobs in the given SparkContext. */
private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "stages") {
val live = parent.live
val sc = parent.sc
val conf = if (live) sc.conf else new SparkConf
val killEnabled = conf.getBoolean("spark.ui.killEnabled", true)
val listener = new JobProgressListener(conf)
val conf = sc.map(_.conf).getOrElse(new SparkConf)
val killEnabled = sc.map(_.conf.getBoolean("spark.ui.killEnabled", true)).getOrElse(false)
val listener = parent.jobProgressListener
attachPage(new JobProgressPage(this))
attachPage(new StagePage(this))
attachPage(new PoolPage(this))
parent.registerListener(listener)
def isFairScheduler = listener.schedulingMode.exists(_ == SchedulingMode.FAIR)
@ -43,7 +41,7 @@ private[ui] class JobProgressTab(parent: SparkUI) extends SparkUITab(parent, "st
val killFlag = Option(request.getParameter("terminate")).getOrElse("false").toBoolean
val stageId = Option(request.getParameter("id")).getOrElse("-1").toInt
if (stageId >= 0 && killFlag && listener.activeStages.contains(stageId)) {
sc.cancelStage(stageId)
sc.get.cancelStage(stageId)
}
// Do a quick pause here to give Spark time to kill the stage so it shows up as
// killed after the refresh. Note that this will block the serving thread so the

View file

@ -26,7 +26,6 @@ import org.apache.spark.ui.{WebUIPage, UIUtils}
/** Page showing specific pool details */
private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
private val live = parent.live
private val sc = parent.sc
private val listener = parent.listener
@ -42,7 +41,7 @@ private[ui] class PoolPage(parent: JobProgressTab) extends WebUIPage("pool") {
new StageTableBase(activeStages.sortBy(_.submissionTime).reverse, parent)
// For now, pool information is only accessible in live UIs
val pools = if (live) Seq(sc.getPoolForName(poolName).get) else Seq[Schedulable]()
val pools = sc.map(_.getPoolForName(poolName).get).toSeq
val poolTable = new PoolTable(pools, parent)
val content =

View file

@ -17,6 +17,7 @@
package org.apache.spark.ui.jobs
import org.apache.spark.JobExecutionStatus
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.scheduler.{AccumulableInfo, TaskInfo}
import org.apache.spark.util.collection.OpenHashSet
@ -36,6 +37,13 @@ private[jobs] object UIData {
var diskBytesSpilled : Long = 0
}
class JobUIData(
var jobId: Int = -1,
var stageIds: Seq[Int] = Seq.empty,
var jobGroup: Option[String] = None,
var status: JobExecutionStatus = JobExecutionStatus.UNKNOWN
)
class StageUIData {
var numActiveTasks: Int = _
var numCompleteTasks: Int = _

View file

@ -26,11 +26,10 @@ import org.apache.spark.storage._
/** Web UI showing storage status of all RDD's in the given SparkContext. */
private[ui] class StorageTab(parent: SparkUI) extends SparkUITab(parent, "storage") {
val listener = new StorageListener(parent.storageStatusListener)
val listener = parent.storageListener
attachPage(new StoragePage(this))
attachPage(new RDDPage(this))
parent.registerListener(listener)
}
/**

View file

@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import scala.concurrent.duration._
import scala.language.implicitConversions
import scala.language.postfixOps
import org.scalatest.{Matchers, FunSuite}
import org.scalatest.concurrent.Eventually._
import org.apache.spark.JobExecutionStatus._
import org.apache.spark.SparkContext._
class StatusAPISuite extends FunSuite with Matchers with SharedSparkContext {
test("basic status API usage") {
val jobFuture = sc.parallelize(1 to 10000, 2).map(identity).groupBy(identity).collectAsync()
val jobId: Int = eventually(timeout(10 seconds)) {
val jobIds = jobFuture.jobIds
jobIds.size should be(1)
jobIds.head
}
val jobInfo = eventually(timeout(10 seconds)) {
sc.getJobInfo(jobId).get
}
jobInfo.status() should not be FAILED
val stageIds = jobInfo.stageIds()
stageIds.size should be(2)
val firstStageInfo = eventually(timeout(10 seconds)) {
sc.getStageInfo(stageIds(0)).get
}
firstStageInfo.stageId() should be(stageIds(0))
firstStageInfo.currentAttemptId() should be(0)
firstStageInfo.numTasks() should be(2)
eventually(timeout(10 seconds)) {
val updatedFirstStageInfo = sc.getStageInfo(stageIds(0)).get
updatedFirstStageInfo.numCompletedTasks() should be(2)
updatedFirstStageInfo.numActiveTasks() should be(0)
updatedFirstStageInfo.numFailedTasks() should be(0)
}
}
test("getJobIdsForGroup()") {
sc.setJobGroup("my-job-group", "description")
sc.getJobIdsForGroup("my-job-group") should be (Seq.empty)
val firstJobFuture = sc.parallelize(1 to 1000).countAsync()
val firstJobId = eventually(timeout(10 seconds)) {
firstJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
sc.getJobIdsForGroup("my-job-group") should be (Seq(firstJobId))
}
val secondJobFuture = sc.parallelize(1 to 1000).countAsync()
val secondJobId = eventually(timeout(10 seconds)) {
secondJobFuture.jobIds.head
}
eventually(timeout(10 seconds)) {
sc.getJobIdsForGroup("my-job-group").toSet should be (Set(firstJobId, secondJobId))
}
}
}

View file

@ -375,7 +375,16 @@ Apart from these, the following properties are also available, and may be useful
<td><code>spark.ui.retainedStages</code></td>
<td>1000</td>
<td>
How many stages the Spark UI remembers before garbage collecting.
How many stages the Spark UI and status APIs remember before garbage
collecting.
</td>
</tr>
<tr>
<td><code>spark.ui.retainedJobs</code></td>
<td>1000</td>
<td>
How many stages the Spark UI and status APIs remember before garbage
collecting.
</td>
</tr>
<tr>

View file

@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.examples;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkJobInfo;
import org.apache.spark.SparkStageInfo;
import org.apache.spark.api.java.JavaFutureAction;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import java.util.Arrays;
import java.util.List;
/**
* Example of using Spark's status APIs from Java.
*/
public final class JavaStatusAPIDemo {
public static final String APP_NAME = "JavaStatusAPIDemo";
public static final class IdentityWithDelay<T> implements Function<T, T> {
@Override
public T call(T x) throws Exception {
Thread.sleep(2 * 1000); // 2 seconds
return x;
}
}
public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName(APP_NAME);
final JavaSparkContext sc = new JavaSparkContext(sparkConf);
// Example of implementing a progress reporter for a simple job.
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5), 5).map(
new IdentityWithDelay<Integer>());
JavaFutureAction<List<Integer>> jobFuture = rdd.collectAsync();
while (!jobFuture.isDone()) {
Thread.sleep(1000); // 1 second
List<Integer> jobIds = jobFuture.jobIds();
if (jobIds.isEmpty()) {
continue;
}
int currentJobId = jobIds.get(jobIds.size() - 1);
SparkJobInfo jobInfo = sc.getJobInfo(currentJobId);
SparkStageInfo stageInfo = sc.getStageInfo(jobInfo.stageIds()[0]);
System.out.println(stageInfo.numTasks() + " tasks total: " + stageInfo.numActiveTasks() +
" active, " + stageInfo.numCompletedTasks() + " complete");
}
System.out.println("Job results are: " + jobFuture.get());
sc.stop();
}
}