[SPARK-18459][SPARK-18460][STRUCTUREDSTREAMING] Rename triggerId to batchId and add triggerDetails to json in StreamingQueryStatus

## What changes were proposed in this pull request?

SPARK-18459: triggerId seems like a number that should be increasing with each trigger, whether or not there is data in it. However, actually, triggerId increases only where there is a batch of data in a trigger. So its better to rename it to batchId.

SPARK-18460: triggerDetails was missing from json representation. Fixed it.

## How was this patch tested?
Updated existing unit tests.

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #15895 from tdas/SPARK-18459.
This commit is contained in:
Tathagata Das 2016-11-16 10:00:59 -08:00 committed by Shixiong Zhu
parent 608ecc512b
commit 0048ce7ce6
6 changed files with 35 additions and 17 deletions

View file

@ -212,12 +212,12 @@ class StreamingQueryStatus(object):
Processing rate 23.5 rows/sec
Latency: 345.0 ms
Trigger details:
batchId: 5
isDataPresentInTrigger: true
isTriggerActive: true
latency.getBatch.total: 20
latency.getOffset.total: 10
numRows.input.total: 100
triggerId: 5
Source statuses [1 source]:
Source 1 - MySource1
Available offset: 0
@ -341,8 +341,8 @@ class StreamingQueryStatus(object):
If no trigger is currently active, then it will have details of the last completed trigger.
>>> sqs.triggerDetails
{u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
u'isTriggerActive': u'true', u'latency.getOffset.total': u'10',
{u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10',
u'isDataPresentInTrigger': u'true'}
"""
return self._jsqs.triggerDetails()

View file

@ -78,13 +78,13 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam
// =========== Setter methods ===========
def reportTriggerStarted(triggerId: Long): Unit = synchronized {
def reportTriggerStarted(batchId: Long): Unit = synchronized {
numInputRows.clear()
triggerDetails.clear()
sourceTriggerDetails.values.foreach(_.clear())
reportTriggerDetail(TRIGGER_ID, triggerId)
sources.foreach(s => reportSourceTriggerDetail(s, TRIGGER_ID, triggerId))
reportTriggerDetail(BATCH_ID, batchId)
sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId))
reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
currentTriggerStartTimestamp = triggerClock.getTimeMillis()
reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
@ -217,7 +217,7 @@ object StreamMetrics extends Logging {
}
val TRIGGER_ID = "triggerId"
val BATCH_ID = "batchId"
val IS_TRIGGER_ACTIVE = "isTriggerActive"
val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
val STATUS_MESSAGE = "statusMessage"

View file

@ -102,7 +102,7 @@ class StreamingQueryStatus private(
("inputRate" -> JDouble(inputRate)) ~
("processingRate" -> JDouble(processingRate)) ~
("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~
("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
("sinkStatus" -> sinkStatus.jsonValue)
}
@ -151,7 +151,7 @@ private[sql] object StreamingQueryStatus {
desc = "MySink",
offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString),
triggerDetails = Map(
TRIGGER_ID -> "5",
BATCH_ID -> "5",
IS_TRIGGER_ACTIVE -> "true",
IS_DATA_PRESENT_IN_TRIGGER -> "true",
GET_OFFSET_LATENCY -> "10",

View file

@ -50,10 +50,10 @@ class StreamMetricsSuite extends SparkFunSuite {
assert(sm.currentSourceProcessingRate(source) === 0.0)
assert(sm.currentLatency() === None)
assert(sm.currentTriggerDetails() ===
Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
START_TIMESTAMP -> "0", "key" -> "value"))
assert(sm.currentSourceTriggerDetails(source) ===
Map(TRIGGER_ID -> "1", "key2" -> "value2"))
Map(BATCH_ID -> "1", "key2" -> "value2"))
// Finishing the trigger should calculate the rates, except input rate which needs
// to have another trigger interval
@ -66,11 +66,11 @@ class StreamMetricsSuite extends SparkFunSuite {
assert(sm.currentSourceProcessingRate(source) === 100.0)
assert(sm.currentLatency() === None)
assert(sm.currentTriggerDetails() ===
Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
NUM_INPUT_ROWS -> "100", "key" -> "value"))
assert(sm.currentSourceTriggerDetails(source) ===
Map(TRIGGER_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
// After another trigger starts, the rates and latencies should not change until
// new rows are reported

View file

@ -84,7 +84,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
AssertOnLastQueryStatus { status: StreamingQueryStatus =>
// Check the correctness of the trigger info of the last completed batch reported by
// onQueryProgress
assert(status.triggerDetails.containsKey("triggerId"))
assert(status.triggerDetails.containsKey("batchId"))
assert(status.triggerDetails.get("isTriggerActive") === "false")
assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
@ -104,7 +104,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1")
assert(status.sourceStatuses.length === 1)
assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId"))
assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId"))
assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100")
assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200")
assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2")

View file

@ -48,12 +48,12 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
| Processing rate 23.5 rows/sec
| Latency: 345.0 ms
| Trigger details:
| batchId: 5
| isDataPresentInTrigger: true
| isTriggerActive: true
| latency.getBatch.total: 20
| latency.getOffset.total: 10
| numRows.input.total: 100
| triggerId: 5
| Source statuses [1 source]:
| Source 1 - MySource1
| Available offset: 0
@ -72,7 +72,11 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
test("json") {
assert(StreamingQueryStatus.testStatus.json ===
"""
|{"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
|{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5,
|"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20",
|"numRows.input.total":"100","isTriggerActive":"true","batchId":"5",
|"latency.getOffset.total":"10","isDataPresentInTrigger":"true"},
|"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
|"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
|"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
|"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
@ -84,6 +88,20 @@ class StreamingQueryStatusSuite extends SparkFunSuite {
StreamingQueryStatus.testStatus.prettyJson ===
"""
|{
| "name" : "query",
| "id" : 1,
| "timestamp" : 123,
| "inputRate" : 15.5,
| "processingRate" : 23.5,
| "latency" : 345.0,
| "triggerDetails" : {
| "latency.getBatch.total" : "20",
| "numRows.input.total" : "100",
| "isTriggerActive" : "true",
| "batchId" : "5",
| "latency.getOffset.total" : "10",
| "isDataPresentInTrigger" : "true"
| },
| "sourceStatuses" : [ {
| "description" : "MySource1",
| "offsetDesc" : "0",