Merge pull request #143 from rxin/scheduler-hang

Ignore a task update status if the executor doesn't exist anymore.

Otherwise if the scheduler receives a task update message when the executor's been removed, the scheduler would hang.

It is pretty hard to add unit tests for these right now because it is hard to mock the cluster scheduler. We should do that once @kayousterhout finishes merging the local scheduler and the cluster scheduler.
This commit is contained in:
Reynold Xin 2013-11-05 23:14:09 -08:00
commit bf4e6131cc

View file

@ -87,8 +87,14 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
case StatusUpdate(executorId, taskId, state, data) =>
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
freeCores(executorId) += 1
makeOffers(executorId)
if (executorActor.contains(executorId)) {
freeCores(executorId) += 1
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
val msg = "Ignored task status update (%d state %s) from unknown executor %s with ID %s"
logWarning(msg.format(taskId, state, sender, executorId))
}
}
case ReviveOffers =>
@ -175,7 +181,9 @@ class CoarseGrainedSchedulerBackend(scheduler: ClusterScheduler, actorSystem: Ac
Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
}
private val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
private val timeout = {
Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
}
def stopExecutors() {
try {