[SPARK-10515] When killing executor, the pending replacement executors should not be lost

If the heartbeat receiver kills executors (and new ones are not registered to replace them), the idle timeout for the old executors will be lost (and then change a total number of executors requested by Driver), So new ones will be not to asked to replace them.
For example, executorsPendingToRemove=Set(1), and executor 2 is idle timeout before a new executor is asked to replace executor 1. Then driver kill executor 2, and sending RequestExecutors to AM. But executorsPendingToRemove=Set(1,2), So AM doesn't allocate a executor to replace 1.

see: https://github.com/apache/spark/pull/8668

Author: KaiXinXiaoLei <huleilei1@huawei.com>
Author: huleilei <huleilei1@huawei.com>

Closes #8945 from KaiXinXiaoLei/pendingexecutor.
This commit is contained in:
KaiXinXiaoLei 2015-10-15 14:48:01 -07:00 committed by Andrew Or
parent 723aa75a9d
commit 2d000124b7
2 changed files with 37 additions and 0 deletions

View file

@ -438,6 +438,8 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
if (!replace) {
doRequestTotalExecutors(
numExistingExecutors + numPendingExecutors - executorsPendingToRemove.size)
} else {
numPendingExecutors += knownExecutors.size
}
doKillExecutors(executorsToKill)

View file

@ -369,6 +369,41 @@ class StandaloneDynamicAllocationSuite
assert(apps.head.getExecutorLimit === 1)
}
test("the pending replacement executors should not be lost (SPARK-10515)") {
sc = new SparkContext(appConf)
val appId = sc.applicationId
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.size === 1)
assert(apps.head.id === appId)
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === Int.MaxValue)
}
// sync executors between the Master and the driver, needed because
// the driver refuses to kill executors it does not know about
syncExecutors(sc)
val executors = getExecutorIds(sc)
assert(executors.size === 2)
// kill executor 1, and replace it
assert(sc.killAndReplaceExecutor(executors.head))
eventually(timeout(10.seconds), interval(10.millis)) {
val apps = getApplications()
assert(apps.head.executors.size === 2)
}
var apps = getApplications()
// kill executor 1
assert(sc.killExecutor(executors.head))
apps = getApplications()
assert(apps.head.executors.size === 2)
assert(apps.head.getExecutorLimit === 2)
// kill executor 2
assert(sc.killExecutor(executors(1)))
apps = getApplications()
assert(apps.head.executors.size === 1)
assert(apps.head.getExecutorLimit === 1)
}
// ===============================
// | Utility methods for testing |
// ===============================