From 7d878cf2da04800bc4147b05610170865b148c64 Mon Sep 17 00:00:00 2001 From: Liwei Lin Date: Tue, 18 Oct 2016 00:49:57 -0700 Subject: [PATCH] [SQL][STREAMING][TEST] Fix flaky tests in StreamingQueryListenerSuite This work has largely been done by lw-lin in his PR #15497. This is a slight refactoring of it. ## What changes were proposed in this pull request? There were two sources of flakiness in StreamingQueryListener test. - When testing with manual clock, consecutive attempts to advance the clock can occur without the stream execution thread being unblocked and doing some work between the two attempts. Hence the following can happen with the current ManualClock. ``` +-----------------------------------+--------------------------------+ | StreamExecution thread | testing thread | +-----------------------------------+--------------------------------+ | ManualClock.waitTillTime(100) { | | | _isWaiting = true | | | wait(10) | | | still in wait(10) | if (_isWaiting) advance(100) | | still in wait(10) | if (_isWaiting) advance(200) | <- this should be disallowed ! | still in wait(10) | if (_isWaiting) advance(300) | <- this should be disallowed ! | wake up from wait(10) | | | current time is 600 | | | _isWaiting = false | | | } | | +-----------------------------------+--------------------------------+ ``` - Second source of flakiness is that the adding data to memory stream may get processing in any trigger, not just the first trigger. My fix is to make the manual clock wait for the other stream execution thread to start waiting for the clock at the right wait start time. That is, `advance(200)` (see above) will wait for stream execution thread to complete the wait that started at time 0, and start a new wait at time 200 (i.e. time stamp after the previous `advance(100)`). In addition, since this is a feature that is solely used by StreamExecution, I removed all the non-generic code from ManualClock and put them in StreamManualClock inside StreamTest. ## How was this patch tested? Ran existing unit test MANY TIME in Jenkins Author: Tathagata Das Author: Liwei Lin Closes #15519 from tdas/metrics-flaky-test-fix. --- .../org/apache/spark/util/ManualClock.scala | 18 ++------- .../spark/sql/streaming/StreamSuite.scala | 4 +- .../spark/sql/streaming/StreamTest.scala | 38 ++++++++++++++++--- .../StreamingQueryListenerSuite.scala | 8 ++-- 4 files changed, 41 insertions(+), 27 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index 91a9587101..e7a65d74a4 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -26,8 +26,6 @@ package org.apache.spark.util */ private[spark] class ManualClock(private var time: Long) extends Clock { - private var _isWaiting = false - /** * @return `ManualClock` with initial time 0 */ @@ -59,19 +57,9 @@ private[spark] class ManualClock(private var time: Long) extends Clock { * @return current time reported by the clock when waiting finishes */ def waitTillTime(targetTime: Long): Long = synchronized { - _isWaiting = true - try { - while (time < targetTime) { - wait(10) - } - getTimeMillis() - } finally { - _isWaiting = false + while (time < targetTime) { + wait(10) } + getTimeMillis() } - - /** - * Returns whether there is any thread being blocked in `waitTillTime`. - */ - def isWaiting: Boolean = synchronized { _isWaiting } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index cdbad901db..6bdf47901a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -161,7 +161,7 @@ class StreamSuite extends StreamTest { val inputData = MemoryStream[Int] testStream(inputData.toDS())( - StartStream(ProcessingTime("10 seconds"), new ManualClock), + StartStream(ProcessingTime("10 seconds"), new StreamManualClock), /* -- batch 0 ----------------------- */ // Add some data in batch 0 @@ -199,7 +199,7 @@ class StreamSuite extends StreamTest { /* Stop then restart the Stream */ StopStream, - StartStream(ProcessingTime("10 seconds"), new ManualClock), + StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)), /* -- batch 1 rerun ----------------- */ // this batch 1 would re-run because the latest batch id logged in offset log is 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala index 3b9d378634..254f823bf5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala @@ -204,6 +204,21 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit) extends StreamAction + class StreamManualClock(time: Long = 0L) extends ManualClock(time) { + private var waitStartTime: Option[Long] = None + + override def waitTillTime(targetTime: Long): Long = synchronized { + try { + waitStartTime = Some(getTimeMillis()) + super.waitTillTime(targetTime) + } finally { + waitStartTime = None + } + } + + def isStreamWaitingAt(time: Long): Boolean = synchronized { waitStartTime.contains(time) } + } + /** * Executes the specified actions on the given streaming DataFrame and provides helpful @@ -307,7 +322,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { val testThread = Thread.currentThread() val metadataRoot = Utils.createTempDir(namePrefix = "streaming.metadata").getCanonicalPath val statusCollector = new QueryStatusCollector - + var manualClockExpectedTime = -1L try { spark.streams.addListener(statusCollector) startedTest.foreach { action => @@ -315,6 +330,12 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { action match { case StartStream(trigger, triggerClock) => verify(currentStream == null, "stream already running") + verify(triggerClock.isInstanceOf[SystemClock] + || triggerClock.isInstanceOf[StreamManualClock], + "Use either SystemClock or StreamManualClock to start the stream") + if (triggerClock.isInstanceOf[StreamManualClock]) { + manualClockExpectedTime = triggerClock.asInstanceOf[StreamManualClock].getTimeMillis() + } lastStream = currentStream currentStream = spark @@ -338,14 +359,19 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts { case AdvanceManualClock(timeToAdd) => verify(currentStream != null, "can not advance manual clock when a stream is not running") - verify(currentStream.triggerClock.isInstanceOf[ManualClock], + verify(currentStream.triggerClock.isInstanceOf[StreamManualClock], s"can not advance clock of type ${currentStream.triggerClock.getClass}") - val clock = currentStream.triggerClock.asInstanceOf[ManualClock] + val clock = currentStream.triggerClock.asInstanceOf[StreamManualClock] + assert(manualClockExpectedTime >= 0) // Make sure we don't advance ManualClock too early. See SPARK-16002. - eventually("ManualClock has not yet entered the waiting state") { - assert(clock.isWaiting) + eventually("StreamManualClock has not yet entered the waiting state") { + assert(clock.isStreamWaitingAt(manualClockExpectedTime)) } - currentStream.triggerClock.asInstanceOf[ManualClock].advance(timeToAdd) + clock.advance(timeToAdd) + manualClockExpectedTime += timeToAdd + verify(clock.getTimeMillis() === manualClockExpectedTime, + s"Unexpected clock time after updating: " + + s"expecting $manualClockExpectedTime, current ${clock.getTimeMillis()}") case StopStream => verify(currentStream != null, "can not stop a stream that is not running") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index 9e0eefbc58..623f66a778 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -43,9 +43,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { // Make sure we don't leak any events to the next test } - ignore("single listener, check trigger statuses") { + test("single listener, check trigger statuses") { import StreamingQueryListenerSuite._ - clock = new ManualClock() + clock = new StreamManualClock /** Custom MemoryStream that waits for manual clock to reach a time */ val inputData = new MemoryStream[Int](0, sqlContext) { @@ -81,7 +81,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AssertOnLastQueryStatus { status: StreamingQueryStatus => // Check the correctness of the trigger info of the last completed batch reported by // onQueryProgress - assert(status.triggerDetails.get("triggerId") == "0") + assert(status.triggerDetails.containsKey("triggerId")) assert(status.triggerDetails.get("isTriggerActive") === "false") assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") @@ -101,7 +101,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") assert(status.sourceStatuses.length === 1) - assert(status.sourceStatuses(0).triggerDetails.get("triggerId") === "0") + assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId")) assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")