[SPARK-26989][CORE][TEST] DAGSchedulerSuite: ensure listeners are fully processed before checking recorded values

### What changes were proposed in this pull request?

This patch ensures accessing recorded values in listener is always after letting listeners fully process all events. To ensure this, this patch adds new class to hide these values and access with methods which will ensure above condition. Without this guard, two threads are running concurrently - 1) listeners process thread 2) test main thread - and race condition would occur.

That's why we also see very odd thing, error message saying condition is met but test failed:
```
- Barrier task failures from the same stage attempt don't trigger multiple stage retries *** FAILED ***
  ArrayBuffer(0) did not equal List(0) (DAGSchedulerSuite.scala:2656)
```
which means verification failed, and condition is met just before constructing error message.

The guard is properly placed in many spots, but missed in some places. This patch enforces that it can't be missed.

### Why are the changes needed?

UT fails intermittently and this patch will address the flakyness.

### Does this PR introduce any user-facing change?

No

### How was this patch tested?

Modified UT.

Also made the flaky tests artificially failing via applying 50ms of sleep on each onXXX method.

![Screen Shot 2019-09-07 at 7 44 15 AM](https://user-images.githubusercontent.com/1317309/64465178-1747ad00-d146-11e9-92f6-f4ed4a1f4b08.png)

I found 3 methods being failed. (They've marked as X. Just ignore ! as they failed on waiting listener in given timeout and these tests don't deal with these recorded values - it uses other timeout value 1000ms than 10000ms for this listener so affected via side-effect.)

When I applied same in this patch all tests marked as X passed.

Closes #25706 from HeartSaVioR/SPARK-26989.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2019-09-11 10:24:57 -07:00 committed by Marcelo Vanzin
parent f263909ee1
commit 2736efa32d

View file

@ -175,31 +175,66 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
/** Length of time to wait while draining listener events. */
val WAIT_TIMEOUT_MILLIS = 10000
val submittedStageInfos = new HashSet[StageInfo]
val successfulStages = new HashSet[Int]
val failedStages = new ArrayBuffer[Int]
val stageByOrderOfExecution = new ArrayBuffer[Int]
val endedTasks = new HashSet[Long]
val sparkListener = new SparkListener() {
/**
* Listeners which records some information to verify in UTs. Getter-kind methods in this class
* ensures the value is returned after ensuring there's no event to process, as well as the
* value is immutable: prevent showing odd result by race condition.
*/
class EventInfoRecordingListener extends SparkListener {
private val _submittedStageInfos = new HashSet[StageInfo]
private val _successfulStages = new HashSet[Int]
private val _failedStages = new ArrayBuffer[Int]
private val _stageByOrderOfExecution = new ArrayBuffer[Int]
private val _endedTasks = new HashSet[Long]
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) {
submittedStageInfos += stageSubmitted.stageInfo
_submittedStageInfos += stageSubmitted.stageInfo
}
override def onStageCompleted(stageCompleted: SparkListenerStageCompleted) {
val stageInfo = stageCompleted.stageInfo
stageByOrderOfExecution += stageInfo.stageId
_stageByOrderOfExecution += stageInfo.stageId
if (stageInfo.failureReason.isEmpty) {
successfulStages += stageInfo.stageId
_successfulStages += stageInfo.stageId
} else {
failedStages += stageInfo.stageId
_failedStages += stageInfo.stageId
}
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
endedTasks += taskEnd.taskInfo.taskId
_endedTasks += taskEnd.taskInfo.taskId
}
def submittedStageInfos: Set[StageInfo] = {
waitForListeners()
_submittedStageInfos.toSet
}
def successfulStages: Set[Int] = {
waitForListeners()
_successfulStages.toSet
}
def failedStages: List[Int] = {
waitForListeners()
_failedStages.toList
}
def stageByOrderOfExecution: List[Int] = {
waitForListeners()
_stageByOrderOfExecution.toList
}
def endedTasks: Set[Long] = {
waitForListeners()
_endedTasks.toSet
}
private def waitForListeners(): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
}
var sparkListener: EventInfoRecordingListener = null
var mapOutputTracker: MapOutputTrackerMaster = null
var broadcastManager: BroadcastManager = null
var securityMgr: SecurityManager = null
@ -248,10 +283,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
private def init(testConf: SparkConf): Unit = {
sc = new SparkContext("local[2]", "DAGSchedulerSuite", testConf)
submittedStageInfos.clear()
successfulStages.clear()
failedStages.clear()
endedTasks.clear()
sparkListener = new EventInfoRecordingListener
failure = null
sc.addSparkListener(sparkListener)
taskSets.clear()
@ -374,9 +406,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
}
test("[SPARK-3353] parent stage should have lower stage id") {
stageByOrderOfExecution.clear()
sc.parallelize(1 to 10).map(x => (x, x)).reduceByKey(_ + _, 4).count()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution
assert(stageByOrderOfExecution.length === 2)
assert(stageByOrderOfExecution(0) < stageByOrderOfExecution(1))
}
@ -619,9 +650,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
submit(unserializableRdd, Array(0))
assert(failure.getMessage.startsWith(
"Job aborted due to stage failure: Task not serializable:"))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(failedStages.contains(0))
assert(failedStages.size === 1)
assert(sparkListener.failedStages === Seq(0))
assertDataStructuresEmpty()
}
@ -629,9 +658,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
submit(new MyRDD(sc, 1, Nil), Array(0))
failed(taskSets(0), "some failure")
assert(failure.getMessage === "Job aborted due to stage failure: some failure")
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(failedStages.contains(0))
assert(failedStages.size === 1)
assert(sparkListener.failedStages === Seq(0))
assertDataStructuresEmpty()
}
@ -640,9 +667,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
val jobId = submit(rdd, Array(0))
cancel(jobId)
assert(failure.getMessage === s"Job $jobId cancelled ")
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(failedStages.contains(0))
assert(failedStages.size === 1)
assert(sparkListener.failedStages === Seq(0))
assertDataStructuresEmpty()
}
@ -700,9 +725,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(results === Map(0 -> 42))
assertDataStructuresEmpty()
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(failedStages.isEmpty)
assert(successfulStages.contains(0))
assert(sparkListener.failedStages.isEmpty)
assert(sparkListener.successfulStages.contains(0))
}
test("run trivial shuffle") {
@ -1115,8 +1139,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(failedStages.contains(1))
assert(sparkListener.failedStages.contains(1))
// The second ResultTask fails, with a fetch failure for the output from the second mapper.
runEvent(makeCompletionEvent(
@ -1124,8 +1147,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 1, 1, "ignored"),
null))
// The SparkListener should not receive redundant failure events.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(failedStages.size == 1)
assert(sparkListener.failedStages.size === 1)
}
test("Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure") {
@ -1172,7 +1194,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
taskSets(0).tasks(1),
TaskKilled("test"),
null))
assert(failedStages === Seq(0))
assert(sparkListener.failedStages === Seq(0))
assert(mapOutputTracker.findMissingPartitions(shuffleId) === Some(Seq(0, 1)))
scheduler.resubmitFailedStages()
@ -1226,11 +1248,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
val mapStageId = 0
def countSubmittedMapStageAttempts(): Int = {
submittedStageInfos.count(_.stageId == mapStageId)
sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
}
// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)
complete(taskSets(0), Seq(
@ -1247,12 +1268,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
taskSets(1).tasks(0),
FetchFailed(makeBlockManagerId("hostA"), shuffleId, 0, 0, "ignored"),
null))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(failedStages.contains(1))
assert(sparkListener.failedStages.contains(1))
// Trigger resubmission of the failed map stage.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
assert(countSubmittedMapStageAttempts() === 2)
@ -1269,7 +1288,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// shouldn't effect anything -- our calling it just makes *SURE* it gets called between the
// desired event and our check.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 2)
}
@ -1287,14 +1305,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
submit(reduceRdd, Array(0, 1))
def countSubmittedReduceStageAttempts(): Int = {
submittedStageInfos.count(_.stageId == 1)
sparkListener.submittedStageInfos.count(_.stageId == 1)
}
def countSubmittedMapStageAttempts(): Int = {
submittedStageInfos.count(_.stageId == 0)
sparkListener.submittedStageInfos.count(_.stageId == 0)
}
// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)
// Complete the map stage.
@ -1303,7 +1320,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
(Success, makeMapStatus("hostB", 2))))
// The reduce stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedReduceStageAttempts() === 1)
// The first result task fails, with a fetch failure for the output from the first mapper.
@ -1318,7 +1334,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// Because the map stage finished, another attempt for the reduce stage should have been
// submitted, resulting in 2 total attempts for each the map and the reduce stage.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 2)
assert(countSubmittedReduceStageAttempts() === 2)
@ -1348,10 +1363,9 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
runEvent(makeCompletionEvent(
taskSets(0).tasks(1), Success, 42,
Seq.empty, Array.empty, createFakeTaskInfoWithId(1)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// verify stage exists
assert(scheduler.stageIdToStage.contains(0))
assert(endedTasks.size == 2)
assert(sparkListener.endedTasks.size === 2)
// finish other 2 tasks
runEvent(makeCompletionEvent(
@ -1360,8 +1374,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty, Array.empty, createFakeTaskInfoWithId(3)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(endedTasks.size == 4)
assert(sparkListener.endedTasks.size === 4)
// verify the stage is done
assert(!scheduler.stageIdToStage.contains(0))
@ -1371,15 +1384,13 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), Success, 42,
Seq.empty, Array.empty, createFakeTaskInfoWithId(5)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(endedTasks.size == 5)
assert(sparkListener.endedTasks.size === 5)
// make sure non successful tasks also send out event
runEvent(makeCompletionEvent(
taskSets(0).tasks(3), UnknownReason, 42,
Seq.empty, Array.empty, createFakeTaskInfoWithId(6)))
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(endedTasks.size == 6)
assert(sparkListener.endedTasks.size === 6)
}
test("ignore late map task completions") {
@ -1452,8 +1463,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// Listener bus should get told about the map stage failing, but not the reduce stage
// (since the reduce stage hasn't been started yet).
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(failedStages.toSet === Set(0))
assert(sparkListener.failedStages.toSet === Set(0))
assertDataStructuresEmpty()
}
@ -1696,9 +1706,8 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
assert(cancelledStages.toSet === Set(0, 2))
// Make sure the listeners got told about both failed stages.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(successfulStages.isEmpty)
assert(failedStages.toSet === Set(0, 2))
assert(sparkListener.successfulStages.isEmpty)
assert(sparkListener.failedStages.toSet === Set(0, 2))
assert(listener1.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
assert(listener2.failureMessage === s"Job aborted due to stage failure: $stageFailureMessage")
@ -2672,11 +2681,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
val mapStageId = 0
def countSubmittedMapStageAttempts(): Int = {
submittedStageInfos.count(_.stageId == mapStageId)
sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
}
// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)
// The first map task fails with TaskKilled.
@ -2684,7 +2692,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
taskSets(0).tasks(0),
TaskKilled("test"),
null))
assert(failedStages === Seq(0))
assert(sparkListener.failedStages === Seq(0))
// The second map task fails with TaskKilled.
runEvent(makeCompletionEvent(
@ -2694,7 +2702,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// Trigger resubmission of the failed map stage.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
assert(countSubmittedMapStageAttempts() === 2)
@ -2708,11 +2715,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
val mapStageId = 0
def countSubmittedMapStageAttempts(): Int = {
submittedStageInfos.count(_.stageId == mapStageId)
sparkListener.submittedStageInfos.count(_.stageId == mapStageId)
}
// The map stage should have been submitted.
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 1)
// The first map task fails with TaskKilled.
@ -2720,11 +2726,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
taskSets(0).tasks(0),
TaskKilled("test"),
null))
assert(failedStages === Seq(0))
assert(sparkListener.failedStages === Seq(0))
// Trigger resubmission of the failed map stage.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
// Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
assert(countSubmittedMapStageAttempts() === 2)
@ -2737,7 +2742,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
// The second map task failure doesn't trigger stage retry.
runEvent(ResubmitFailedStages)
sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS)
assert(countSubmittedMapStageAttempts() === 2)
}