[SPARK-32878][CORE] Avoid scheduling TaskSetManager which has no pending tasks

### What changes were proposed in this pull request?

This PR proposes to avoid scheduling the (non-zombie) TaskSetManager which has no pending tasks.

### Why are the changes needed?

Currently, Spark always tries to schedule a (non-zombie) TaskSetManager even if it has no pending tasks. This causes notable problems for the barrier TaskSetManager: 1. `calculateAvailableSlots` can be called for multiple times for a launched barrier TaskSetManager; 2. user would see "Skip current round of resource offers for barrier stage" log message for
a launched barrier TaskSetManager all the time until the barrier TaskSetManager finishes, which is quite confused.

Besides, scheduling a TaskSetManager always involves many function invocations even if there're no pending tasks.

Therefore, I think we can skip those un-schedulable TasksetManagers to avoid the potential overhead.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass existing tests.

Closes #29750 from Ngone51/filter-out-unschedulable-stage.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
This commit is contained in:
yi.wu 2020-09-14 21:15:06 -07:00 committed by Dongjoon Hyun
parent 7a9b066c66
commit 0811666ab1
4 changed files with 8 additions and 2 deletions

View file

@ -59,6 +59,8 @@ private[spark] class Pool(
}
}
override def isSchedulable: Boolean = true
override def addSchedulable(schedulable: Schedulable): Unit = {
require(schedulable != null)
schedulableQueue.add(schedulable)
@ -105,7 +107,7 @@ private[spark] class Pool(
val sortedSchedulableQueue =
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue.filter(_.isSchedulable)
}
sortedTaskSetQueue
}

View file

@ -39,6 +39,7 @@ private[spark] trait Schedulable {
def stageId: Int
def name: String
def isSchedulable: Boolean
def addSchedulable(schedulable: Schedulable): Unit
def removeSchedulable(schedulable: Schedulable): Unit
def getSchedulableByName(name: String): Schedulable

View file

@ -535,7 +535,7 @@ private[spark] class TaskSchedulerImpl(
val availableResources = shuffledOffers.map(_.resources).toArray
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val resourceProfileIds = shuffledOffers.map(o => o.resourceProfileId).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue.filterNot(_.isZombie)
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))

View file

@ -951,6 +951,9 @@ private[spark] class TaskSetManager(
null
}
override def isSchedulable: Boolean = !isZombie &&
(pendingTasks.all.nonEmpty || pendingSpeculatableTasks.all.nonEmpty)
override def addSchedulable(schedulable: Schedulable): Unit = {}
override def removeSchedulable(schedulable: Schedulable): Unit = {}