[SPARK-20209][SS] Execute next trigger immediately if previous batch took longer than trigger interval

## What changes were proposed in this pull request?

For large trigger intervals (e.g. 10 minutes), if a batch takes 11 minutes, then it will wait for 9 mins before starting the next batch. This does not make sense. The processing time based trigger policy should be to do process batches as fast as possible, but no faster than 1 in every trigger interval. If batches are taking longer than trigger interval anyways, then no point waiting extra trigger interval.

In this PR, I modified the ProcessingTimeExecutor to do so. Another minor change I did was to extract our StreamManualClock into a separate class so that it can be used outside subclasses of StreamTest. For example, ProcessingTimeExecutorSuite does not need to create any context for testing, just needs the StreamManualClock.

## How was this patch tested?
Added new unit tests to comprehensively test this behavior.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17525 from tdas/SPARK-20209.
This commit is contained in:
Tathagata Das 2017-04-04 23:20:17 -07:00
parent b6e71032d9
commit dad499f324
11 changed files with 194 additions and 72 deletions

View file

@ -39,6 +39,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions.{count, window}
import org.apache.spark.sql.kafka010.KafkaSourceProvider._
import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
import org.apache.spark.util.Utils

View file

@ -47,21 +47,22 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock =
extends TriggerExecutor with Logging {
private val intervalMs = processingTime.intervalMs
require(intervalMs >= 0)
override def execute(batchRunner: () => Boolean): Unit = {
override def execute(triggerHandler: () => Boolean): Unit = {
while (true) {
val batchStartTimeMs = clock.getTimeMillis()
val terminated = !batchRunner()
val triggerTimeMs = clock.getTimeMillis
val nextTriggerTimeMs = nextBatchTime(triggerTimeMs)
val terminated = !triggerHandler()
if (intervalMs > 0) {
val batchEndTimeMs = clock.getTimeMillis()
val batchElapsedTimeMs = batchEndTimeMs - batchStartTimeMs
val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs
if (batchElapsedTimeMs > intervalMs) {
notifyBatchFallingBehind(batchElapsedTimeMs)
}
if (terminated) {
return
}
clock.waitTillTime(nextBatchTime(batchEndTimeMs))
clock.waitTillTime(nextTriggerTimeMs)
} else {
if (terminated) {
return
@ -70,7 +71,7 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock =
}
}
/** Called when a batch falls behind. Expose for test only */
/** Called when a batch falls behind */
def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = {
logWarning("Current batch is falling behind. The trigger interval is " +
s"${intervalMs} milliseconds, but spent ${realElapsedTimeMs} milliseconds")
@ -83,6 +84,6 @@ case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock =
* an interval of `100 ms`, `nextBatchTime(nextBatchTime(0)) = 200` rather than `0`).
*/
def nextBatchTime(now: Long): Long = {
now / intervalMs * intervalMs + intervalMs
if (intervalMs == 0) now else now / intervalMs * intervalMs + intervalMs
}
}

View file

@ -17,14 +17,24 @@
package org.apache.spark.sql.execution.streaming
import java.util.concurrent.{CountDownLatch, TimeUnit}
import java.util.concurrent.ConcurrentHashMap
import scala.collection.mutable
import org.eclipse.jetty.util.ConcurrentHashSet
import org.scalatest.concurrent.Eventually
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.SpanSugar._
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.util.{Clock, ManualClock, SystemClock}
import org.apache.spark.sql.streaming.util.StreamManualClock
class ProcessingTimeExecutorSuite extends SparkFunSuite {
val timeout = 10.seconds
test("nextBatchTime") {
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100))
assert(processingTimeExecutor.nextBatchTime(0) === 100)
@ -35,6 +45,57 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
assert(processingTimeExecutor.nextBatchTime(150) === 200)
}
test("trigger timing") {
val triggerTimes = new ConcurrentHashSet[Int]
val clock = new StreamManualClock()
@volatile var continueExecuting = true
@volatile var clockIncrementInTrigger = 0L
val executor = ProcessingTimeExecutor(ProcessingTime("1000 milliseconds"), clock)
val executorThread = new Thread() {
override def run(): Unit = {
executor.execute(() => {
// Record the trigger time, increment clock if needed and
triggerTimes.add(clock.getTimeMillis.toInt)
clock.advance(clockIncrementInTrigger)
clockIncrementInTrigger = 0 // reset this so that there are no runaway triggers
continueExecuting
})
}
}
executorThread.start()
// First batch should execute immediately, then executor should wait for next one
eventually {
assert(triggerTimes.contains(0))
assert(clock.isStreamWaitingAt(0))
assert(clock.isStreamWaitingFor(1000))
}
// Second batch should execute when clock reaches the next trigger time.
// If next trigger takes less than the trigger interval, executor should wait for next one
clockIncrementInTrigger = 500
clock.setTime(1000)
eventually {
assert(triggerTimes.contains(1000))
assert(clock.isStreamWaitingAt(1500))
assert(clock.isStreamWaitingFor(2000))
}
// If next trigger takes less than the trigger interval, executor should immediately execute
// another one
clockIncrementInTrigger = 1500
clock.setTime(2000) // allow another trigger by setting clock to 2000
eventually {
// Since the next trigger will take 1500 (which is more than trigger interval of 1000)
// executor will immediately execute another trigger
assert(triggerTimes.contains(2000) && triggerTimes.contains(3500))
assert(clock.isStreamWaitingAt(3500))
assert(clock.isStreamWaitingFor(4000))
}
continueExecuting = false
clock.advance(1000)
waitForThreadJoin(executorThread)
}
test("calling nextBatchTime with the result of a previous call should return the next interval") {
val intervalMS = 100
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS))
@ -54,7 +115,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs))
processingTimeExecutor.execute(() => {
batchCounts += 1
// If the batch termination works well, batchCounts should be 3 after `execute`
// If the batch termination works correctly, batchCounts should be 3 after `execute`
batchCounts < 3
})
assert(batchCounts === 3)
@ -66,9 +127,8 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
}
test("notifyBatchFallingBehind") {
val clock = new ManualClock()
val clock = new StreamManualClock()
@volatile var batchFallingBehindCalled = false
val latch = new CountDownLatch(1)
val t = new Thread() {
override def run(): Unit = {
val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTime(100), clock) {
@ -77,7 +137,6 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
}
}
processingTimeExecutor.execute(() => {
latch.countDown()
clock.waitTillTime(200)
false
})
@ -85,9 +144,17 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite {
}
t.start()
// Wait until the batch is running so that we don't call `advance` too early
assert(latch.await(10, TimeUnit.SECONDS), "the batch has not yet started in 10 seconds")
eventually { assert(clock.isStreamWaitingFor(200)) }
clock.advance(200)
t.join()
waitForThreadJoin(t)
assert(batchFallingBehindCalled === true)
}
private def eventually(body: => Unit): Unit = {
Eventually.eventually(Timeout(timeout)) { body }
}
private def waitForThreadJoin(thread: Thread): Unit = {
failAfter(timeout) { thread.join() }
}
}

View file

@ -33,6 +33,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.FileStreamSource.{FileEntry, SeenFilesMap}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.ExistsThrowsExceptionFileSystem._
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

View file

@ -21,8 +21,6 @@ import java.sql.Date
import java.util.concurrent.ConcurrentHashMap
import org.scalatest.BeforeAndAfterAll
import org.scalatest.concurrent.Eventually.eventually
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.apache.spark.SparkException
import org.apache.spark.api.java.function.FlatMapGroupsWithStateFunction
@ -35,6 +33,7 @@ import org.apache.spark.sql.execution.RDDScanExec
import org.apache.spark.sql.execution.streaming.{FlatMapGroupsWithStateExec, GroupStateImpl, MemoryStream}
import org.apache.spark.sql.execution.streaming.state.{StateStore, StateStoreId, StoreUpdate}
import org.apache.spark.sql.streaming.FlatMapGroupsWithStateSuite.MemoryStateStore
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types.{DataType, IntegerType}
/** Class to check custom state types */

View file

@ -32,6 +32,7 @@ import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.StreamSourceProvider
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.sql.types.{IntegerType, StructField, StructType}
import org.apache.spark.util.Utils

View file

@ -214,24 +214,6 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
AssertOnQuery(query => { func(query); true })
}
class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
private var waitStartTime: Option[Long] = None
override def waitTillTime(targetTime: Long): Long = synchronized {
try {
waitStartTime = Some(getTimeMillis())
super.waitTillTime(targetTime)
} finally {
waitStartTime = None
}
}
def isStreamWaitingAt(time: Long): Boolean = synchronized {
waitStartTime == Some(time)
}
}
/**
* Executes the specified actions on the given streaming DataFrame and provides helpful
* error messages in the case of failures or incorrect answers.
@ -242,6 +224,8 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
def testStream(
_stream: Dataset[_],
outputMode: OutputMode = OutputMode.Append)(actions: StreamAction*): Unit = synchronized {
import org.apache.spark.sql.streaming.util.StreamManualClock
// `synchronized` is added to prevent the user from calling multiple `testStream`s concurrently
// because this method assumes there is only one active query in its `StreamingQueryListener`
// and it may not work correctly when multiple `testStream`s run concurrently.

View file

@ -30,6 +30,7 @@ import org.apache.spark.sql.execution.streaming.state.StateStore
import org.apache.spark.sql.expressions.scalalang.typed
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.OutputMode._
import org.apache.spark.sql.streaming.util.StreamManualClock
object FailureSinglton {
var firstTime = true

View file

@ -35,6 +35,7 @@ import org.apache.spark.sql.{Encoder, SparkSession}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.StreamingQueryListener._
import org.apache.spark.sql.streaming.util.StreamManualClock
import org.apache.spark.util.JsonProtocol
class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {

View file

@ -34,7 +34,7 @@ import org.apache.spark.SparkException
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider}
import org.apache.spark.sql.streaming.util.{BlockingSource, MockSourceProvider, StreamManualClock}
import org.apache.spark.util.ManualClock
@ -207,46 +207,53 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
/** Custom MemoryStream that waits for manual clock to reach a time */
val inputData = new MemoryStream[Int](0, sqlContext) {
// Wait for manual clock to be 100 first time there is data
// getOffset should take 50 ms the first time it is called
override def getOffset: Option[Offset] = {
val offset = super.getOffset
if (offset.nonEmpty) {
clock.waitTillTime(300)
clock.waitTillTime(1050)
}
offset
}
// Wait for manual clock to be 300 first time there is data
// getBatch should take 100 ms the first time it is called
override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
clock.waitTillTime(600)
if (start.isEmpty) clock.waitTillTime(1150)
super.getBatch(start, end)
}
}
// This is to make sure thatquery waits for manual clock to be 600 first time there is data
val mapped = inputData.toDS().as[Long].map { x =>
clock.waitTillTime(1100)
// query execution should take 350 ms the first time it is called
val mapped = inputData.toDS.coalesce(1).as[Long].map { x =>
clock.waitTillTime(1500) // this will only wait the first time when clock < 1500
10 / x
}.agg(count("*")).as[Long]
case class AssertStreamExecThreadToWaitForClock()
case class AssertStreamExecThreadIsWaitingForTime(targetTime: Long)
extends AssertOnQuery(q => {
eventually(Timeout(streamingTimeout)) {
if (q.exception.isEmpty) {
assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
assert(clock.isStreamWaitingFor(targetTime))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
true
}, "")
}, "") {
override def toString: String = s"AssertStreamExecThreadIsWaitingForTime($targetTime)"
}
case class AssertClockTime(time: Long)
extends AssertOnQuery(q => clock.getTimeMillis() === time, "") {
override def toString: String = s"AssertClockTime($time)"
}
var lastProgressBeforeStop: StreamingQueryProgress = null
testStream(mapped, OutputMode.Complete)(
StartStream(ProcessingTime(100), triggerClock = clock),
AssertStreamExecThreadToWaitForClock(),
StartStream(ProcessingTime(1000), triggerClock = clock),
AssertStreamExecThreadIsWaitingForTime(1000),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
@ -254,33 +261,37 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
// Test status and progress while offset is being fetched
AddData(inputData, 1, 2),
AdvanceManualClock(100), // time = 100 to start new trigger, will block on getOffset
AssertStreamExecThreadToWaitForClock(),
AdvanceManualClock(1000), // time = 1000 to start new trigger, will block on getOffset
AssertStreamExecThreadIsWaitingForTime(1050),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message.startsWith("Getting offsets from")),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
// Test status and progress while batch is being fetched
AdvanceManualClock(200), // time = 300 to unblock getOffset, will block on getBatch
AssertStreamExecThreadToWaitForClock(),
AdvanceManualClock(50), // time = 1050 to unblock getOffset
AssertClockTime(1050),
AssertStreamExecThreadIsWaitingForTime(1150), // will block on getBatch that needs 1150
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
// Test status and progress while batch is being processed
AdvanceManualClock(300), // time = 600 to unblock getBatch, will block in Spark job
AdvanceManualClock(100), // time = 1150 to unblock getBatch
AssertClockTime(1150),
AssertStreamExecThreadIsWaitingForTime(1500), // will block in Spark job that needs 1500
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === true),
AssertOnQuery(_.status.message === "Processing new data"),
AssertOnQuery(_.recentProgress.count(_.numInputRows > 0) === 0),
// Test status and progress while batch processing has completed
AdvanceManualClock(500), // time = 1100 to unblock job
AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
AssertOnQuery { _ => clock.getTimeMillis() === 1150 },
AdvanceManualClock(350), // time = 1500 to unblock job
AssertClockTime(1500),
CheckAnswer(2),
AssertStreamExecThreadToWaitForClock(),
AssertStreamExecThreadIsWaitingForTime(2000),
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
@ -293,21 +304,21 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
assert(progress.id === query.id)
assert(progress.name === query.name)
assert(progress.batchId === 0)
assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC
assert(progress.timestamp === "1970-01-01T00:00:01.000Z") // 100 ms in UTC
assert(progress.numInputRows === 2)
assert(progress.processedRowsPerSecond === 2.0)
assert(progress.processedRowsPerSecond === 4.0)
assert(progress.durationMs.get("getOffset") === 200)
assert(progress.durationMs.get("getBatch") === 300)
assert(progress.durationMs.get("getOffset") === 50)
assert(progress.durationMs.get("getBatch") === 100)
assert(progress.durationMs.get("queryPlanning") === 0)
assert(progress.durationMs.get("walCommit") === 0)
assert(progress.durationMs.get("triggerExecution") === 1000)
assert(progress.durationMs.get("triggerExecution") === 500)
assert(progress.sources.length === 1)
assert(progress.sources(0).description contains "MemoryStream")
assert(progress.sources(0).startOffset === null)
assert(progress.sources(0).endOffset !== null)
assert(progress.sources(0).processedRowsPerSecond === 2.0)
assert(progress.sources(0).processedRowsPerSecond === 4.0) // 2 rows processed in 500 ms
assert(progress.stateOperators.length === 1)
assert(progress.stateOperators(0).numRowsUpdated === 1)
@ -317,9 +328,12 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
true
},
// Test whether input rate is updated after two batches
AssertStreamExecThreadIsWaitingForTime(2000), // blocked waiting for next trigger time
AddData(inputData, 1, 2),
AdvanceManualClock(100), // allow another trigger
AssertStreamExecThreadToWaitForClock(),
AdvanceManualClock(500), // allow another trigger
AssertClockTime(2000),
AssertStreamExecThreadIsWaitingForTime(3000), // will block waiting for next trigger time
CheckAnswer(4),
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === false),
@ -327,13 +341,14 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
AssertOnQuery { query =>
assert(query.recentProgress.last.eq(query.lastProgress))
assert(query.lastProgress.batchId === 1)
assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
assert(query.lastProgress.inputRowsPerSecond === 2.0)
assert(query.lastProgress.sources(0).inputRowsPerSecond === 2.0)
true
},
// Test status and progress after data is not available for a trigger
AdvanceManualClock(100), // allow another trigger
AssertStreamExecThreadToWaitForClock(),
AdvanceManualClock(1000), // allow another trigger
AssertStreamExecThreadIsWaitingForTime(4000),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
@ -350,10 +365,10 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
AssertOnQuery(_.status.message === "Stopped"),
// Test status and progress after query terminated with error
StartStream(ProcessingTime(100), triggerClock = clock),
AdvanceManualClock(100), // ensure initial trigger completes before AddData
StartStream(ProcessingTime(1000), triggerClock = clock),
AdvanceManualClock(1000), // ensure initial trigger completes before AddData
AddData(inputData, 0),
AdvanceManualClock(100), // allow another trigger
AdvanceManualClock(1000), // allow another trigger
ExpectFailure[SparkException](),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),
@ -678,5 +693,5 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
object StreamingQuerySuite {
// Singleton reference to clock that does not get serialized in task closures
var clock: ManualClock = null
var clock: StreamManualClock = null
}

View file

@ -0,0 +1,51 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.streaming.util
import org.apache.spark.util.ManualClock
/**
* ManualClock used for streaming tests that allows checking whether the stream is waiting
* on the clock at expected times.
*/
class StreamManualClock(time: Long = 0L) extends ManualClock(time) with Serializable {
private var waitStartTime: Option[Long] = None
private var waitTargetTime: Option[Long] = None
override def waitTillTime(targetTime: Long): Long = synchronized {
try {
waitStartTime = Some(getTimeMillis())
waitTargetTime = Some(targetTime)
super.waitTillTime(targetTime)
} finally {
waitStartTime = None
waitTargetTime = None
}
}
/** Is the streaming thread waiting for the clock to advance when it is at the given time */
def isStreamWaitingAt(time: Long): Boolean = synchronized {
waitStartTime == Some(time)
}
/** Is the streaming thread waiting for clock to advance to the given time */
def isStreamWaitingFor(target: Long): Boolean = synchronized {
waitTargetTime == Some(target)
}
}