[SPARK-18886][CORE] Make Locality wait time measure resource under utilization due to delay scheduling

### What changes were proposed in this pull request?

[Delay scheduling](http://elmeleegy.com/khaled/papers/delay_scheduling.pdf) is an optimization that sacrifices fairness for data locality in order to improve cluster and workload throughput.

One useful definition of "delay" here is how much time has passed since the TaskSet was using its fair share of resources.

However it is impractical to calculate this delay, as it would require running simulations assuming no delay scheduling. Tasks would be run in different orders with different run times.

Currently the heuristic used to estimate this delay is the time since a task was last launched for a TaskSet. The problem is that it essentially does not account for resource utilization, potentially leaving the cluster heavily underutilized.

This PR modifies the heuristic in an attempt to move closer to the useful definition of delay above.
The newly proposed delay is the time since a TasksSet last launched a task **and** did not reject any resources due to delay scheduling when offered its "fair share".

See the last comments of #26696 for more discussion.

### Why are the changes needed?

cluster can become heavily underutilized as described in [SPARK-18886](https://issues.apache.org/jira/browse/SPARK-18886?jql=project%20%3D%20SPARK%20AND%20text%20~%20delay)

### How was this patch tested?

TaskSchedulerImplSuite

cloud-fan
tgravescs
squito

Closes #27207 from bmarcott/nmarcott-fulfill-slots-2.

Authored-by: Nicholas Marcott <481161+bmarcott@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Nicholas Marcott 2020-04-09 11:00:29 +00:00 committed by Wenchen Fan
parent c279e6b091
commit 8b4862953a
10 changed files with 506 additions and 154 deletions

View file

@ -544,6 +544,16 @@ package object config {
.version("1.2.0") .version("1.2.0")
.fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT) .fallbackConf(DYN_ALLOCATION_SCHEDULER_BACKLOG_TIMEOUT)
private[spark] val LEGACY_LOCALITY_WAIT_RESET =
ConfigBuilder("spark.locality.wait.legacyResetOnTaskLaunch")
.doc("Whether to use the legacy behavior of locality wait, which resets the delay timer " +
"anytime a task is scheduled. See Delay Scheduling section of TaskSchedulerImpl's class " +
"documentation for more details.")
.internal()
.version("3.0.0")
.booleanConf
.createWithDefault(false)
private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait") private[spark] val LOCALITY_WAIT = ConfigBuilder("spark.locality.wait")
.version("0.5.0") .version("0.5.0")
.timeConf(TimeUnit.MILLISECONDS) .timeConf(TimeUnit.MILLISECONDS)

View file

@ -36,7 +36,7 @@ import org.apache.spark.rpc.RpcEndpoint
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.TaskLocality.TaskLocality import org.apache.spark.scheduler.TaskLocality.TaskLocality
import org.apache.spark.storage.BlockManagerId import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils} import org.apache.spark.util.{AccumulatorV2, Clock, SystemClock, ThreadUtils, Utils}
/** /**
* Schedules tasks for multiple types of clusters by acting through a SchedulerBackend. * Schedules tasks for multiple types of clusters by acting through a SchedulerBackend.
@ -57,11 +57,24 @@ import org.apache.spark.util.{AccumulatorV2, SystemClock, ThreadUtils, Utils}
* * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay * * Periodic revival of all offers from the CoarseGrainedSchedulerBackend, to accommodate delay
* scheduling * scheduling
* * task-result-getter threads * * task-result-getter threads
*
* Delay Scheduling:
* Delay scheduling is an optimization that sacrifices job fairness for data locality in order to
* improve cluster and workload throughput. One useful definition of "delay" is how much time
* has passed since the TaskSet was using its fair share of resources. Since it is impractical to
* calculate this delay without a full simulation, the heuristic used is the time since the
* TaskSetManager last launched a task and has not rejected any resources due to delay scheduling
* since it was last offered its "fair share". A "fair share" offer is when [[resourceOffers]]'s
* parameter "isAllFreeResources" is set to true. A "delay scheduling reject" is when a resource
* is not utilized despite there being pending tasks (implemented inside [[TaskSetManager]]).
* The legacy heuristic only measured the time since the [[TaskSetManager]] last launched a task,
* and can be re-enabled by setting spark.locality.wait.legacyResetOnTaskLaunch to true.
*/ */
private[spark] class TaskSchedulerImpl( private[spark] class TaskSchedulerImpl(
val sc: SparkContext, val sc: SparkContext,
val maxTaskFailures: Int, val maxTaskFailures: Int,
isLocal: Boolean = false) isLocal: Boolean = false,
clock: Clock = new SystemClock)
extends TaskScheduler with Logging { extends TaskScheduler with Logging {
import TaskSchedulerImpl._ import TaskSchedulerImpl._
@ -97,6 +110,11 @@ private[spark] class TaskSchedulerImpl(
// on this class. Protected by `this` // on this class. Protected by `this`
private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]] private val taskSetsByStageIdAndAttempt = new HashMap[Int, HashMap[Int, TaskSetManager]]
// keyed by taskset
// value is true if the task set's locality wait timer was reset on the last resource offer
private val resetOnPreviousOffer = new mutable.HashMap[TaskSet, Boolean]()
private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET)
// Protected by `this` // Protected by `this`
private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager] private[scheduler] val taskIdToTaskSetManager = new ConcurrentHashMap[Long, TaskSetManager]
// Protected by `this` // Protected by `this`
@ -125,7 +143,6 @@ private[spark] class TaskSchedulerImpl(
protected val executorIdToHost = new HashMap[String, String] protected val executorIdToHost = new HashMap[String, String]
private val abortTimer = new Timer(true) private val abortTimer = new Timer(true)
private val clock = new SystemClock
// Exposed for testing // Exposed for testing
val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long] val unschedulableTaskSetToExpiryTime = new HashMap[TaskSetManager, Long]
@ -319,11 +336,27 @@ private[spark] class TaskSchedulerImpl(
taskSetsByStageIdAndAttempt -= manager.taskSet.stageId taskSetsByStageIdAndAttempt -= manager.taskSet.stageId
} }
} }
resetOnPreviousOffer -= manager.taskSet
manager.parent.removeSchedulable(manager) manager.parent.removeSchedulable(manager)
logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" + logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +
s" ${manager.parent.name}") s" ${manager.parent.name}")
} }
/**
* Offers resources to a single [[TaskSetManager]] at a given max allowed [[TaskLocality]].
*
* @param taskSet task set manager to offer resources to
* @param maxLocality max locality to allow when scheduling
* @param shuffledOffers shuffled resource offers to use for scheduling,
* remaining resources are tracked by below fields as tasks are scheduled
* @param availableCpus remaining cpus per offer,
* value at index 'i' corresponds to shuffledOffers[i]
* @param availableResources remaining resources per offer,
* value at index 'i' corresponds to shuffledOffers[i]
* @param tasks tasks scheduled per offer, value at index 'i' corresponds to shuffledOffers[i]
* @param addressesWithDescs tasks scheduler per host:port, used for barrier tasks
* @return tuple of (had delay schedule rejects?, option of min locality of launched task)
*/
private def resourceOfferSingleTaskSet( private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager, taskSet: TaskSetManager,
maxLocality: TaskLocality, maxLocality: TaskLocality,
@ -331,8 +364,10 @@ private[spark] class TaskSchedulerImpl(
availableCpus: Array[Int], availableCpus: Array[Int],
availableResources: Array[Map[String, Buffer[String]]], availableResources: Array[Map[String, Buffer[String]]],
tasks: IndexedSeq[ArrayBuffer[TaskDescription]], tasks: IndexedSeq[ArrayBuffer[TaskDescription]],
addressesWithDescs: ArrayBuffer[(String, TaskDescription)]) : Boolean = { addressesWithDescs: ArrayBuffer[(String, TaskDescription)])
var launchedTask = false : (Boolean, Option[TaskLocality]) = {
var noDelayScheduleRejects = true
var minLaunchedLocality: Option[TaskLocality] = None
// nodes and executors that are blacklisted for the entire application have already been // nodes and executors that are blacklisted for the entire application have already been
// filtered out by this point // filtered out by this point
for (i <- 0 until shuffledOffers.size) { for (i <- 0 until shuffledOffers.size) {
@ -348,11 +383,14 @@ private[spark] class TaskSchedulerImpl(
try { try {
val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID) val prof = sc.resourceProfileManager.resourceProfileFromId(taskSetRpID)
val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf) val taskCpus = ResourceProfile.getTaskCpusOrDefaultForProfile(prof, conf)
val taskDescOption = taskSet.resourceOffer(execId, host, maxLocality, val (taskDescOption, didReject) =
taskResAssignments) taskSet.resourceOffer(execId, host, maxLocality, taskResAssignments)
noDelayScheduleRejects &= !didReject
for (task <- taskDescOption) { for (task <- taskDescOption) {
tasks(i) += task tasks(i) += task
val tid = task.taskId val tid = task.taskId
val locality = taskSet.taskInfos(task.taskId).taskLocality
minLaunchedLocality = minTaskLocality(minLaunchedLocality, Some(locality))
taskIdToTaskSetManager.put(tid, taskSet) taskIdToTaskSetManager.put(tid, taskSet)
taskIdToExecutorId(tid) = execId taskIdToExecutorId(tid) = execId
executorIdToRunningTaskIds(execId).add(tid) executorIdToRunningTaskIds(execId).add(tid)
@ -372,19 +410,18 @@ private[spark] class TaskSchedulerImpl(
// The executor address is expected to be non empty. // The executor address is expected to be non empty.
addressesWithDescs += (shuffledOffers(i).address.get -> task) addressesWithDescs += (shuffledOffers(i).address.get -> task)
} }
launchedTask = true
} }
} catch { } catch {
case e: TaskNotSerializableException => case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable") logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other // Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted. // task sets to be submitted.
return launchedTask return (noDelayScheduleRejects, minLaunchedLocality)
} }
} }
} }
} }
launchedTask (noDelayScheduleRejects, minLaunchedLocality)
} }
/** /**
@ -466,12 +503,28 @@ private[spark] class TaskSchedulerImpl(
}.sum }.sum
} }
private def minTaskLocality(
l1: Option[TaskLocality],
l2: Option[TaskLocality]) : Option[TaskLocality] = {
if (l1.isEmpty) {
l2
} else if (l2.isEmpty) {
l1
} else if (l1.get < l2.get) {
l1
} else {
l2
}
}
/** /**
* Called by cluster manager to offer resources on slaves. We respond by asking our active task * Called by cluster manager to offer resources on slaves. We respond by asking our active task
* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so * sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so
* that tasks are balanced across the cluster. * that tasks are balanced across the cluster.
*/ */
def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { def resourceOffers(
offers: IndexedSeq[WorkerOffer],
isAllFreeResources: Boolean = true): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname // Mark each slave as alive and remember its hostname
// Also track if new executor is added // Also track if new executor is added
var newExecAvail = false var newExecAvail = false
@ -544,18 +597,34 @@ private[spark] class TaskSchedulerImpl(
s"number of available slots is $numBarrierSlotsAvailable.") s"number of available slots is $numBarrierSlotsAvailable.")
} else { } else {
var launchedAnyTask = false var launchedAnyTask = false
var noDelaySchedulingRejects = true
var globalMinLocality: Option[TaskLocality] = None
// Record all the executor IDs assigned barrier tasks on. // Record all the executor IDs assigned barrier tasks on.
val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]() val addressesWithDescs = ArrayBuffer[(String, TaskDescription)]()
for (currentMaxLocality <- taskSet.myLocalityLevels) { for (currentMaxLocality <- taskSet.myLocalityLevels) {
var launchedTaskAtCurrentMaxLocality = false var launchedTaskAtCurrentMaxLocality = false
do { do {
launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, val (noDelayScheduleReject, minLocality) = resourceOfferSingleTaskSet(
currentMaxLocality, shuffledOffers, availableCpus, taskSet, currentMaxLocality, shuffledOffers, availableCpus,
availableResources, tasks, addressesWithDescs) availableResources, tasks, addressesWithDescs)
launchedTaskAtCurrentMaxLocality = minLocality.isDefined
launchedAnyTask |= launchedTaskAtCurrentMaxLocality launchedAnyTask |= launchedTaskAtCurrentMaxLocality
noDelaySchedulingRejects &= noDelayScheduleReject
globalMinLocality = minTaskLocality(globalMinLocality, minLocality)
} while (launchedTaskAtCurrentMaxLocality) } while (launchedTaskAtCurrentMaxLocality)
} }
if (!legacyLocalityWaitReset) {
if (noDelaySchedulingRejects && launchedAnyTask) {
if (isAllFreeResources || resetOnPreviousOffer.getOrElse(taskSet.taskSet, true)) {
taskSet.resetDelayScheduleTimer(globalMinLocality)
resetOnPreviousOffer.update(taskSet.taskSet, true)
}
} else {
resetOnPreviousOffer.update(taskSet.taskSet, false)
}
}
if (!launchedAnyTask) { if (!launchedAnyTask) {
taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex => taskSet.getCompletelyBlacklistedTaskIfAny(hostToExecutors).foreach { taskIndex =>
// If the taskSet is unschedulable we try to find an existing idle blacklisted // If the taskSet is unschedulable we try to find an existing idle blacklisted

View file

@ -221,10 +221,11 @@ private[spark] class TaskSetManager(
private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait) private[scheduler] var localityWaits = myLocalityLevels.map(getLocalityWait)
// Delay scheduling variables: we keep track of our current locality level and the time we // Delay scheduling variables: we keep track of our current locality level and the time we
// last launched a task at that level, and move up a level when localityWaits[curLevel] expires. // last reset the locality wait timer, and move up a level when localityWaits[curLevel] expires.
// We then move down if we manage to launch a "more local" task. // We then move down if we manage to launch a "more local" task when resetting the timer
private val legacyLocalityWaitReset = conf.get(LEGACY_LOCALITY_WAIT_RESET)
private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels private var currentLocalityIndex = 0 // Index of our current locality level in validLocalityLevels
private var lastLaunchTime = clock.getTimeMillis() // Time we last launched a task at this level private var lastLocalityWaitResetTime = clock.getTimeMillis() // Time we last reset locality wait
override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null override def schedulableQueue: ConcurrentLinkedQueue[Schedulable] = null
@ -386,6 +387,14 @@ private[spark] class TaskSetManager(
None None
} }
private[scheduler] def resetDelayScheduleTimer(
minLocality: Option[TaskLocality.TaskLocality]): Unit = {
lastLocalityWaitResetTime = clock.getTimeMillis()
for (locality <- minLocality) {
currentLocalityIndex = getLocalityIndex(locality)
}
}
/** /**
* Respond to an offer of a single executor from the scheduler by finding a task * Respond to an offer of a single executor from the scheduler by finding a task
* *
@ -396,6 +405,9 @@ private[spark] class TaskSetManager(
* @param execId the executor Id of the offered resource * @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource * @param host the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at * @param maxLocality the maximum locality we want to schedule the tasks at
*
* @return Tuple containing:
* (TaskDescription of launched task if any, rejected resource due to delay scheduling?)
*/ */
@throws[TaskNotSerializableException] @throws[TaskNotSerializableException]
def resourceOffer( def resourceOffer(
@ -403,7 +415,7 @@ private[spark] class TaskSetManager(
host: String, host: String,
maxLocality: TaskLocality.TaskLocality, maxLocality: TaskLocality.TaskLocality,
taskResourceAssignments: Map[String, ResourceInformation] = Map.empty) taskResourceAssignments: Map[String, ResourceInformation] = Map.empty)
: Option[TaskDescription] = : (Option[TaskDescription], Boolean) =
{ {
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist => val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) || blacklist.isNodeBlacklistedForTaskSet(host) ||
@ -422,7 +434,9 @@ private[spark] class TaskSetManager(
} }
} }
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) => val taskDescription =
dequeueTask(execId, host, allowedLocality)
.map { case (index, taskLocality, speculative) =>
// Found a task; do some bookkeeping and return a task description // Found a task; do some bookkeeping and return a task description
val task = tasks(index) val task = tasks(index)
val taskId = sched.newTaskId() val taskId = sched.newTaskId()
@ -433,11 +447,8 @@ private[spark] class TaskSetManager(
execId, host, taskLocality, speculative) execId, host, taskLocality, speculative)
taskInfos(taskId) = info taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index) taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling if (legacyLocalityWaitReset && maxLocality != TaskLocality.NO_PREF) {
// NO_PREF will not affect the variables related to delay scheduling resetDelayScheduleTimer(Some(taskLocality))
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
} }
// Serialize and return the task // Serialize and return the task
val serializedTask: ByteBuffer = try { val serializedTask: ByteBuffer = try {
@ -482,8 +493,14 @@ private[spark] class TaskSetManager(
taskResourceAssignments, taskResourceAssignments,
serializedTask) serializedTask)
} }
val hasPendingTasks = pendingTasks.all.nonEmpty || pendingSpeculatableTasks.all.nonEmpty
val hasScheduleDelayReject =
taskDescription.isEmpty &&
maxLocality == TaskLocality.ANY &&
hasPendingTasks
(taskDescription, hasScheduleDelayReject)
} else { } else {
None (None, false)
} }
} }
@ -547,14 +564,14 @@ private[spark] class TaskSetManager(
// This is a performance optimization: if there are no more tasks that can // This is a performance optimization: if there are no more tasks that can
// be scheduled at a particular locality level, there is no point in waiting // be scheduled at a particular locality level, there is no point in waiting
// for the locality wait timeout (SPARK-4939). // for the locality wait timeout (SPARK-4939).
lastLaunchTime = curTime lastLocalityWaitResetTime = curTime
logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " + logDebug(s"No tasks for locality level ${myLocalityLevels(currentLocalityIndex)}, " +
s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}") s"so moving to locality level ${myLocalityLevels(currentLocalityIndex + 1)}")
currentLocalityIndex += 1 currentLocalityIndex += 1
} else if (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex)) { } else if (curTime - lastLocalityWaitResetTime >= localityWaits(currentLocalityIndex)) {
// Jump to the next locality level, and reset lastLaunchTime so that the next locality // Jump to the next locality level, and reset lastLocalityWaitResetTime so that the next
// wait timer doesn't immediately expire // locality wait timer doesn't immediately expire
lastLaunchTime += localityWaits(currentLocalityIndex) lastLocalityWaitResetTime += localityWaits(currentLocalityIndex)
logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " + logDebug(s"Moving to ${myLocalityLevels(currentLocalityIndex + 1)} after waiting for " +
s"${localityWaits(currentLocalityIndex)}ms") s"${localityWaits(currentLocalityIndex)}ms")
currentLocalityIndex += 1 currentLocalityIndex += 1

View file

@ -303,7 +303,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
(rName, rInfo.availableAddrs.toBuffer) (rName, rInfo.availableAddrs.toBuffer)
}, executorData.resourceProfileId) }, executorData.resourceProfileId)
}.toIndexedSeq }.toIndexedSeq
scheduler.resourceOffers(workOffers) scheduler.resourceOffers(workOffers, true)
} }
if (taskDescs.nonEmpty) { if (taskDescs.nonEmpty) {
launchTasks(taskDescs) launchTasks(taskDescs)
@ -331,7 +331,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp
executorData.resourcesInfo.map { case (rName, rInfo) => executorData.resourcesInfo.map { case (rName, rInfo) =>
(rName, rInfo.availableAddrs.toBuffer) (rName, rInfo.availableAddrs.toBuffer)
}, executorData.resourceProfileId)) }, executorData.resourceProfileId))
scheduler.resourceOffers(workOffers) scheduler.resourceOffers(workOffers, false)
} else { } else {
Seq.empty Seq.empty
} }

View file

@ -88,7 +88,7 @@ private[spark] class LocalEndpoint(
// local mode doesn't support extra resources like GPUs right now // local mode doesn't support extra resources like GPUs right now
val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores, val offers = IndexedSeq(new WorkerOffer(localExecutorId, localExecutorHostname, freeCores,
Some(rpcEnv.address.hostPort))) Some(rpcEnv.address.hostPort)))
for (task <- scheduler.resourceOffers(offers).flatten) { for (task <- scheduler.resourceOffers(offers, true).flatten) {
freeCores -= scheduler.CPUS_PER_TASK freeCores -= scheduler.CPUS_PER_TASK
executor.launchTask(executorBackend, task) executor.launchTask(executorBackend, task)
} }

View file

@ -510,7 +510,7 @@ class StandaloneDynamicAllocationSuite
val taskScheduler = mock(classOf[TaskSchedulerImpl]) val taskScheduler = mock(classOf[TaskSchedulerImpl])
when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host")) when(taskScheduler.nodeBlacklist()).thenReturn(Set("blacklisted-host"))
when(taskScheduler.resourceOffers(any())).thenReturn(Nil) when(taskScheduler.resourceOffers(any(), any[Boolean])).thenReturn(Nil)
when(taskScheduler.sc).thenReturn(sc) when(taskScheduler.sc).thenReturn(sc)
val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, securityManager) val rpcEnv = RpcEnv.create("test-rpcenv", "localhost", 0, conf, securityManager)

View file

@ -248,7 +248,7 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
"t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], "t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), taskResources, bytebuffer))) new Properties(), taskResources, bytebuffer)))
val ts = backend.getTaskSchedulerImpl() val ts = backend.getTaskSchedulerImpl()
when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]])).thenReturn(taskDescs) when(ts.resourceOffers(any[IndexedSeq[WorkerOffer]], any[Boolean])).thenReturn(taskDescs)
backend.driverEndpoint.send(ReviveOffers) backend.driverEndpoint.send(ReviveOffers)

View file

@ -196,6 +196,240 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
assert(!failedTaskSet) assert(!failedTaskSet)
} }
private def setupTaskSchedulerForLocalityTests(clock: ManualClock): TaskSchedulerImpl = {
val conf = new SparkConf()
sc = new SparkContext("local", "TaskSchedulerImplSuite", conf)
val taskScheduler = new TaskSchedulerImpl(sc,
sc.conf.get(config.TASK_MAX_FAILURES),
clock = clock) {
override def createTaskSetManager(taskSet: TaskSet, maxTaskFailures: Int): TaskSetManager = {
new TaskSetManager(this, taskSet, maxTaskFailures, blacklistTrackerOpt, clock)
}
override def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
// Don't shuffle the offers around for this test. Instead, we'll just pass in all
// the permutations we care about directly.
offers
}
}
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo): Unit = {}
override def executorAdded(execId: String, host: String): Unit = {}
}
taskScheduler.initialize(new FakeSchedulerBackend)
val taskSet = FakeTask.createTaskSet(8, 1, 1,
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host1", "exec1")),
Seq(TaskLocation("host1", "exec1"))
)
// Offer resources first so that when the taskset is submitted it can initialize
// with proper locality level. Otherwise, ANY would be the only locality level.
// See TaskSetManager.computeValidLocalityLevels()
// This begins the task set as PROCESS_LOCAL locality level
taskScheduler.resourceOffers(IndexedSeq(WorkerOffer("exec1", "host1", 1)))
taskScheduler.submitTasks(taskSet)
taskScheduler
}
test("SPARK-18886 - partial offers (isAllFreeResources = false) reset timer before " +
"any resources have been rejected") {
val clock = new ManualClock()
// All tasks created here are local to exec1, host1.
// Locality level starts at PROCESS_LOCAL.
val taskScheduler = setupTaskSchedulerForLocalityTests(clock)
// Locality levels increase at 3000 ms.
val advanceAmount = 3000
// Advancing clock increases locality level to NODE_LOCAL.
clock.advance(advanceAmount)
// If there hasn't yet been any full resource offers,
// partial resource (isAllFreeResources = false) offers reset delay scheduling
// if this and previous offers were accepted.
// This line resets the timer and locality level is reset to PROCESS_LOCAL.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
isAllFreeResources = false)
.flatten.length === 1)
// This NODE_LOCAL task should not be accepted.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
isAllFreeResources = false)
.flatten.isEmpty)
}
test("SPARK-18886 - delay scheduling timer is reset when it accepts all resources offered when " +
"isAllFreeResources = true") {
val clock = new ManualClock()
// All tasks created here are local to exec1, host1.
// Locality level starts at PROCESS_LOCAL.
val taskScheduler = setupTaskSchedulerForLocalityTests(clock)
// Locality levels increase at 3000 ms.
val advanceAmount = 3000
// Advancing clock increases locality level to NODE_LOCAL.
clock.advance(advanceAmount)
// If there are no rejects on an all resource offer, delay scheduling is reset.
// This line resets the timer and locality level is reset to PROCESS_LOCAL.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
isAllFreeResources = true)
.flatten.length === 1)
// This NODE_LOCAL task should not be accepted.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
isAllFreeResources = false)
.flatten.isEmpty)
}
test("SPARK-18886 - partial resource offers (isAllFreeResources = false) reset " +
"time if last full resource offer (isAllResources = true) was accepted as well as any " +
"following partial resource offers") {
val clock = new ManualClock()
// All tasks created here are local to exec1, host1.
// Locality level starts at PROCESS_LOCAL.
val taskScheduler = setupTaskSchedulerForLocalityTests(clock)
// Locality levels increase at 3000 ms.
val advanceAmount = 3000
// PROCESS_LOCAL full resource offer is accepted.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
isAllFreeResources = true)
.flatten.length === 1)
// Advancing clock increases locality level to NODE_LOCAL.
clock.advance(advanceAmount)
// PROCESS_LOCAL partial resource is accepted.
// Since all offers have been accepted since the last full resource offer
// (this one and the previous one), delay scheduling is reset.
// This line resets the timer and locality level is reset to PROCESS_LOCAL.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
isAllFreeResources = false)
.flatten.length === 1)
// Advancing clock increases locality level to NODE_LOCAL
clock.advance(advanceAmount)
// PROCESS_LOCAL partial resource is accepted
// Since all offers have been accepted since the last full resource offer
// (one previous full offer, one previous partial offer, and this partial offer),
// delay scheduling is reset.
// This line resets the timer and locality level is reset to PROCESS_LOCAL.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
isAllFreeResources = false)
.flatten.length === 1)
// This NODE_LOCAL task should not be accepted.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
isAllFreeResources = false)
.flatten.isEmpty)
}
// This tests two cases
// 1. partial resource offer doesn't reset timer after full resource offer had rejected resources
// 2. partial resource offer doesn't reset timer after partial resource offer
// had rejected resources
test("SPARK-18886 - partial resource offers (isAllFreeResources = false) do not reset " +
"time if any offer was rejected since last full offer was fully accepted") {
val clock = new ManualClock()
// All tasks created here are local to exec1, host1.
// Locality level starts at PROCESS_LOCAL.
val taskScheduler = setupTaskSchedulerForLocalityTests(clock)
// Locality levels increase at 3000 ms.
val advanceAmount = 3000
// case 1 from test description above.
// NODE_LOCAL full resource offer is rejected, so delay scheduling is not reset.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
isAllFreeResources = true)
.flatten.isEmpty)
// Advancing clock increases locality level to NODE_LOCAL
clock.advance(advanceAmount)
// PROCESS_LOCAL partial resource is accepted,
// but because preceding full resource offer was rejected, delay scheduling is not reset.
// Locality level remains at NODE_LOCAL.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
isAllFreeResources = false)
.flatten.length === 1)
// Even though we launched a local task above, we still utilize non-local exec2.
// This is the behavior change to fix SPARK-18886.
// Locality level remains NODE_LOCAL after this clock advance.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
isAllFreeResources = false)
.flatten.length === 1)
// case 2 from test description above.
// PROCESS_LOCAL full resource offer is accepted, resetting delay scheduling.
// This line resets the timer and locality level is reset to PROCESS_LOCAL.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
isAllFreeResources = true)
.flatten.length === 1)
// Partial resource offer: NODE_LOCAL exec 2 is rejected, PROCESS_LOCAL exec1 is accepted.
// Since there were rejects, delay scheduling is not reset, and follow up partial offers
// will not reset delay scheduling, even if they are accepted.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1), WorkerOffer("exec1", "host1", 1)),
isAllFreeResources = false)
.flatten.size === 1)
// Advancing clock increases locality level to NODE_LOCAL
clock.advance(advanceAmount)
// PROCESS_LOCAL partial resource is accepted, but does not reset delay scheduling
// as described above.
// Locality level remains at NODE_LOCAL.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec1", "host1", 1)),
isAllFreeResources = false)
.flatten.length === 1)
// NODE_LOCAL partial resource offer is accepted,
// verifying locality level was not reset to PROCESS_LOCAL by above offer.
assert(taskScheduler
.resourceOffers(
IndexedSeq(WorkerOffer("exec2", "host1", 1)),
isAllFreeResources = false)
.flatten.length === 1)
}
test("Scheduler does not crash when tasks are not serializable") { test("Scheduler does not crash when tasks are not serializable") {
val taskCpus = 2 val taskCpus = 2
val taskScheduler = setupSchedulerWithMaster( val taskScheduler = setupSchedulerWithMaster(
@ -901,18 +1135,17 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
} }
// Here is the main check of this test -- we have the same offers again, and we schedule it // Here is the main check of this test -- we have the same offers again, and we schedule it
// successfully. Because the scheduler first tries to schedule with locality in mind, at first // successfully. Because the scheduler tries to schedule with locality in mind, at first
// it won't schedule anything on executor1. But despite that, we don't abort the job. Then the // it won't schedule anything on executor1. But despite that, we don't abort the job.
// scheduler tries for ANY locality, and successfully schedules tasks on executor1.
val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten val secondTaskAttempts = taskScheduler.resourceOffers(offers).flatten
assert(secondTaskAttempts.size == 2) assert(secondTaskAttempts.isEmpty)
secondTaskAttempts.foreach { taskAttempt => assert("executor1" === taskAttempt.executorId) }
assert(!failedTaskSet) assert(!failedTaskSet)
} }
test("SPARK-16106 locality levels updated if executor added to existing host") { test("SPARK-16106 locality levels updated if executor added to existing host") {
val taskScheduler = setupScheduler() val taskScheduler = setupScheduler()
taskScheduler.resourceOffers(IndexedSeq(new WorkerOffer("executor0", "host0", 1)))
taskScheduler.submitTasks(FakeTask.createTaskSet(2, stageId = 0, stageAttemptId = 0, taskScheduler.submitTasks(FakeTask.createTaskSet(2, stageId = 0, stageAttemptId = 0,
(0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _* (0 until 2).map { _ => Seq(TaskLocation("host0", "executor2")) }: _*
)) ))

View file

@ -225,7 +225,7 @@ class TaskSetManagerSuite
// Offer a host with NO_PREF as the constraint, // Offer a host with NO_PREF as the constraint,
// we should get a nopref task immediately since that's what we only have // we should get a nopref task immediately since that's what we only have
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
clock.advance(1) clock.advance(1)
@ -246,7 +246,7 @@ class TaskSetManagerSuite
// First three offers should all find tasks // First three offers should all find tasks
for (i <- 0 until 3) { for (i <- 0 until 3) {
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
val task = taskOption.get val task = taskOption.get
assert(task.executorId === "exec1") assert(task.executorId === "exec1")
@ -254,7 +254,7 @@ class TaskSetManagerSuite
assert(sched.startedTasks.toSet === Set(0, 1, 2)) assert(sched.startedTasks.toSet === Set(0, 1, 2))
// Re-offer the host -- now we should get no more tasks // Re-offer the host -- now we should get no more tasks
assert(manager.resourceOffer("exec1", "host1", NO_PREF) === None) assert(manager.resourceOffer("exec1", "host1", NO_PREF)._1 === None)
// Finish the first two tasks // Finish the first two tasks
manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdatesByTask(0))) manager.handleSuccessfulTask(0, createTaskResult(0, accumUpdatesByTask(0)))
@ -277,12 +277,12 @@ class TaskSetManagerSuite
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// An executor that is not NODE_LOCAL should be rejected. // An executor that is not NODE_LOCAL should be rejected.
assert(manager.resourceOffer("execC", "host2", ANY) === None) assert(manager.resourceOffer("execC", "host2", ANY)._1 === None)
// Because there are no alive PROCESS_LOCAL executors, the base locality level should be // Because there are no alive PROCESS_LOCAL executors, the base locality level should be
// NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before // NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before
// any of the locality wait timers expire. // any of the locality wait timers expire.
assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0) assert(manager.resourceOffer("execA", "host1", ANY)._1.get.index === 0)
} }
test("basic delay scheduling") { test("basic delay scheduling") {
@ -297,22 +297,22 @@ class TaskSetManagerSuite
val clock = new ManualClock val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1, exec1: first task should be chosen // First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) == None) assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1 === None)
clock.advance(LOCALITY_WAIT_MS) clock.advance(LOCALITY_WAIT_MS)
// Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 3) should // Offer host1, exec1 again, at NODE_LOCAL level: the node local (task 3) should
// get chosen before the noPref task // get chosen before the noPref task
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2) assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL)._1.get.index == 2)
// Offer host2, exec2, at NODE_LOCAL level: we should choose task 2 // Offer host2, exec2, at NODE_LOCAL level: we should choose task 2
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).get.index == 1) assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL)._1.get.index == 1)
// Offer host2, exec2 again, at NODE_LOCAL level: we should get noPref task // Offer host2, exec2 again, at NODE_LOCAL level: we should get noPref task
// after failing to find a node_Local task // after failing to find a node_Local task
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL) == None) assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL)._1 === None)
clock.advance(LOCALITY_WAIT_MS) clock.advance(LOCALITY_WAIT_MS)
assert(manager.resourceOffer("exec2", "host2", NO_PREF).get.index == 3) assert(manager.resourceOffer("exec2", "host2", NO_PREF)._1.get.index == 3)
} }
test("we do not need to delay scheduling when we only have noPref tasks in the queue") { test("we do not need to delay scheduling when we only have noPref tasks in the queue") {
@ -326,10 +326,10 @@ class TaskSetManagerSuite
val clock = new ManualClock val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1, exec1: first task should be chosen // First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).get.index === 0) assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1.get.index === 0)
assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL).get.index === 1) assert(manager.resourceOffer("exec3", "host2", PROCESS_LOCAL)._1.get.index === 1)
assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL) == None) assert(manager.resourceOffer("exec3", "host2", NODE_LOCAL)._1 === None)
assert(manager.resourceOffer("exec3", "host2", NO_PREF).get.index === 2) assert(manager.resourceOffer("exec3", "host2", NO_PREF)._1.get.index === 2)
} }
test("delay scheduling with fallback") { test("delay scheduling with fallback") {
@ -343,33 +343,55 @@ class TaskSetManagerSuite
Seq(TaskLocation("host3")), Seq(TaskLocation("host3")),
Seq(TaskLocation("host2")) Seq(TaskLocation("host2"))
) )
sc.conf.set(config.LEGACY_LOCALITY_WAIT_RESET, true)
val clock = new ManualClock val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1: first task should be chosen // First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0)
// Offer host1 again: nothing should get chosen // Offer host1 again: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None) assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None)
clock.advance(LOCALITY_WAIT_MS) clock.advance(LOCALITY_WAIT_MS)
// Offer host1 again: second task (on host2) should get chosen // Offer host1 again: second task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 1)
// Offer host1 again: third task (on host2) should get chosen // Offer host1 again: third task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 2)
// Offer host2: fifth task (also on host2) should get chosen // Offer host2: fifth task (also on host2) should get chosen
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 4) assert(manager.resourceOffer("exec2", "host2", ANY)._1.get.index === 4)
// Now that we've launched a local task, we should no longer launch the task for host3 // Now that we've launched a local task, we should no longer launch the task for host3
assert(manager.resourceOffer("exec2", "host2", ANY) === None) assert(manager.resourceOffer("exec2", "host2", ANY)._1 === None)
clock.advance(LOCALITY_WAIT_MS) clock.advance(LOCALITY_WAIT_MS)
// offers not accepted due to task set zombies are not delay schedule rejects
manager.isZombie = true
val (taskDesciption, delayReject) = manager.resourceOffer("exec2", "host2", ANY)
assert(taskDesciption.isEmpty)
assert(delayReject === false)
manager.isZombie = false
// offers not accepted due to blacklisting are not delay schedule rejects
val tsmSpy = spy(manager)
val blacklist = mock(classOf[TaskSetBlacklist])
when(tsmSpy.taskSetBlacklistHelperOpt).thenReturn(Some(blacklist))
when(blacklist.isNodeBlacklistedForTaskSet(any())).thenReturn(true)
val (blacklistTask, blackListReject) = tsmSpy.resourceOffer("exec2", "host2", ANY)
assert(blacklistTask.isEmpty)
assert(blackListReject === false)
// After another delay, we can go ahead and launch that task non-locally // After another delay, we can go ahead and launch that task non-locally
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3) assert(manager.resourceOffer("exec2", "host2", ANY)._1.get.index === 3)
// offers not accepted due to no pending tasks are not delay schedule rejects
val (noPendingTask, noPendingReject) = manager.resourceOffer("exec2", "host2", ANY)
assert(noPendingTask.isEmpty)
assert(noPendingReject === false)
} }
test("delay scheduling with failed hosts") { test("delay scheduling with failed hosts") {
@ -385,28 +407,28 @@ class TaskSetManagerSuite
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// First offer host1: first task should be chosen // First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0)
// After this, nothing should get chosen, because we have separated tasks with unavailable // After this, nothing should get chosen, because we have separated tasks with unavailable
// preference from the noPrefPendingTasks // preference from the noPrefPendingTasks
assert(manager.resourceOffer("exec1", "host1", ANY) === None) assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None)
// Now mark host2 as dead // Now mark host2 as dead
sched.removeExecutor("exec2") sched.removeExecutor("exec2")
manager.executorLost("exec2", "host2", SlaveLost()) manager.executorLost("exec2", "host2", SlaveLost())
// nothing should be chosen // nothing should be chosen
assert(manager.resourceOffer("exec1", "host1", ANY) === None) assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None)
clock.advance(LOCALITY_WAIT_MS * 2) clock.advance(LOCALITY_WAIT_MS * 2)
// task 1 and 2 would be scheduled as nonLocal task // task 1 and 2 would be scheduled as nonLocal task
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 1)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 2)
// all finished // all finished
assert(manager.resourceOffer("exec1", "host1", ANY) === None) assert(manager.resourceOffer("exec1", "host1", ANY)._1 === None)
assert(manager.resourceOffer("exec2", "host2", ANY) === None) assert(manager.resourceOffer("exec2", "host2", ANY)._1 === None)
} }
test("task result lost") { test("task result lost") {
@ -417,14 +439,14 @@ class TaskSetManagerSuite
clock.advance(1) clock.advance(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0)
// Tell it the task has finished but the result was lost. // Tell it the task has finished but the result was lost.
manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost) manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost)
assert(sched.endedTasks(0) === TaskResultLost) assert(sched.endedTasks(0) === TaskResultLost)
// Re-offer the host -- now we should get task 0 again. // Re-offer the host -- now we should get task 0 again.
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0)
} }
test("repeated failures lead to task set abortion") { test("repeated failures lead to task set abortion") {
@ -438,7 +460,7 @@ class TaskSetManagerSuite
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure. // after the last failure.
(1 to manager.maxTaskFailures).foreach { index => (1 to manager.maxTaskFailures).foreach { index =>
val offerResult = manager.resourceOffer("exec1", "host1", ANY) val offerResult = manager.resourceOffer("exec1", "host1", ANY)._1
assert(offerResult.isDefined, assert(offerResult.isDefined,
"Expect resource offer on iteration %s to return a task".format(index)) "Expect resource offer on iteration %s to return a task".format(index))
assert(offerResult.get.index === 0) assert(offerResult.get.index === 0)
@ -474,7 +496,7 @@ class TaskSetManagerSuite
val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock) val manager = new TaskSetManager(sched, taskSet, 4, blacklistTrackerOpt, clock)
{ {
val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1
assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0) assert(offerResult.get.index === 0)
@ -485,15 +507,15 @@ class TaskSetManagerSuite
assert(!sched.taskSetsFailed.contains(taskSet.id)) assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1 fails after failure 1 due to blacklist // Ensure scheduling on exec1 fails after failure 1 due to blacklist
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL).isEmpty) assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1.isEmpty)
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).isEmpty) assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL)._1.isEmpty)
assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL).isEmpty) assert(manager.resourceOffer("exec1", "host1", RACK_LOCAL)._1.isEmpty)
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
} }
// Run the task on exec1.1 - should work, and then fail it on exec1.1 // Run the task on exec1.1 - should work, and then fail it on exec1.1
{ {
val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL) val offerResult = manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)._1
assert(offerResult.isDefined, assert(offerResult.isDefined,
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult) "Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
@ -505,12 +527,12 @@ class TaskSetManagerSuite
assert(!sched.taskSetsFailed.contains(taskSet.id)) assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist // Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL).isEmpty) assert(manager.resourceOffer("exec1.1", "host1", NODE_LOCAL)._1.isEmpty)
} }
// Run the task on exec2 - should work, and then fail it on exec2 // Run the task on exec2 - should work, and then fail it on exec2
{ {
val offerResult = manager.resourceOffer("exec2", "host2", ANY) val offerResult = manager.resourceOffer("exec2", "host2", ANY)._1
assert(offerResult.isDefined, "Expect resource offer to return a task") assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0) assert(offerResult.get.index === 0)
@ -521,7 +543,7 @@ class TaskSetManagerSuite
assert(!sched.taskSetsFailed.contains(taskSet.id)) assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec2 fails after failure 3 due to blacklist // Ensure scheduling on exec2 fails after failure 3 due to blacklist
assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
} }
// Despite advancing beyond the time for expiring executors from within the blacklist, // Despite advancing beyond the time for expiring executors from within the blacklist,
@ -529,17 +551,17 @@ class TaskSetManagerSuite
clock.advance(rescheduleDelay) clock.advance(rescheduleDelay)
{ {
val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) val offerResult = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)._1
assert(offerResult.isEmpty) assert(offerResult.isEmpty)
} }
{ {
val offerResult = manager.resourceOffer("exec3", "host3", ANY) val offerResult = manager.resourceOffer("exec3", "host3", ANY)._1
assert(offerResult.isDefined) assert(offerResult.isDefined)
assert(offerResult.get.index === 0) assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec3") assert(offerResult.get.executorId === "exec3")
assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
// Cause exec3 to fail : failure 4 // Cause exec3 to fail : failure 4
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost) manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)
@ -598,14 +620,14 @@ class TaskSetManagerSuite
manager.executorAdded() manager.executorAdded()
sched.addExecutor("execC", "host2") sched.addExecutor("execC", "host2")
manager.executorAdded() manager.executorAdded()
assert(manager.resourceOffer("exec1", "host1", ANY).isDefined) assert(manager.resourceOffer("exec1", "host1", ANY)._1.isDefined)
sched.removeExecutor("execA") sched.removeExecutor("execA")
manager.executorLost( manager.executorLost(
"execA", "execA",
"host1", "host1",
ExecutorExited(143, false, "Terminated for reason unrelated to running tasks")) ExecutorExited(143, false, "Terminated for reason unrelated to running tasks"))
assert(!sched.taskSetsFailed.contains(taskSet.id)) assert(!sched.taskSetsFailed.contains(taskSet.id))
assert(manager.resourceOffer("execC", "host2", ANY).isDefined) assert(manager.resourceOffer("execC", "host2", ANY)._1.isDefined)
sched.removeExecutor("execC") sched.removeExecutor("execC")
manager.executorLost( manager.executorLost(
"execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks")) "execC", "host2", ExecutorExited(1, true, "Terminated due to issue with running tasks"))
@ -633,12 +655,12 @@ class TaskSetManagerSuite
clock.advance(LOCALITY_WAIT_MS * 3) clock.advance(LOCALITY_WAIT_MS * 3)
// Offer host3 // Offer host3
// No task is scheduled if we restrict locality to RACK_LOCAL // No task is scheduled if we restrict locality to RACK_LOCAL
assert(manager.resourceOffer("execC", "host3", RACK_LOCAL) === None) assert(manager.resourceOffer("execC", "host3", RACK_LOCAL)._1 === None)
// Task 0 can be scheduled with ANY // Task 0 can be scheduled with ANY
assert(manager.resourceOffer("execC", "host3", ANY).get.index === 0) assert(manager.resourceOffer("execC", "host3", ANY)._1.get.index === 0)
// Offer host2 // Offer host2
// Task 1 can be scheduled with RACK_LOCAL // Task 1 can be scheduled with RACK_LOCAL
assert(manager.resourceOffer("execB", "host2", RACK_LOCAL).get.index === 1) assert(manager.resourceOffer("execB", "host2", RACK_LOCAL)._1.get.index === 1)
} }
test("do not emit warning when serialized task is small") { test("do not emit warning when serialized task is small") {
@ -649,7 +671,7 @@ class TaskSetManagerSuite
assert(!manager.emittedTaskSizeWarning) assert(!manager.emittedTaskSizeWarning)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0)
assert(!manager.emittedTaskSizeWarning) assert(!manager.emittedTaskSizeWarning)
} }
@ -664,7 +686,7 @@ class TaskSetManagerSuite
assert(!manager.emittedTaskSizeWarning) assert(!manager.emittedTaskSizeWarning)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0) assert(manager.resourceOffer("exec1", "host1", ANY)._1.get.index === 0)
assert(manager.emittedTaskSizeWarning) assert(manager.emittedTaskSizeWarning)
} }
@ -752,13 +774,13 @@ class TaskSetManagerSuite
// Offer host1, which should be accepted as a PROCESS_LOCAL location // Offer host1, which should be accepted as a PROCESS_LOCAL location
// by the one task in the task set // by the one task in the task set
val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL).get val task1 = manager.resourceOffer("execA", "host1", TaskLocality.PROCESS_LOCAL)._1.get
// Mark the task as available for speculation, and then offer another resource, // Mark the task as available for speculation, and then offer another resource,
// which should be used to launch a speculative copy of the task. // which should be used to launch a speculative copy of the task.
manager.speculatableTasks += singleTask.partitionId manager.speculatableTasks += singleTask.partitionId
manager.addPendingTask(singleTask.partitionId, speculatable = true) manager.addPendingTask(singleTask.partitionId, speculatable = true)
val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY).get val task2 = manager.resourceOffer("execB", "host2", TaskLocality.ANY)._1.get
assert(manager.runningTasks === 2) assert(manager.runningTasks === 2)
assert(manager.isZombie === false) assert(manager.isZombie === false)
@ -844,7 +866,7 @@ class TaskSetManagerSuite
"exec1" -> "host1", "exec1" -> "host1",
"exec3" -> "host3", "exec3" -> "host3",
"exec2" -> "host2")) { "exec2" -> "host2")) {
val taskOption = manager.resourceOffer(exec, host, NO_PREF) val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
val task = taskOption.get val task = taskOption.get
assert(task.executorId === exec) assert(task.executorId === exec)
@ -870,7 +892,7 @@ class TaskSetManagerSuite
assert(sched.speculativeTasks.toSet === Set(2, 3)) assert(sched.speculativeTasks.toSet === Set(2, 3))
// Offer resource to start the speculative attempt for the running task 2.0 // Offer resource to start the speculative attempt for the running task 2.0
val taskOption = manager.resourceOffer("exec2", "host2", ANY) val taskOption = manager.resourceOffer("exec2", "host2", ANY)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
val task4 = taskOption.get val task4 = taskOption.get
assert(task4.index === 2) assert(task4.index === 2)
@ -899,20 +921,20 @@ class TaskSetManagerSuite
val clock = new ManualClock val clock = new ManualClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 0) assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL)._1.get.index === 0)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1 === None)
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index == 1) assert(manager.resourceOffer("execA", "host1", NO_PREF)._1.get.index == 1)
manager.speculatableTasks += 1 manager.speculatableTasks += 1
manager.addPendingTask(1, speculatable = true) manager.addPendingTask(1, speculatable = true)
clock.advance(LOCALITY_WAIT_MS) clock.advance(LOCALITY_WAIT_MS)
// schedule the nonPref task // schedule the nonPref task
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 2) assert(manager.resourceOffer("execA", "host1", NO_PREF)._1.get.index === 2)
// schedule the speculative task // schedule the speculative task
assert(manager.resourceOffer("execB", "host2", NO_PREF).get.index === 1) assert(manager.resourceOffer("execB", "host2", NO_PREF)._1.get.index === 1)
clock.advance(LOCALITY_WAIT_MS * 3) clock.advance(LOCALITY_WAIT_MS * 3)
// schedule non-local tasks // schedule non-local tasks
assert(manager.resourceOffer("execB", "host2", ANY).get.index === 3) assert(manager.resourceOffer("execB", "host2", ANY)._1.get.index === 3)
} }
test("node-local tasks should be scheduled right away " + test("node-local tasks should be scheduled right away " +
@ -929,13 +951,13 @@ class TaskSetManagerSuite
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// node-local tasks are scheduled without delay // node-local tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1.get.index === 0)
assert(manager.resourceOffer("execA", "host2", NODE_LOCAL).get.index === 1) assert(manager.resourceOffer("execA", "host2", NODE_LOCAL)._1.get.index === 1)
assert(manager.resourceOffer("execA", "host3", NODE_LOCAL).get.index === 3) assert(manager.resourceOffer("execA", "host3", NODE_LOCAL)._1.get.index === 3)
assert(manager.resourceOffer("execA", "host3", NODE_LOCAL) === None) assert(manager.resourceOffer("execA", "host3", NODE_LOCAL)._1 === None)
// schedule no-preference after node local ones // schedule no-preference after node local ones
assert(manager.resourceOffer("execA", "host3", NO_PREF).get.index === 2) assert(manager.resourceOffer("execA", "host3", NO_PREF)._1.get.index === 2)
} }
test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished") test("SPARK-4939: node-local tasks should be scheduled right after process-local tasks finished")
@ -951,13 +973,13 @@ class TaskSetManagerSuite
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// process-local tasks are scheduled first // process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 2) assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1.get.index === 2)
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 3) assert(manager.resourceOffer("execB", "host2", NODE_LOCAL)._1.get.index === 3)
// node-local tasks are scheduled without delay // node-local tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL).get.index === 0) assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1.get.index === 0)
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL).get.index === 1) assert(manager.resourceOffer("execB", "host2", NODE_LOCAL)._1.get.index === 1)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1 === None)
assert(manager.resourceOffer("execB", "host2", NODE_LOCAL) == None) assert(manager.resourceOffer("execB", "host2", NODE_LOCAL)._1 === None)
} }
test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") { test("SPARK-4939: no-pref tasks should be scheduled after process-local tasks finished") {
@ -971,13 +993,13 @@ class TaskSetManagerSuite
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = clock)
// process-local tasks are scheduled first // process-local tasks are scheduled first
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL).get.index === 1) assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL)._1.get.index === 1)
assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL).get.index === 2) assert(manager.resourceOffer("execB", "host2", PROCESS_LOCAL)._1.get.index === 2)
// no-pref tasks are scheduled without delay // no-pref tasks are scheduled without delay
assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL) == None) assert(manager.resourceOffer("execA", "host1", PROCESS_LOCAL)._1 === None)
assert(manager.resourceOffer("execA", "host1", NODE_LOCAL) == None) assert(manager.resourceOffer("execA", "host1", NODE_LOCAL)._1 === None)
assert(manager.resourceOffer("execA", "host1", NO_PREF).get.index === 0) assert(manager.resourceOffer("execA", "host1", NO_PREF)._1.get.index === 0)
assert(manager.resourceOffer("execA", "host1", ANY) == None) assert(manager.resourceOffer("execA", "host1", ANY)._1 === None)
} }
test("Ensure TaskSetManager is usable after addition of levels") { test("Ensure TaskSetManager is usable after addition of levels") {
@ -1061,7 +1083,7 @@ class TaskSetManagerSuite
"exec1" -> "host1", "exec1" -> "host1",
"exec2" -> "host2", "exec2" -> "host2",
"exec2" -> "host2")) { "exec2" -> "host2")) {
val taskOption = manager.resourceOffer(k, v, NO_PREF) val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
val task = taskOption.get val task = taskOption.get
assert(task.executorId === k) assert(task.executorId === k)
@ -1082,7 +1104,7 @@ class TaskSetManagerSuite
assert(sched.speculativeTasks.toSet === Set(3)) assert(sched.speculativeTasks.toSet === Set(3))
// Offer resource to start the speculative attempt for the running task // Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption5.isDefined) assert(taskOption5.isDefined)
val task5 = taskOption5.get val task5 = taskOption5.get
assert(task5.index === 3) assert(task5.index === 3)
@ -1121,7 +1143,7 @@ class TaskSetManagerSuite
"exec1" -> "host1", "exec1" -> "host1",
"exec2" -> "host2", "exec2" -> "host2",
"exec2" -> "host2")) { "exec2" -> "host2")) {
val taskOption = manager.resourceOffer(k, v, NO_PREF) val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
val task = taskOption.get val task = taskOption.get
assert(task.executorId === k) assert(task.executorId === k)
@ -1154,7 +1176,7 @@ class TaskSetManagerSuite
manager.handleFailedTask(task.taskId, TaskState.FAILED, endReason) manager.handleFailedTask(task.taskId, TaskState.FAILED, endReason)
sched.endedTasks(task.taskId) = endReason sched.endedTasks(task.taskId) = endReason
assert(!manager.isZombie) assert(!manager.isZombie)
val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF) val nextTask = manager.resourceOffer(s"exec2", s"host2", NO_PREF)._1
assert(nextTask.isDefined, s"no offer for attempt $attempt of $index") assert(nextTask.isDefined, s"no offer for attempt $attempt of $index")
tasks += nextTask.get tasks += nextTask.get
} }
@ -1170,7 +1192,7 @@ class TaskSetManagerSuite
assert(manager.checkSpeculatableTasks(0)) assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(3, 4)) assert(sched.speculativeTasks.toSet === Set(3, 4))
// Offer resource to start the speculative attempt for the running task // Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption5.isDefined) assert(taskOption5.isDefined)
val speculativeTask = taskOption5.get val speculativeTask = taskOption5.get
assert(speculativeTask.index === 3 || speculativeTask.index === 4) assert(speculativeTask.index === 3 || speculativeTask.index === 4)
@ -1195,7 +1217,7 @@ class TaskSetManagerSuite
assert(!manager.isZombie) assert(!manager.isZombie)
// now run another speculative task // now run another speculative task
val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF) val taskOpt6 = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOpt6.isDefined) assert(taskOpt6.isDefined)
val speculativeTask2 = taskOpt6.get val speculativeTask2 = taskOpt6.get
assert(speculativeTask2.index === 3 || speculativeTask2.index === 4) assert(speculativeTask2.index === 3 || speculativeTask2.index === 4)
@ -1226,7 +1248,7 @@ class TaskSetManagerSuite
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1)) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock = new ManualClock(1))
when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any(), any())).thenAnswer( when(mockDAGScheduler.taskEnded(any(), any(), any(), any(), any(), any())).thenAnswer(
(invocationOnMock: InvocationOnMock) => assert(manager.isZombie)) (invocationOnMock: InvocationOnMock) => assert(manager.isZombie))
val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF) val taskOption = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
// this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon // this would fail, inside our mock dag scheduler, if it calls dagScheduler.taskEnded() too soon
manager.handleSuccessfulTask(0, createTaskResult(0)) manager.handleSuccessfulTask(0, createTaskResult(0))
@ -1271,7 +1293,7 @@ class TaskSetManagerSuite
"exec2" -> "host1" "exec2" -> "host1"
).flatMap { case (exec, host) => ).flatMap { case (exec, host) =>
// offer each executor twice (simulating 2 cores per executor) // offer each executor twice (simulating 2 cores per executor)
(0 until 2).flatMap{ _ => tsmSpy.resourceOffer(exec, host, TaskLocality.ANY)} (0 until 2).flatMap{ _ => tsmSpy.resourceOffer(exec, host, TaskLocality.ANY)._1}
} }
assert(taskDescs.size === 4) assert(taskDescs.size === 4)
@ -1308,7 +1330,7 @@ class TaskSetManagerSuite
"exec2" -> "host2" "exec2" -> "host2"
).flatMap { case (exec, host) => ).flatMap { case (exec, host) =>
// offer each executor twice (simulating 2 cores per executor) // offer each executor twice (simulating 2 cores per executor)
(0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)} (0 until 2).flatMap{ _ => tsm.resourceOffer(exec, host, TaskLocality.ANY)._1}
} }
assert(taskDescs.size === 4) assert(taskDescs.size === 4)
@ -1344,7 +1366,7 @@ class TaskSetManagerSuite
val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(blacklistTracker)) val taskSetManager = new TaskSetManager(sched, taskSet, 1, Some(blacklistTracker))
val taskSetManagerSpy = spy(taskSetManager) val taskSetManagerSpy = spy(taskSetManager)
val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY) val taskDesc = taskSetManagerSpy.resourceOffer(exec, host, TaskLocality.ANY)._1
// Assert the task has been black listed on the executor it was last executed on. // Assert the task has been black listed on the executor it was last executed on.
when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean(), anyBoolean())).thenAnswer( when(taskSetManagerSpy.addPendingTask(anyInt(), anyBoolean(), anyBoolean())).thenAnswer(
@ -1372,9 +1394,9 @@ class TaskSetManagerSuite
val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES, clock = new ManualClock) val manager1 = new TaskSetManager(sched, taskSet1, MAX_TASK_FAILURES, clock = new ManualClock)
// all tasks from the first taskset have the same jars // all tasks from the first taskset have the same jars
val taskOption1 = manager1.resourceOffer("exec1", "host1", NO_PREF) val taskOption1 = manager1.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption1.get.addedJars === addedJarsPreTaskSet) assert(taskOption1.get.addedJars === addedJarsPreTaskSet)
val taskOption2 = manager1.resourceOffer("exec1", "host1", NO_PREF) val taskOption2 = manager1.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption2.get.addedJars === addedJarsPreTaskSet) assert(taskOption2.get.addedJars === addedJarsPreTaskSet)
// even with a jar added mid-TaskSet // even with a jar added mid-TaskSet
@ -1382,7 +1404,7 @@ class TaskSetManagerSuite
sc.addJar(jarPath.toString) sc.addJar(jarPath.toString)
val addedJarsMidTaskSet = Map[String, Long](sc.addedJars.toSeq: _*) val addedJarsMidTaskSet = Map[String, Long](sc.addedJars.toSeq: _*)
assert(addedJarsPreTaskSet !== addedJarsMidTaskSet) assert(addedJarsPreTaskSet !== addedJarsMidTaskSet)
val taskOption3 = manager1.resourceOffer("exec1", "host1", NO_PREF) val taskOption3 = manager1.resourceOffer("exec1", "host1", NO_PREF)._1
// which should have the old version of the jars list // which should have the old version of the jars list
assert(taskOption3.get.addedJars === addedJarsPreTaskSet) assert(taskOption3.get.addedJars === addedJarsPreTaskSet)
@ -1390,7 +1412,7 @@ class TaskSetManagerSuite
val taskSet2 = FakeTask.createTaskSet(1) val taskSet2 = FakeTask.createTaskSet(1)
val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = new ManualClock) val manager2 = new TaskSetManager(sched, taskSet2, MAX_TASK_FAILURES, clock = new ManualClock)
val taskOption4 = manager2.resourceOffer("exec1", "host1", NO_PREF) val taskOption4 = manager2.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption4.get.addedJars === addedJarsMidTaskSet) assert(taskOption4.get.addedJars === addedJarsMidTaskSet)
} }
@ -1488,7 +1510,7 @@ class TaskSetManagerSuite
"exec1" -> "host1", "exec1" -> "host1",
"exec3" -> "host3", "exec3" -> "host3",
"exec2" -> "host2")) { "exec2" -> "host2")) {
val taskOption = manager.resourceOffer(exec, host, NO_PREF) val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
val task = taskOption.get val task = taskOption.get
assert(task.executorId === exec) assert(task.executorId === exec)
@ -1514,7 +1536,7 @@ class TaskSetManagerSuite
assert(sched.speculativeTasks.toSet === Set(2, 3)) assert(sched.speculativeTasks.toSet === Set(2, 3))
// Offer resource to start the speculative attempt for the running task 2.0 // Offer resource to start the speculative attempt for the running task 2.0
val taskOption = manager.resourceOffer("exec2", "host2", ANY) val taskOption = manager.resourceOffer("exec2", "host2", ANY)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
val task4 = taskOption.get val task4 = taskOption.get
assert(task4.index === 2) assert(task4.index === 2)
@ -1560,7 +1582,7 @@ class TaskSetManagerSuite
"exec1" -> "host1", "exec1" -> "host1",
"exec2" -> "host2", "exec2" -> "host2",
"exec2" -> "host2")) { "exec2" -> "host2")) {
val taskOption = manager.resourceOffer(k, v, NO_PREF) val taskOption = manager.resourceOffer(k, v, NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
val task = taskOption.get val task = taskOption.get
assert(task.executorId === k) assert(task.executorId === k)
@ -1580,7 +1602,7 @@ class TaskSetManagerSuite
assert(sched.speculativeTasks.toSet === Set(3)) assert(sched.speculativeTasks.toSet === Set(3))
// Offer resource to start the speculative attempt for the running task // Offer resource to start the speculative attempt for the running task
val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF) val taskOption5 = manager.resourceOffer("exec1", "host1", NO_PREF)._1
assert(taskOption5.isDefined) assert(taskOption5.isDefined)
val task5 = taskOption5.get val task5 = taskOption5.get
assert(task5.index === 3) assert(task5.index === 3)
@ -1640,7 +1662,7 @@ class TaskSetManagerSuite
assert(FakeRackUtil.numBatchInvocation === 1) assert(FakeRackUtil.numBatchInvocation === 1)
assert(FakeRackUtil.numSingleHostInvocation === 0) assert(FakeRackUtil.numSingleHostInvocation === 0)
// with rack locality, reject an offer on a host with an unknown rack // with rack locality, reject an offer on a host with an unknown rack
assert(manager.resourceOffer("otherExec", "otherHost", TaskLocality.RACK_LOCAL).isEmpty) assert(manager.resourceOffer("otherExec", "otherHost", TaskLocality.RACK_LOCAL)._1.isEmpty)
(0 until 20).foreach { rackIdx => (0 until 20).foreach { rackIdx =>
(0 until 5).foreach { offerIdx => (0 until 5).foreach { offerIdx =>
// if we offer hosts which are not in preferred locations, // if we offer hosts which are not in preferred locations,
@ -1648,9 +1670,9 @@ class TaskSetManagerSuite
// but accept them at RACK_LOCAL level if they're on OK racks // but accept them at RACK_LOCAL level if they're on OK racks
val hostIdx = 100 + rackIdx val hostIdx = 100 + rackIdx
assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.NODE_LOCAL) assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.NODE_LOCAL)
.isEmpty) ._1.isEmpty)
assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.RACK_LOCAL) assert(manager.resourceOffer("exec" + hostIdx, "host" + hostIdx, TaskLocality.RACK_LOCAL)
.isDefined) ._1.isDefined)
} }
} }
// check no more expensive calls to the rack resolution. manager.resourceOffer() will call // check no more expensive calls to the rack resolution. manager.resourceOffer() will call
@ -1670,7 +1692,7 @@ class TaskSetManagerSuite
val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU, Array("0", "1"))) val taskResourceAssignments = Map(GPU -> new ResourceInformation(GPU, Array("0", "1")))
val taskOption = val taskOption =
manager.resourceOffer("exec1", "host1", NO_PREF, taskResourceAssignments) manager.resourceOffer("exec1", "host1", NO_PREF, taskResourceAssignments)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
val allocatedResources = taskOption.get.resources val allocatedResources = taskOption.get.resources
assert(allocatedResources.size == 1) assert(allocatedResources.size == 1)
@ -1693,7 +1715,7 @@ class TaskSetManagerSuite
// Offer resources for 4 tasks to start, 2 on each exec // Offer resources for 4 tasks to start, 2 on each exec
Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) => Seq("exec1" -> "host1", "exec2" -> "host2").foreach { case (exec, host) =>
(0 until 2).foreach { _ => (0 until 2).foreach { _ =>
val taskOption = manager.resourceOffer(exec, host, NO_PREF) val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
assert(taskOption.get.executorId === exec) assert(taskOption.get.executorId === exec)
} }
@ -1717,8 +1739,8 @@ class TaskSetManagerSuite
// Offer resource to start the speculative attempt for the running task. We offer more // Offer resource to start the speculative attempt for the running task. We offer more
// resources, and ensure that speculative tasks get scheduled appropriately -- only one extra // resources, and ensure that speculative tasks get scheduled appropriately -- only one extra
// copy per speculatable task // copy per speculatable task
val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF) val taskOption2 = manager.resourceOffer("exec1", "host1", NO_PREF)._1
val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF) val taskOption3 = manager.resourceOffer("exec2", "host2", NO_PREF)._1
assert(taskOption2.isDefined) assert(taskOption2.isDefined)
val task2 = taskOption2.get val task2 = taskOption2.get
// Ensure that task index 3 is launched on host1 and task index 4 on host2 // Ensure that task index 3 is launched on host1 and task index 4 on host2
@ -1738,9 +1760,9 @@ class TaskSetManagerSuite
assert(manager.copiesRunning(1) === 2) assert(manager.copiesRunning(1) === 2)
assert(manager.copiesRunning(3) === 2) assert(manager.copiesRunning(3) === 2)
// Offering additional resources should not lead to any speculative tasks being respawned // Offering additional resources should not lead to any speculative tasks being respawned
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
assert(manager.resourceOffer("exec2", "host2", ANY).isEmpty) assert(manager.resourceOffer("exec2", "host2", ANY)._1.isEmpty)
assert(manager.resourceOffer("exec3", "host3", ANY).isEmpty) assert(manager.resourceOffer("exec3", "host3", ANY)._1.isEmpty)
} }
test("SPARK-26755 Ensure that a speculative task obeys original locality preferences") { test("SPARK-26755 Ensure that a speculative task obeys original locality preferences") {
@ -1763,7 +1785,7 @@ class TaskSetManagerSuite
} }
// Offer resources for 3 tasks to start // Offer resources for 3 tasks to start
Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) => Seq("exec1" -> "host1", "exec2" -> "host2", "exec3" -> "host3").foreach { case (exec, host) =>
val taskOption = manager.resourceOffer(exec, host, NO_PREF) val taskOption = manager.resourceOffer(exec, host, NO_PREF)._1
assert(taskOption.isDefined) assert(taskOption.isDefined)
assert(taskOption.get.executorId === exec) assert(taskOption.get.executorId === exec)
} }
@ -1776,17 +1798,17 @@ class TaskSetManagerSuite
assert(manager.checkSpeculatableTasks(0)) assert(manager.checkSpeculatableTasks(0))
assert(sched.speculativeTasks.toSet === Set(0, 1)) assert(sched.speculativeTasks.toSet === Set(0, 1))
// Ensure that the speculatable tasks obey the original locality preferences // Ensure that the speculatable tasks obey the original locality preferences
assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL).isEmpty) assert(manager.resourceOffer("exec4", "host4", NODE_LOCAL)._1.isEmpty)
// task 1 does have a node-local preference for host2 -- but we've already got a regular // task 1 does have a node-local preference for host2 -- but we've already got a regular
// task running there, so we should not schedule a speculative there as well. // task running there, so we should not schedule a speculative there as well.
assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL).isEmpty) assert(manager.resourceOffer("exec2", "host2", NODE_LOCAL)._1.isEmpty)
assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL).isDefined) assert(manager.resourceOffer("exec3", "host3", NODE_LOCAL)._1.isDefined)
assert(manager.resourceOffer("exec4", "host4", ANY).isDefined) assert(manager.resourceOffer("exec4", "host4", ANY)._1.isDefined)
// Since, all speculatable tasks have been launched, making another offer // Since, all speculatable tasks have been launched, making another offer
// should not schedule any more tasks // should not schedule any more tasks
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
assert(!manager.checkSpeculatableTasks(0)) assert(!manager.checkSpeculatableTasks(0))
assert(manager.resourceOffer("exec1", "host1", ANY).isEmpty) assert(manager.resourceOffer("exec1", "host1", ANY)._1.isEmpty)
} }
private def testSpeculationDurationSetup( private def testSpeculationDurationSetup(
@ -1931,7 +1953,7 @@ class TaskSetManagerSuite
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES) val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
assert(sched.taskSetsFailed.isEmpty) assert(sched.taskSetsFailed.isEmpty)
val offerResult = manager.resourceOffer("exec1", "host1", ANY) val offerResult = manager.resourceOffer("exec1", "host1", ANY)._1
assert(offerResult.isDefined, assert(offerResult.isDefined,
"Expect resource offer on iteration 0 to return a task") "Expect resource offer on iteration 0 to return a task")
assert(offerResult.get.index === 0) assert(offerResult.get.index === 0)

View file

@ -303,7 +303,8 @@ class MesosFineGrainedSchedulerBackendSuite
mesosOffers2.add(createOffer(1, minMem, minCpu)) mesosOffers2.add(createOffer(1, minMem, minCpu))
reset(taskScheduler) reset(taskScheduler)
reset(driver) reset(driver)
when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]))).thenReturn(Seq(Seq())) when(taskScheduler.resourceOffers(any(classOf[IndexedSeq[WorkerOffer]]), any[Boolean]))
.thenReturn(Seq(Seq()))
when(taskScheduler.CPUS_PER_TASK).thenReturn(2) when(taskScheduler.CPUS_PER_TASK).thenReturn(2)
when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1)) when(driver.declineOffer(mesosOffers2.get(0).getId)).thenReturn(Status.valueOf(1))