[SPARK-30355][CORE] Unify isExecutorActive between CoarseGrainedSchedulerBackend and DriverEndpoint

### What changes were proposed in this pull request?

Unify `DriverEndpoint. executorIsAlive()` and `CoarseGrainedSchedulerBackend .isExecutorActive()`.

### Why are the changes needed?

`DriverEndPoint` has method `executorIsAlive()` to check wether an executor is alive/active, while `CoarseGrainedSchedulerBackend` has method `isExecutorActive()` to do the same work. But, `isExecutorActive()` seems forget to consider `executorsPendingLossReason`. Unify these two methods makes behavior be consistent between `DriverEndPoint` and `CoarseGrainedSchedulerBackend` and make code more easier to maintain.

### Does this PR introduce any user-facing change?

No.

### How was this patch tested?

Pass Jenkins.

Closes #27012 from Ngone51/unify-is-executor-alive.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
yi.wu 2019-12-27 14:41:45 +08:00 committed by Wenchen Fan
parent 9c046dc808
commit c35427f6b1

View file

@ -99,6 +99,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
@GuardedBy("CoarseGrainedSchedulerBackend.this")
private val executorsPendingToRemove = new HashMap[String, Boolean]
// Executors that have been lost, but for which we don't yet know the real exit reason.
private val executorsPendingLossReason = new HashSet[String]
// A map to store hostname with its possible task number running on it
@GuardedBy("CoarseGrainedSchedulerBackend.this")
protected var hostToLocalTaskCount: Map[String, Int] = Map.empty
@ -123,9 +126,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
override val rpcEnv: RpcEnv = CoarseGrainedSchedulerBackend.this.rpcEnv
// Executors that have been lost, but for which we don't yet know the real exit reason.
protected val executorsPendingLossReason = new HashSet[String]
protected val addressToExecutorId = new HashMap[RpcAddress, String]
// Spark configuration sent to executors. This is a lazy val so that subclasses of the
@ -285,7 +285,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val activeExecutors = executorDataMap.filterKeys(isExecutorActive)
val workOffers = activeExecutors.map {
case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores,
@ -314,7 +314,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
// Make sure no executor is killed while some task is launching on it
val taskDescs = withLock {
// Filter out executors under killing
if (executorIsAlive(executorId)) {
if (isExecutorActive(executorId)) {
val executorData = executorDataMap(executorId)
val workOffers = IndexedSeq(
new WorkerOffer(executorId, executorData.executorHost, executorData.freeCores,
@ -332,11 +332,6 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
}
private def executorIsAlive(executorId: String): Boolean = synchronized {
!executorsPendingToRemove.contains(executorId) &&
!executorsPendingLossReason.contains(executorId)
}
// Launch tasks returned by a set of resource offers
private def launchTasks(tasks: Seq[Seq[TaskDescription]]): Unit = {
for (task <- tasks.flatten) {
@ -415,7 +410,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
*/
protected def disableExecutor(executorId: String): Boolean = {
val shouldDisable = CoarseGrainedSchedulerBackend.this.synchronized {
if (executorIsAlive(executorId)) {
if (isExecutorActive(executorId)) {
executorsPendingLossReason += executorId
true
} else {
@ -560,7 +555,9 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
}
override def isExecutorActive(id: String): Boolean = synchronized {
executorDataMap.contains(id) && !executorsPendingToRemove.contains(id)
executorDataMap.contains(id) &&
!executorsPendingToRemove.contains(id) &&
!executorsPendingLossReason.contains(id)
}
override def maxNumConcurrentTasks(): Int = synchronized {