[SPARK-30511][SPARK-28403][CORE] Don't treat failed/killed speculative tasks as pending in ExecutorAllocationManager

### What changes were proposed in this pull request?

Currently, when speculative tasks fail/get killed, they are still considered as pending and count towards the calculation of number of needed executors. To be more accurate: `stageAttemptToNumSpeculativeTasks(stageAttempt)` is incremented on onSpeculativeTaskSubmitted, but never decremented.  `stageAttemptToNumSpeculativeTasks -= stageAttempt` is performed on stage completion. **This means Spark is marking ended speculative tasks as pending, which leads to Spark to hold more executors that it actually needs!**

This PR fixes this issue by updating `stageAttemptToSpeculativeTaskIndices` and  `stageAttemptToNumSpeculativeTasks` on speculative tasks completion.  This PR also addresses some other minor issues: scheduler behavior after receiving an intentionally killed task event; try to address [SPARK-28403](https://issues.apache.org/jira/browse/SPARK-28403).

### Why are the changes needed?

This has caused resource wastage in our production with speculation enabled. With aggressive speculation, we found data skewed jobs can hold hundreds of idle executors with less than 10 tasks running.

An easy repro of the issue (`--conf spark.speculation=true --conf spark.executor.cores=4 --conf spark.dynamicAllocation.maxExecutors=1000` in cluster mode):
```
val n = 4000
val someRDD = sc.parallelize(1 to n, n)
someRDD.mapPartitionsWithIndex( (index: Int, it: Iterator[Int]) => {
if (index < 300 && index >= 150) {
    Thread.sleep(index * 1000) // Fake running tasks
} else if (index == 300) {
    Thread.sleep(1000 * 1000) // Fake long running tasks
}
it.toList.map(x => index + ", " + x).iterator
}).collect
```
You will see when running the last task, we would be hold 38 executors (see below), which is exactly (152 + 3) / 4 = 38.
![image](https://user-images.githubusercontent.com/9404831/72469112-9a7fac00-3793-11ea-8f50-74d0ab7325a4.png)

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Added a comprehensive unit test.

Test with the above repro shows that we are holding 2 executors at the end
![image](https://user-images.githubusercontent.com/9404831/72469177-bbe09800-3793-11ea-850f-4a2c67142899.png)

Closes #27223 from linzebing/speculation_fix.

Authored-by: zebingl@fb.com <zebingl@fb.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
zebingl@fb.com 2020-01-31 08:49:34 -06:00 committed by Thomas Graves
parent 3d2b8d8b13
commit 21bc0474bb
2 changed files with 172 additions and 24 deletions

View file

@ -263,9 +263,16 @@ private[spark] class ExecutorAllocationManager(
*/
private def maxNumExecutorsNeeded(): Int = {
val numRunningOrPendingTasks = listener.totalPendingTasks + listener.totalRunningTasks
math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutorForFullParallelism)
.toInt
val maxNeeded = math.ceil(numRunningOrPendingTasks * executorAllocationRatio /
tasksPerExecutorForFullParallelism).toInt
if (tasksPerExecutorForFullParallelism > 1 && maxNeeded == 1 &&
listener.pendingSpeculativeTasks > 0) {
// If we have pending speculative tasks and only need a single executor, allocate one more
// to satisfy the locality requirements of speculation
maxNeeded + 1
} else {
maxNeeded
}
}
private def totalRunningTasks(): Int = synchronized {
@ -377,14 +384,8 @@ private[spark] class ExecutorAllocationManager(
// If our target has not changed, do not send a message
// to the cluster manager and reset our exponential growth
if (delta == 0) {
// Check if there is any speculative jobs pending
if (listener.pendingTasks == 0 && listener.pendingSpeculativeTasks > 0) {
numExecutorsTarget =
math.max(math.min(maxNumExecutorsNeeded + 1, maxNumExecutors), minNumExecutors)
} else {
numExecutorsToAdd = 1
return 0
}
numExecutorsToAdd = 1
return 0
}
val addRequestAcknowledged = try {
@ -512,7 +513,7 @@ private[spark] class ExecutorAllocationManager(
// Should be 0 when no stages are active.
private val stageAttemptToNumRunningTask = new mutable.HashMap[StageAttempt, Int]
private val stageAttemptToTaskIndices = new mutable.HashMap[StageAttempt, mutable.HashSet[Int]]
// Number of speculative tasks to be scheduled in each stageAttempt
// Number of speculative tasks pending/running in each stageAttempt
private val stageAttemptToNumSpeculativeTasks = new mutable.HashMap[StageAttempt, Int]
// The speculative tasks started in each stageAttempt
private val stageAttemptToSpeculativeTaskIndices =
@ -614,18 +615,30 @@ private[spark] class ExecutorAllocationManager(
stageAttemptToNumRunningTask -= stageAttempt
}
}
// If the task failed, we expect it to be resubmitted later. To ensure we have
// enough resources to run the resubmitted task, we need to mark the scheduler
// as backlogged again if it's not already marked as such (SPARK-8366)
if (taskEnd.reason != Success) {
if (totalPendingTasks() == 0) {
allocationManager.onSchedulerBacklogged()
}
if (taskEnd.taskInfo.speculative) {
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
} else {
stageAttemptToTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
}
if (taskEnd.taskInfo.speculative) {
stageAttemptToSpeculativeTaskIndices.get(stageAttempt).foreach {_.remove{taskIndex}}
stageAttemptToNumSpeculativeTasks(stageAttempt) -= 1
}
taskEnd.reason match {
case Success | _: TaskKilled =>
case _ =>
if (totalPendingTasks() == 0) {
// If the task failed (not intentionally killed), we expect it to be resubmitted
// later. To ensure we have enough resources to run the resubmitted task, we need to
// mark the scheduler as backlogged again if it's not already marked as such
// (SPARK-8366)
allocationManager.onSchedulerBacklogged()
}
if (!taskEnd.taskInfo.speculative) {
// If a non-speculative task is intentionally killed, it means the speculative task
// has succeeded, and no further task of this task index will be resubmitted. In this
// case, the task index is completed and we shouldn't remove it from
// stageAttemptToTaskIndices. Otherwise, we will have a pending non-speculative task
// for the task index (SPARK-30511)
stageAttemptToTaskIndices.get(stageAttempt).foreach {_.remove(taskIndex)}
}
}
}
}

View file

@ -264,6 +264,141 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
assert(numExecutorsToAdd(manager) === 1)
}
test("SPARK-30511 remove executors when speculative tasks end") {
val clock = new ManualClock()
val stage = createStageInfo(0, 40)
val manager = createManager(createConf(0, 10, 0).set(config.EXECUTOR_CORES, 4), clock = clock)
post(SparkListenerStageSubmitted(stage))
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2)
assert(addExecutors(manager) === 4)
assert(addExecutors(manager) === 3)
(0 to 9).foreach(execId => onExecutorAdded(manager, execId.toString))
(0 to 39).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
info => post(SparkListenerTaskStart(0, 0, info))
}
assert(numExecutorsTarget(manager) === 10)
assert(maxNumExecutorsNeeded(manager) == 10)
// 30 tasks (0 - 29) finished
(0 to 29).map { i => createTaskInfo(i, i, executorId = s"${i / 4}")}.foreach {
info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null)) }
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 3)
assert(maxNumExecutorsNeeded(manager) == 3)
(0 to 6).foreach { i => assert(removeExecutor(manager, i.toString))}
(0 to 6).foreach { i => onExecutorRemoved(manager, i.toString)}
// 10 speculative tasks (30 - 39) launch for the remaining tasks
(30 to 39).foreach { _ => post(SparkListenerSpeculativeTaskSubmitted(0))}
assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 1)
assert(numExecutorsTarget(manager) == 5)
assert(maxNumExecutorsNeeded(manager) == 5)
(10 to 12).foreach(execId => onExecutorAdded(manager, execId.toString))
(40 to 49).map { i =>
createTaskInfo(taskId = i, taskIndex = i - 10, executorId = s"${i / 4}", speculative = true)}
.foreach { info => post(SparkListenerTaskStart(0, 0, info))}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) == 5) // At this point, we still have 6 executors running
assert(maxNumExecutorsNeeded(manager) == 5)
// 6 speculative tasks (40 - 45) finish before the original tasks, with 4 speculative remaining
(40 to 45).map { i =>
createTaskInfo(taskId = i, taskIndex = i - 10, executorId = s"${i / 4}", speculative = true)}
.foreach {
info => post(SparkListenerTaskEnd(0, 0, null, Success, info, new ExecutorMetrics, null))}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 4)
assert(maxNumExecutorsNeeded(manager) == 4)
assert(removeExecutor(manager, "10"))
onExecutorRemoved(manager, "10")
// At this point, we still have 5 executors running: ["7", "8", "9", "11", "12"]
// 6 original tasks (30 - 35) are intentionally killed
(30 to 35).map { i =>
createTaskInfo(i, i, executorId = s"${i / 4}")}
.foreach { info => post(
SparkListenerTaskEnd(0, 0, null, TaskKilled("test"), info, new ExecutorMetrics, null))}
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 2)
assert(maxNumExecutorsNeeded(manager) == 2)
(7 to 8).foreach { i => assert(removeExecutor(manager, i.toString))}
(7 to 8).foreach { i => onExecutorRemoved(manager, i.toString)}
// At this point, we still have 3 executors running: ["9", "11", "12"]
// Task 36 finishes before the speculative task 46, task 46 killed
post(SparkListenerTaskEnd(0, 0, null, Success,
createTaskInfo(36, 36, executorId = "9"), new ExecutorMetrics, null))
post(SparkListenerTaskEnd(0, 0, null, TaskKilled("test"),
createTaskInfo(46, 36, executorId = "11", speculative = true), new ExecutorMetrics, null))
// We should have 3 original tasks (index 37, 38, 39) running, with corresponding 3 speculative
// tasks running. Target lowers to 2, but still hold 3 executors ["9", "11", "12"]
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 2)
assert(maxNumExecutorsNeeded(manager) == 2)
// At this point, we still have 3 executors running: ["9", "11", "12"]
// Task 37 and 47 succeed at the same time
post(SparkListenerTaskEnd(0, 0, null, Success,
createTaskInfo(37, 37, executorId = "9"), new ExecutorMetrics, null))
post(SparkListenerTaskEnd(0, 0, null, Success,
createTaskInfo(47, 37, executorId = "11", speculative = true), new ExecutorMetrics, null))
// We should have 2 original tasks (index 38, 39) running, with corresponding 2 speculative
// tasks running
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 1)
assert(maxNumExecutorsNeeded(manager) == 1)
assert(removeExecutor(manager, "11"))
onExecutorRemoved(manager, "11")
// At this point, we still have 2 executors running: ["9", "12"]
// Task 38 fails and task 49 fails, new speculative task 50 is submitted to speculate on task 39
post(SparkListenerTaskEnd(0, 0, null, UnknownReason,
createTaskInfo(38, 38, executorId = "9"), new ExecutorMetrics, null))
post(SparkListenerTaskEnd(0, 0, null, UnknownReason,
createTaskInfo(49, 39, executorId = "12", speculative = true), new ExecutorMetrics, null))
post(SparkListenerSpeculativeTaskSubmitted(0))
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
// maxNeeded = 1, allocate one more to satisfy speculation locality requirement
assert(numExecutorsTarget(manager) === 2)
assert(maxNumExecutorsNeeded(manager) == 2)
post(SparkListenerTaskStart(0, 0,
createTaskInfo(50, 39, executorId = "12", speculative = true)))
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 1)
assert(maxNumExecutorsNeeded(manager) == 1)
// Task 39 and 48 succeed, task 50 killed
post(SparkListenerTaskEnd(0, 0, null, Success,
createTaskInfo(39, 39, executorId = "9"), new ExecutorMetrics, null))
post(SparkListenerTaskEnd(0, 0, null, Success,
createTaskInfo(48, 38, executorId = "12", speculative = true), new ExecutorMetrics, null))
post(SparkListenerTaskEnd(0, 0, null, TaskKilled("test"),
createTaskInfo(50, 39, executorId = "12", speculative = true), new ExecutorMetrics, null))
post(SparkListenerStageCompleted(stage))
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 0)
assert(maxNumExecutorsNeeded(manager) == 0)
assert(removeExecutor(manager, "9"))
onExecutorRemoved(manager, "9")
assert(removeExecutor(manager, "12"))
onExecutorRemoved(manager, "12")
}
test("properly handle task end events from completed stages") {
val manager = createManager(createConf(0, 10, 0))