Replaced the daemon thread started by DAGScheduler with an actor

This commit is contained in:
Lian, Cheng 2013-11-09 19:05:18 +08:00
parent bf4e6131cc
commit 2539c06745
4 changed files with 45 additions and 65 deletions

View file

@ -238,7 +238,6 @@ class SparkContext(
taskScheduler.start()
@volatile private[spark] var dagScheduler = new DAGScheduler(taskScheduler)
dagScheduler.start()
ui.start()

View file

@ -19,9 +19,9 @@ package org.apache.spark.scheduler
import java.io.NotSerializableException
import java.util.Properties
import java.util.concurrent.{LinkedBlockingQueue, TimeUnit}
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Props, Actor, ActorRef}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import org.apache.spark._
@ -65,12 +65,12 @@ class DAGScheduler(
// Called by TaskScheduler to report task's starting.
def taskStarted(task: Task[_], taskInfo: TaskInfo) {
eventQueue.put(BeginEvent(task, taskInfo))
eventProcessActor ! BeginEvent(task, taskInfo)
}
// Called to report that a task has completed and results are being fetched remotely.
def taskGettingResult(task: Task[_], taskInfo: TaskInfo) {
eventQueue.put(GettingResultEvent(task, taskInfo))
eventProcessActor ! GettingResultEvent(task, taskInfo)
}
// Called by TaskScheduler to report task completions or failures.
@ -81,23 +81,23 @@ class DAGScheduler(
accumUpdates: Map[Long, Any],
taskInfo: TaskInfo,
taskMetrics: TaskMetrics) {
eventQueue.put(CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics))
eventProcessActor ! CompletionEvent(task, reason, result, accumUpdates, taskInfo, taskMetrics)
}
// Called by TaskScheduler when an executor fails.
def executorLost(execId: String) {
eventQueue.put(ExecutorLost(execId))
eventProcessActor ! ExecutorLost(execId)
}
// Called by TaskScheduler when a host is added
def executorGained(execId: String, host: String) {
eventQueue.put(ExecutorGained(execId, host))
eventProcessActor ! ExecutorGained(execId, host)
}
// Called by TaskScheduler to cancel an entire TaskSet due to either repeated failures or
// cancellation of the job itself.
def taskSetFailed(taskSet: TaskSet, reason: String) {
eventQueue.put(TaskSetFailed(taskSet, reason))
eventProcessActor ! TaskSetFailed(taskSet, reason)
}
// The time, in millis, to wait for fetch failure events to stop coming in after one is detected;
@ -109,7 +109,36 @@ class DAGScheduler(
// resubmit failed stages
val POLL_TIMEOUT = 10L
private val eventQueue = new LinkedBlockingQueue[DAGSchedulerEvent]
private val eventProcessActor: ActorRef = env.actorSystem.actorOf(Props(new Actor {
/**
* The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
* events and responds by launching tasks. This runs in a dedicated thread and receives events
* via the eventQueue.
*/
def receive = {
case event: DAGSchedulerEvent =>
if (event != null) {
logDebug("Got event of type " + event.getClass.getName)
}
if (!processEvent(event)) {
val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
// Periodically resubmit failed stages if some map output fetches have failed and we have
// waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
// tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
// the same time, so we want to make sure we've identified all the reduce tasks that depend
// on the failed node.
if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
resubmitFailedStages()
} else {
submitWaitingStages()
}
}
else {
context.stop(self)
}
}
}))
private[scheduler] val nextJobId = new AtomicInteger(0)
@ -150,16 +179,6 @@ class DAGScheduler(
val metadataCleaner = new MetadataCleaner(MetadataCleanerType.DAG_SCHEDULER, this.cleanup)
// Start a thread to run the DAGScheduler event loop
def start() {
new Thread("DAGScheduler") {
setDaemon(true)
override def run() {
DAGScheduler.this.run()
}
}.start()
}
def addSparkListener(listener: SparkListener) {
listenerBus.addListener(listener)
}
@ -301,8 +320,7 @@ class DAGScheduler(
assert(partitions.size > 0)
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite,
waiter, properties))
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
waiter
}
@ -337,8 +355,7 @@ class DAGScheduler(
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
val partitions = (0 until rdd.partitions.size).toArray
val jobId = nextJobId.getAndIncrement()
eventQueue.put(JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite,
listener, properties))
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions, allowLocal = false, callSite, listener, properties)
listener.awaitResult() // Will throw an exception if the job fails
}
@ -347,19 +364,19 @@ class DAGScheduler(
*/
def cancelJob(jobId: Int) {
logInfo("Asked to cancel job " + jobId)
eventQueue.put(JobCancelled(jobId))
eventProcessActor ! JobCancelled(jobId)
}
def cancelJobGroup(groupId: String) {
logInfo("Asked to cancel job group " + groupId)
eventQueue.put(JobGroupCancelled(groupId))
eventProcessActor ! JobGroupCancelled(groupId)
}
/**
* Cancel all jobs that are running or waiting in the queue.
*/
def cancelAllJobs() {
eventQueue.put(AllJobsCancelled)
eventProcessActor ! AllJobsCancelled
}
/**
@ -474,42 +491,6 @@ class DAGScheduler(
}
}
/**
* The main event loop of the DAG scheduler, which waits for new-job / task-finished / failure
* events and responds by launching tasks. This runs in a dedicated thread and receives events
* via the eventQueue.
*/
private def run() {
SparkEnv.set(env)
while (true) {
val event = eventQueue.poll(POLL_TIMEOUT, TimeUnit.MILLISECONDS)
if (event != null) {
logDebug("Got event of type " + event.getClass.getName)
}
this.synchronized { // needed in case other threads makes calls into methods of this class
if (event != null) {
if (processEvent(event)) {
return
}
}
val time = System.currentTimeMillis() // TODO: use a pluggable clock for testability
// Periodically resubmit failed stages if some map output fetches have failed and we have
// waited at least RESUBMIT_TIMEOUT. We wait for this short time because when a node fails,
// tasks on many other nodes are bound to get a fetch failure, and they won't all get it at
// the same time, so we want to make sure we've identified all the reduce tasks that depend
// on the failed node.
if (failed.size > 0 && time > lastFetchFailureTime + RESUBMIT_TIMEOUT) {
resubmitFailedStages()
} else {
submitWaitingStages()
}
}
}
}
/**
* Run a job on an RDD locally, assuming it has only a single partition and no dependencies.
* We run the operation in a separate thread just in case it takes a bunch of time, so that we
@ -909,7 +890,7 @@ class DAGScheduler(
}
def stop() {
eventQueue.put(StopDAGScheduler)
eventProcessActor ! StopDAGScheduler
metadataCleaner.cancel()
taskSched.stop()
}

View file

@ -893,7 +893,7 @@ private[spark] object BlockManager extends Logging {
{
// env == null and blockManagerMaster != null is used in tests
assert (env != null || blockManagerMaster != null)
val blockLocations: Seq[Seq[BlockManagerId]] = if (env != null) {
val blockLocations: Seq[Seq[BlockManagerId]] = if (blockManagerMaster == null) {
env.blockManager.getLocationBlockIds(blockIds)
} else {
blockManagerMaster.getLocations(blockIds)

View file

@ -100,7 +100,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
cacheLocations.clear()
results.clear()
mapOutputTracker = new MapOutputTrackerMaster()
scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, null) {
scheduler = new DAGScheduler(taskScheduler, mapOutputTracker, blockManagerMaster, sc.env) {
override def runLocally(job: ActiveJob) {
// don't bother with the thread while unit testing
runLocallyWithinThread(job)