Style fixes based on code review

This commit is contained in:
Kay Ousterhout 2013-08-09 16:46:34 -07:00
parent 81e1d4a7d1
commit 29b79714f9
3 changed files with 110 additions and 132 deletions

View file

@ -23,10 +23,11 @@ import java.io.FileNotFoundException
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.{Date, Properties} import java.util.{Date, Properties}
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{Map, HashMap, ListBuffer} import scala.collection.mutable.{Map, HashMap, ListBuffer}
import scala.io.Source import scala.io.Source
import spark._ import spark._
import spark.SparkContext
import spark.executor.TaskMetrics import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo import spark.scheduler.cluster.TaskInfo

View file

@ -17,16 +17,20 @@
package spark.scheduler package spark.scheduler
import scala.collection.mutable.ArrayBuffer
import java.util.concurrent.LinkedBlockingQueue import java.util.concurrent.LinkedBlockingQueue
import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import spark.Logging
/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */ /** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
class SparkListenerEventProcessor() { class SparkListenerEventProcessor() extends Logging {
/* sparkListeners is not thread safe, so this assumes that listeners are all added before any private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
* SparkListenerEvents occur. */
private val sparkListeners = ArrayBuffer[SparkListener]() /* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents] * an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
new Thread("SparkListenerEventProcessor") { new Thread("SparkListenerEventProcessor") {
setDaemon(true) setDaemon(true)
@ -57,6 +61,12 @@ class SparkListenerEventProcessor() {
} }
def addEvent(event: SparkListenerEvents) { def addEvent(event: SparkListenerEvents) {
eventQueue.put(event) val eventAdded = eventQueue.offer(event)
if (!eventAdded) {
logError("Dropping SparkListenerEvent because no remaining room in event queue. " +
"This likely means one of the SparkListeners is too slow and cannot keep up with the " +
"rate at which tasks are being started by the scheduler.")
}
} }
} }

View file

@ -9,7 +9,8 @@ import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics import spark.executor.TaskMetrics
import collection.mutable import collection.mutable
/** Tracks task-level information to be displayed in the UI. /**
* Tracks task-level information to be displayed in the UI.
* *
* All access to the data structures in this class must be synchronized on the * All access to the data structures in this class must be synchronized on the
* class, since the UI thread and the DAGScheduler event loop may otherwise * class, since the UI thread and the DAGScheduler event loop may otherwise
@ -44,146 +45,112 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
override def onJobStart(jobStart: SparkListenerJobStart) {} override def onJobStart(jobStart: SparkListenerJobStart) {}
override def onStageCompleted(stageCompleted: StageCompleted) = { override def onStageCompleted(stageCompleted: StageCompleted) : Unit = synchronized {
this.synchronized { val stage = stageCompleted.stageInfo.stage
val stage = stageCompleted.stageInfo.stage poolToActiveStages(stageToPool(stage)) -= stage
poolToActiveStages(stageToPool(stage)) -= stage activeStages -= stage
activeStages -= stage completedStages += stage
completedStages += stage trimIfNecessary(completedStages)
trimIfNecessary(completedStages)
}
} }
/** If stages is too large, remove and garbage collect old stages */ /** If stages is too large, remove and garbage collect old stages */
def trimIfNecessary(stages: ListBuffer[Stage]) { def trimIfNecessary(stages: ListBuffer[Stage]): Unit = synchronized {
this.synchronized { if (stages.size > RETAINED_STAGES) {
if (stages.size > RETAINED_STAGES) { val toRemove = RETAINED_STAGES / 10
val toRemove = RETAINED_STAGES / 10 stages.takeRight(toRemove).foreach( s => {
stages.takeRight(toRemove).foreach( s => { stageToTaskInfos.remove(s.id)
stageToTaskInfos.remove(s.id) stageToTime.remove(s.id)
stageToTime.remove(s.id) stageToShuffleRead.remove(s.id)
stageToShuffleRead.remove(s.id) stageToShuffleWrite.remove(s.id)
stageToShuffleWrite.remove(s.id) stageToTasksActive.remove(s.id)
stageToTasksActive.remove(s.id) stageToTasksComplete.remove(s.id)
stageToTasksComplete.remove(s.id) stageToTasksFailed.remove(s.id)
stageToTasksFailed.remove(s.id) stageToPool.remove(s)
stageToPool.remove(s) if (stageToDescription.contains(s)) {stageToDescription.remove(s)}
if (stageToDescription.contains(s)) {stageToDescription.remove(s)} })
}) stages.trimEnd(toRemove)
stages.trimEnd(toRemove)
}
} }
} }
/** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */ /** For FIFO, all stages are contained by "default" pool but "default" pool here is meaningless */
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) = { override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) : Unit = synchronized {
this.synchronized { val stage = stageSubmitted.stage
val stage = stageSubmitted.stage activeStages += stage
activeStages += stage
val poolName = Option(stageSubmitted.properties).map { val poolName = Option(stageSubmitted.properties).map {
p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME) p => p.getProperty("spark.scheduler.cluster.fair.pool", DEFAULT_POOL_NAME)
}.getOrElse(DEFAULT_POOL_NAME) }.getOrElse(DEFAULT_POOL_NAME)
stageToPool(stage) = poolName stageToPool(stage) = poolName
val description = Option(stageSubmitted.properties).flatMap { val description = Option(stageSubmitted.properties).flatMap {
p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION)) p => Option(p.getProperty(SparkContext.SPARK_JOB_DESCRIPTION))
}
description.map(d => stageToDescription(stage) = d)
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
stages += stage
} }
description.map(d => stageToDescription(stage) = d)
val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashSet[Stage]())
stages += stage
} }
override def onTaskStart(taskStart: SparkListenerTaskStart) { override def onTaskStart(taskStart: SparkListenerTaskStart) : Unit = synchronized {
this.synchronized { val sid = taskStart.task.stageId
val sid = taskStart.task.stageId val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive += taskStart.taskInfo
tasksActive += taskStart.taskInfo val taskList = stageToTaskInfos.getOrElse(
val taskList = stageToTaskInfos.getOrElse( sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]()) taskList += ((taskStart.taskInfo, None, None))
taskList += ((taskStart.taskInfo, None, None)) stageToTaskInfos(sid) = taskList
stageToTaskInfos(sid) = taskList
}
} }
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) : Unit = synchronized {
this.synchronized { val sid = taskEnd.task.stageId
val sid = taskEnd.task.stageId val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]())
val tasksActive = stageToTasksActive.getOrElseUpdate(sid, new HashSet[TaskInfo]()) tasksActive -= taskEnd.taskInfo
tasksActive -= taskEnd.taskInfo val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) = taskEnd.reason match {
taskEnd.reason match { case e: ExceptionFailure =>
case e: ExceptionFailure => stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 (Some(e), e.metrics)
(Some(e), e.metrics)
case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
(None, Option(taskEnd.taskMetrics))
}
stageToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(m => m.executorRunTime).getOrElse(0)
stageToTime(sid) += time
totalTime += time
stageToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
s.remoteBytesRead).getOrElse(0L)
stageToShuffleRead(sid) += shuffleRead
totalShuffleRead += shuffleRead
stageToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
s.shuffleBytesWritten).getOrElse(0L)
stageToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse(
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}
}
override def onJobEnd(jobEnd: SparkListenerJobEnd) {
this.synchronized {
jobEnd match {
case end: SparkListenerJobEnd =>
end.jobResult match {
case JobFailed(ex, Some(stage)) =>
activeStages -= stage
poolToActiveStages(stageToPool(stage)) -= stage
failedStages += stage
trimIfNecessary(failedStages)
case _ =>
}
case _ => case _ =>
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
(None, Option(taskEnd.taskMetrics))
} }
}
stageToTime.getOrElseUpdate(sid, 0L)
val time = metrics.map(m => m.executorRunTime).getOrElse(0)
stageToTime(sid) += time
totalTime += time
stageToShuffleRead.getOrElseUpdate(sid, 0L)
val shuffleRead = metrics.flatMap(m => m.shuffleReadMetrics).map(s =>
s.remoteBytesRead).getOrElse(0L)
stageToShuffleRead(sid) += shuffleRead
totalShuffleRead += shuffleRead
stageToShuffleWrite.getOrElseUpdate(sid, 0L)
val shuffleWrite = metrics.flatMap(m => m.shuffleWriteMetrics).map(s =>
s.shuffleBytesWritten).getOrElse(0L)
stageToShuffleWrite(sid) += shuffleWrite
totalShuffleWrite += shuffleWrite
val taskList = stageToTaskInfos.getOrElse(
sid, HashSet[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
} }
/** Is this stage's input from a shuffle read. */ override def onJobEnd(jobEnd: SparkListenerJobEnd) : Unit = synchronized {
def hasShuffleRead(stageID: Int): Boolean = { jobEnd match {
this.synchronized { case end: SparkListenerJobEnd =>
// This is written in a slightly complicated way to avoid having to scan all tasks end.jobResult match {
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) { case JobFailed(ex, Some(stage)) =>
if (s._2 != null) return s._2.flatMap(m => m.shuffleReadMetrics).isDefined activeStages -= stage
} poolToActiveStages(stageToPool(stage)) -= stage
return false // No tasks have finished for this stage failedStages += stage
} trimIfNecessary(failedStages)
} case _ =>
}
/** Is this stage's output to a shuffle write. */ case _ =>
def hasShuffleWrite(stageID: Int): Boolean = {
this.synchronized {
// This is written in a slightly complicated way to avoid having to scan all tasks
for (s <- stageToTaskInfos.get(stageID).getOrElse(Seq())) {
if (s._2 != null) return s._2.flatMap(m => m.shuffleWriteMetrics).isDefined
}
return false // No tasks have finished for this stage
} }
} }
} }