[SQL][STREAMING][TEST] Fix flaky tests in StreamingQueryListenerSuite

This work has largely been done by lw-lin in his PR #15497. This is a slight refactoring of it.

## What changes were proposed in this pull request?
There were two sources of flakiness in StreamingQueryListener test.

- When testing with manual clock, consecutive attempts to advance the clock can occur without the stream execution thread being unblocked and doing some work between the two attempts. Hence the following can happen with the current ManualClock.
```
+-----------------------------------+--------------------------------+
|      StreamExecution thread       |         testing thread         |
+-----------------------------------+--------------------------------+
|  ManualClock.waitTillTime(100) {  |                                |
|        _isWaiting = true          |                                |
|            wait(10)               |                                |
|        still in wait(10)          |  if (_isWaiting) advance(100)  |
|        still in wait(10)          |  if (_isWaiting) advance(200)  | <- this should be disallowed !
|        still in wait(10)          |  if (_isWaiting) advance(300)  | <- this should be disallowed !
|      wake up from wait(10)        |                                |
|       current time is 600         |                                |
|       _isWaiting = false          |                                |
|  }                                |                                |
+-----------------------------------+--------------------------------+
```

- Second source of flakiness is that the adding data to memory stream may get processing in any trigger, not just the first trigger.

My fix is to make the manual clock wait for the other stream execution thread to start waiting for the clock at the right wait start time. That is, `advance(200)` (see above) will wait for stream execution thread to complete the wait that started at time 0, and start a new wait at time 200 (i.e. time stamp after the previous `advance(100)`).

In addition, since this is a feature that is solely used by StreamExecution, I removed all the non-generic code from ManualClock and put them in StreamManualClock inside StreamTest.

## How was this patch tested?
Ran existing unit test MANY TIME in Jenkins

Author: Tathagata Das <tathagata.das1565@gmail.com>
Author: Liwei Lin <lwlin7@gmail.com>

Closes #15519 from tdas/metrics-flaky-test-fix.
This commit is contained in:
Liwei Lin 2016-10-18 00:49:57 -07:00 committed by Tathagata Das
parent 1c5a7d7f64
commit 7d878cf2da
4 changed files with 41 additions and 27 deletions

View file

@ -26,8 +26,6 @@ package org.apache.spark.util
*/ */
private[spark] class ManualClock(private var time: Long) extends Clock { private[spark] class ManualClock(private var time: Long) extends Clock {
private var _isWaiting = false
/** /**
* @return `ManualClock` with initial time 0 * @return `ManualClock` with initial time 0
*/ */
@ -59,19 +57,9 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
* @return current time reported by the clock when waiting finishes * @return current time reported by the clock when waiting finishes
*/ */
def waitTillTime(targetTime: Long): Long = synchronized { def waitTillTime(targetTime: Long): Long = synchronized {
_isWaiting = true while (time < targetTime) {
try { wait(10)
while (time < targetTime) {
wait(10)
}
getTimeMillis()
} finally {
_isWaiting = false
} }
getTimeMillis()
} }
/**
* Returns whether there is any thread being blocked in `waitTillTime`.
*/
def isWaiting: Boolean = synchronized { _isWaiting }
} }

View file

@ -161,7 +161,7 @@ class StreamSuite extends StreamTest {
val inputData = MemoryStream[Int] val inputData = MemoryStream[Int]
testStream(inputData.toDS())( testStream(inputData.toDS())(
StartStream(ProcessingTime("10 seconds"), new ManualClock), StartStream(ProcessingTime("10 seconds"), new StreamManualClock),
/* -- batch 0 ----------------------- */ /* -- batch 0 ----------------------- */
// Add some data in batch 0 // Add some data in batch 0
@ -199,7 +199,7 @@ class StreamSuite extends StreamTest {
/* Stop then restart the Stream */ /* Stop then restart the Stream */
StopStream, StopStream,
StartStream(ProcessingTime("10 seconds"), new ManualClock), StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)),
/* -- batch 1 rerun ----------------- */ /* -- batch 1 rerun ----------------- */
// this batch 1 would re-run because the latest batch id logged in offset log is 1 // this batch 1 would re-run because the latest batch id logged in offset log is 1

View file

@ -204,6 +204,21 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit) case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit)
extends StreamAction extends StreamAction
class StreamManualClock(time: Long = 0L) extends ManualClock(time) {
private var waitStartTime: Option[Long] = None
override def waitTillTime(targetTime: Long): Long = synchronized {
try {
waitStartTime = Some(getTimeMillis())
super.waitTillTime(targetTime)
} finally {
waitStartTime = None
}
}
def isStreamWaitingAt(time: Long): Boolean = synchronized { waitStartTime.contains(time) }
}
/** /**
* Executes the specified actions on the given streaming DataFrame and provides helpful * Executes the specified actions on the given streaming DataFrame and provides helpful
@ -307,7 +322,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
val testThread = Thread.currentThread() val testThread = Thread.currentThread()
val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath
val statusCollector = new QueryStatusCollector val statusCollector = new QueryStatusCollector
var manualClockExpectedTime = -1L
try { try {
spark.streams.addListener(statusCollector) spark.streams.addListener(statusCollector)
startedTest.foreach { action => startedTest.foreach { action =>
@ -315,6 +330,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
action match { action match {
case StartStream(trigger, triggerClock) => case StartStream(trigger, triggerClock) =>
verify(currentStream == null, "stream already running") verify(currentStream == null, "stream already running")
verify(triggerClock.isInstanceOf[SystemClock]
|| triggerClock.isInstanceOf[StreamManualClock],
"Use either SystemClock or StreamManualClock to start the stream")
if (triggerClock.isInstanceOf[StreamManualClock]) {
manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis()
}
lastStream = currentStream lastStream = currentStream
currentStream = currentStream =
spark spark
@ -338,14 +359,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
case AdvanceManualClock(timeToAdd) => case AdvanceManualClock(timeToAdd) =>
verify(currentStream != null, verify(currentStream != null,
"can not advance manual clock when a stream is not running") "can not advance manual clock when a stream is not running")
verify(currentStream.triggerClock.isInstanceOf[ManualClock], verify(currentStream.triggerClock.isInstanceOf[StreamManualClock],
s"can not advance clock of type ${currentStream.triggerClock.getClass}") s"can not advance clock of type ${currentStream.triggerClock.getClass}")
val clock = currentStream.triggerClock.asInstanceOf[ManualClock] val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock]
assert(manualClockExpectedTime >= 0)
// Make sure we don't advance ManualClock too early. See SPARK-16002. // Make sure we don't advance ManualClock too early. See SPARK-16002.
eventually("ManualClock has not yet entered the waiting state") { eventually("StreamManualClock has not yet entered the waiting state") {
assert(clock.isWaiting) assert(clock.isStreamWaitingAt(manualClockExpectedTime))
} }
currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) clock.advance(timeToAdd)
manualClockExpectedTime += timeToAdd
verify(clock.getTimeMillis() === manualClockExpectedTime,
s"Unexpected clock time after updating: " +
s"expecting $manualClockExpectedTime, current ${clock.getTimeMillis()}")
case StopStream => case StopStream =>
verify(currentStream != null, "can not stop a stream that is not running") verify(currentStream != null, "can not stop a stream that is not running")

View file

@ -43,9 +43,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Make sure we don't leak any events to the next test // Make sure we don't leak any events to the next test
} }
ignore("single listener, check trigger statuses") { test("single listener, check trigger statuses") {
import StreamingQueryListenerSuite._ import StreamingQueryListenerSuite._
clock = new ManualClock() clock = new StreamManualClock
/** Custom MemoryStream that waits for manual clock to reach a time */ /** Custom MemoryStream that waits for manual clock to reach a time */
val inputData = new MemoryStream[Int](0, sqlContext) { val inputData = new MemoryStream[Int](0, sqlContext) {
@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AssertOnLastQueryStatus { status: StreamingQueryStatus => AssertOnLastQueryStatus { status: StreamingQueryStatus =>
// Check the correctness of the trigger info of the last completed batch reported by // Check the correctness of the trigger info of the last completed batch reported by
// onQueryProgress // onQueryProgress
assert(status.triggerDetails.get("triggerId") == "0") assert(status.triggerDetails.containsKey("triggerId"))
assert(status.triggerDetails.get("isTriggerActive") === "false") assert(status.triggerDetails.get("isTriggerActive") === "false")
assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")
assert(status.sourceStatuses.length === 1) assert(status.sourceStatuses.length === 1)
assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0") assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId"))
assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2") assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")