Fix schedulingAlgorithm bugs for unit test
This commit is contained in:
parent
ecd6d75c6a
commit
606bb1b450
|
@ -40,15 +40,24 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm {
|
|||
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
|
||||
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
|
||||
var res:Boolean = true
|
||||
var compare:Int = 0
|
||||
|
||||
if (s1Needy && !s2Needy) {
|
||||
res = true
|
||||
return true
|
||||
} else if (!s1Needy && s2Needy) {
|
||||
res = false
|
||||
return false
|
||||
} else if (s1Needy && s2Needy) {
|
||||
res = minShareRatio1 <= minShareRatio2
|
||||
compare = minShareRatio1.compareTo(minShareRatio2)
|
||||
} else {
|
||||
res = taskToWeightRatio1 <= taskToWeightRatio2
|
||||
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
|
||||
}
|
||||
|
||||
if (compare < 0) {
|
||||
res = true
|
||||
} else if (compare > 0) {
|
||||
res = false
|
||||
} else {
|
||||
return s1.name < s2.name
|
||||
}
|
||||
return res
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ class DummyTask(stageId: Int) extends Task[Int](stageId)
|
|||
}
|
||||
}
|
||||
|
||||
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
|
||||
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
|
||||
|
||||
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
|
||||
new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
|
||||
|
@ -96,8 +96,11 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
|
|||
|
||||
def resourceOffer(rootPool: Pool): Int = {
|
||||
val taskSetQueue = rootPool.getSortedTaskSetQueue()
|
||||
for (taskSet <- taskSetQueue)
|
||||
{
|
||||
/* Just for Test*/
|
||||
for (manager <- taskSetQueue) {
|
||||
logInfo("parentName:%s, parent running tasks:%d, name:%s,runningTasks:%d".format(manager.parent.name, manager.parent.runningTasks, manager.name, manager.runningTasks))
|
||||
}
|
||||
for (taskSet <- taskSetQueue) {
|
||||
taskSet.slaveOffer("execId_1", "hostname_1", 1) match {
|
||||
case Some(task) =>
|
||||
return taskSet.stageId
|
||||
|
|
Loading…
Reference in a new issue