SPARK-1319: Fix scheduler to account for tasks using > 1 CPUs.

Move CPUS_PER_TASK to TaskSchedulerImpl as the value is a constant and use it in both Mesos and CoarseGrained scheduler backends.

Thanks @kayousterhout for the design discussion

Author: Shivaram Venkataraman <shivaram@eecs.berkeley.edu>

Closes #219 from shivaram/multi-cpus and squashes the following commits:

5c7d685 [Shivaram Venkataraman] Don't pass availableCpus to TaskSetManager
260e4d5 [Shivaram Venkataraman] Add a check for non-zero CPUs in TaskSetManager
73fcf6f [Shivaram Venkataraman] Add documentation for spark.task.cpus
647bc45 [Shivaram Venkataraman] Fix scheduler to account for tasks using > 1 CPUs. Move CPUS_PER_TASK to TaskSchedulerImpl as the value is a constant and use it in both Mesos and CoarseGrained scheduler backends.
This commit is contained in:
Shivaram Venkataraman 2014-03-25 13:05:30 -07:00 committed by Kay Ousterhout
parent 71d4ed271b
commit f8111eaeb0
7 changed files with 106 additions and 63 deletions

View file

@ -62,6 +62,9 @@ private[spark] class TaskSchedulerImpl(
// Threshold above which we warn user initial TaskSet may be starved
val STARVATION_TIMEOUT = conf.getLong("spark.starvation.timeout", 15000)
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
// TaskSetManagers are not thread safe, so any access to one should be synchronized
// on this class.
val activeTaskSets = new HashMap[String, TaskSetManager]
@ -228,16 +231,18 @@ private[spark] class TaskSchedulerImpl(
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
for (task <- taskSet.resourceOffer(execId, host, availableCpus(i), maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= taskSet.CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
if (availableCpus(i) >= CPUS_PER_TASK) {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
}
}
}
} while (launchedTask)

View file

@ -56,9 +56,6 @@ private[spark] class TaskSetManager(
{
val conf = sched.sc.conf
// CPUs to request per task
val CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
/*
* Sometimes if an executor is dead or in an otherwise invalid state, the driver
* does not realize right away leading to repeated task failures. If enabled,
@ -384,11 +381,10 @@ private[spark] class TaskSetManager(
def resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie && availableCpus >= CPUS_PER_TASK) {
if (!isZombie) {
val curTime = clock.getTime()
var allowedLocality = getAllowedLocalityLevel(curTime)

View file

@ -89,7 +89,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
if (executorActor.contains(executorId)) {
freeCores(executorId) += 1
freeCores(executorId) += scheduler.CPUS_PER_TASK
makeOffers(executorId)
} else {
// Ignoring the update since we don't know about the executor.
@ -140,7 +140,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
// Launch tasks returned by a set of resource offers
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= 1
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
}
}

View file

@ -246,7 +246,7 @@ private[spark] class MesosSchedulerBackend(
val cpuResource = Resource.newBuilder()
.setName("cpus")
.setType(Value.Type.SCALAR)
.setScalar(Value.Scalar.newBuilder().setValue(1).build())
.setScalar(Value.Scalar.newBuilder().setValue(scheduler.CPUS_PER_TASK).build())
.build()
MesosTaskInfo.newBuilder()
.setTaskId(taskId)

View file

@ -80,7 +80,6 @@ class FakeTaskSetManager(
override def resourceOffer(
execId: String,
host: String,
availableCpus: Int,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
@ -125,7 +124,7 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
}
for (taskSet <- taskSetQueue) {
taskSet.resourceOffer("execId_1", "hostname_1", 1, TaskLocality.ANY) match {
taskSet.resourceOffer("execId_1", "hostname_1", TaskLocality.ANY) match {
case Some(task) =>
return taskSet.stageId
case None => {}
@ -293,4 +292,43 @@ class TaskSchedulerImplSuite extends FunSuite with LocalSparkContext with Loggin
assert(count > 0)
assert(count < numTrials)
}
test("Scheduler correctly accounts for multiple CPUs per task") {
sc = new SparkContext("local", "TaskSchedulerImplSuite")
val taskCpus = 2
sc.conf.set("spark.task.cpus", taskCpus.toString)
val taskScheduler = new TaskSchedulerImpl(sc)
taskScheduler.initialize(new FakeSchedulerBackend)
// Need to initialize a DAGScheduler for the taskScheduler to use for callbacks.
val dagScheduler = new DAGScheduler(sc, taskScheduler) {
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {}
override def executorAdded(execId: String, host: String) {}
}
// Give zero core offers. Should not generate any tasks
val zeroCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", 0),
new WorkerOffer("executor1", "host1", 0))
val taskSet = FakeTask.createTaskSet(1)
taskScheduler.submitTasks(taskSet)
var taskDescriptions = taskScheduler.resourceOffers(zeroCoreWorkerOffers).flatten
assert(0 === taskDescriptions.length)
// No tasks should run as we only have 1 core free.
val numFreeCores = 1
val singleCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", numFreeCores),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
assert(0 === taskDescriptions.length)
// Now change the offers to have 2 cores in one executor and verify if it
// is chosen.
val multiCoreWorkerOffers = Seq(new WorkerOffer("executor0", "host0", taskCpus),
new WorkerOffer("executor1", "host1", numFreeCores))
taskScheduler.submitTasks(taskSet)
taskDescriptions = taskScheduler.resourceOffers(multiCoreWorkerOffers).flatten
assert(1 === taskDescriptions.length)
assert("executor0" === taskDescriptions(0).executorId)
}
}

View file

@ -93,19 +93,16 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val taskSet = FakeTask.createTaskSet(1)
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES)
// Offer a host with no CPUs
assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
// Offer a host with process-local as the constraint; this should work because the TaskSet
// above won't have any locality preferences
val taskOption = manager.resourceOffer("exec1", "host1", 2, TaskLocality.PROCESS_LOCAL)
val taskOption = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
assert(sched.startedTasks.contains(0))
// Re-offer the host -- now we should get no more tasks
assert(manager.resourceOffer("exec1", "host1", 2, PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
// Tell it the task has finished
manager.handleSuccessfulTask(0, createTaskResult(0))
@ -121,7 +118,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// First three offers should all find tasks
for (i <- 0 until 3) {
val taskOption = manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL)
val taskOption = manager.resourceOffer("exec1", "host1", PROCESS_LOCAL)
assert(taskOption.isDefined)
val task = taskOption.get
assert(task.executorId === "exec1")
@ -129,7 +126,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(sched.startedTasks.toSet === Set(0, 1, 2))
// Re-offer the host -- now we should get no more tasks
assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
// Finish the first two tasks
manager.handleSuccessfulTask(0, createTaskResult(0))
@ -157,35 +154,35 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1, exec1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Offer host1, exec1 again: the last task, which has no prefs, should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 3)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 3)
// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
clock.advance(LOCALITY_WAIT)
// Offer host1, exec1 again, at PROCESS_LOCAL level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, PROCESS_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", PROCESS_LOCAL) === None)
// Offer host1, exec1 again, at NODE_LOCAL level: we should choose task 2
assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL).get.index == 2)
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL).get.index == 2)
// Offer host1, exec1 again, at NODE_LOCAL level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, NODE_LOCAL) === None)
assert(manager.resourceOffer("exec1", "host1", NODE_LOCAL) === None)
// Offer host1, exec1 again, at ANY level: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
clock.advance(LOCALITY_WAIT)
// Offer host1, exec1 again, at ANY level: task 1 should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
// Offer host1, exec1 again, at ANY level: nothing should be chosen as we've launched all tasks
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
}
test("delay scheduling with fallback") {
@ -203,29 +200,29 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Offer host1 again: nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
clock.advance(LOCALITY_WAIT)
// Offer host1 again: second task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
// Offer host1 again: third task (on host2) should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
// Offer host2: fifth task (also on host2) should get chosen
assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 4)
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 4)
// Now that we've launched a local task, we should no longer launch the task for host3
assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
clock.advance(LOCALITY_WAIT)
// After another delay, we can go ahead and launch that task non-locally
assert(manager.resourceOffer("exec2", "host2", 1, ANY).get.index === 3)
assert(manager.resourceOffer("exec2", "host2", ANY).get.index === 3)
}
test("delay scheduling with failed hosts") {
@ -240,24 +237,24 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
// First offer host1: first task should be chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Offer host1 again: third task should be chosen immediately because host3 is not up
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 2)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 2)
// After this, nothing should get chosen
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
// Now mark host2 as dead
sched.removeExecutor("exec2")
manager.executorLost("exec2", "host2")
// Task 1 should immediately be launched on host1 because its original host is gone
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 1)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 1)
// Now that all tasks have launched, nothing new should be launched anywhere else
assert(manager.resourceOffer("exec1", "host1", 1, ANY) === None)
assert(manager.resourceOffer("exec2", "host2", 1, ANY) === None)
assert(manager.resourceOffer("exec1", "host1", ANY) === None)
assert(manager.resourceOffer("exec2", "host2", ANY) === None)
}
test("task result lost") {
@ -267,14 +264,14 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val clock = new FakeClock
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
// Tell it the task has finished but the result was lost.
manager.handleFailedTask(0, TaskState.FINISHED, TaskResultLost)
assert(sched.endedTasks(0) === TaskResultLost)
// Re-offer the host -- now we should get task 0 again.
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
assert(manager.resourceOffer("exec1", "host1", ANY).get.index === 0)
}
test("repeated failures lead to task set abortion") {
@ -287,7 +284,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
// after the last failure.
(1 to manager.maxTaskFailures).foreach { index =>
val offerResult = manager.resourceOffer("exec1", "host1", 1, ANY)
val offerResult = manager.resourceOffer("exec1", "host1", ANY)
assert(offerResult.isDefined,
"Expect resource offer on iteration %s to return a task".format(index))
assert(offerResult.get.index === 0)
@ -317,7 +314,7 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
val manager = new TaskSetManager(sched, taskSet, 4, clock)
{
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
@ -328,15 +325,15 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1 fails after failure 1 due to blacklist
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.RACK_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.ANY).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.RACK_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.ANY).isEmpty)
}
// Run the task on exec1.1 - should work, and then fail it on exec1.1
{
val offerResult = manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL)
val offerResult = manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL)
assert(offerResult.isDefined,
"Expect resource offer to return a task for exec1.1, offerResult = " + offerResult)
@ -348,12 +345,12 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec1.1 fails after failure 2 due to blacklist
assert(manager.resourceOffer("exec1.1", "host1", 1, TaskLocality.NODE_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1.1", "host1", TaskLocality.NODE_LOCAL).isEmpty)
}
// Run the task on exec2 - should work, and then fail it on exec2
{
val offerResult = manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY)
val offerResult = manager.resourceOffer("exec2", "host2", TaskLocality.ANY)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
@ -364,20 +361,20 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
assert(!sched.taskSetsFailed.contains(taskSet.id))
// Ensure scheduling on exec2 fails after failure 3 due to blacklist
assert(manager.resourceOffer("exec2", "host2", 1, TaskLocality.ANY).isEmpty)
assert(manager.resourceOffer("exec2", "host2", TaskLocality.ANY).isEmpty)
}
// After reschedule delay, scheduling on exec1 should be possible.
clock.advance(rescheduleDelay)
{
val offerResult = manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL)
val offerResult = manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL)
assert(offerResult.isDefined, "Expect resource offer to return a task")
assert(offerResult.get.index === 0)
assert(offerResult.get.executorId === "exec1")
assert(manager.resourceOffer("exec1", "host1", 1, TaskLocality.PROCESS_LOCAL).isEmpty)
assert(manager.resourceOffer("exec1", "host1", TaskLocality.PROCESS_LOCAL).isEmpty)
// Cause exec1 to fail : failure 4
manager.handleFailedTask(offerResult.get.taskId, TaskState.FINISHED, TaskResultLost)

View file

@ -579,6 +579,13 @@ Apart from these, the following properties are also available, and may be useful
out and giving up.
</td>
</tr>
<tr>
<td>spark.task.cpus</td>
<td>1</td>
<td>
Number of cores to allocate for each task.
</td>
</tr>
</table>
## Viewing Spark Properties