[SPARK-26927][CORE] Ensure executor is active when processing events in dynamic allocation manager.

## What changes were proposed in this pull request?

There is a race condition in the `ExecutorAllocationManager` that the `SparkListenerExecutorRemoved` event is posted before the `SparkListenerTaskStart` event, which will cause the incorrect result of `executorIds`. Then, when some executor idles, the real executors will be removed even actual executor number is equal to `minNumExecutors` due to the incorrect computation of `newExecutorTotal`(may greater than the `minNumExecutors`), thus may finally causing zero available executors but a wrong positive number of executorIds was kept in memory.

What's more, even the `SparkListenerTaskEnd` event can not make the fake `executorIds` released, because later idle event for the fake executors can not cause the real removal of these executors, as they are already removed and they are not exist in the `executorDataMap`  of `CoaseGrainedSchedulerBackend`, so that the `onExecutorRemoved` method will never be called again.

For details see https://issues.apache.org/jira/browse/SPARK-26927

This PR is to fix this problem.

## How was this patch tested?

existUT and added UT

Closes #23842 from liupc/Fix-race-condition-that-casues-dyanmic-allocation-not-working.

Lead-authored-by: Liupengcheng <liupengcheng@xiaomi.com>
Co-authored-by: liupengcheng <liupengcheng@xiaomi.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Liupengcheng 2019-03-12 13:53:42 -07:00 committed by Marcelo Vanzin
parent 688b0c01fa
commit d5cfe08fdc
2 changed files with 34 additions and 5 deletions

View file

@ -725,10 +725,15 @@ private[spark] class ExecutorAllocationManager(
if (stageIdToNumRunningTask.contains(stageId)) { if (stageIdToNumRunningTask.contains(stageId)) {
stageIdToNumRunningTask(stageId) += 1 stageIdToNumRunningTask(stageId) += 1
} }
// This guards against the race condition in which the `SparkListenerTaskStart` // This guards against the following race condition:
// event is posted before the `SparkListenerBlockManagerAdded` event, which is // 1. The `SparkListenerTaskStart` event is posted before the
// possible because these events are posted in different threads. (see SPARK-4951) // `SparkListenerExecutorAdded` event
if (!allocationManager.executorIds.contains(executorId)) { // 2. The `SparkListenerExecutorRemoved` event is posted before the
// `SparkListenerTaskStart` event
// Above cases are possible because these events are posted in different threads.
// (see SPARK-4951 SPARK-26927)
if (!allocationManager.executorIds.contains(executorId) &&
client.getExecutorIds().contains(executorId)) {
allocationManager.onExecutorAdded(executorId) allocationManager.onExecutorAdded(executorId)
} }

View file

@ -421,6 +421,7 @@ class ExecutorAllocationManagerSuite
// Remove when numExecutorsTarget is the same as the current number of executors // Remove when numExecutorsTarget is the same as the current number of executors
assert(addExecutors(manager) === 1) assert(addExecutors(manager) === 1)
assert(addExecutors(manager) === 2) assert(addExecutors(manager) === 2)
(1 to 8).foreach(execId => onExecutorAdded(manager, execId.toString))
(1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach { (1 to 8).map { i => createTaskInfo(i, i, s"$i") }.foreach {
info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) } info => post(sc.listenerBus, SparkListenerTaskStart(0, 0, info)) }
assert(executorIds(manager).size === 8) assert(executorIds(manager).size === 8)
@ -834,7 +835,7 @@ class ExecutorAllocationManagerSuite
assert(removeTimes(manager).size === 1) assert(removeTimes(manager).size === 1)
} }
test("SPARK-4951: call onTaskStart before onBlockManagerAdded") { test("SPARK-4951: call onTaskStart before onExecutorAdded") {
sc = createSparkContext(2, 10, 2) sc = createSparkContext(2, 10, 2)
val manager = sc.executorAllocationManager.get val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty) assert(executorIds(manager).isEmpty)
@ -1162,6 +1163,29 @@ class ExecutorAllocationManagerSuite
assert(numExecutorsTarget(manager) === 1) assert(numExecutorsTarget(manager) === 1)
} }
test("SPARK-26927 call onExecutorRemoved before onTaskStart") {
sc = createSparkContext(2, 5)
val manager = sc.executorAllocationManager.get
assert(executorIds(manager).isEmpty)
post(sc.listenerBus, SparkListenerExecutorAdded(
0L, "1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
post(sc.listenerBus, SparkListenerExecutorAdded(
0L, "2", new ExecutorInfo("host2", 1, Map.empty, Map.empty)))
post(sc.listenerBus, SparkListenerExecutorAdded(
0L, "3", new ExecutorInfo("host3", 1, Map.empty, Map.empty)))
assert(executorIds(manager).size === 3)
post(sc.listenerBus, SparkListenerExecutorRemoved(0L, "3", "disconnected"))
assert(executorIds(manager).size === 2)
assert(executorIds(manager) === Set("1", "2"))
val taskInfo1 = createTaskInfo(0, 0, "3")
post(sc.listenerBus, SparkListenerTaskStart(0, 0, taskInfo1))
// Verify taskStart not adding already removed executors.
assert(executorIds(manager).size === 2)
assert(executorIds(manager) === Set("1", "2"))
}
private def createSparkContext( private def createSparkContext(
minExecutors: Int = 1, minExecutors: Int = 1,
maxExecutors: Int = 5, maxExecutors: Int = 5,