From 0b448df6ac520a7977b1eb51e8c55e33f3fd2da8 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Thu, 6 Feb 2014 16:15:24 -0800 Subject: [PATCH] Merge pull request #450 from kayousterhout/fetch_failures. Closes #450. Only run ResubmitFailedStages event after a fetch fails Previously, the ResubmitFailedStages event was called every 200 milliseconds, leading to a lot of unnecessary event processing and clogged DAGScheduler logs. Author: Kay Ousterhout == Merge branch commits == commit e603784b3a562980e6f1863845097effe2129d3b Author: Kay Ousterhout Date: Wed Feb 5 11:34:41 2014 -0800 Re-add check for empty set of failed stages commit d258f0ef50caff4bbb19fb95a6b82186db1935bf Author: Kay Ousterhout Date: Wed Jan 15 23:35:41 2014 -0800 Only run ResubmitFailedStages event after a fetch fails Previously, the ResubmitFailedStages event was called every 200 milliseconds, leading to a lot of unnecessary event processing and clogged DAGScheduler logs. --- .../apache/spark/scheduler/DAGScheduler.scala | 33 +++++++------------ 1 file changed, 11 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 821241508e..21d16fabef 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -155,7 +155,6 @@ class DAGScheduler( val failed = new HashSet[Stage] // Stages that must be resubmitted due to fetch failures // Missing tasks from each stage val pendingTasks = new TimeStampedHashMap[Stage, HashSet[Task[_]]] - var lastFetchFailureTime: Long = 0 // Used to wait a bit to avoid repeated resubmits val activeJobs = new HashSet[ActiveJob] val resultStageToJob = new HashMap[Stage, ActiveJob] @@ -176,22 +175,6 @@ class DAGScheduler( */ def start() { eventProcessActor = env.actorSystem.actorOf(Props(new Actor { - /** - * A handle to the periodical task, used to cancel the task when the actor is stopped. - */ - var resubmissionTask: Cancellable = _ - - override def preStart() { - import context.dispatcher - /** - * A message is sent to the actor itself periodically to remind the actor to resubmit failed - * stages. In this way, stage resubmission can be done within the same thread context of - * other event processing logic to avoid unnecessary synchronization overhead. - */ - resubmissionTask = context.system.scheduler.schedule( - RESUBMIT_TIMEOUT, RESUBMIT_TIMEOUT, self, ResubmitFailedStages) - } - /** * The main event loop of the DAG scheduler. */ @@ -207,7 +190,6 @@ class DAGScheduler( if (!processEvent(event)) { submitWaitingStages() } else { - resubmissionTask.cancel() context.stop(self) } } @@ -620,6 +602,8 @@ class DAGScheduler( case ResubmitFailedStages => if (failed.size > 0) { + // Failed stages may be removed by job cancellation, so failed might be empty even if + // the ResubmitFailedStages event has been scheduled. resubmitFailedStages() } @@ -926,7 +910,6 @@ class DAGScheduler( // Mark the stage that the reducer was in as unrunnable val failedStage = stageIdToStage(task.stageId) running -= failedStage - failed += failedStage // TODO: Cancel running tasks in the stage logInfo("Marking " + failedStage + " (" + failedStage.name + ") for resubmision due to a fetch failure") @@ -938,10 +921,16 @@ class DAGScheduler( } logInfo("The failed fetch was from " + mapStage + " (" + mapStage.name + "); marking it for resubmission") + if (failed.isEmpty && eventProcessActor != null) { + // Don't schedule an event to resubmit failed stages if failed isn't empty, because + // in that case the event will already have been scheduled. eventProcessActor may be + // null during unit tests. + import env.actorSystem.dispatcher + env.actorSystem.scheduler.scheduleOnce( + RESUBMIT_TIMEOUT, eventProcessActor, ResubmitFailedStages) + } + failed += failedStage failed += mapStage - // Remember that a fetch failed now; this is used to resubmit the broken - // stages later, after a small wait (to give other tasks the chance to fail) - lastFetchFailureTime = System.currentTimeMillis() // TODO: Use pluggable clock // TODO: mark the executor as failed only if there were lots of fetch failures on it if (bmAddress != null) { handleExecutorLost(bmAddress.executorId, Some(task.epoch))