[SPARK-19876][SS][WIP] OneTime Trigger Executor

## What changes were proposed in this pull request?

An additional trigger and trigger executor that will execute a single trigger only. One can use this OneTime trigger to have more control over the scheduling of triggers.

In addition, this patch requires an optimization to StreamExecution that logs a commit record at the end of successfully processing a batch. This new commit log will be used to determine the next batch (offsets) to process after a restart, instead of using the offset log itself to determine what batch to process next after restart; using the offset log to determine this would process the previously logged batch, always, thus not permitting a OneTime trigger feature.

## How was this patch tested?

A number of existing tests have been revised. These tests all assumed that when restarting a stream, the last batch in the offset log is to be re-processed. Given that we now have a commit log that will tell us if that last batch was processed successfully, the results/assumptions of those tests needed to be revised accordingly.

In addition, a OneTime trigger test was added to StreamingQuerySuite, which tests:
- The semantics of OneTime trigger (i.e., on start, execute a single batch, then stop).
- The case when the commit log was not able to successfully log the completion of a batch before restart, which would mean that we should fall back to what's in the offset log.
- A OneTime trigger execution that results in an exception being thrown.

marmbrus tdas zsxwing

Please review http://spark.apache.org/contributing.html before opening a pull request.

Author: Tyson Condie <tcondie@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #17219 from tcondie/stream-commit.
This commit is contained in:
Tyson Condie 2017-03-23 14:32:05 -07:00 committed by Tathagata Das
parent b0ae6a38a3
commit 746a558de2
19 changed files with 439 additions and 94 deletions

View file

@ -301,8 +301,6 @@ class KafkaSourceSuite extends KafkaSourceTest {
StopStream,
StartStream(ProcessingTime(100), clock),
waitUntilBatchProcessed,
AdvanceManualClock(100),
waitUntilBatchProcessed,
// smallest now empty, 1 more from middle, 9 more from biggest
CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
11, 108, 109, 110, 111, 112, 113, 114, 115, 116,

View file

@ -64,7 +64,11 @@ object MimaExcludes {
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskData.<init>$default$11"),
// [SPARK-17161] Removing Python-friendly constructors not needed
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.classification.OneVsRestModel.this"),
// [SPARK-19876] Add one time trigger, and improve Trigger APIs
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.streaming.Trigger"),
ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.sql.streaming.ProcessingTime")
)
// Exclude rules for 2.1.x

View file

@ -277,44 +277,6 @@ class StreamingQueryManager(object):
self._jsqm.resetTerminated()
class Trigger(object):
"""Used to indicate how often results should be produced by a :class:`StreamingQuery`.
.. note:: Experimental
.. versionadded:: 2.0
"""
__metaclass__ = ABCMeta
@abstractmethod
def _to_java_trigger(self, sqlContext):
"""Internal method to construct the trigger on the jvm.
"""
pass
class ProcessingTime(Trigger):
"""A trigger that runs a query periodically based on the processing time. If `interval` is 0,
the query will run as fast as possible.
The interval should be given as a string, e.g. '2 seconds', '5 minutes', ...
.. note:: Experimental
.. versionadded:: 2.0
"""
def __init__(self, interval):
if type(interval) != str or len(interval.strip()) == 0:
raise ValueError("interval should be a non empty interval string, e.g. '2 seconds'.")
self.interval = interval
def _to_java_trigger(self, sqlContext):
return sqlContext._sc._jvm.org.apache.spark.sql.streaming.ProcessingTime.create(
self.interval)
class DataStreamReader(OptionUtils):
"""
Interface used to load a streaming :class:`DataFrame` from external storage systems
@ -790,7 +752,7 @@ class DataStreamWriter(object):
@keyword_only
@since(2.0)
def trigger(self, processingTime=None):
def trigger(self, processingTime=None, once=None):
"""Set the trigger for the stream query. If this is not set it will run the query as fast
as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``.
@ -800,17 +762,26 @@ class DataStreamWriter(object):
>>> # trigger the query for execution every 5 seconds
>>> writer = sdf.writeStream.trigger(processingTime='5 seconds')
>>> # trigger the query for just once batch of data
>>> writer = sdf.writeStream.trigger(once=True)
"""
from pyspark.sql.streaming import ProcessingTime
trigger = None
jTrigger = None
if processingTime is not None:
if once is not None:
raise ValueError('Multiple triggers not allowed.')
if type(processingTime) != str or len(processingTime.strip()) == 0:
raise ValueError('The processing time must be a non empty string. Got: %s' %
raise ValueError('Value for processingTime must be a non empty string. Got: %s' %
processingTime)
trigger = ProcessingTime(processingTime)
if trigger is None:
raise ValueError('A trigger was not provided. Supported triggers: processingTime.')
self._jwrite = self._jwrite.trigger(trigger._to_java_trigger(self._spark))
interval = processingTime.strip()
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.ProcessingTime(
interval)
elif once is not None:
if once is not True:
raise ValueError('Value for once must be True. Got: %s' % once)
jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()
else:
raise ValueError('No trigger provided')
self._jwrite = self._jwrite.trigger(jTrigger)
return self
@ignore_unicode_prefix

View file

@ -1255,13 +1255,26 @@ class SQLTests(ReusedPySparkTestCase):
shutil.rmtree(tmpPath)
def test_stream_trigger_takes_keyword_args(self):
def test_stream_trigger(self):
df = self.spark.readStream.format('text').load('python/test_support/sql/streaming')
# Should take at least one arg
try:
df.writeStream.trigger()
except ValueError:
pass
# Should not take multiple args
try:
df.writeStream.trigger(once=True, processingTime='5 seconds')
except ValueError:
pass
# Should take only keyword args
try:
df.writeStream.trigger('5 seconds')
self.fail("Should have thrown an exception")
except TypeError:
# should throw error
pass
def test_stream_read_options(self):

View file

@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import java.io.{InputStream, OutputStream}
import java.nio.charset.StandardCharsets._
import scala.io.{Source => IOSource}
import org.apache.spark.sql.SparkSession
/**
* Used to write log files that represent batch commit points in structured streaming.
* A commit log file will be written immediately after the successful completion of a
* batch, and before processing the next batch. Here is an execution summary:
* - trigger batch 1
* - obtain batch 1 offsets and write to offset log
* - process batch 1
* - write batch 1 to completion log
* - trigger batch 2
* - obtain bactch 2 offsets and write to offset log
* - process batch 2
* - write batch 2 to completion log
* ....
*
* The current format of the batch completion log is:
* line 1: version
* line 2: metadata (optional json string)
*/
class BatchCommitLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[String](sparkSession, path) {
override protected def deserialize(in: InputStream): String = {
// called inside a try-finally where the underlying stream is closed in the caller
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file in the offset commit log")
}
parseVersion(lines.next().trim, BatchCommitLog.VERSION)
// read metadata
lines.next().trim match {
case BatchCommitLog.SERIALIZED_VOID => null
case metadata => metadata
}
}
override protected def serialize(metadata: String, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(s"v${BatchCommitLog.VERSION}".getBytes(UTF_8))
out.write('\n')
// write metadata or void
out.write((if (metadata == null) BatchCommitLog.SERIALIZED_VOID else metadata)
.getBytes(UTF_8))
}
}
object BatchCommitLog {
private val VERSION = 1
private val SERIALIZED_VOID = "{}"
}

View file

@ -165,6 +165,8 @@ class StreamExecution(
private val triggerExecutor = trigger match {
case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
case OneTimeTrigger => OneTimeExecutor()
case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger")
}
/** Defines the internal state of execution */
@ -209,6 +211,13 @@ class StreamExecution(
*/
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
/**
* A log that records the batch ids that have completed. This is used to check if a batch was
* fully processed, and its output was committed to the sink, hence no need to process it again.
* This is used (for instance) during restart, to help identify which batch to run next.
*/
val batchCommitLog = new BatchCommitLog(sparkSession, checkpointFile("commits"))
/** Whether all fields of the query have been initialized */
private def isInitialized: Boolean = state.get != INITIALIZING
@ -291,10 +300,13 @@ class StreamExecution(
runBatch(sparkSessionToRunBatches)
}
}
// Report trigger as finished and construct progress object.
finishTrigger(dataAvailable)
if (dataAvailable) {
// Update committed offsets.
committedOffsets ++= availableOffsets
batchCommitLog.add(currentBatchId, null)
logDebug(s"batch ${currentBatchId} committed")
// We'll increase currentBatchId after we complete processing current batch's data
currentBatchId += 1
} else {
@ -306,9 +318,6 @@ class StreamExecution(
} else {
false
}
// Update committed offsets.
committedOffsets ++= availableOffsets
updateStatusMessage("Waiting for next trigger")
continueToRun
})
@ -392,13 +401,33 @@ class StreamExecution(
* - currentBatchId
* - committedOffsets
* - availableOffsets
* The basic structure of this method is as follows:
*
* Identify (from the offset log) the offsets used to run the last batch
* IF last batch exists THEN
* Set the next batch to be executed as the last recovered batch
* Check the commit log to see which batch was committed last
* IF the last batch was committed THEN
* Call getBatch using the last batch start and end offsets
* // ^^^^ above line is needed since some sources assume last batch always re-executes
* Setup for a new batch i.e., start = last batch end, and identify new end
* DONE
* ELSE
* Identify a brand new batch
* DONE
*/
private def populateStartOffsets(sparkSessionToRunBatches: SparkSession): Unit = {
offsetLog.getLatest() match {
case Some((batchId, nextOffsets)) =>
logInfo(s"Resuming streaming query, starting with batch $batchId")
currentBatchId = batchId
case Some((latestBatchId, nextOffsets)) =>
/* First assume that we are re-executing the latest known batch
* in the offset log */
currentBatchId = latestBatchId
availableOffsets = nextOffsets.toStreamProgress(sources)
/* Initialize committed offsets to a committed batch, which at this
* is the second latest batch id in the offset log. */
offsetLog.get(latestBatchId - 1).foreach { secondLatestBatchId =>
committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}
// update offset metadata
nextOffsets.metadata.foreach { metadata =>
@ -419,14 +448,37 @@ class StreamExecution(
SQLConf.SHUFFLE_PARTITIONS.key, shufflePartitionsToUse.toString)
}
logDebug(s"Found possibly unprocessed offsets $availableOffsets " +
s"at batch timestamp ${offsetSeqMetadata.batchTimestampMs}")
offsetLog.get(batchId - 1).foreach {
case lastOffsets =>
committedOffsets = lastOffsets.toStreamProgress(sources)
logDebug(s"Resuming with committed offsets: $committedOffsets")
/* identify the current batch id: if commit log indicates we successfully processed the
* latest batch id in the offset log, then we can safely move to the next batch
* i.e., committedBatchId + 1 */
batchCommitLog.getLatest() match {
case Some((latestCommittedBatchId, _)) =>
if (latestBatchId == latestCommittedBatchId) {
/* The last batch was successfully committed, so we can safely process a
* new next batch but first:
* Make a call to getBatch using the offsets from previous batch.
* because certain sources (e.g., KafkaSource) assume on restart the last
* batch will be executed before getOffset is called again. */
availableOffsets.foreach { ao: (Source, Offset) =>
val (source, end) = ao
if (committedOffsets.get(source).map(_ != end).getOrElse(true)) {
val start = committedOffsets.get(source)
source.getBatch(start, end)
}
}
currentBatchId = latestCommittedBatchId + 1
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
constructNextBatch()
} else if (latestCommittedBatchId < latestBatchId - 1) {
logWarning(s"Batch completion log latest batch id is " +
s"${latestCommittedBatchId}, which is not trailing " +
s"batchid $latestBatchId by one")
}
case None => logInfo("no commit log present")
}
logDebug(s"Resuming at batch $currentBatchId with committed offsets " +
s"$committedOffsets and available offsets $availableOffsets")
case None => // We are starting this stream for the first time.
logInfo(s"Starting new streaming query.")
currentBatchId = 0
@ -523,6 +575,7 @@ class StreamExecution(
// Note that purge is exclusive, i.e. it purges everything before the target ID.
if (minBatchesToRetain < currentBatchId) {
offsetLog.purge(currentBatchId - minBatchesToRetain)
batchCommitLog.purge(currentBatchId - minBatchesToRetain)
}
}
} else {

View file

@ -29,6 +29,17 @@ trait TriggerExecutor {
def execute(batchRunner: () => Boolean): Unit
}
/**
* A trigger executor that runs a single batch only, then terminates.
*/
case class OneTimeExecutor() extends TriggerExecutor {
/**
* Execute a single batch using `batchRunner`.
*/
override def execute(batchRunner: () => Boolean): Unit = batchRunner()
}
/**
* A trigger executor that runs a batch every `intervalMs` milliseconds.
*/

View file

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.execution.streaming
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.sql.streaming.Trigger
/**
* A [[Trigger]] that process only one batch of data in a streaming query then terminates
* the query.
*/
@Experimental
@InterfaceStability.Evolving
case object OneTimeTrigger extends Trigger

View file

@ -377,7 +377,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) {
private var outputMode: OutputMode = OutputMode.Append
private var trigger: Trigger = ProcessingTime(0L)
private var trigger: Trigger = Trigger.ProcessingTime(0L)
private var extraOptions = new scala.collection.mutable.HashMap[String, String]

View file

@ -26,16 +26,6 @@ import org.apache.commons.lang3.StringUtils
import org.apache.spark.annotation.{Experimental, InterfaceStability}
import org.apache.spark.unsafe.types.CalendarInterval
/**
* :: Experimental ::
* Used to indicate how often results should be produced by a [[StreamingQuery]].
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
sealed trait Trigger
/**
* :: Experimental ::
* A trigger that runs a query periodically based on the processing time. If `interval` is 0,
@ -43,24 +33,25 @@ sealed trait Trigger
*
* Scala Example:
* {{{
* df.write.trigger(ProcessingTime("10 seconds"))
* df.writeStream.trigger(ProcessingTime("10 seconds"))
*
* import scala.concurrent.duration._
* df.write.trigger(ProcessingTime(10.seconds))
* df.writeStream.trigger(ProcessingTime(10.seconds))
* }}}
*
* Java Example:
* {{{
* df.write.trigger(ProcessingTime.create("10 seconds"))
* df.writeStream.trigger(ProcessingTime.create("10 seconds"))
*
* import java.util.concurrent.TimeUnit
* df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
case class ProcessingTime(intervalMs: Long) extends Trigger {
require(intervalMs >= 0, "the interval of trigger should not be negative")
}
@ -73,6 +64,7 @@ case class ProcessingTime(intervalMs: Long) extends Trigger {
*/
@Experimental
@InterfaceStability.Evolving
@deprecated("use Trigger.ProcessingTimeTrigger(intervalMs)", "2.2.0")
object ProcessingTime {
/**
@ -80,11 +72,13 @@ object ProcessingTime {
*
* Example:
* {{{
* df.write.trigger(ProcessingTime("10 seconds"))
* df.writeStream.trigger(ProcessingTime("10 seconds"))
* }}}
*
* @since 2.0.0
* @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
@deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
def apply(interval: String): ProcessingTime = {
if (StringUtils.isBlank(interval)) {
throw new IllegalArgumentException(
@ -110,11 +104,13 @@ object ProcessingTime {
* Example:
* {{{
* import scala.concurrent.duration._
* df.write.trigger(ProcessingTime(10.seconds))
* df.writeStream.trigger(ProcessingTime(10.seconds))
* }}}
*
* @since 2.0.0
* @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
@deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
def apply(interval: Duration): ProcessingTime = {
new ProcessingTime(interval.toMillis)
}
@ -124,11 +120,13 @@ object ProcessingTime {
*
* Example:
* {{{
* df.write.trigger(ProcessingTime.create("10 seconds"))
* df.writeStream.trigger(ProcessingTime.create("10 seconds"))
* }}}
*
* @since 2.0.0
* @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
@deprecated("use Trigger.ProcessingTimeTrigger(interval)", "2.2.0")
def create(interval: String): ProcessingTime = {
apply(interval)
}
@ -139,11 +137,13 @@ object ProcessingTime {
* Example:
* {{{
* import java.util.concurrent.TimeUnit
* df.write.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.0.0
* @deprecated use Trigger.ProcessingTimeTrigger(interval)
*/
@deprecated("use Trigger.ProcessingTimeTrigger(interval, unit)", "2.2.0")
def create(interval: Long, unit: TimeUnit): ProcessingTime = {
new ProcessingTime(unit.toMillis(interval))
}

View file

@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.sql.streaming;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import org.apache.spark.annotation.Experimental;
import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
/**
* :: Experimental ::
* Policy used to indicate how often results should be produced by a [[StreamingQuery]].
*
* @since 2.0.0
*/
@Experimental
@InterfaceStability.Evolving
public class Trigger {
/**
* :: Experimental ::
* A trigger policy that runs a query periodically based on an interval in processing time.
* If `interval` is 0, the query will run as fast as possible.
*
* @since 2.2.0
*/
public static Trigger ProcessingTime(long intervalMs) {
return ProcessingTime.apply(intervalMs);
}
/**
* :: Experimental ::
* (Java-friendly)
* A trigger policy that runs a query periodically based on an interval in processing time.
* If `interval` is 0, the query will run as fast as possible.
*
* {{{
* import java.util.concurrent.TimeUnit
* df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
* }}}
*
* @since 2.2.0
*/
public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
return ProcessingTime.create(interval, timeUnit);
}
/**
* :: Experimental ::
* (Scala-friendly)
* A trigger policy that runs a query periodically based on an interval in processing time.
* If `duration` is 0, the query will run as fast as possible.
*
* {{{
* import scala.concurrent.duration._
* df.writeStream.trigger(ProcessingTime(10.seconds))
* }}}
* @since 2.2.0
*/
public static Trigger ProcessingTime(Duration interval) {
return ProcessingTime.apply(interval);
}
/**
* :: Experimental ::
* A trigger policy that runs a query periodically based on an interval in processing time.
* If `interval` is effectively 0, the query will run as fast as possible.
*
* {{{
* df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
* }}}
* @since 2.2.0
*/
public static Trigger ProcessingTime(String interval) {
return ProcessingTime.apply(interval);
}
/**
* A trigger that process only one batch of data in a streaming query then terminates
* the query.
*
* @since 2.2.0
*/
public static Trigger Once() {
return OneTimeTrigger$.MODULE$;
}
}

View file

@ -218,7 +218,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Loggin
AddData(inputData, 25), // Evict items less than previous watermark.
CheckLastBatch((10, 5)),
StopStream,
AssertOnQuery { q => // clear the sink
AssertOnQuery { q => // purge commit and clear the sink
val commit = q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) + 1L
q.batchCommitLog.purge(commit)
q.sink.asInstanceOf[MemorySink].clear()
true
},

View file

@ -575,9 +575,10 @@ class FlatMapGroupsWithStateSuite extends StateStoreMetricsTest with BeforeAndAf
StopStream,
StartStream(ProcessingTime("1 second"), triggerClock = clock),
AdvanceManualClock(10 * 1000),
AddData(inputData, "c"),
AdvanceManualClock(20 * 1000),
AdvanceManualClock(1 * 1000),
CheckLastBatch(("b", "-1"), ("c", "1")),
assertNumStateRows(total = 1, updated = 2),

View file

@ -156,6 +156,15 @@ class StreamSuite extends StreamTest {
AssertOnQuery(_.offsetLog.getLatest().get._1 == expectedId,
s"offsetLog's latest should be $expectedId")
// Check the latest batchid in the commit log
def CheckCommitLogLatestBatchId(expectedId: Int): AssertOnQuery =
AssertOnQuery(_.batchCommitLog.getLatest().get._1 == expectedId,
s"commitLog's latest should be $expectedId")
// Ensure that there has not been an incremental execution after restart
def CheckNoIncrementalExecutionCurrentBatchId(): AssertOnQuery =
AssertOnQuery(_.lastExecution == null, s"lastExecution not expected to run")
// For each batch, we would log the state change during the execution
// This checks whether the key of the state change log is the expected batch id
def CheckIncrementalExecutionCurrentBatchId(expectedId: Int): AssertOnQuery =
@ -181,6 +190,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 0
CheckAnswer(1, 2, 3),
CheckIncrementalExecutionCurrentBatchId(0),
CheckCommitLogLatestBatchId(0),
CheckOffsetLogLatestBatchId(0),
CheckSinkLatestBatchId(0),
// Add some data in batch 1
@ -191,6 +201,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
@ -203,6 +214,7 @@ class StreamSuite extends StreamTest {
// the currentId does not get logged (e.g. as 2) even if the clock has advanced many times
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
@ -210,14 +222,15 @@ class StreamSuite extends StreamTest {
StopStream,
StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)),
/* -- batch 1 rerun ----------------- */
// this batch 1 would re-run because the latest batch id logged in offset log is 1
/* -- batch 1 no rerun ----------------- */
// batch 1 would not re-run because the latest batch id logged in commit log is 1
AdvanceManualClock(10 * 1000),
CheckNoIncrementalExecutionCurrentBatchId(),
/* -- batch 2 ----------------------- */
// Check the results of batch 1
CheckAnswer(1, 2, 3, 4, 5, 6),
CheckIncrementalExecutionCurrentBatchId(1),
CheckCommitLogLatestBatchId(1),
CheckOffsetLogLatestBatchId(1),
CheckSinkLatestBatchId(1),
// Add some data in batch 2
@ -228,6 +241,7 @@ class StreamSuite extends StreamTest {
// Check the results of batch 2
CheckAnswer(1, 2, 3, 4, 5, 6, 7, 8, 9),
CheckIncrementalExecutionCurrentBatchId(2),
CheckCommitLogLatestBatchId(2),
CheckOffsetLogLatestBatchId(2),
CheckSinkLatestBatchId(2))
}

View file

@ -159,7 +159,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with Timeouts {
/** Starts the stream, resuming if data has already been processed. It must not be running. */
case class StartStream(
trigger: Trigger = ProcessingTime(0),
trigger: Trigger = Trigger.ProcessingTime(0),
triggerClock: Clock = new SystemClock,
additionalConfs: Map[String, String] = Map.empty)
extends StreamAction

View file

@ -272,11 +272,13 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
q.batchCommitLog.purge(3)
// advance by a minute i.e., 90 seconds total
clock.advance(60 * 1000L)
true
},
StartStream(ProcessingTime("10 seconds"), triggerClock = clock),
// The commit log blown, causing the last batch to re-run
CheckLastBatch((20L, 1), (85L, 1)),
AssertOnQuery { q =>
clock.getTimeMillis() == 90000L
@ -322,11 +324,13 @@ class StreamingAggregationSuite extends StateStoreMetricsTest with BeforeAndAfte
StopStream,
AssertOnQuery { q => // clear the sink
q.sink.asInstanceOf[MemorySink].clear()
q.batchCommitLog.purge(3)
// advance by 60 days i.e., 90 days total
clock.advance(DateTimeUtils.MILLIS_PER_DAY * 60)
true
},
StartStream(ProcessingTime("10 day"), triggerClock = clock),
// Commit log blown, causing a re-run of the last batch
CheckLastBatch((20L, 1), (85L, 1)),
// advance clock to 100 days, should retain keys >= 90

View file

@ -57,6 +57,20 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
val inputData = new MemoryStream[Int](0, sqlContext)
val df = inputData.toDS().as[Long].map { 10 / _ }
val listener = new EventCollector
case class AssertStreamExecThreadToWaitForClock()
extends AssertOnQuery(q => {
eventually(Timeout(streamingTimeout)) {
if (q.exception.isEmpty) {
assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
}
}
if (q.exception.isDefined) {
throw q.exception.get
}
true
}, "")
try {
// No events until started
spark.streams.addListener(listener)
@ -81,6 +95,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Progress event generated when data processed
AddData(inputData, 1, 2),
AdvanceManualClock(100),
AssertStreamExecThreadToWaitForClock(),
CheckAnswer(10, 5),
AssertOnQuery { query =>
assert(listener.progressEvents.nonEmpty)
@ -109,8 +124,9 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
// Termination event generated with exception message when stopped with error
StartStream(ProcessingTime(100), triggerClock = clock),
AssertStreamExecThreadToWaitForClock(),
AddData(inputData, 0),
AdvanceManualClock(100),
AdvanceManualClock(100), // process bad data
ExpectFailure[SparkException](),
AssertOnQuery { query =>
eventually(Timeout(streamingTimeout)) {

View file

@ -158,6 +158,49 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
)
}
testQuietly("OneTime trigger, commit log, and exception") {
import Trigger.Once
val inputData = MemoryStream[Int]
val mapped = inputData.toDS().map { 6 / _}
testStream(mapped)(
AssertOnQuery(_.isActive === true),
StopStream,
AddData(inputData, 1, 2),
StartStream(trigger = Once),
CheckAnswer(6, 3),
StopStream, // clears out StreamTest state
AssertOnQuery { q =>
// both commit log and offset log contain the same (latest) batch id
q.batchCommitLog.getLatest().map(_._1).getOrElse(-1L) ==
q.offsetLog.getLatest().map(_._1).getOrElse(-2L)
},
AssertOnQuery { q =>
// blow away commit log and sink result
q.batchCommitLog.purge(1)
q.sink.asInstanceOf[MemorySink].clear()
true
},
StartStream(trigger = Once),
CheckAnswer(6, 3), // ensure we fall back to offset log and reprocess batch
StopStream,
AddData(inputData, 3),
StartStream(trigger = Once),
CheckLastBatch(2), // commit log should be back in place
StopStream,
AddData(inputData, 0),
StartStream(trigger = Once),
ExpectFailure[SparkException](),
AssertOnQuery(_.isActive === false),
AssertOnQuery(q => {
q.exception.get.startOffset ===
q.committedOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString &&
q.exception.get.endOffset ===
q.availableOffsets.toOffsetSeq(Seq(inputData), OffsetSeqMetadata()).toString
}, "incorrect start offset or end offset on exception")
)
}
testQuietly("status, lastProgress, and recentProgress") {
import StreamingQuerySuite._
clock = new StreamManualClock
@ -237,6 +280,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
AdvanceManualClock(500), // time = 1100 to unblock job
AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
CheckAnswer(2),
AssertStreamExecThreadToWaitForClock(),
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === false),
AssertOnQuery(_.status.message === "Waiting for next trigger"),
@ -275,6 +319,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
AddData(inputData, 1, 2),
AdvanceManualClock(100), // allow another trigger
AssertStreamExecThreadToWaitForClock(),
CheckAnswer(4),
AssertOnQuery(_.status.isDataAvailable === true),
AssertOnQuery(_.status.isTriggerActive === false),
@ -306,8 +351,9 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi
// Test status and progress after query terminated with error
StartStream(ProcessingTime(100), triggerClock = clock),
AdvanceManualClock(100), // ensure initial trigger completes before AddData
AddData(inputData, 0),
AdvanceManualClock(100),
AdvanceManualClock(100), // allow another trigger
ExpectFailure[SparkException](),
AssertOnQuery(_.status.isDataAvailable === false),
AssertOnQuery(_.status.isTriggerActive === false),

View file

@ -31,7 +31,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.{StreamSinkProvider, StreamSourceProvider}
import org.apache.spark.sql.streaming._
import org.apache.spark.sql.streaming.{ProcessingTime => DeprecatedProcessingTime, _}
import org.apache.spark.sql.streaming.Trigger._
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils
@ -346,7 +347,7 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
q = df.writeStream
.format("org.apache.spark.sql.streaming.test")
.option("checkpointLocation", newMetadataDir)
.trigger(ProcessingTime.create(100, TimeUnit.SECONDS))
.trigger(ProcessingTime(100, TimeUnit.SECONDS))
.start()
q.stop()