Properly handle job failure when the job gets killed.

This commit is contained in:
Reynold Xin 2013-09-16 22:10:45 -07:00
parent cbc48be13b
commit 1cb42e6b2d

View file

@ -335,23 +335,24 @@ class DAGScheduler(
listener.awaitResult() // Will throw an exception if the job fails listener.awaitResult() // Will throw an exception if the job fails
} }
def killJob(jobId: Int) { def killJob(jobId: Int): Unit = this.synchronized {
activeJobs.find(job => job.jobId == jobId).foreach(job => killJob(job)) activeJobs.find(job => job.jobId == jobId).foreach(job => killJob(job))
} }
private def killJob(job: ActiveJob) { private def killJob(job: ActiveJob): Unit = this.synchronized {
logInfo("Killing Job and cleaning up stages %d".format(job.jobId)) logInfo("Killing Job and cleaning up stages %d".format(job.jobId))
activeJobs.remove(job) activeJobs.remove(job)
idToActiveJob.remove(job.jobId) idToActiveJob.remove(job.jobId)
val stage = job.finalStage val stage = job.finalStage
resultStageToJob.remove(stage) resultStageToJob.remove(stage)
killStage(stage) killStage(job, stage)
// recursively remove all parent stages val e = new SparkException("Job killed")
stage.parents.foreach(p => killStage(p)) job.listener.jobFailed(e)
job.listener.jobFailed(new SparkException("Job killed")) listenerBus.post(SparkListenerJobEnd(job, JobFailed(e, None)))
} }
private def killStage(stage: Stage) { private def killStage(job: ActiveJob, stage: Stage): Unit = this.synchronized {
// TODO: Can we reuse taskSetFailed?
logInfo("Killing Stage %s".format(stage.id)) logInfo("Killing Stage %s".format(stage.id))
stageIdToStage.remove(stage.id) stageIdToStage.remove(stage.id)
if (stage.isShuffleMap) { if (stage.isShuffleMap) {
@ -359,9 +360,16 @@ class DAGScheduler(
} }
waiting.remove(stage) waiting.remove(stage)
pendingTasks.remove(stage) pendingTasks.remove(stage)
running.remove(stage)
taskSched.killTasks(stage.id) taskSched.killTasks(stage.id)
stage.parents.foreach(p => killStage(p))
if (running.contains(stage)) {
running.remove(stage)
val e = new SparkException("Job killed")
listenerBus.post(SparkListenerJobEnd(job, JobFailed(e, Some(stage))))
}
stage.parents.foreach(parentStage => killStage(job, parentStage))
//stageToInfos -= stage
} }
/** /**
@ -785,7 +793,7 @@ class DAGScheduler(
/** /**
* Aborts all jobs depending on a particular Stage. This is called in response to a task set * Aborts all jobs depending on a particular Stage. This is called in response to a task set
* being cancelled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside.
*/ */
private def abortStage(failedStage: Stage, reason: String) { private def abortStage(failedStage: Stage, reason: String) {
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq