From a124658e53a5abeda00a2582385a294c8e452d21 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 30 Oct 2013 19:29:38 -0700 Subject: [PATCH] Fixed most issues with unit tests --- .../spark/scheduler/DAGSchedulerSuite.scala | 94 ++++++++++--------- .../scheduler/{cluster => }/FakeTask.scala | 3 +- .../{cluster => }/TaskResultGetterSuite.scala | 13 ++- ...erSuite.scala => TaskSchedulerSuite.scala} | 56 ++++++----- ...rSuite.scala => TaskSetManagerSuite.scala} | 43 +++++---- 5 files changed, 103 insertions(+), 106 deletions(-) rename core/src/test/scala/org/apache/spark/scheduler/{cluster => }/FakeTask.scala (91%) rename core/src/test/scala/org/apache/spark/scheduler/{cluster => }/TaskResultGetterSuite.scala (90%) rename core/src/test/scala/org/apache/spark/scheduler/{cluster/ClusterSchedulerSuite.scala => TaskSchedulerSuite.scala} (77%) rename core/src/test/scala/org/apache/spark/scheduler/{cluster/ClusterTaskSetManagerSuite.scala => TaskSetManagerSuite.scala} (88%) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 00f2fdd657..394a1bb06f 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -33,6 +33,24 @@ import org.apache.spark.rdd.RDD import org.apache.spark.scheduler.SchedulingMode.SchedulingMode import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} +/** + * TaskScheduler that records the task sets that the DAGScheduler requested executed. + */ +class TaskSetRecordingTaskScheduler(sc: SparkContext) extends TaskScheduler(sc) { + /** Set of TaskSets the DAGScheduler has requested executed. */ + val taskSets = scala.collection.mutable.Buffer[TaskSet]() + override def start() = {} + override def stop() = {} + override def submitTasks(taskSet: TaskSet) = { + // normally done by TaskSetManager + taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) + taskSets += taskSet + } + override def cancelTasks(stageId: Int) {} + override def setDAGScheduler(dagScheduler: DAGScheduler) = {} + override def defaultParallelism() = 2 +} + /** * Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler * rather than spawning an event loop thread as happens in the real code. They use EasyMock @@ -46,24 +64,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster} * and capturing the resulting TaskSets from the mock TaskScheduler. */ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext { - - /** Set of TaskSets the DAGScheduler has requested executed. */ - val taskSets = scala.collection.mutable.Buffer[TaskSet]() - val taskScheduler = new TaskScheduler() { - override def rootPool: Pool = null - override def schedulingMode: SchedulingMode = SchedulingMode.NONE - override def start() = {} - override def stop() = {} - override def submitTasks(taskSet: TaskSet) = { - // normally done by TaskSetManager - taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch) - taskSets += taskSet - } - override def cancelTasks(stageId: Int) {} - override def setDAGScheduler(dagScheduler: DAGScheduler) = {} - override def defaultParallelism() = 2 - } - + var taskScheduler: TaskSetRecordingTaskScheduler = null var mapOutputTracker: MapOutputTrackerMaster = null var scheduler: DAGScheduler = null @@ -96,7 +97,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont before { sc = new SparkContext("local", "DAGSchedulerSuite") - taskSets.clear() + taskScheduler = new TaskSetRecordingTaskScheduler(sc) + taskScheduler.taskSets.clear() cacheLocations.clear() results.clear() mapOutputTracker = new MapOutputTrackerMaster() @@ -204,7 +206,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont test("run trivial job") { val rdd = makeRdd(1, Nil) submit(rdd, Array(0)) - complete(taskSets(0), List((Success, 42))) + complete(taskScheduler.taskSets(0), List((Success, 42))) assert(results === Map(0 -> 42)) } @@ -225,7 +227,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val baseRdd = makeRdd(1, Nil) val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd))) submit(finalRdd, Array(0)) - complete(taskSets(0), Seq((Success, 42))) + complete(taskScheduler.taskSets(0), Seq((Success, 42))) assert(results === Map(0 -> 42)) } @@ -235,7 +237,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont cacheLocations(baseRdd.id -> 0) = Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")) submit(finalRdd, Array(0)) - val taskSet = taskSets(0) + val taskSet = taskScheduler.taskSets(0) assertLocations(taskSet, Seq(Seq("hostA", "hostB"))) complete(taskSet, Seq((Success, 42))) assert(results === Map(0 -> 42)) @@ -243,7 +245,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont test("trivial job failure") { submit(makeRdd(1, Nil), Array(0)) - failed(taskSets(0), "some failure") + failed(taskScheduler.taskSets(0), "some failure") assert(failure.getMessage === "Job aborted: some failure") } @@ -253,12 +255,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(1, List(shuffleDep)) submit(reduceRdd, Array(0)) - complete(taskSets(0), Seq( + complete(taskScheduler.taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))) - complete(taskSets(1), Seq((Success, 42))) + complete(taskScheduler.taskSets(1), Seq((Success, 42))) assert(results === Map(0 -> 42)) } @@ -268,11 +270,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val shuffleId = shuffleDep.shuffleId val reduceRdd = makeRdd(2, List(shuffleDep)) submit(reduceRdd, Array(0, 1)) - complete(taskSets(0), Seq( + complete(taskScheduler.taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) // the 2nd ResultTask failed - complete(taskSets(1), Seq( + complete(taskScheduler.taskSets(1), Seq( (Success, 42), (FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null))) // this will get called @@ -280,10 +282,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont // ask the scheduler to try it again scheduler.resubmitFailedStages() // have the 2nd attempt pass - complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) + complete(taskScheduler.taskSets(2), Seq((Success, makeMapStatus("hostA", 1)))) // we can see both result blocks now assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB")) - complete(taskSets(3), Seq((Success, 43))) + complete(taskScheduler.taskSets(3), Seq((Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) } @@ -299,7 +301,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val newEpoch = mapOutputTracker.getEpoch assert(newEpoch > oldEpoch) val noAccum = Map[Long, Any]() - val taskSet = taskSets(0) + val taskSet = taskScheduler.taskSets(0) // should be ignored for being too old runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null)) // should work because it's a non-failed host @@ -311,7 +313,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null)) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA"))) - complete(taskSets(1), Seq((Success, 42), (Success, 43))) + complete(taskScheduler.taskSets(1), Seq((Success, 42), (Success, 43))) assert(results === Map(0 -> 42, 1 -> 43)) } @@ -326,14 +328,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont runEvent(ExecutorLost("exec-hostA")) // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks // rather than marking it is as failed and waiting. - complete(taskSets(0), Seq( + complete(taskScheduler.taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) // have hostC complete the resubmitted task - complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) + complete(taskScheduler.taskSets(1), Seq((Success, makeMapStatus("hostC", 1)))) assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) === Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB"))) - complete(taskSets(2), Seq((Success, 42))) + complete(taskScheduler.taskSets(2), Seq((Success, 42))) assert(results === Map(0 -> 42)) } @@ -345,23 +347,23 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont val finalRdd = makeRdd(1, List(shuffleDepTwo)) submit(finalRdd, Array(0)) // have the first stage complete normally - complete(taskSets(0), Seq( + complete(taskScheduler.taskSets(0), Seq( (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostB", 2)))) // have the second stage complete normally - complete(taskSets(1), Seq( + complete(taskScheduler.taskSets(1), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostC", 1)))) // fail the third stage because hostA went down - complete(taskSets(2), Seq( + complete(taskScheduler.taskSets(2), Seq( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null))) // TODO assert this: // blockManagerMaster.removeExecutor("exec-hostA") // have DAGScheduler try again scheduler.resubmitFailedStages() - complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2)))) - complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) - complete(taskSets(5), Seq((Success, 42))) + complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostA", 2)))) + complete(taskScheduler.taskSets(4), Seq((Success, makeMapStatus("hostA", 1)))) + complete(taskScheduler.taskSets(5), Seq((Success, 42))) assert(results === Map(0 -> 42)) } @@ -375,24 +377,24 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD")) cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC")) // complete stage 2 - complete(taskSets(0), Seq( + complete(taskScheduler.taskSets(0), Seq( (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostB", 2)))) // complete stage 1 - complete(taskSets(1), Seq( + complete(taskScheduler.taskSets(1), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) // pretend stage 0 failed because hostA went down - complete(taskSets(2), Seq( + complete(taskScheduler.taskSets(2), Seq( (FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null))) // TODO assert this: // blockManagerMaster.removeExecutor("exec-hostA") // DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun. scheduler.resubmitFailedStages() - assertLocations(taskSets(3), Seq(Seq("hostD"))) + assertLocations(taskScheduler.taskSets(3), Seq(Seq("hostD"))) // allow hostD to recover - complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1)))) - complete(taskSets(4), Seq((Success, 42))) + complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostD", 1)))) + complete(taskScheduler.taskSets(4), Seq((Success, 42))) assert(results === Map(0 -> 42)) } diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala similarity index 91% rename from core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala rename to core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala index 0f01515179..0b90c4e74c 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/FakeTask.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/FakeTask.scala @@ -15,10 +15,9 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import org.apache.spark.TaskContext -import org.apache.spark.scheduler.{TaskLocation, Task} class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) { override def runTask(context: TaskContext): Int = 0 diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala similarity index 90% rename from core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala rename to core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala index 77d3038614..30e6bc5721 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/TaskResultGetterSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskResultGetterSuite.scala @@ -15,14 +15,13 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import java.nio.ByteBuffer import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite} import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv} -import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult} import org.apache.spark.storage.TaskResultBlockId /** @@ -31,12 +30,12 @@ import org.apache.spark.storage.TaskResultBlockId * Used to test the case where a BlockManager evicts the task result (or dies) before the * TaskResult is retrieved. */ -class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler) +class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskScheduler) extends TaskResultGetter(sparkEnv, scheduler) { var removedResult = false override def enqueueSuccessfulTask( - taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) { + taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) { if (!removedResult) { // Only remove the result once, since we'd like to test the case where the task eventually // succeeds. @@ -92,11 +91,11 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA test("task retried if result missing from block manager") { // If this test hangs, it's probably because no resource offers were made after the task // failed. - val scheduler: ClusterScheduler = sc.taskScheduler match { - case clusterScheduler: ClusterScheduler => + val scheduler: TaskScheduler = sc.taskScheduler match { + case clusterScheduler: TaskScheduler => clusterScheduler case _ => - assert(false, "Expect local cluster to use ClusterScheduler") + assert(false, "Expect local cluster to use TaskScheduler") throw new ClassCastException } scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala similarity index 77% rename from core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala rename to core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala index 95d3553d91..bfbffdf261 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerSuite.scala @@ -15,14 +15,12 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import org.scalatest.FunSuite import org.scalatest.BeforeAndAfter import org.apache.spark._ -import org.apache.spark.scheduler._ -import org.apache.spark.scheduler.cluster._ import scala.collection.mutable.ArrayBuffer import java.util.Properties @@ -31,9 +29,9 @@ class FakeTaskSetManager( initPriority: Int, initStageId: Int, initNumTasks: Int, - clusterScheduler: ClusterScheduler, + taskScheduler: TaskScheduler, taskSet: TaskSet) - extends ClusterTaskSetManager(clusterScheduler, taskSet) { + extends TaskSetManager(taskScheduler, taskSet) { parent = null weight = 1 @@ -104,9 +102,9 @@ class FakeTaskSetManager( } } -class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging { +class TaskSchedulerSuite extends FunSuite with LocalSparkContext with Logging { - def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = { + def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskScheduler, taskSet: TaskSet): FakeTaskSetManager = { new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet) } @@ -132,8 +130,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging } test("FIFO Scheduler Test") { - sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new ClusterScheduler(sc) + sc = new SparkContext("local", "TaskSchedulerSuite") + val taskScheduler = new TaskScheduler(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -143,9 +141,9 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val schedulableBuilder = new FIFOSchedulableBuilder(rootPool) schedulableBuilder.buildPools() - val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet) - val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet) - val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet) + val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet) + val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet) + val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager0, null) schedulableBuilder.addTaskSetManager(taskSetManager1, null) schedulableBuilder.addTaskSetManager(taskSetManager2, null) @@ -159,8 +157,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging } test("Fair Scheduler Test") { - sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new ClusterScheduler(sc) + sc = new SparkContext("local", "TaskSchedulerSuite") + val taskScheduler = new TaskScheduler(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -188,15 +186,15 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging val properties2 = new Properties() properties2.setProperty("spark.scheduler.pool","2") - val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet) - val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet) - val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet) + val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet) + val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet) + val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager10, properties1) schedulableBuilder.addTaskSetManager(taskSetManager11, properties1) schedulableBuilder.addTaskSetManager(taskSetManager12, properties1) - val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet) - val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet) + val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet) + val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet) schedulableBuilder.addTaskSetManager(taskSetManager23, properties2) schedulableBuilder.addTaskSetManager(taskSetManager24, properties2) @@ -216,8 +214,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging } test("Nested Pool Test") { - sc = new SparkContext("local", "ClusterSchedulerSuite") - val clusterScheduler = new ClusterScheduler(sc) + sc = new SparkContext("local", "TaskSchedulerSuite") + val taskScheduler = new TaskScheduler(sc) var tasks = ArrayBuffer[Task[_]]() val task = new FakeTask(0) tasks += task @@ -239,23 +237,23 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging pool1.addSchedulable(pool10) pool1.addSchedulable(pool11) - val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet) - val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet) + val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet) + val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet) pool00.addSchedulable(taskSetManager000) pool00.addSchedulable(taskSetManager001) - val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet) - val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet) + val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet) + val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet) pool01.addSchedulable(taskSetManager010) pool01.addSchedulable(taskSetManager011) - val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet) - val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet) + val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet) + val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet) pool10.addSchedulable(taskSetManager100) pool10.addSchedulable(taskSetManager101) - val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet) - val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet) + val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet) + val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet) pool11.addSchedulable(taskSetManager110) pool11.addSchedulable(taskSetManager111) diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala similarity index 88% rename from core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala rename to core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala index b97f2b19b5..fe3ea7b594 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/ClusterTaskSetManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSetManagerSuite.scala @@ -15,7 +15,7 @@ * limitations under the License. */ -package org.apache.spark.scheduler.cluster +package org.apache.spark.scheduler import scala.collection.mutable.ArrayBuffer import scala.collection.mutable @@ -23,12 +23,11 @@ import scala.collection.mutable import org.scalatest.FunSuite import org.apache.spark._ -import org.apache.spark.scheduler._ import org.apache.spark.executor.TaskMetrics import java.nio.ByteBuffer import org.apache.spark.util.{Utils, FakeClock} -class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) { +class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) { override def taskStarted(task: Task[_], taskInfo: TaskInfo) { taskScheduler.startedTasks += taskInfo.index } @@ -53,13 +52,13 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler } /** - * A mock ClusterScheduler implementation that just remembers information about tasks started and + * A mock TaskScheduler implementation that just remembers information about tasks started and * feedback received from the TaskSetManagers. Note that it's important to initialize this with * a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost - * to work, and these are required for locality in ClusterTaskSetManager. + * to work, and these are required for locality in TaskSetManager. */ -class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */) - extends ClusterScheduler(sc) +class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */) + extends TaskScheduler(sc) { val startedTasks = new ArrayBuffer[Long] val endedTasks = new mutable.HashMap[Long, TaskEndReason] @@ -79,16 +78,16 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host) } -class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { +class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging { import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL} val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong test("TaskSet with no preferences") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) - val manager = new ClusterTaskSetManager(sched, taskSet) + val manager = new TaskSetManager(sched, taskSet) // Offer a host with no CPUs assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None) @@ -112,9 +111,9 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo test("multiple offers with no preferences") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(3) - val manager = new ClusterTaskSetManager(sched, taskSet) + val manager = new TaskSetManager(sched, taskSet) // First three offers should all find tasks for (i <- 0 until 3) { @@ -143,7 +142,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo test("basic delay scheduling") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = createTaskSet(4, Seq(TaskLocation("host1", "exec1")), Seq(TaskLocation("host2", "exec2")), @@ -151,7 +150,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo Seq() // Last task has no locality prefs ) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, clock) // First offer host1, exec1: first task should be chosen assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) @@ -187,7 +186,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo test("delay scheduling with fallback") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3")) val taskSet = createTaskSet(5, Seq(TaskLocation("host1")), @@ -197,7 +196,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo Seq(TaskLocation("host2")) ) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, clock) // First offer host1: first task should be chosen assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) @@ -227,14 +226,14 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo test("delay scheduling with failed hosts") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2")) val taskSet = createTaskSet(3, Seq(TaskLocation("host1")), Seq(TaskLocation("host2")), Seq(TaskLocation("host3")) ) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, clock) // First offer host1: first task should be chosen assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) @@ -259,10 +258,10 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo test("task result lost") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, clock) assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0) @@ -276,10 +275,10 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo test("repeated failures lead to task set abortion") { sc = new SparkContext("local", "test") - val sched = new FakeClusterScheduler(sc, ("exec1", "host1")) + val sched = new FakeTaskScheduler(sc, ("exec1", "host1")) val taskSet = createTaskSet(1) val clock = new FakeClock - val manager = new ClusterTaskSetManager(sched, taskSet, clock) + val manager = new TaskSetManager(sched, taskSet, clock) // Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted // after the last failure.