Fixed most issues with unit tests
This commit is contained in:
parent
5e91495f5c
commit
a124658e53
|
@ -33,6 +33,24 @@ import org.apache.spark.rdd.RDD
|
|||
import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
|
||||
import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
|
||||
|
||||
/**
|
||||
* TaskScheduler that records the task sets that the DAGScheduler requested executed.
|
||||
*/
|
||||
class TaskSetRecordingTaskScheduler(sc: SparkContext) extends TaskScheduler(sc) {
|
||||
/** Set of TaskSets the DAGScheduler has requested executed. */
|
||||
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
|
||||
override def start() = {}
|
||||
override def stop() = {}
|
||||
override def submitTasks(taskSet: TaskSet) = {
|
||||
// normally done by TaskSetManager
|
||||
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
|
||||
taskSets += taskSet
|
||||
}
|
||||
override def cancelTasks(stageId: Int) {}
|
||||
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
|
||||
override def defaultParallelism() = 2
|
||||
}
|
||||
|
||||
/**
|
||||
* Tests for DAGScheduler. These tests directly call the event processing functions in DAGScheduler
|
||||
* rather than spawning an event loop thread as happens in the real code. They use EasyMock
|
||||
|
@ -46,24 +64,7 @@ import org.apache.spark.storage.{BlockId, BlockManagerId, BlockManagerMaster}
|
|||
* and capturing the resulting TaskSets from the mock TaskScheduler.
|
||||
*/
|
||||
class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkContext {
|
||||
|
||||
/** Set of TaskSets the DAGScheduler has requested executed. */
|
||||
val taskSets = scala.collection.mutable.Buffer[TaskSet]()
|
||||
val taskScheduler = new TaskScheduler() {
|
||||
override def rootPool: Pool = null
|
||||
override def schedulingMode: SchedulingMode = SchedulingMode.NONE
|
||||
override def start() = {}
|
||||
override def stop() = {}
|
||||
override def submitTasks(taskSet: TaskSet) = {
|
||||
// normally done by TaskSetManager
|
||||
taskSet.tasks.foreach(_.epoch = mapOutputTracker.getEpoch)
|
||||
taskSets += taskSet
|
||||
}
|
||||
override def cancelTasks(stageId: Int) {}
|
||||
override def setDAGScheduler(dagScheduler: DAGScheduler) = {}
|
||||
override def defaultParallelism() = 2
|
||||
}
|
||||
|
||||
var taskScheduler: TaskSetRecordingTaskScheduler = null
|
||||
var mapOutputTracker: MapOutputTrackerMaster = null
|
||||
var scheduler: DAGScheduler = null
|
||||
|
||||
|
@ -96,7 +97,8 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
|
||||
before {
|
||||
sc = new SparkContext("local", "DAGSchedulerSuite")
|
||||
taskSets.clear()
|
||||
taskScheduler = new TaskSetRecordingTaskScheduler(sc)
|
||||
taskScheduler.taskSets.clear()
|
||||
cacheLocations.clear()
|
||||
results.clear()
|
||||
mapOutputTracker = new MapOutputTrackerMaster()
|
||||
|
@ -204,7 +206,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
test("run trivial job") {
|
||||
val rdd = makeRdd(1, Nil)
|
||||
submit(rdd, Array(0))
|
||||
complete(taskSets(0), List((Success, 42)))
|
||||
complete(taskScheduler.taskSets(0), List((Success, 42)))
|
||||
assert(results === Map(0 -> 42))
|
||||
}
|
||||
|
||||
|
@ -225,7 +227,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
val baseRdd = makeRdd(1, Nil)
|
||||
val finalRdd = makeRdd(1, List(new OneToOneDependency(baseRdd)))
|
||||
submit(finalRdd, Array(0))
|
||||
complete(taskSets(0), Seq((Success, 42)))
|
||||
complete(taskScheduler.taskSets(0), Seq((Success, 42)))
|
||||
assert(results === Map(0 -> 42))
|
||||
}
|
||||
|
||||
|
@ -235,7 +237,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
cacheLocations(baseRdd.id -> 0) =
|
||||
Seq(makeBlockManagerId("hostA"), makeBlockManagerId("hostB"))
|
||||
submit(finalRdd, Array(0))
|
||||
val taskSet = taskSets(0)
|
||||
val taskSet = taskScheduler.taskSets(0)
|
||||
assertLocations(taskSet, Seq(Seq("hostA", "hostB")))
|
||||
complete(taskSet, Seq((Success, 42)))
|
||||
assert(results === Map(0 -> 42))
|
||||
|
@ -243,7 +245,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
|
||||
test("trivial job failure") {
|
||||
submit(makeRdd(1, Nil), Array(0))
|
||||
failed(taskSets(0), "some failure")
|
||||
failed(taskScheduler.taskSets(0), "some failure")
|
||||
assert(failure.getMessage === "Job aborted: some failure")
|
||||
}
|
||||
|
||||
|
@ -253,12 +255,12 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
val shuffleId = shuffleDep.shuffleId
|
||||
val reduceRdd = makeRdd(1, List(shuffleDep))
|
||||
submit(reduceRdd, Array(0))
|
||||
complete(taskSets(0), Seq(
|
||||
complete(taskScheduler.taskSets(0), Seq(
|
||||
(Success, makeMapStatus("hostA", 1)),
|
||||
(Success, makeMapStatus("hostB", 1))))
|
||||
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
|
||||
Array(makeBlockManagerId("hostA"), makeBlockManagerId("hostB")))
|
||||
complete(taskSets(1), Seq((Success, 42)))
|
||||
complete(taskScheduler.taskSets(1), Seq((Success, 42)))
|
||||
assert(results === Map(0 -> 42))
|
||||
}
|
||||
|
||||
|
@ -268,11 +270,11 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
val shuffleId = shuffleDep.shuffleId
|
||||
val reduceRdd = makeRdd(2, List(shuffleDep))
|
||||
submit(reduceRdd, Array(0, 1))
|
||||
complete(taskSets(0), Seq(
|
||||
complete(taskScheduler.taskSets(0), Seq(
|
||||
(Success, makeMapStatus("hostA", 1)),
|
||||
(Success, makeMapStatus("hostB", 1))))
|
||||
// the 2nd ResultTask failed
|
||||
complete(taskSets(1), Seq(
|
||||
complete(taskScheduler.taskSets(1), Seq(
|
||||
(Success, 42),
|
||||
(FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0), null)))
|
||||
// this will get called
|
||||
|
@ -280,10 +282,10 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
// ask the scheduler to try it again
|
||||
scheduler.resubmitFailedStages()
|
||||
// have the 2nd attempt pass
|
||||
complete(taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
|
||||
complete(taskScheduler.taskSets(2), Seq((Success, makeMapStatus("hostA", 1))))
|
||||
// we can see both result blocks now
|
||||
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1.host) === Array("hostA", "hostB"))
|
||||
complete(taskSets(3), Seq((Success, 43)))
|
||||
complete(taskScheduler.taskSets(3), Seq((Success, 43)))
|
||||
assert(results === Map(0 -> 42, 1 -> 43))
|
||||
}
|
||||
|
||||
|
@ -299,7 +301,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
val newEpoch = mapOutputTracker.getEpoch
|
||||
assert(newEpoch > oldEpoch)
|
||||
val noAccum = Map[Long, Any]()
|
||||
val taskSet = taskSets(0)
|
||||
val taskSet = taskScheduler.taskSets(0)
|
||||
// should be ignored for being too old
|
||||
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA", 1), noAccum, null, null))
|
||||
// should work because it's a non-failed host
|
||||
|
@ -311,7 +313,7 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA", 1), noAccum, null, null))
|
||||
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
|
||||
Array(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
|
||||
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
|
||||
complete(taskScheduler.taskSets(1), Seq((Success, 42), (Success, 43)))
|
||||
assert(results === Map(0 -> 42, 1 -> 43))
|
||||
}
|
||||
|
||||
|
@ -326,14 +328,14 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
runEvent(ExecutorLost("exec-hostA"))
|
||||
// DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
|
||||
// rather than marking it is as failed and waiting.
|
||||
complete(taskSets(0), Seq(
|
||||
complete(taskScheduler.taskSets(0), Seq(
|
||||
(Success, makeMapStatus("hostA", 1)),
|
||||
(Success, makeMapStatus("hostB", 1))))
|
||||
// have hostC complete the resubmitted task
|
||||
complete(taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
|
||||
complete(taskScheduler.taskSets(1), Seq((Success, makeMapStatus("hostC", 1))))
|
||||
assert(mapOutputTracker.getServerStatuses(shuffleId, 0).map(_._1) ===
|
||||
Array(makeBlockManagerId("hostC"), makeBlockManagerId("hostB")))
|
||||
complete(taskSets(2), Seq((Success, 42)))
|
||||
complete(taskScheduler.taskSets(2), Seq((Success, 42)))
|
||||
assert(results === Map(0 -> 42))
|
||||
}
|
||||
|
||||
|
@ -345,23 +347,23 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
val finalRdd = makeRdd(1, List(shuffleDepTwo))
|
||||
submit(finalRdd, Array(0))
|
||||
// have the first stage complete normally
|
||||
complete(taskSets(0), Seq(
|
||||
complete(taskScheduler.taskSets(0), Seq(
|
||||
(Success, makeMapStatus("hostA", 2)),
|
||||
(Success, makeMapStatus("hostB", 2))))
|
||||
// have the second stage complete normally
|
||||
complete(taskSets(1), Seq(
|
||||
complete(taskScheduler.taskSets(1), Seq(
|
||||
(Success, makeMapStatus("hostA", 1)),
|
||||
(Success, makeMapStatus("hostC", 1))))
|
||||
// fail the third stage because hostA went down
|
||||
complete(taskSets(2), Seq(
|
||||
complete(taskScheduler.taskSets(2), Seq(
|
||||
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
|
||||
// TODO assert this:
|
||||
// blockManagerMaster.removeExecutor("exec-hostA")
|
||||
// have DAGScheduler try again
|
||||
scheduler.resubmitFailedStages()
|
||||
complete(taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
|
||||
complete(taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
|
||||
complete(taskSets(5), Seq((Success, 42)))
|
||||
complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostA", 2))))
|
||||
complete(taskScheduler.taskSets(4), Seq((Success, makeMapStatus("hostA", 1))))
|
||||
complete(taskScheduler.taskSets(5), Seq((Success, 42)))
|
||||
assert(results === Map(0 -> 42))
|
||||
}
|
||||
|
||||
|
@ -375,24 +377,24 @@ class DAGSchedulerSuite extends FunSuite with BeforeAndAfter with LocalSparkCont
|
|||
cacheLocations(shuffleTwoRdd.id -> 0) = Seq(makeBlockManagerId("hostD"))
|
||||
cacheLocations(shuffleTwoRdd.id -> 1) = Seq(makeBlockManagerId("hostC"))
|
||||
// complete stage 2
|
||||
complete(taskSets(0), Seq(
|
||||
complete(taskScheduler.taskSets(0), Seq(
|
||||
(Success, makeMapStatus("hostA", 2)),
|
||||
(Success, makeMapStatus("hostB", 2))))
|
||||
// complete stage 1
|
||||
complete(taskSets(1), Seq(
|
||||
complete(taskScheduler.taskSets(1), Seq(
|
||||
(Success, makeMapStatus("hostA", 1)),
|
||||
(Success, makeMapStatus("hostB", 1))))
|
||||
// pretend stage 0 failed because hostA went down
|
||||
complete(taskSets(2), Seq(
|
||||
complete(taskScheduler.taskSets(2), Seq(
|
||||
(FetchFailed(makeBlockManagerId("hostA"), shuffleDepTwo.shuffleId, 0, 0), null)))
|
||||
// TODO assert this:
|
||||
// blockManagerMaster.removeExecutor("exec-hostA")
|
||||
// DAGScheduler should notice the cached copy of the second shuffle and try to get it rerun.
|
||||
scheduler.resubmitFailedStages()
|
||||
assertLocations(taskSets(3), Seq(Seq("hostD")))
|
||||
assertLocations(taskScheduler.taskSets(3), Seq(Seq("hostD")))
|
||||
// allow hostD to recover
|
||||
complete(taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
|
||||
complete(taskSets(4), Seq((Success, 42)))
|
||||
complete(taskScheduler.taskSets(3), Seq((Success, makeMapStatus("hostD", 1))))
|
||||
complete(taskScheduler.taskSets(4), Seq((Success, 42)))
|
||||
assert(results === Map(0 -> 42))
|
||||
}
|
||||
|
||||
|
|
|
@ -15,10 +15,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.cluster
|
||||
package org.apache.spark.scheduler
|
||||
|
||||
import org.apache.spark.TaskContext
|
||||
import org.apache.spark.scheduler.{TaskLocation, Task}
|
||||
|
||||
class FakeTask(stageId: Int, prefLocs: Seq[TaskLocation] = Nil) extends Task[Int](stageId, 0) {
|
||||
override def runTask(context: TaskContext): Int = 0
|
|
@ -15,14 +15,13 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.cluster
|
||||
package org.apache.spark.scheduler
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import org.scalatest.{BeforeAndAfter, BeforeAndAfterAll, FunSuite}
|
||||
|
||||
import org.apache.spark.{LocalSparkContext, SparkContext, SparkEnv}
|
||||
import org.apache.spark.scheduler.{DirectTaskResult, IndirectTaskResult, TaskResult}
|
||||
import org.apache.spark.storage.TaskResultBlockId
|
||||
|
||||
/**
|
||||
|
@ -31,12 +30,12 @@ import org.apache.spark.storage.TaskResultBlockId
|
|||
* Used to test the case where a BlockManager evicts the task result (or dies) before the
|
||||
* TaskResult is retrieved.
|
||||
*/
|
||||
class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: ClusterScheduler)
|
||||
class ResultDeletingTaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskScheduler)
|
||||
extends TaskResultGetter(sparkEnv, scheduler) {
|
||||
var removedResult = false
|
||||
|
||||
override def enqueueSuccessfulTask(
|
||||
taskSetManager: ClusterTaskSetManager, tid: Long, serializedData: ByteBuffer) {
|
||||
taskSetManager: TaskSetManager, tid: Long, serializedData: ByteBuffer) {
|
||||
if (!removedResult) {
|
||||
// Only remove the result once, since we'd like to test the case where the task eventually
|
||||
// succeeds.
|
||||
|
@ -92,11 +91,11 @@ class TaskResultGetterSuite extends FunSuite with BeforeAndAfter with BeforeAndA
|
|||
test("task retried if result missing from block manager") {
|
||||
// If this test hangs, it's probably because no resource offers were made after the task
|
||||
// failed.
|
||||
val scheduler: ClusterScheduler = sc.taskScheduler match {
|
||||
case clusterScheduler: ClusterScheduler =>
|
||||
val scheduler: TaskScheduler = sc.taskScheduler match {
|
||||
case clusterScheduler: TaskScheduler =>
|
||||
clusterScheduler
|
||||
case _ =>
|
||||
assert(false, "Expect local cluster to use ClusterScheduler")
|
||||
assert(false, "Expect local cluster to use TaskScheduler")
|
||||
throw new ClassCastException
|
||||
}
|
||||
scheduler.taskResultGetter = new ResultDeletingTaskResultGetter(sc.env, scheduler)
|
|
@ -15,14 +15,12 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.cluster
|
||||
package org.apache.spark.scheduler
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.scheduler.cluster._
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import java.util.Properties
|
||||
|
@ -31,9 +29,9 @@ class FakeTaskSetManager(
|
|||
initPriority: Int,
|
||||
initStageId: Int,
|
||||
initNumTasks: Int,
|
||||
clusterScheduler: ClusterScheduler,
|
||||
taskScheduler: TaskScheduler,
|
||||
taskSet: TaskSet)
|
||||
extends ClusterTaskSetManager(clusterScheduler, taskSet) {
|
||||
extends TaskSetManager(taskScheduler, taskSet) {
|
||||
|
||||
parent = null
|
||||
weight = 1
|
||||
|
@ -104,9 +102,9 @@ class FakeTaskSetManager(
|
|||
}
|
||||
}
|
||||
|
||||
class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
|
||||
class TaskSchedulerSuite extends FunSuite with LocalSparkContext with Logging {
|
||||
|
||||
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: ClusterScheduler, taskSet: TaskSet): FakeTaskSetManager = {
|
||||
def createDummyTaskSetManager(priority: Int, stage: Int, numTasks: Int, cs: TaskScheduler, taskSet: TaskSet): FakeTaskSetManager = {
|
||||
new FakeTaskSetManager(priority, stage, numTasks, cs , taskSet)
|
||||
}
|
||||
|
||||
|
@ -132,8 +130,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
|
|||
}
|
||||
|
||||
test("FIFO Scheduler Test") {
|
||||
sc = new SparkContext("local", "ClusterSchedulerSuite")
|
||||
val clusterScheduler = new ClusterScheduler(sc)
|
||||
sc = new SparkContext("local", "TaskSchedulerSuite")
|
||||
val taskScheduler = new TaskScheduler(sc)
|
||||
var tasks = ArrayBuffer[Task[_]]()
|
||||
val task = new FakeTask(0)
|
||||
tasks += task
|
||||
|
@ -143,9 +141,9 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
|
|||
val schedulableBuilder = new FIFOSchedulableBuilder(rootPool)
|
||||
schedulableBuilder.buildPools()
|
||||
|
||||
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, clusterScheduler, taskSet)
|
||||
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, clusterScheduler, taskSet)
|
||||
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, clusterScheduler, taskSet)
|
||||
val taskSetManager0 = createDummyTaskSetManager(0, 0, 2, taskScheduler, taskSet)
|
||||
val taskSetManager1 = createDummyTaskSetManager(0, 1, 2, taskScheduler, taskSet)
|
||||
val taskSetManager2 = createDummyTaskSetManager(0, 2, 2, taskScheduler, taskSet)
|
||||
schedulableBuilder.addTaskSetManager(taskSetManager0, null)
|
||||
schedulableBuilder.addTaskSetManager(taskSetManager1, null)
|
||||
schedulableBuilder.addTaskSetManager(taskSetManager2, null)
|
||||
|
@ -159,8 +157,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
|
|||
}
|
||||
|
||||
test("Fair Scheduler Test") {
|
||||
sc = new SparkContext("local", "ClusterSchedulerSuite")
|
||||
val clusterScheduler = new ClusterScheduler(sc)
|
||||
sc = new SparkContext("local", "TaskSchedulerSuite")
|
||||
val taskScheduler = new TaskScheduler(sc)
|
||||
var tasks = ArrayBuffer[Task[_]]()
|
||||
val task = new FakeTask(0)
|
||||
tasks += task
|
||||
|
@ -188,15 +186,15 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
|
|||
val properties2 = new Properties()
|
||||
properties2.setProperty("spark.scheduler.pool","2")
|
||||
|
||||
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, clusterScheduler, taskSet)
|
||||
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, clusterScheduler, taskSet)
|
||||
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, clusterScheduler, taskSet)
|
||||
val taskSetManager10 = createDummyTaskSetManager(1, 0, 1, taskScheduler, taskSet)
|
||||
val taskSetManager11 = createDummyTaskSetManager(1, 1, 1, taskScheduler, taskSet)
|
||||
val taskSetManager12 = createDummyTaskSetManager(1, 2, 2, taskScheduler, taskSet)
|
||||
schedulableBuilder.addTaskSetManager(taskSetManager10, properties1)
|
||||
schedulableBuilder.addTaskSetManager(taskSetManager11, properties1)
|
||||
schedulableBuilder.addTaskSetManager(taskSetManager12, properties1)
|
||||
|
||||
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, clusterScheduler, taskSet)
|
||||
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, clusterScheduler, taskSet)
|
||||
val taskSetManager23 = createDummyTaskSetManager(2, 3, 2, taskScheduler, taskSet)
|
||||
val taskSetManager24 = createDummyTaskSetManager(2, 4, 2, taskScheduler, taskSet)
|
||||
schedulableBuilder.addTaskSetManager(taskSetManager23, properties2)
|
||||
schedulableBuilder.addTaskSetManager(taskSetManager24, properties2)
|
||||
|
||||
|
@ -216,8 +214,8 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
|
|||
}
|
||||
|
||||
test("Nested Pool Test") {
|
||||
sc = new SparkContext("local", "ClusterSchedulerSuite")
|
||||
val clusterScheduler = new ClusterScheduler(sc)
|
||||
sc = new SparkContext("local", "TaskSchedulerSuite")
|
||||
val taskScheduler = new TaskScheduler(sc)
|
||||
var tasks = ArrayBuffer[Task[_]]()
|
||||
val task = new FakeTask(0)
|
||||
tasks += task
|
||||
|
@ -239,23 +237,23 @@ class ClusterSchedulerSuite extends FunSuite with LocalSparkContext with Logging
|
|||
pool1.addSchedulable(pool10)
|
||||
pool1.addSchedulable(pool11)
|
||||
|
||||
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, clusterScheduler, taskSet)
|
||||
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, clusterScheduler, taskSet)
|
||||
val taskSetManager000 = createDummyTaskSetManager(0, 0, 5, taskScheduler, taskSet)
|
||||
val taskSetManager001 = createDummyTaskSetManager(0, 1, 5, taskScheduler, taskSet)
|
||||
pool00.addSchedulable(taskSetManager000)
|
||||
pool00.addSchedulable(taskSetManager001)
|
||||
|
||||
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, clusterScheduler, taskSet)
|
||||
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, clusterScheduler, taskSet)
|
||||
val taskSetManager010 = createDummyTaskSetManager(1, 2, 5, taskScheduler, taskSet)
|
||||
val taskSetManager011 = createDummyTaskSetManager(1, 3, 5, taskScheduler, taskSet)
|
||||
pool01.addSchedulable(taskSetManager010)
|
||||
pool01.addSchedulable(taskSetManager011)
|
||||
|
||||
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, clusterScheduler, taskSet)
|
||||
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, clusterScheduler, taskSet)
|
||||
val taskSetManager100 = createDummyTaskSetManager(2, 4, 5, taskScheduler, taskSet)
|
||||
val taskSetManager101 = createDummyTaskSetManager(2, 5, 5, taskScheduler, taskSet)
|
||||
pool10.addSchedulable(taskSetManager100)
|
||||
pool10.addSchedulable(taskSetManager101)
|
||||
|
||||
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, clusterScheduler, taskSet)
|
||||
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, clusterScheduler, taskSet)
|
||||
val taskSetManager110 = createDummyTaskSetManager(3, 6, 5, taskScheduler, taskSet)
|
||||
val taskSetManager111 = createDummyTaskSetManager(3, 7, 5, taskScheduler, taskSet)
|
||||
pool11.addSchedulable(taskSetManager110)
|
||||
pool11.addSchedulable(taskSetManager111)
|
||||
|
|
@ -15,7 +15,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.scheduler.cluster
|
||||
package org.apache.spark.scheduler
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
import scala.collection.mutable
|
||||
|
@ -23,12 +23,11 @@ import scala.collection.mutable
|
|||
import org.scalatest.FunSuite
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.scheduler._
|
||||
import org.apache.spark.executor.TaskMetrics
|
||||
import java.nio.ByteBuffer
|
||||
import org.apache.spark.util.{Utils, FakeClock}
|
||||
|
||||
class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler(taskScheduler) {
|
||||
class FakeDAGScheduler(taskScheduler: FakeTaskScheduler) extends DAGScheduler(taskScheduler) {
|
||||
override def taskStarted(task: Task[_], taskInfo: TaskInfo) {
|
||||
taskScheduler.startedTasks += taskInfo.index
|
||||
}
|
||||
|
@ -53,13 +52,13 @@ class FakeDAGScheduler(taskScheduler: FakeClusterScheduler) extends DAGScheduler
|
|||
}
|
||||
|
||||
/**
|
||||
* A mock ClusterScheduler implementation that just remembers information about tasks started and
|
||||
* A mock TaskScheduler implementation that just remembers information about tasks started and
|
||||
* feedback received from the TaskSetManagers. Note that it's important to initialize this with
|
||||
* a list of "live" executors and their hostnames for isExecutorAlive and hasExecutorsAliveOnHost
|
||||
* to work, and these are required for locality in ClusterTaskSetManager.
|
||||
* to work, and these are required for locality in TaskSetManager.
|
||||
*/
|
||||
class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
|
||||
extends ClusterScheduler(sc)
|
||||
class FakeTaskScheduler(sc: SparkContext, liveExecutors: (String, String)* /* execId, host */)
|
||||
extends TaskScheduler(sc)
|
||||
{
|
||||
val startedTasks = new ArrayBuffer[Long]
|
||||
val endedTasks = new mutable.HashMap[Long, TaskEndReason]
|
||||
|
@ -79,16 +78,16 @@ class FakeClusterScheduler(sc: SparkContext, liveExecutors: (String, String)* /*
|
|||
override def hasExecutorsAliveOnHost(host: String): Boolean = executors.values.exists(_ == host)
|
||||
}
|
||||
|
||||
class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
|
||||
class TaskSetManagerSuite extends FunSuite with LocalSparkContext with Logging {
|
||||
import TaskLocality.{ANY, PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL}
|
||||
|
||||
val LOCALITY_WAIT = System.getProperty("spark.locality.wait", "3000").toLong
|
||||
|
||||
test("TaskSet with no preferences") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
|
||||
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
|
||||
val taskSet = createTaskSet(1)
|
||||
val manager = new ClusterTaskSetManager(sched, taskSet)
|
||||
val manager = new TaskSetManager(sched, taskSet)
|
||||
|
||||
// Offer a host with no CPUs
|
||||
assert(manager.resourceOffer("exec1", "host1", 0, ANY) === None)
|
||||
|
@ -112,9 +111,9 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
|||
|
||||
test("multiple offers with no preferences") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
|
||||
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
|
||||
val taskSet = createTaskSet(3)
|
||||
val manager = new ClusterTaskSetManager(sched, taskSet)
|
||||
val manager = new TaskSetManager(sched, taskSet)
|
||||
|
||||
// First three offers should all find tasks
|
||||
for (i <- 0 until 3) {
|
||||
|
@ -143,7 +142,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
|||
|
||||
test("basic delay scheduling") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
|
||||
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
|
||||
val taskSet = createTaskSet(4,
|
||||
Seq(TaskLocation("host1", "exec1")),
|
||||
Seq(TaskLocation("host2", "exec2")),
|
||||
|
@ -151,7 +150,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
|||
Seq() // Last task has no locality prefs
|
||||
)
|
||||
val clock = new FakeClock
|
||||
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
|
||||
val manager = new TaskSetManager(sched, taskSet, clock)
|
||||
|
||||
// First offer host1, exec1: first task should be chosen
|
||||
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
|
||||
|
@ -187,7 +186,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
|||
|
||||
test("delay scheduling with fallback") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val sched = new FakeClusterScheduler(sc,
|
||||
val sched = new FakeTaskScheduler(sc,
|
||||
("exec1", "host1"), ("exec2", "host2"), ("exec3", "host3"))
|
||||
val taskSet = createTaskSet(5,
|
||||
Seq(TaskLocation("host1")),
|
||||
|
@ -197,7 +196,7 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
|||
Seq(TaskLocation("host2"))
|
||||
)
|
||||
val clock = new FakeClock
|
||||
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
|
||||
val manager = new TaskSetManager(sched, taskSet, clock)
|
||||
|
||||
// First offer host1: first task should be chosen
|
||||
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
|
||||
|
@ -227,14 +226,14 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
|||
|
||||
test("delay scheduling with failed hosts") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
|
||||
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"), ("exec2", "host2"))
|
||||
val taskSet = createTaskSet(3,
|
||||
Seq(TaskLocation("host1")),
|
||||
Seq(TaskLocation("host2")),
|
||||
Seq(TaskLocation("host3"))
|
||||
)
|
||||
val clock = new FakeClock
|
||||
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
|
||||
val manager = new TaskSetManager(sched, taskSet, clock)
|
||||
|
||||
// First offer host1: first task should be chosen
|
||||
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
|
||||
|
@ -259,10 +258,10 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
|||
|
||||
test("task result lost") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
|
||||
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
|
||||
val taskSet = createTaskSet(1)
|
||||
val clock = new FakeClock
|
||||
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
|
||||
val manager = new TaskSetManager(sched, taskSet, clock)
|
||||
|
||||
assert(manager.resourceOffer("exec1", "host1", 1, ANY).get.index === 0)
|
||||
|
||||
|
@ -276,10 +275,10 @@ class ClusterTaskSetManagerSuite extends FunSuite with LocalSparkContext with Lo
|
|||
|
||||
test("repeated failures lead to task set abortion") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val sched = new FakeClusterScheduler(sc, ("exec1", "host1"))
|
||||
val sched = new FakeTaskScheduler(sc, ("exec1", "host1"))
|
||||
val taskSet = createTaskSet(1)
|
||||
val clock = new FakeClock
|
||||
val manager = new ClusterTaskSetManager(sched, taskSet, clock)
|
||||
val manager = new TaskSetManager(sched, taskSet, clock)
|
||||
|
||||
// Fail the task MAX_TASK_FAILURES times, and check that the task set is aborted
|
||||
// after the last failure.
|
Loading…
Reference in a new issue