From c36537fcfddc1eae1581b1b84d9d4384c5985c26 Mon Sep 17 00:00:00 2001 From: Shixiong Zhu Date: Tue, 30 Oct 2018 10:48:04 -0700 Subject: [PATCH] [SPARK-25773][CORE] Cancel zombie tasks in a result stage when the job finishes ## What changes were proposed in this pull request? When a job finishes, there may be some zombie tasks still running due to stage retry. Since a result stage will never be used by other jobs, running these tasks are just wasting the cluster resource. This PR just asks TaskScheduler to cancel the running tasks of a result stage when it's already finished. Credits go to srinathshankar who suggested this idea to me. This PR also fixes two minor issues while I'm touching DAGScheduler: - Invalid spark.job.interruptOnCancel should not crash DAGScheduler. - Non fatal errors should not crash DAGScheduler. ## How was this patch tested? The new unit tests. Closes #22771 from zsxwing/SPARK-25773. Lead-authored-by: Shixiong Zhu Co-authored-by: Shixiong Zhu Signed-off-by: Shixiong Zhu --- .../apache/spark/scheduler/DAGScheduler.scala | 48 ++++++++++++++--- .../org/apache/spark/SparkContextSuite.scala | 53 ++++++++++++++++++- .../spark/scheduler/DAGSchedulerSuite.scala | 53 +++++++++++++------ 3 files changed, 130 insertions(+), 24 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 34b1160dbb..06966e77db 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1295,6 +1295,27 @@ private[spark] class DAGScheduler( Utils.getFormattedClassName(event.task), event.reason, event.taskInfo, taskMetrics)) } + /** + * Check [[SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL]] in job properties to see if we should + * interrupt running tasks. Returns `false` if the property value is not a boolean value + */ + private def shouldInterruptTaskThread(job: ActiveJob): Boolean = { + if (job.properties == null) { + false + } else { + val shouldInterruptThread = + job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false") + try { + shouldInterruptThread.toBoolean + } catch { + case e: IllegalArgumentException => + logWarning(s"${SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL} in Job ${job.jobId} " + + s"is invalid: $shouldInterruptThread. Using 'false' instead", e) + false + } + } + } + /** * Responds to a task finishing. This is called inside the event loop so it assumes that it can * modify the scheduler's internal state. Use taskEnded() to post a task end event from outside. @@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler( if (job.numFinished == job.numPartitions) { markStageAsFinished(resultStage) cleanupStateForJobAndIndependentStages(job) + try { + // killAllTaskAttempts will fail if a SchedulerBackend does not implement + // killTask. + logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " + + "or zombie tasks for this job") + // ResultStage is only used by this job. It's safe to kill speculative or + // zombie tasks in this stage. + taskScheduler.killAllTaskAttempts( + stageId, + shouldInterruptTaskThread(job), + reason = "Stage finished") + } catch { + case e: UnsupportedOperationException => + logWarning(s"Could not cancel tasks for stage $stageId", e) + } listenerBus.post( SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded)) } @@ -1373,7 +1409,7 @@ private[spark] class DAGScheduler( try { job.listener.taskSucceeded(rt.outputId, event.result) } catch { - case e: Exception => + case e: Throwable if !Utils.isFatalError(e) => // TODO: Perhaps we want to mark the resultStage as failed? job.listener.jobFailed(new SparkDriverExecutionException(e)) } @@ -1890,10 +1926,6 @@ private[spark] class DAGScheduler( val error = new SparkException(failureReason, exception.getOrElse(null)) var ableToCancelStages = true - val shouldInterruptThread = - if (job.properties == null) false - else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean - // Cancel all independent, running stages. val stages = jobIdToStageIds(job.jobId) if (stages.isEmpty) { @@ -1913,12 +1945,12 @@ private[spark] class DAGScheduler( val stage = stageIdToStage(stageId) if (runningStages.contains(stage)) { try { // cancelTasks will fail if a SchedulerBackend does not implement killTask - taskScheduler.cancelTasks(stageId, shouldInterruptThread) + taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job)) markStageAsFinished(stage, Some(failureReason)) } catch { case e: UnsupportedOperationException => - logInfo(s"Could not cancel tasks for stage $stageId", e) - ableToCancelStages = false + logWarning(s"Could not cancel tasks for stage $stageId", e) + ableToCancelStages = false } } } diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index e1666a3527..79192f3f3c 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -33,7 +33,9 @@ import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFor import org.scalatest.Matchers._ import org.scalatest.concurrent.Eventually -import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart} +import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES +import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart} +import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.util.{ThreadUtils, Utils} @@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0) } } + + test("cancel zombie tasks in a result stage when the job finishes") { + val conf = new SparkConf() + .setMaster("local-cluster[1,2,1024]") + .setAppName("test-cluster") + .set("spark.ui.enabled", "false") + // Disable this so that if a task is running, we can make sure the executor will always send + // task metrics via heartbeat to driver. + .set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false") + // Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast + .set("spark.executor.heartbeatInterval", "1s") + sc = new SparkContext(conf) + sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true") + @volatile var runningTaskIds: Seq[Long] = null + val listener = new SparkListener { + override def onExecutorMetricsUpdate( + executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = { + if (executorMetricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER) { + runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1) + } + } + } + sc.addSparkListener(listener) + sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) => + val context = org.apache.spark.TaskContext.get() + if (context.stageAttemptNumber == 0) { + if (context.partitionId == 0) { + // Make the first task in the first stage attempt fail. + throw new FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0, + new java.io.IOException("fake")) + } else { + // Make the second task in the first stage attempt sleep to generate a zombie task + Thread.sleep(60000) + } + } else { + // Make the second stage attempt successful. + } + x + }.collect() + sc.listenerBus.waitUntilEmpty(10000) + // As executors will send the metrics of running tasks via heartbeat, we can use this to check + // whether there is any running task. + eventually(timeout(10.seconds)) { + // Make sure runningTaskIds has been set + assert(runningTaskIds != null) + // Verify there is no running task. + assert(runningTaskIds.isEmpty) + } + } } object SparkContextSuite { diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index b41d2acab7..5f4ffa151d 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -1901,27 +1901,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi } /** - * The job will be failed on first task throwing a DAGSchedulerSuiteDummyException. + * The job will be failed on first task throwing an error. * Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException. * If multiple tasks, there exists a race condition between the SparkDriverExecutionExceptions * and their differing causes as to which will represent result for job... */ test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") { - val e = intercept[SparkDriverExecutionException] { - // Number of parallelized partitions implies number of tasks of job - val rdd = sc.parallelize(1 to 10, 2) - sc.runJob[Int, Int]( - rdd, - (context: TaskContext, iter: Iterator[Int]) => iter.size, - // For a robust test assertion, limit number of job tasks to 1; that is, - // if multiple RDD partitions, use id of any one partition, say, first partition id=0 - Seq(0), - (part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException) - } - assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException]) + failAfter(1.minute) { // If DAGScheduler crashes, the following test will hang forever + for (error <- Seq( + new DAGSchedulerSuiteDummyException, + new AssertionError, // E.g., assert(foo == bar) fails + new NotImplementedError // E.g., call a method with `???` implementation. + )) { + val e = intercept[SparkDriverExecutionException] { + // Number of parallelized partitions implies number of tasks of job + val rdd = sc.parallelize(1 to 10, 2) + sc.runJob[Int, Int]( + rdd, + (context: TaskContext, iter: Iterator[Int]) => iter.size, + // For a robust test assertion, limit number of job tasks to 1; that is, + // if multiple RDD partitions, use id of any one partition, say, first partition id=0 + Seq(0), + (part: Int, result: Int) => throw error) + } + assert(e.getCause eq error) - // Make sure we can still run commands on our SparkContext - assert(sc.parallelize(1 to 10, 2).count() === 10) + // Make sure we can still run commands on our SparkContext + assert(sc.parallelize(1 to 10, 2).count() === 10) + } + } + } + + test(s"invalid ${SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL} should not crash DAGScheduler") { + sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "invalid") + try { + intercept[SparkException] { + sc.parallelize(1 to 1, 1).foreach { _ => + throw new DAGSchedulerSuiteDummyException + } + } + // Verify the above job didn't crash DAGScheduler by running a simple job + assert(sc.parallelize(1 to 10, 2).count() === 10) + } finally { + sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null) + } } test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {