[SPARK-24634][SS][FOLLOWUP] Rename the variable from "numLateInputs" to "numRowsDroppedByWatermark"

### What changes were proposed in this pull request?

This PR renames the variable from "numLateInputs" to "numRowsDroppedByWatermark" so that it becomes self-explanation.

### Why are the changes needed?

This is originated from post-review, see https://github.com/apache/spark/pull/28607#discussion_r439853232

### Does this PR introduce _any_ user-facing change?

No, as SPARK-24634 is not introduced in any release yet.

### How was this patch tested?

Existing UTs.

Closes #28828 from HeartSaVioR/SPARK-24634-v3-followup.

Authored-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
Signed-off-by: HyukjinKwon <gurwls223@apache.org>
This commit is contained in:
Jungtaek Lim (HeartSaVioR) 2020-06-16 16:41:08 +09:00 committed by HyukjinKwon
parent e9145d41f3
commit fe68e95a5a
9 changed files with 59 additions and 48 deletions

View file

@ -1678,12 +1678,11 @@ emits late rows if the operator uses Append mode.
Spark provides two ways to check the number of late rows on stateful operators which would help you identify the issue:
1. On Spark UI: check the metrics in stateful operator nodes in query execution details page in SQL tab
2. On Streaming Query Listener: check "numLateInputs" in "stateOperators" in QueryProcessEvent.
2. On Streaming Query Listener: check "numRowsDroppedByWatermark" in "stateOperators" in QueryProcessEvent.
Please note that the definition of "input" is relative: it doesn't always mean "input rows" for the operator.
Streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs,
hence the number is not same as the number of original input rows. You'd like to check the fact whether the value is zero
or non-zero.
Please note that "numRowsDroppedByWatermark" represents the number of "dropped" rows by watermark, which is not always same as the count of "late input rows" for the operator.
It depends on the implementation of the operator - e.g. streaming aggregation does pre-aggregate input rows and checks the late inputs against pre-aggregated inputs,
hence the number is not same as the number of original input rows. You'd like to just check the fact whether the value is zero or non-zero.
There's a known workaround: split your streaming query into multiple queries per stateful operator, and ensure
end-to-end exactly once per query. Ensuring end-to-end exactly once for the last query is optional.

View file

@ -222,7 +222,11 @@ trait ProgressReporter extends Logging {
lastExecution.executedPlan.collect {
case p if p.isInstanceOf[StateStoreWriter] =>
val progress = p.asInstanceOf[StateStoreWriter].getProgress()
if (hasExecuted) progress else progress.copy(newNumRowsUpdated = 0, newNumLateInputs = 0)
if (hasExecuted) {
progress
} else {
progress.copy(newNumRowsUpdated = 0, newNumRowsDroppedByWatermark = 0)
}
}
}

View file

@ -77,8 +77,8 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
override lazy val metrics = Map(
"numOutputRows" -> SQLMetrics.createMetric(sparkContext, "number of output rows"),
"numLateInputs" -> SQLMetrics.createMetric(sparkContext,
"number of inputs which are later than watermark ('inputs' are relative to operators)"),
"numRowsDroppedByWatermark" -> SQLMetrics.createMetric(sparkContext,
"number of rows which are dropped by watermark"),
"numTotalStateRows" -> SQLMetrics.createMetric(sparkContext, "number of total state rows"),
"numUpdatedStateRows" -> SQLMetrics.createMetric(sparkContext, "number of updated state rows"),
"allUpdatesTimeMs" -> SQLMetrics.createTimingMetric(sparkContext, "time to update"),
@ -102,7 +102,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
numRowsTotal = longMetric("numTotalStateRows").value,
numRowsUpdated = longMetric("numUpdatedStateRows").value,
memoryUsedBytes = longMetric("stateMemory").value,
numLateInputs = longMetric("numLateInputs").value,
numRowsDroppedByWatermark = longMetric("numRowsDroppedByWatermark").value,
javaConvertedCustomMetrics
)
}
@ -137,11 +137,11 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan =>
protected def applyRemovingRowsOlderThanWatermark(
iter: Iterator[InternalRow],
predicateFilterOutLateInput: BasePredicate): Iterator[InternalRow] = {
predicateDropRowByWatermark: BasePredicate): Iterator[InternalRow] = {
iter.filterNot { row =>
val lateInput = predicateFilterOutLateInput.eval(row)
if (lateInput) longMetric("numLateInputs") += 1
lateInput
val shouldDrop = predicateDropRowByWatermark.eval(row)
if (shouldDrop) longMetric("numRowsDroppedByWatermark") += 1
shouldDrop
}
}

View file

@ -43,7 +43,7 @@ class StateOperatorProgress private[sql](
val numRowsTotal: Long,
val numRowsUpdated: Long,
val memoryUsedBytes: Long,
val numLateInputs: Long,
val numRowsDroppedByWatermark: Long,
val customMetrics: ju.Map[String, JLong] = new ju.HashMap()
) extends Serializable {
@ -55,15 +55,15 @@ class StateOperatorProgress private[sql](
private[sql] def copy(
newNumRowsUpdated: Long,
newNumLateInputs: Long): StateOperatorProgress =
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, newNumLateInputs,
customMetrics)
newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes,
newNumRowsDroppedByWatermark, customMetrics)
private[sql] def jsonValue: JValue = {
("numRowsTotal" -> JInt(numRowsTotal)) ~
("numRowsUpdated" -> JInt(numRowsUpdated)) ~
("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
("numLateInputs" -> JInt(numLateInputs)) ~
("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
("customMetrics" -> {
if (!customMetrics.isEmpty) {
val keys = customMetrics.keySet.asScala.toSeq.sorted

View file

@ -298,11 +298,11 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((10, 5)),
assertNumStateRows(2),
assertNumLateInputs(0),
assertNumRowsDroppedByWatermark(0),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2),
assertNumLateInputs(1)
assertNumRowsDroppedByWatermark(1)
)
}
@ -323,15 +323,15 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
AddData(inputData, 25), // Advance watermark to 15 seconds
CheckNewAnswer((25, 1)),
assertNumStateRows(2),
assertNumLateInputs(0),
assertNumRowsDroppedByWatermark(0),
AddData(inputData, 10, 25), // Ignore 10 as its less than watermark
CheckNewAnswer((25, 2)),
assertNumStateRows(2),
assertNumLateInputs(1),
assertNumRowsDroppedByWatermark(1),
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(2),
assertNumLateInputs(1)
assertNumRowsDroppedByWatermark(1)
)
}
@ -788,7 +788,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
true
}
private def assertNumLateInputs(numLateInputs: Long): AssertOnQuery = AssertOnQuery { q =>
private def assertNumRowsDroppedByWatermark(
numRowsDroppedByWatermark: Long): AssertOnQuery = AssertOnQuery { q =>
q.processAllAvailable()
val progressWithData = q.recentProgress.filterNot { p =>
// filter out batches which are falling into one of types:
@ -796,7 +797,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
// 2) empty input batch
p.inputRowsPerSecond == 0
}.lastOption.get
assert(progressWithData.stateOperators(0).numLateInputs === numLateInputs)
assert(progressWithData.stateOperators(0).numRowsDroppedByWatermark
=== numRowsDroppedByWatermark)
true
}

View file

@ -32,9 +32,9 @@ trait StateStoreMetricsTest extends StreamTest {
def assertNumStateRows(
total: Seq[Long],
updated: Seq[Long],
lateInputs: Seq[Long]): AssertOnQuery =
droppedByWatermark: Seq[Long]): AssertOnQuery =
AssertOnQuery(s"Check total state rows = $total, updated state rows = $updated" +
s", late inputs = $lateInputs") { q =>
s", rows dropped by watermark = $droppedByWatermark") { q =>
// This assumes that the streaming query will not make any progress while the eventually
// is being executed.
eventually(timeout(streamingTimeout)) {
@ -55,8 +55,8 @@ trait StateStoreMetricsTest extends StreamTest {
val allNumUpdatedRowsSinceLastCheck =
progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsUpdated))
val allNumLateInputsSinceLastCheck =
progressesSinceLastCheck.map(_.stateOperators.map(_.numLateInputs))
val allNumRowsDroppedByWatermarkSinceLastCheck =
progressesSinceLastCheck.map(_.stateOperators.map(_.numRowsDroppedByWatermark))
lazy val debugString = "recent progresses:\n" +
progressesSinceLastCheck.map(_.prettyJson).mkString("\n\n")
@ -67,8 +67,10 @@ trait StateStoreMetricsTest extends StreamTest {
val numUpdatedRows = arraySum(allNumUpdatedRowsSinceLastCheck, numStateOperators)
assert(numUpdatedRows === updated, s"incorrect updates rows, $debugString")
val numLateInputs = arraySum(allNumLateInputsSinceLastCheck, numStateOperators)
assert(numLateInputs === lateInputs, s"incorrect late inputs, $debugString")
val numRowsDroppedByWatermark = arraySum(allNumRowsDroppedByWatermarkSinceLastCheck,
numStateOperators)
assert(numRowsDroppedByWatermark === droppedByWatermark,
s"incorrect dropped rows by watermark, $debugString")
lastCheckedRecentProgressIndex = recentProgress.length - 1
}
@ -77,11 +79,14 @@ trait StateStoreMetricsTest extends StreamTest {
def assertNumStateRows(total: Seq[Long], updated: Seq[Long]): AssertOnQuery = {
assert(total.length === updated.length)
assertNumStateRows(total, updated, lateInputs = (0 until total.length).map(_ => 0L))
assertNumStateRows(total, updated, droppedByWatermark = (0 until total.length).map(_ => 0L))
}
def assertNumStateRows(total: Long, updated: Long, lateInput: Long = 0): AssertOnQuery = {
assertNumStateRows(Seq(total), Seq(updated), Seq(lateInput))
def assertNumStateRows(
total: Long,
updated: Long,
droppedByWatermark: Long = 0): AssertOnQuery = {
assertNumStateRows(Seq(total), Seq(updated), Seq(droppedByWatermark))
}
def arraySum(arraySeq: Seq[Array[Long]], arrayLength: Int): Seq[Long] = {

View file

@ -54,13 +54,13 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
testStream(result, Append)(
AddData(inputData, "a" -> 1),
CheckLastBatch("a" -> 1),
assertNumStateRows(total = 1, updated = 1, lateInput = 0),
assertNumStateRows(total = 1, updated = 1, droppedByWatermark = 0),
AddData(inputData, "a" -> 2), // Dropped
CheckLastBatch(),
assertNumStateRows(total = 1, updated = 0, lateInput = 0),
assertNumStateRows(total = 1, updated = 0, droppedByWatermark = 0),
AddData(inputData, "b" -> 1),
CheckLastBatch("b" -> 1),
assertNumStateRows(total = 2, updated = 1, lateInput = 0)
assertNumStateRows(total = 2, updated = 1, droppedByWatermark = 0)
)
}
@ -102,7 +102,7 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckNewAnswer(),
assertNumStateRows(total = 1, updated = 0, lateInput = 1),
assertNumStateRows(total = 1, updated = 0, droppedByWatermark = 1),
AddData(inputData, 45), // Advance watermark to 35 seconds, no-data-batch drops row 25
CheckNewAnswer(45),
@ -136,7 +136,8 @@ class StreamingDeduplicationSuite extends StateStoreMetricsTest {
AddData(inputData, 10), // Should not emit anything as data less than watermark
CheckLastBatch(),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L), lateInputs = Seq(0L, 1L)),
assertNumStateRows(total = Seq(2L, 1L), updated = Seq(0L, 0L),
droppedByWatermark = Seq(0L, 1L)),
AddData(inputData, 40), // Advance watermark to 30 seconds
CheckLastBatch((15 -> 1), (25 -> 1)),

View file

@ -166,7 +166,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
AddData(input1, 5),
CheckNewAnswer(), // Same reason as above
assertNumStateRows(total = 2, updated = 0, lateInput = 1)
assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1)
)
}
@ -219,12 +219,12 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
// (1, 28) ==> passed filter, matched with left (1, 3) and (1, 5), added to state
AddData(rightInput, (1, 20), (1, 21), (1, 28)),
CheckNewAnswer((1, 3, 21), (1, 5, 21), (1, 3, 28), (1, 5, 28)),
assertNumStateRows(total = 5, updated = 1, lateInput = 1),
assertNumStateRows(total = 5, updated = 1, droppedByWatermark = 1),
// New data to left input with leftTime <= 20 should be filtered due to event time watermark
AddData(leftInput, (1, 20), (1, 21)),
CheckNewAnswer((1, 21, 28)),
assertNumStateRows(total = 6, updated = 1, lateInput = 1)
assertNumStateRows(total = 6, updated = 1, droppedByWatermark = 1)
)
}
@ -293,7 +293,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
AddData(leftInput, (1, 30), (1, 31)), // 30 should not be processed or added to state
CheckNewAnswer((1, 31, 26), (1, 31, 30), (1, 31, 31)),
assertNumStateRows(total = 11, updated = 1, lateInput = 1), // only 31 added
assertNumStateRows(total = 11, updated = 1, droppedByWatermark = 1), // only 31 added
// Advance the watermark
AddData(rightInput, (1, 80)),
@ -307,7 +307,7 @@ class StreamingInnerJoinSuite extends StreamTest with StateStoreMetricsTest with
AddData(rightInput, (1, 46), (1, 50)), // 46 should not be processed or added to state
CheckNewAnswer((1, 49, 50), (1, 50, 50)),
assertNumStateRows(total = 7, updated = 1, lateInput = 1) // 50 added
assertNumStateRows(total = 7, updated = 1, droppedByWatermark = 1) // 50 added
)
}

View file

@ -64,7 +64,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 3,
| "numLateInputs" : 0,
| "numRowsDroppedByWatermark" : 0,
| "customMetrics" : {
| "loadedMapCacheHitCount" : 1,
| "loadedMapCacheMissCount" : 0,
@ -115,7 +115,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually {
| "numRowsTotal" : 0,
| "numRowsUpdated" : 1,
| "memoryUsedBytes" : 2,
| "numLateInputs" : 0
| "numRowsDroppedByWatermark" : 0
| } ],
| "sources" : [ {
| "description" : "source",
@ -323,7 +323,7 @@ object StreamingQueryStatusAndProgressSuite {
"avg" -> "2016-12-05T20:54:20.827Z",
"watermark" -> "2016-12-05T20:54:20.827Z").asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numLateInputs = 0,
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 3, numRowsDroppedByWatermark = 0,
customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L,
"loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
.mapValues(long2Long).asJava)
@ -355,7 +355,7 @@ object StreamingQueryStatusAndProgressSuite {
// empty maps should be handled correctly
eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
stateOperators = Array(new StateOperatorProgress(
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numLateInputs = 0)),
numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numRowsDroppedByWatermark = 0)),
sources = Array(
new SourceProgress(
description = "source",