Added a TaskSetManager unit test.
This test ensures that when there are no alive executors that satisfy a particular locality level, the TaskSetManager doesn't ever use that as the maximum allowed locality level (this optimization ensures that a job doesn't wait extra time in an attempt to satisfy a scheduling locality level that is impossible). @mateiz and @lirui-intel this unit test illustrates an issue with #892 (it fails with that patch). Author: Kay Ousterhout <kayousterhout@gmail.com> Closes #1024 from kayousterhout/scheduler_unit_test and squashes the following commits: de6a08f [Kay Ousterhout] Added a TaskSetManager unit test.
This commit is contained in:
parent
0cf6002801
commit
6cf335d79a
|
@ -141,6 +141,22 @@ class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
|
|||
assert(sched.finishedManagers.contains(manager))
|
||||
}
|
||||
|
||||
test("skip unsatisfiable locality levels") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val sched = new FakeTaskScheduler(sc, ("execA", "host1"), ("execC", "host2"))
|
||||
val taskSet = FakeTask.createTaskSet(1, Seq(TaskLocation("host1", "execB")))
|
||||
val clock = new FakeClock
|
||||
val manager = new TaskSetManager(sched, taskSet, MAX_TASK_FAILURES, clock)
|
||||
|
||||
// An executor that is not NODE_LOCAL should be rejected.
|
||||
assert(manager.resourceOffer("execC", "host2", ANY) === None)
|
||||
|
||||
// Because there are no alive PROCESS_LOCAL executors, the base locality level should be
|
||||
// NODE_LOCAL. So, we should schedule the task on this offered NODE_LOCAL executor before
|
||||
// any of the locality wait timers expire.
|
||||
assert(manager.resourceOffer("execA", "host1", ANY).get.index === 0)
|
||||
}
|
||||
|
||||
test("basic delay scheduling") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
|
||||
|
|
Loading…
Reference in a new issue