[SPARK-26758][CORE] Idle Executors are not getting killed after spark.dynamiAllocation.executorIdleTimeout value

## What changes were proposed in this pull request?

**updateAndSyncNumExecutorsTarget**  API should be called after **initializing** flag is unset
## How was this patch tested?
Added UT and also manually tested

After Fix
![afterfix](https://user-images.githubusercontent.com/35216143/51983136-ed4a5000-24bd-11e9-90c8-c4a562c17a4b.png)

Closes #23697 from sandeep-katta/executorIssue.

Authored-by: sandeep-katta <sandeep.katta2007@gmail.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
sandeep-katta 2019-02-04 20:13:22 -08:00 committed by Sean Owen
parent 755f9c2076
commit 1dd7419702
2 changed files with 22 additions and 8 deletions

View file

@ -312,8 +312,6 @@ private[spark] class ExecutorAllocationManager(
private def schedule(): Unit = synchronized {
val now = clock.getTimeMillis
updateAndSyncNumExecutorsTarget(now)
val executorIdsToBeRemoved = ArrayBuffer[String]()
removeTimes.retain { case (executorId, expireTime) =>
val expired = now >= expireTime
@ -323,6 +321,8 @@ private[spark] class ExecutorAllocationManager(
}
!expired
}
// Update executor target number only after initializing flag is unset
updateAndSyncNumExecutorsTarget(now)
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}

View file

@ -936,12 +936,7 @@ class ExecutorAllocationManagerSuite
assert(maxNumExecutorsNeeded(manager) === 0)
schedule(manager)
// Verify executor is timeout but numExecutorsTarget is not recalculated
assert(numExecutorsTarget(manager) === 3)
// Schedule again to recalculate the numExecutorsTarget after executor is timeout
schedule(manager)
// Verify that current number of executors should be ramp down when executor is timeout
// Verify executor is timeout,numExecutorsTarget is recalculated
assert(numExecutorsTarget(manager) === 2)
}
@ -1148,6 +1143,25 @@ class ExecutorAllocationManagerSuite
verify(mockAllocationClient).killExecutors(Seq("executor-1"), false, false, false)
}
test("SPARK-26758 check executor target number after idle time out ") {
sc = createSparkContext(1, 5, 3)
val manager = sc.executorAllocationManager.get
val clock = new ManualClock(10000L)
manager.setClock(clock)
assert(numExecutorsTarget(manager) === 3)
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty)))
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-2", new ExecutorInfo("host1", 2, Map.empty)))
manager.listener.onExecutorAdded(SparkListenerExecutorAdded(
clock.getTimeMillis(), "executor-3", new ExecutorInfo("host1", 3, Map.empty)))
// make all the executors as idle, so that it will be killed
clock.advance(executorIdleTimeout * 1000)
schedule(manager)
// once the schedule is run target executor number should be 1
assert(numExecutorsTarget(manager) === 1)
}
private def createSparkContext(
minExecutors: Int = 1,
maxExecutors: Int = 5,