[SPARK-25773][CORE] Cancel zombie tasks in a result stage when the job finishes

## What changes were proposed in this pull request?

When a job finishes, there may be some zombie tasks still running due to stage retry. Since a result stage will never be used by other jobs, running these tasks are just wasting the cluster resource. This PR just asks TaskScheduler to cancel the running tasks of a result stage when it's already finished. Credits go to srinathshankar who suggested this idea to me.

This PR also fixes two minor issues while I'm touching DAGScheduler:
- Invalid spark.job.interruptOnCancel should not crash DAGScheduler.
- Non fatal errors should not crash DAGScheduler.

## How was this patch tested?

The new unit tests.

Closes #22771 from zsxwing/SPARK-25773.

Lead-authored-by: Shixiong Zhu <zsxwing@gmail.com>
Co-authored-by: Shixiong Zhu <shixiong@databricks.com>
Signed-off-by: Shixiong Zhu <zsxwing@gmail.com>
This commit is contained in:
Shixiong Zhu 2018-10-30 10:48:04 -07:00
parent 94de5609be
commit c36537fcfd
No known key found for this signature in database
GPG key ID: ECB1BBEA55295E39
3 changed files with 130 additions and 24 deletions

View file

@ -1295,6 +1295,27 @@ private[spark] class DAGScheduler(
Utils.getFormattedClassName(event.task), event.reason, event.taskInfo, taskMetrics))
}
/**
* Check [[SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL]] in job properties to see if we should
* interrupt running tasks. Returns `false` if the property value is not a boolean value
*/
private def shouldInterruptTaskThread(job: ActiveJob): Boolean = {
if (job.properties == null) {
false
} else {
val shouldInterruptThread =
job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
try {
shouldInterruptThread.toBoolean
} catch {
case e: IllegalArgumentException =>
logWarning(s"${SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL} in Job ${job.jobId} " +
s"is invalid: $shouldInterruptThread. Using 'false' instead", e)
false
}
}
}
/**
* Responds to a task finishing. This is called inside the event loop so it assumes that it can
* modify the scheduler's internal state. Use taskEnded() to post a task end event from outside.
@ -1364,6 +1385,21 @@ private[spark] class DAGScheduler(
if (job.numFinished == job.numPartitions) {
markStageAsFinished(resultStage)
cleanupStateForJobAndIndependentStages(job)
try {
// killAllTaskAttempts will fail if a SchedulerBackend does not implement
// killTask.
logInfo(s"Job ${job.jobId} is finished. Cancelling potential speculative " +
"or zombie tasks for this job")
// ResultStage is only used by this job. It's safe to kill speculative or
// zombie tasks in this stage.
taskScheduler.killAllTaskAttempts(
stageId,
shouldInterruptTaskThread(job),
reason = "Stage finished")
} catch {
case e: UnsupportedOperationException =>
logWarning(s"Could not cancel tasks for stage $stageId", e)
}
listenerBus.post(
SparkListenerJobEnd(job.jobId, clock.getTimeMillis(), JobSucceeded))
}
@ -1373,7 +1409,7 @@ private[spark] class DAGScheduler(
try {
job.listener.taskSucceeded(rt.outputId, event.result)
} catch {
case e: Exception =>
case e: Throwable if !Utils.isFatalError(e) =>
// TODO: Perhaps we want to mark the resultStage as failed?
job.listener.jobFailed(new SparkDriverExecutionException(e))
}
@ -1890,10 +1926,6 @@ private[spark] class DAGScheduler(
val error = new SparkException(failureReason, exception.getOrElse(null))
var ableToCancelStages = true
val shouldInterruptThread =
if (job.properties == null) false
else job.properties.getProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false").toBoolean
// Cancel all independent, running stages.
val stages = jobIdToStageIds(job.jobId)
if (stages.isEmpty) {
@ -1913,12 +1945,12 @@ private[spark] class DAGScheduler(
val stage = stageIdToStage(stageId)
if (runningStages.contains(stage)) {
try { // cancelTasks will fail if a SchedulerBackend does not implement killTask
taskScheduler.cancelTasks(stageId, shouldInterruptThread)
taskScheduler.cancelTasks(stageId, shouldInterruptTaskThread(job))
markStageAsFinished(stage, Some(failureReason))
} catch {
case e: UnsupportedOperationException =>
logInfo(s"Could not cancel tasks for stage $stageId", e)
ableToCancelStages = false
logWarning(s"Could not cancel tasks for stage $stageId", e)
ableToCancelStages = false
}
}
}

View file

@ -33,7 +33,9 @@ import org.apache.hadoop.mapreduce.lib.input.{TextInputFormat => NewTextInputFor
import org.scalatest.Matchers._
import org.scalatest.concurrent.Eventually
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.internal.config.EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerJobStart, SparkListenerTaskEnd, SparkListenerTaskStart}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.util.{ThreadUtils, Utils}
@ -672,6 +674,55 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
assert(sc.statusTracker.getExecutorInfos.map(_.numRunningTasks()).sum == 0)
}
}
test("cancel zombie tasks in a result stage when the job finishes") {
val conf = new SparkConf()
.setMaster("local-cluster[1,2,1024]")
.setAppName("test-cluster")
.set("spark.ui.enabled", "false")
// Disable this so that if a task is running, we can make sure the executor will always send
// task metrics via heartbeat to driver.
.set(EXECUTOR_HEARTBEAT_DROP_ZERO_ACCUMULATOR_UPDATES.key, "false")
// Set a short heartbeat interval to send SparkListenerExecutorMetricsUpdate fast
.set("spark.executor.heartbeatInterval", "1s")
sc = new SparkContext(conf)
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "true")
@volatile var runningTaskIds: Seq[Long] = null
val listener = new SparkListener {
override def onExecutorMetricsUpdate(
executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
if (executorMetricsUpdate.execId != SparkContext.DRIVER_IDENTIFIER) {
runningTaskIds = executorMetricsUpdate.accumUpdates.map(_._1)
}
}
}
sc.addSparkListener(listener)
sc.range(0, 2).groupBy((x: Long) => x % 2, 2).map { case (x, _) =>
val context = org.apache.spark.TaskContext.get()
if (context.stageAttemptNumber == 0) {
if (context.partitionId == 0) {
// Make the first task in the first stage attempt fail.
throw new FetchFailedException(SparkEnv.get.blockManager.blockManagerId, 0, 0, 0,
new java.io.IOException("fake"))
} else {
// Make the second task in the first stage attempt sleep to generate a zombie task
Thread.sleep(60000)
}
} else {
// Make the second stage attempt successful.
}
x
}.collect()
sc.listenerBus.waitUntilEmpty(10000)
// As executors will send the metrics of running tasks via heartbeat, we can use this to check
// whether there is any running task.
eventually(timeout(10.seconds)) {
// Make sure runningTaskIds has been set
assert(runningTaskIds != null)
// Verify there is no running task.
assert(runningTaskIds.isEmpty)
}
}
}
object SparkContextSuite {

View file

@ -1901,27 +1901,50 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
/**
* The job will be failed on first task throwing a DAGSchedulerSuiteDummyException.
* The job will be failed on first task throwing an error.
* Any subsequent task WILL throw a legitimate java.lang.UnsupportedOperationException.
* If multiple tasks, there exists a race condition between the SparkDriverExecutionExceptions
* and their differing causes as to which will represent result for job...
*/
test("misbehaved resultHandler should not crash DAGScheduler and SparkContext") {
val e = intercept[SparkDriverExecutionException] {
// Number of parallelized partitions implies number of tasks of job
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
(context: TaskContext, iter: Iterator[Int]) => iter.size,
// For a robust test assertion, limit number of job tasks to 1; that is,
// if multiple RDD partitions, use id of any one partition, say, first partition id=0
Seq(0),
(part: Int, result: Int) => throw new DAGSchedulerSuiteDummyException)
}
assert(e.getCause.isInstanceOf[DAGSchedulerSuiteDummyException])
failAfter(1.minute) { // If DAGScheduler crashes, the following test will hang forever
for (error <- Seq(
new DAGSchedulerSuiteDummyException,
new AssertionError, // E.g., assert(foo == bar) fails
new NotImplementedError // E.g., call a method with `???` implementation.
)) {
val e = intercept[SparkDriverExecutionException] {
// Number of parallelized partitions implies number of tasks of job
val rdd = sc.parallelize(1 to 10, 2)
sc.runJob[Int, Int](
rdd,
(context: TaskContext, iter: Iterator[Int]) => iter.size,
// For a robust test assertion, limit number of job tasks to 1; that is,
// if multiple RDD partitions, use id of any one partition, say, first partition id=0
Seq(0),
(part: Int, result: Int) => throw error)
}
assert(e.getCause eq error)
// Make sure we can still run commands on our SparkContext
assert(sc.parallelize(1 to 10, 2).count() === 10)
// Make sure we can still run commands on our SparkContext
assert(sc.parallelize(1 to 10, 2).count() === 10)
}
}
}
test(s"invalid ${SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL} should not crash DAGScheduler") {
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "invalid")
try {
intercept[SparkException] {
sc.parallelize(1 to 1, 1).foreach { _ =>
throw new DAGSchedulerSuiteDummyException
}
}
// Verify the above job didn't crash DAGScheduler by running a simple job
assert(sc.parallelize(1 to 10, 2).count() === 10)
} finally {
sc.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, null)
}
}
test("getPartitions exceptions should not crash DAGScheduler and SparkContext (SPARK-8606)") {