fix bug of unit tests

This commit is contained in:
Andrew xia 2013-05-21 06:49:23 +08:00
parent 3d4672eaa9
commit ecd6d75c6a
2 changed files with 43 additions and 35 deletions

View file

@ -352,7 +352,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
executorsByHostPort(hostPort) += execId
availableCpus(i) -= 1
launchedTask = true
case None => {}
}
}
@ -373,7 +373,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
}
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
@ -522,7 +522,7 @@ private[spark] class ClusterScheduler(val sc: SparkContext)
hostPortsAlive -= hostPort
hostToAliveHostPorts.getOrElseUpdate(Utils.parseHostPort(hostPort)._1, new HashSet[String]).remove(hostPort)
}
val execs = executorsByHostPort.getOrElse(hostPort, new HashSet)
execs -= executorId
if (execs.isEmpty) {

View file

@ -67,12 +67,6 @@ class DummyTaskSetManager(
return true
}
// override def getSortedLeafSchedulable(): ArrayBuffer[Schedulable] = {
// var leafSchedulableQueue = new ArrayBuffer[Schedulable]
// leafSchedulableQueue += this
// return leafSchedulableQueue
// }
def taskFinished() {
decreaseRunningTasks(1)
tasksFinished +=1
@ -94,17 +88,10 @@ class DummyTask(stageId: Int) extends Task[Int](stageId)
}
}
class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext {
val sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new DummyTask(0)
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
tasks += task
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int): DummyTaskSetManager = {
new DummyTaskSetManager(priority, stage, numTasks, clusterScheduler, taskSet)
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): DummyTaskSetManager = {
new DummyTaskSetManager(priority, stage, numTasks, cs , taskSet)
}
def resourceOffer(rootPool: Pool): Int = {
@ -125,13 +112,20 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
}
test("FIFO Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new DummyTask(0)
tasks += task
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
val rootPool = new Pool("", SchedulingMode.FIFO, 0, 0)
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
schedulableBuilder.buildPools()
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2)
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2)
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2)
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
@ -145,6 +139,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
}
test("Fair Scheduler Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new DummyTask(0)
tasks += task
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
val xmlPath = getClass.getClassLoader.getResource("fairscheduler.xml").getFile()
System.setProperty("spark.fairscheduler.allocation.file", xmlPath)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
@ -167,15 +168,15 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
val properties2 = new Properties()
properties2.setProperty("spark.scheduler.cluster.fair.pool","2")
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1)
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1)
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2)
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2)
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2)
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
@ -195,6 +196,13 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
}
test("Nested Pool Test") {
sc = new SparkContext("local", "ClusterSchedulerSuite")
val clusterScheduler = new ClusterScheduler(sc)
var tasks = ArrayBuffer[Task[_]]()
val task = new DummyTask(0)
tasks += task
val taskSet = new TaskSet(tasks.toArray,0,0,0,null)
val rootPool = new Pool("", SchedulingMode.FAIR, 0, 0)
val pool0 = new Pool("0", SchedulingMode.FAIR, 3, 1)
val pool1 = new Pool("1", SchedulingMode.FAIR, 4, 1)
@ -211,23 +219,23 @@ class ClusterSchedulerSuite extends FunSuite with BeforeAndAfter {
pool1.addSchedulable(pool10)
pool1.addSchedulable(pool11)
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5)
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5)
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
pool00.addSchedulable(taskSetManager000)
pool00.addSchedulable(taskSetManager001)
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5)
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5)
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
pool01.addSchedulable(taskSetManager010)
pool01.addSchedulable(taskSetManager011)
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5)
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5)
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
pool10.addSchedulable(taskSetManager100)
pool10.addSchedulable(taskSetManager101)
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5)
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5)
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
pool11.addSchedulable(taskSetManager110)
pool11.addSchedulable(taskSetManager111)