[SPARK-18868][FLAKY-TEST] Deflake StreamingQueryListenerSuite: single listener, check trigger...

## What changes were proposed in this pull request?

Use `recentProgress` instead of `lastProgress` and filter out last non-zero value. Also add eventually to the latest assertQuery similar to first `assertQuery`

## How was this patch tested?

Ran test 1000 times

Author: Burak Yavuz <brkyvz@gmail.com>

Closes #16287 from brkyvz/SPARK-18868.
This commit is contained in:
Burak Yavuz 2016-12-15 15:46:03 -08:00 committed by Shixiong Zhu
parent 32ff964526
commit 9c7f83b028

View file

@ -84,7 +84,11 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
CheckAnswer(10, 5),
AssertOnQuery { query =>
assert(listener.progressEvents.nonEmpty)
assert(listener.progressEvents.last.json === query.lastProgress.json)
// SPARK-18868: We can't use query.lastProgress, because in progressEvents, we filter
// out non-zero input rows, but the lastProgress may be a zero input row trigger
val lastNonZeroProgress = query.recentProgress.filter(_.numInputRows > 0).lastOption
.getOrElse(fail("No progress updates received in StreamingQuery!"))
assert(listener.progressEvents.last.json === lastNonZeroProgress.json)
assert(listener.terminationEvent === null)
true
},
@ -109,14 +113,17 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AdvanceManualClock(100),
ExpectFailure[SparkException],
AssertOnQuery { query =>
eventually(Timeout(streamingTimeout)) {
assert(listener.terminationEvent !== null)
assert(listener.terminationEvent.id === query.id)
assert(listener.terminationEvent.exception.nonEmpty)
// Make sure that the exception message reported through listener
// contains the actual exception and relevant stack trace
assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
assert(
listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
}
listener.checkAsyncErrors()
true
}