diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 1d491d581a..d44a3f22be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -335,23 +335,24 @@ class DAGScheduler( listener.awaitResult() // Will throw an exception if the job fails } - def killJob(jobId: Int) { + def killJob(jobId: Int): Unit = this.synchronized { activeJobs.find(job => job.jobId == jobId).foreach(job => killJob(job)) } - private def killJob(job: ActiveJob) { + private def killJob(job: ActiveJob): Unit = this.synchronized { logInfo("Killing Job and cleaning up stages %d".format(job.jobId)) activeJobs.remove(job) idToActiveJob.remove(job.jobId) val stage = job.finalStage resultStageToJob.remove(stage) - killStage(stage) - // recursively remove all parent stages - stage.parents.foreach(p => killStage(p)) - job.listener.jobFailed(new SparkException("Job killed")) + killStage(job, stage) + val e = new SparkException("Job killed") + job.listener.jobFailed(e) + listenerBus.post(SparkListenerJobEnd(job, JobFailed(e, None))) } - private def killStage(stage: Stage) { + private def killStage(job: ActiveJob, stage: Stage): Unit = this.synchronized { + // TODO: Can we reuse taskSetFailed? logInfo("Killing Stage %s".format(stage.id)) stageIdToStage.remove(stage.id) if (stage.isShuffleMap) { @@ -359,9 +360,16 @@ class DAGScheduler( } waiting.remove(stage) pendingTasks.remove(stage) - running.remove(stage) taskSched.killTasks(stage.id) - stage.parents.foreach(p => killStage(p)) + + if (running.contains(stage)) { + running.remove(stage) + val e = new SparkException("Job killed") + listenerBus.post(SparkListenerJobEnd(job, JobFailed(e, Some(stage)))) + } + + stage.parents.foreach(parentStage => killStage(job, parentStage)) + //stageToInfos -= stage } /** @@ -785,7 +793,7 @@ class DAGScheduler( /** * Aborts all jobs depending on a particular Stage. This is called in response to a task set - * being cancelled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. + * being canceled by the TaskScheduler. Use taskSetFailed() to inject this event from outside. */ private def abortStage(failedStage: Stage, reason: String) { val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq