[SPARK-10247] [CORE] improve readability of a test case in DAGSchedulerSuite
This is pretty minor, just trying to improve the readability of `DAGSchedulerSuite`, I figure every bit helps. Before whenever I read this test, I never knew what "should work" and "should be ignored" really meant -- this adds some asserts & updates comments to make it more clear. Also some reformatting per a suggestion from markhamstra on https://github.com/apache/spark/pull/7699 Author: Imran Rashid <irashid@cloudera.com> Closes #8434 from squito/SPARK-10247.
This commit is contained in:
parent
f6c447f875
commit
3ddb9b3233
|
@ -926,27 +926,64 @@ class DAGSchedulerSuite
|
|||
val shuffleId = shuffleDep.shuffleId
|
||||
val reduceRdd = new MyRDD(sc, 2, List(shuffleDep))
|
||||
submit(reduceRdd, Array(0, 1))
|
||||
|
||||
// pretend we were told hostA went away
|
||||
val oldEpoch = mapOutputTracker.getEpoch
|
||||
runEvent(ExecutorLost("exec-hostA"))
|
||||
val newEpoch = mapOutputTracker.getEpoch
|
||||
assert(newEpoch > oldEpoch)
|
||||
|
||||
// now start completing some tasks in the shuffle map stage, under different hosts
|
||||
// and epochs, and make sure scheduler updates its state correctly
|
||||
val taskSet = taskSets(0)
|
||||
val shuffleStage = scheduler.stageIdToStage(taskSet.stageId).asInstanceOf[ShuffleMapStage]
|
||||
assert(shuffleStage.numAvailableOutputs === 0)
|
||||
|
||||
// should be ignored for being too old
|
||||
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
|
||||
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
|
||||
// should work because it's a non-failed host
|
||||
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostB",
|
||||
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
|
||||
runEvent(CompletionEvent(
|
||||
taskSet.tasks(0),
|
||||
Success,
|
||||
makeMapStatus("hostA", reduceRdd.partitions.size),
|
||||
null,
|
||||
createFakeTaskInfo(),
|
||||
null))
|
||||
assert(shuffleStage.numAvailableOutputs === 0)
|
||||
|
||||
// should work because it's a non-failed host (so the available map outputs will increase)
|
||||
runEvent(CompletionEvent(
|
||||
taskSet.tasks(0),
|
||||
Success,
|
||||
makeMapStatus("hostB", reduceRdd.partitions.size),
|
||||
null,
|
||||
createFakeTaskInfo(),
|
||||
null))
|
||||
assert(shuffleStage.numAvailableOutputs === 1)
|
||||
|
||||
// should be ignored for being too old
|
||||
runEvent(CompletionEvent(taskSet.tasks(0), Success, makeMapStatus("hostA",
|
||||
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
|
||||
// should work because it's a new epoch
|
||||
runEvent(CompletionEvent(
|
||||
taskSet.tasks(0),
|
||||
Success,
|
||||
makeMapStatus("hostA", reduceRdd.partitions.size),
|
||||
null,
|
||||
createFakeTaskInfo(),
|
||||
null))
|
||||
assert(shuffleStage.numAvailableOutputs === 1)
|
||||
|
||||
// should work because it's a new epoch, which will increase the number of available map
|
||||
// outputs, and also finish the stage
|
||||
taskSet.tasks(1).epoch = newEpoch
|
||||
runEvent(CompletionEvent(taskSet.tasks(1), Success, makeMapStatus("hostA",
|
||||
reduceRdd.partitions.size), null, createFakeTaskInfo(), null))
|
||||
runEvent(CompletionEvent(
|
||||
taskSet.tasks(1),
|
||||
Success,
|
||||
makeMapStatus("hostA", reduceRdd.partitions.size),
|
||||
null,
|
||||
createFakeTaskInfo(),
|
||||
null))
|
||||
assert(shuffleStage.numAvailableOutputs === 2)
|
||||
assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0).map(_._1).toSet ===
|
||||
HashSet(makeBlockManagerId("hostB"), makeBlockManagerId("hostA")))
|
||||
|
||||
// finish the next stage normally, which completes the job
|
||||
complete(taskSets(1), Seq((Success, 42), (Success, 43)))
|
||||
assert(results === Map(0 -> 42, 1 -> 43))
|
||||
assertDataStructuresEmpty()
|
||||
|
|
Loading…
Reference in a new issue