Merge pull request #450 from kayousterhout/fetch_failures. Closes #450.

Only run ResubmitFailedStages event after a fetch fails

Previously, the ResubmitFailedStages event was called every
200 milliseconds, leading to a lot of unnecessary event processing
and clogged DAGScheduler logs.

Author: Kay Ousterhout <kayousterhout@gmail.com>

== Merge branch commits ==

commit e603784b3a562980e6f1863845097effe2129d3b
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date:   Wed Feb 5 11:34:41 2014 -0800

    Re-add check for empty set of failed stages

commit d258f0ef50caff4bbb19fb95a6b82186db1935bf
Author: Kay Ousterhout <kayousterhout@gmail.com>
Date:   Wed Jan 15 23:35:41 2014 -0800

    Only run ResubmitFailedStages event after a fetch fails

    Previously, the ResubmitFailedStages event was called every
    200 milliseconds, leading to a lot of unnecessary event processing
    and clogged DAGScheduler logs.
This commit is contained in:
Kay Ousterhout 2014-02-06 16:15:24 -08:00 committed by Patrick Wendell
parent 18ad59e2c6
commit 0b448df6ac

View file

@ -155,7 +155,6 @@ class DAGScheduler(
val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures
// Missing tasks from each stage // Missing tasks from each stage
val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]]
var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits
val activeJobs = new HashSet[ActiveJob] val activeJobs = new HashSet[ActiveJob]
val resultStageToJob = new HashMap[Stage, ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob]
@ -176,22 +175,6 @@ class DAGScheduler(
*/ */
def start() { def start() {
eventProcessActor = env.actorSystem.actorOf(Props(new Actor { eventProcessActor = env.actorSystem.actorOf(Props(new Actor {
/**
* A handle to the periodical task, used to cancel the task when the actor is stopped.
*/
var resubmissionTask: Cancellable = _
override def preStart() {
import context.dispatcher
/**
* A message is sent to the actor itself periodically to remind the actor to resubmit failed
* stages. In this way, stage resubmission can be done within the same thread context of
* other event processing logic to avoid unnecessary synchronization overhead.
*/
resubmissionTask = context.system.scheduler.schedule(
RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages)
}
/** /**
* The main event loop of the DAG scheduler. * The main event loop of the DAG scheduler.
*/ */
@ -207,7 +190,6 @@ class DAGScheduler(
if (!processEvent(event)) { if (!processEvent(event)) {
submitWaitingStages() submitWaitingStages()
} else { } else {
resubmissionTask.cancel()
context.stop(self) context.stop(self)
} }
} }
@ -620,6 +602,8 @@ class DAGScheduler(
case ResubmitFailedStages => case ResubmitFailedStages =>
if (failed.size > 0) { if (failed.size > 0) {
// Failed stages may be removed by job cancellation, so failed might be empty even if
// the ResubmitFailedStages event has been scheduled.
resubmitFailedStages() resubmitFailedStages()
} }
@ -926,7 +910,6 @@ class DAGScheduler(
// Mark the stage that the reducer was in as unrunnable // Mark the stage that the reducer was in as unrunnable
val failedStage = stageIdToStage(task.stageId) val failedStage = stageIdToStage(task.stageId)
running -= failedStage running -= failedStage
failed += failedStage
// TODO: Cancel running tasks in the stage // TODO: Cancel running tasks in the stage
logInfo("Marking " + failedStage + " (" + failedStage.name + logInfo("Marking " + failedStage + " (" + failedStage.name +
") for resubmision due to a fetch failure") ") for resubmision due to a fetch failure")
@ -938,10 +921,16 @@ class DAGScheduler(
} }
logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name +
"); marking it for resubmission") "); marking it for resubmission")
if (failed.isEmpty && eventProcessActor != null) {
// Don't schedule an event to resubmit failed stages if failed isn't empty, because
// in that case the event will already have been scheduled. eventProcessActor may be
// null during unit tests.
import env.actorSystem.dispatcher
env.actorSystem.scheduler.scheduleOnce(
RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages)
}
failed += failedStage
failed += mapStage failed += mapStage
// Remember that a fetch failed now; this is used to resubmit the broken
// stages later, after a small wait (to give other tasks the chance to fail)
lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock
// TODO: mark the executor as failed only if there were lots of fetch failures on it // TODO: mark the executor as failed only if there were lots of fetch failures on it
if (bmAddress != null) { if (bmAddress != null) {
handleExecutorLost(bmAddress.executorId, Some(task.epoch)) handleExecutorLost(bmAddress.executorId, Some(task.epoch))