[SPARK-24699][SS] Make watermarks work with Trigger.Once by saving updated watermark to commit log

## What changes were proposed in this pull request?

Streaming queries with watermarks do not work with Trigger.Once because of the following.
- Watermark is updated in the driver memory after a batch completes, but it is persisted to checkpoint (in the offset log) only when the next batch is planned
- In trigger.once, the query terminated as soon as one batch has completed. Hence, the updated watermark is never persisted anywhere.

The simple solution is to persist the updated watermark value in the commit log when a batch is marked as completed. Then the next batch, in the next trigger.once run can pick it up from the commit log.

## How was this patch tested?
new unit tests

Co-authored-by: Tathagata Das <tathagata.das1565gmail.com>
Co-authored-by: c-horn <chorn4033gmail.com>

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #21746 from tdas/SPARK-24699.
This commit is contained in:
Tathagata Das 2018-07-23 13:03:32 -07:00
parent 2edf17effd
commit 61f0ca4f1c
20 changed files with 177 additions and 42 deletions

View file

@ -22,6 +22,9 @@ import java.nio.charset.StandardCharsets._
import scala.io.{Source => IOSource}
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization
import org.apache.spark.sql.SparkSession
/**
@ -43,36 +46,28 @@ import org.apache.spark.sql.SparkSession
* line 2: metadata (optional json string)
*/
class CommitLog(sparkSession: SparkSession, path: String)
extends HDFSMetadataLog[String](sparkSession, path) {
extends HDFSMetadataLog[CommitMetadata](sparkSession, path) {
import CommitLog._
def add(batchId: Long): Unit = {
super.add(batchId, EMPTY_JSON)
}
override def add(batchId: Long, metadata: String): Boolean = {
throw new UnsupportedOperationException(
"CommitLog does not take any metadata, use 'add(batchId)' instead")
}
override protected def deserialize(in: InputStream): String = {
override protected def deserialize(in: InputStream): CommitMetadata = {
// called inside a try-finally where the underlying stream is closed in the caller
val lines = IOSource.fromInputStream(in, UTF_8.name()).getLines()
if (!lines.hasNext) {
throw new IllegalStateException("Incomplete log file in the offset commit log")
}
parseVersion(lines.next.trim, VERSION)
EMPTY_JSON
val metadataJson = if (lines.hasNext) lines.next else EMPTY_JSON
CommitMetadata(metadataJson)
}
override protected def serialize(metadata: String, out: OutputStream): Unit = {
override protected def serialize(metadata: CommitMetadata, out: OutputStream): Unit = {
// called inside a try-finally where the underlying stream is closed in the caller
out.write(s"v${VERSION}".getBytes(UTF_8))
out.write('\n')
// write metadata
out.write(EMPTY_JSON.getBytes(UTF_8))
out.write(metadata.json.getBytes(UTF_8))
}
}
@ -81,3 +76,13 @@ object CommitLog {
private val EMPTY_JSON = "{}"
}
case class CommitMetadata(nextBatchWatermarkMs: Long = 0) {
def json: String = Serialization.write(this)(CommitMetadata.format)
}
object CommitMetadata {
implicit val format = Serialization.formats(NoTypeHints)
def apply(json: String): CommitMetadata = Serialization.read[CommitMetadata](json)
}

View file

@ -268,7 +268,7 @@ class MicroBatchExecution(
* latest batch id in the offset log, then we can safely move to the next batch
* i.e., committedBatchId + 1 */
commitLog.getLatest() match {
case Some((latestCommittedBatchId, _)) =>
case Some((latestCommittedBatchId, commitMetadata)) =>
if (latestBatchId == latestCommittedBatchId) {
/* The last batch was successfully committed, so we can safely process a
* new next batch but first:
@ -286,7 +286,8 @@ class MicroBatchExecution(
currentBatchId = latestCommittedBatchId + 1
isCurrentBatchConstructed = false
committedOffsets ++= availableOffsets
// Construct a new batch be recomputing availableOffsets
watermarkTracker.setWatermark(
math.max(watermarkTracker.currentWatermark, commitMetadata.nextBatchWatermarkMs))
} else if (latestCommittedBatchId < latestBatchId - 1) {
logWarning(s"Batch completion log latest batch id is " +
s"${latestCommittedBatchId}, which is not trailing " +
@ -536,11 +537,11 @@ class MicroBatchExecution(
}
withProgressLocked {
commitLog.add(currentBatchId)
watermarkTracker.updateWatermark(lastExecution.executedPlan)
commitLog.add(currentBatchId, CommitMetadata(watermarkTracker.currentWatermark))
committedOffsets ++= availableOffsets
awaitProgressLockCondition.signalAll()
}
watermarkTracker.updateWatermark(lastExecution.executedPlan)
logDebug(s"Completed batch ${currentBatchId}")
}

View file

@ -314,7 +314,7 @@ class ContinuousExecution(
// Record offsets before updating `committedOffsets`
recordTriggerOffsets(from = committedOffsets, to = availableOffsets)
if (queryExecutionThread.isAlive) {
commitLog.add(epoch)
commitLog.add(epoch, CommitMetadata())
val offset =
continuousSources(0).deserializeOffset(offsetLog.get(epoch).get.offsets(0).get.json)
committedOffsets ++= Seq(continuousSources(0) -> offset)

View file

@ -0,0 +1 @@
{"id":"73f7f943-0a08-4ffb-a504-9fa88ff7612a"}

View file

@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":0,"batchTimestampMs":1531991874513,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
0

View file

@ -0,0 +1,3 @@
v1
{"batchWatermarkMs":5000,"batchTimestampMs":1531991878604,"conf":{"spark.sql.shuffle.partitions":"5","spark.sql.streaming.stateStore.providerClass":"org.apache.spark.sql.execution.streaming.state.HDFSBackedStateStoreProvider"}}
1

View file

@ -127,31 +127,133 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
testStream(aggWithWatermark)(
AddData(inputData2, 15),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(15))
assert(e.get("min") === formatTimestamp(15))
assert(e.get("avg") === formatTimestamp(15))
assert(e.get("watermark") === formatTimestamp(0))
},
assertEventStats(min = 15, max = 15, avg = 15, wtrmark = 0),
AddData(inputData2, 10, 12, 14),
CheckAnswer(),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(14))
assert(e.get("min") === formatTimestamp(10))
assert(e.get("avg") === formatTimestamp(12))
assert(e.get("watermark") === formatTimestamp(5))
},
assertEventStats(min = 10, max = 14, avg = 12, wtrmark = 5),
AddData(inputData2, 25),
CheckAnswer((10, 3)),
assertEventStats { e =>
assert(e.get("max") === formatTimestamp(25))
assert(e.get("min") === formatTimestamp(25))
assert(e.get("avg") === formatTimestamp(25))
assert(e.get("watermark") === formatTimestamp(5))
}
assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5)
)
}
test("event time and watermark metrics with Trigger.Once (SPARK-24699)") {
// All event time metrics where watermarking is set
val inputData = MemoryStream[Int]
val aggWithWatermark = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
// Unlike the ProcessingTime trigger, Trigger.Once only runs one trigger every time
// the query is started and it does not run no-data batches. Hence the answer generated
// by the updated watermark is only generated the next time the query is started.
// Also, the data to process in the next trigger is added *before* starting the stream in
// Trigger.Once to ensure that first and only trigger picks up the new data.
testStream(aggWithWatermark)(
StartStream(Trigger.Once), // to make sure the query is not running when adding data 1st time
awaitTermination(),
AddData(inputData, 15),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer(),
assertEventStats(min = 15, max = 15, avg = 15, wtrmark = 0),
// watermark should be updated to 15 - 10 = 5
AddData(inputData, 10, 12, 14),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer(),
assertEventStats(min = 10, max = 14, avg = 12, wtrmark = 5),
// watermark should stay at 5
AddData(inputData, 25),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer(),
assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5),
// watermark should be updated to 25 - 10 = 15
AddData(inputData, 50),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer((10, 3)), // watermark = 15 is used to generate this
assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15),
// watermark should be updated to 50 - 10 = 40
AddData(inputData, 50),
StartStream(Trigger.Once),
awaitTermination(),
CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate this
assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40))
}
test("recovery from Spark ver 2.3.1 commit log without commit metadata (SPARK-24699)") {
// All event time metrics where watermarking is set
val inputData = MemoryStream[Int]
val aggWithWatermark = inputData.toDF()
.withColumn("eventTime", $"value".cast("timestamp"))
.withWatermark("eventTime", "10 seconds")
.groupBy(window($"eventTime", "5 seconds") as 'window)
.agg(count("*") as 'count)
.select($"window".getField("start").cast("long").as[Long], $"count".as[Long])
val resourceUri = this.getClass.getResource(
"/structured-streaming/checkpoint-version-2.3.1-without-commit-log-metadata/").toURI
val checkpointDir = Utils.createTempDir().getCanonicalFile
// Copy the checkpoint to a temp dir to prevent changes to the original.
// Not doing this will lead to the test passing on the first run, but fail subsequent runs.
FileUtils.copyDirectory(new File(resourceUri), checkpointDir)
inputData.addData(15)
inputData.addData(10, 12, 14)
testStream(aggWithWatermark)(
/*
Note: The checkpoint was generated using the following input in Spark version 2.3.1
StartStream(checkpointLocation = "./sql/core/src/test/resources/structured-streaming/" +
"checkpoint-version-2.3.1-without-commit-log-metadata/")),
AddData(inputData, 15), // watermark should be updated to 15 - 10 = 5
CheckAnswer(),
AddData(inputData, 10, 12, 14), // watermark should stay at 5
CheckAnswer(),
StopStream,
// Offset log should have watermark recorded as 5.
*/
StartStream(Trigger.Once),
awaitTermination(),
AddData(inputData, 25),
StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
awaitTermination(),
CheckNewAnswer(),
assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5),
// watermark should be updated to 25 - 10 = 15
AddData(inputData, 50),
StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
awaitTermination(),
CheckNewAnswer((10, 3)), // watermark = 15 is used to generate this
assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15),
// watermark should be updated to 50 - 10 = 40
AddData(inputData, 50),
StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath),
awaitTermination(),
CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate this
assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40))
}
test("append mode") {
val inputData = MemoryStream[Int]
@ -625,10 +727,20 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
true
}
/** Assert event stats generated on that last batch with data in it */
private def assertEventStats(body: ju.Map[String, String] => Unit): AssertOnQuery = {
AssertOnQuery { q =>
Execute("AssertEventStats") { q =>
body(q.recentProgress.filter(_.numInputRows > 0).lastOption.get.eventTime)
true
}
}
/** Assert event stats generated on that last batch with data in it */
private def assertEventStats(min: Long, max: Long, avg: Double, wtrmark: Long): AssertOnQuery = {
assertEventStats { e =>
assert(e.get("min") === formatTimestamp(min), s"min value mismatch")
assert(e.get("max") === formatTimestamp(max), s"max value mismatch")
assert(e.get("avg") === formatTimestamp(avg.toLong), s"avg value mismatch")
assert(e.get("watermark") === formatTimestamp(wtrmark), s"watermark value mismatch")
}
}
@ -638,4 +750,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
private def formatTimestamp(sec: Long): String = {
timestampFormat.format(new ju.Date(sec * 1000))
}
private def awaitTermination(): AssertOnQuery = Execute("AwaitTermination") { q =>
q.awaitTermination()
}
}

View file

@ -291,8 +291,10 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
/** Execute arbitrary code */
object Execute {
def apply(func: StreamExecution => Any): AssertOnQuery =
AssertOnQuery(query => { func(query); true }, "Execute")
def apply(name: String)(func: StreamExecution => Any): AssertOnQuery =
AssertOnQuery(query => { func(query); true }, "name")
def apply(func: StreamExecution => Any): AssertOnQuery = apply("Execute")(func)
}
object AwaitEpoch {
@ -512,7 +514,7 @@ trait StreamTest extends QueryTest with SharedSQLContext with TimeLimits with Be
logInfo(s"Processing test stream action: $action")
action match {
case StartStream(trigger, triggerClock, additionalConfs, checkpointLocation) =>
verify(currentStream == null, "stream already running")
verify(currentStream == null || !currentStream.isActive, "stream already running")
verify(triggerClock.isInstanceOf[SystemClock]
|| triggerClock.isInstanceOf[StreamManualClock],
"Use either SystemClock or StreamManualClock to start the stream")