Merge remote-tracking branch 'apache/master' into error-handling

This commit is contained in:
Tathagata Das 2014-01-12 17:37:46 -08:00
commit aa2c993858
4 changed files with 18 additions and 6 deletions

View file

@ -57,7 +57,7 @@ private[spark] class Executor(
Utils.setCustomHostname(slaveHostname)
// Set spark.* properties from executor arg
val conf = new SparkConf(false)
val conf = new SparkConf(true)
conf.setAll(properties)
// If we are in yarn mode, systems can have different disk layouts so we must set it

View file

@ -133,7 +133,8 @@ class DAGScheduler(
private[spark] val stageToInfos = new TimeStampedHashMap[Stage, StageInfo]
private[spark] val listenerBus = new SparkListenerBus()
// An async scheduler event bus. The bus should be stopped when DAGSCheduler is stopped.
private[spark] val listenerBus = new SparkListenerBus
// Contains the locations that each RDD's partitions are cached on
private val cacheLocs = new HashMap[Int, Array[Seq[TaskLocation]]]
@ -1121,5 +1122,6 @@ class DAGScheduler(
}
metadataCleaner.cancel()
taskSched.stop()
listenerBus.stop()
}
}

View file

@ -43,6 +43,9 @@ case class SparkListenerJobStart(job: ActiveJob, stageIds: Array[Int], propertie
case class SparkListenerJobEnd(job: ActiveJob, jobResult: JobResult)
extends SparkListenerEvents
/** An event used in the listener to shutdown the listener daemon thread. */
private[scheduler] case object SparkListenerShutdown extends SparkListenerEvents
trait SparkListener {
/**
* Called when a stage is completed, with information on the completed stage

View file

@ -24,15 +24,17 @@ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer}
import org.apache.spark.Logging
/** Asynchronously passes SparkListenerEvents to registered SparkListeners. */
private[spark] class SparkListenerBus() extends Logging {
private val sparkListeners = new ArrayBuffer[SparkListener]() with SynchronizedBuffer[SparkListener]
private[spark] class SparkListenerBus extends Logging {
private val sparkListeners = new ArrayBuffer[SparkListener] with SynchronizedBuffer[SparkListener]
/* Cap the capacity of the SparkListenerEvent queue so we get an explicit error (rather than
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
private val EVENT_QUEUE_CAPACITY = 10000
private val EVENT_QUEUE_CAPACITY = 10000
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvents](EVENT_QUEUE_CAPACITY)
private var queueFullErrorMessageLogged = false
// Create a new daemon thread to listen for events. This thread is stopped when it receives
// a SparkListenerShutdown event, using the stop method.
new Thread("SparkListenerBus") {
setDaemon(true)
override def run() {
@ -53,6 +55,9 @@ private[spark] class SparkListenerBus() extends Logging {
sparkListeners.foreach(_.onTaskGettingResult(taskGettingResult))
case taskEnd: SparkListenerTaskEnd =>
sparkListeners.foreach(_.onTaskEnd(taskEnd))
case SparkListenerShutdown =>
// Get out of the while loop and shutdown the daemon thread
return
case _ =>
}
}
@ -80,7 +85,7 @@ private[spark] class SparkListenerBus() extends Logging {
*/
def waitUntilEmpty(timeoutMillis: Int): Boolean = {
val finishTime = System.currentTimeMillis + timeoutMillis
while (!eventQueue.isEmpty()) {
while (!eventQueue.isEmpty) {
if (System.currentTimeMillis > finishTime) {
return false
}
@ -90,4 +95,6 @@ private[spark] class SparkListenerBus() extends Logging {
}
return true
}
def stop(): Unit = post(SparkListenerShutdown)
}