diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index f326f16232..0e4589be97 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -212,12 +212,12 @@ class StreamingQueryStatus(object): Processing rate 23.5 rows/sec Latency: 345.0 ms Trigger details: + batchId: 5 isDataPresentInTrigger: true isTriggerActive: true latency.getBatch.total: 20 latency.getOffset.total: 10 numRows.input.total: 100 - triggerId: 5 Source statuses [1 source]: Source 1 - MySource1 Available offset: 0 @@ -341,8 +341,8 @@ class StreamingQueryStatus(object): If no trigger is currently active, then it will have details of the last completed trigger. >>> sqs.triggerDetails - {u'triggerId': u'5', u'latency.getBatch.total': u'20', u'numRows.input.total': u'100', - u'isTriggerActive': u'true', u'latency.getOffset.total': u'10', + {u'latency.getBatch.total': u'20', u'numRows.input.total': u'100', + u'isTriggerActive': u'true', u'batchId': u'5', u'latency.getOffset.total': u'10', u'isDataPresentInTrigger': u'true'} """ return self._jsqs.triggerDetails() diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala index 5645554a58..942e6ed894 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala @@ -78,13 +78,13 @@ class StreamMetrics(sources: Set[Source], triggerClock: Clock, codahaleSourceNam // =========== Setter methods =========== - def reportTriggerStarted(triggerId: Long): Unit = synchronized { + def reportTriggerStarted(batchId: Long): Unit = synchronized { numInputRows.clear() triggerDetails.clear() sourceTriggerDetails.values.foreach(_.clear()) - reportTriggerDetail(TRIGGER_ID, triggerId) - sources.foreach(s => reportSourceTriggerDetail(s, TRIGGER_ID, triggerId)) + reportTriggerDetail(BATCH_ID, batchId) + sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId)) reportTriggerDetail(IS_TRIGGER_ACTIVE, true) currentTriggerStartTimestamp = triggerClock.getTimeMillis() reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp) @@ -217,7 +217,7 @@ object StreamMetrics extends Logging { } - val TRIGGER_ID = "triggerId" + val BATCH_ID = "batchId" val IS_TRIGGER_ACTIVE = "isTriggerActive" val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger" val STATUS_MESSAGE = "statusMessage" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala index 99c7729d02..ba732ff7fc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -102,7 +102,7 @@ class StreamingQueryStatus private( ("inputRate" -> JDouble(inputRate)) ~ ("processingRate" -> JDouble(processingRate)) ~ ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~ - ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) + ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~ ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~ ("sinkStatus" -> sinkStatus.jsonValue) } @@ -151,7 +151,7 @@ private[sql] object StreamingQueryStatus { desc = "MySink", offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString), triggerDetails = Map( - TRIGGER_ID -> "5", + BATCH_ID -> "5", IS_TRIGGER_ACTIVE -> "true", IS_DATA_PRESENT_IN_TRIGGER -> "true", GET_OFFSET_LATENCY -> "10", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala index 938423db64..38c4ece439 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala @@ -50,10 +50,10 @@ class StreamMetricsSuite extends SparkFunSuite { assert(sm.currentSourceProcessingRate(source) === 0.0) assert(sm.currentLatency() === None) assert(sm.currentTriggerDetails() === - Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "true", + Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true", START_TIMESTAMP -> "0", "key" -> "value")) assert(sm.currentSourceTriggerDetails(source) === - Map(TRIGGER_ID -> "1", "key2" -> "value2")) + Map(BATCH_ID -> "1", "key2" -> "value2")) // Finishing the trigger should calculate the rates, except input rate which needs // to have another trigger interval @@ -66,11 +66,11 @@ class StreamMetricsSuite extends SparkFunSuite { assert(sm.currentSourceProcessingRate(source) === 100.0) assert(sm.currentLatency() === None) assert(sm.currentTriggerDetails() === - Map(TRIGGER_ID -> "1", IS_TRIGGER_ACTIVE -> "false", + Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false", START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000", NUM_INPUT_ROWS -> "100", "key" -> "value")) assert(sm.currentSourceTriggerDetails(source) === - Map(TRIGGER_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2")) + Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2")) // After another trigger starts, the rates and latencies should not change until // new rows are reported diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index cebb32a0a5..98f3bec708 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -84,7 +84,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { AssertOnLastQueryStatus { status: StreamingQueryStatus => // Check the correctness of the trigger info of the last completed batch reported by // onQueryProgress - assert(status.triggerDetails.containsKey("triggerId")) + assert(status.triggerDetails.containsKey("batchId")) assert(status.triggerDetails.get("isTriggerActive") === "false") assert(status.triggerDetails.get("isDataPresentInTrigger") === "true") @@ -104,7 +104,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { assert(status.triggerDetails.get("numRows.state.aggregation1.updated") === "1") assert(status.sourceStatuses.length === 1) - assert(status.sourceStatuses(0).triggerDetails.containsKey("triggerId")) + assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId")) assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") === "100") assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") === "200") assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === "2") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala index 6af19fb0c2..50a7d92ede 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala @@ -48,12 +48,12 @@ class StreamingQueryStatusSuite extends SparkFunSuite { | Processing rate 23.5 rows/sec | Latency: 345.0 ms | Trigger details: + | batchId: 5 | isDataPresentInTrigger: true | isTriggerActive: true | latency.getBatch.total: 20 | latency.getOffset.total: 10 | numRows.input.total: 100 - | triggerId: 5 | Source statuses [1 source]: | Source 1 - MySource1 | Available offset: 0 @@ -72,7 +72,11 @@ class StreamingQueryStatusSuite extends SparkFunSuite { test("json") { assert(StreamingQueryStatus.testStatus.json === """ - |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5, + |{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5, + |"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20", + |"numRows.input.total":"100","isTriggerActive":"true","batchId":"5", + |"latency.getOffset.total":"10","isDataPresentInTrigger":"true"}, + |"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5, |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100", |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}], |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}} @@ -84,6 +88,20 @@ class StreamingQueryStatusSuite extends SparkFunSuite { StreamingQueryStatus.testStatus.prettyJson === """ |{ + | "name" : "query", + | "id" : 1, + | "timestamp" : 123, + | "inputRate" : 15.5, + | "processingRate" : 23.5, + | "latency" : 345.0, + | "triggerDetails" : { + | "latency.getBatch.total" : "20", + | "numRows.input.total" : "100", + | "isTriggerActive" : "true", + | "batchId" : "5", + | "latency.getOffset.total" : "10", + | "isDataPresentInTrigger" : "true" + | }, | "sourceStatuses" : [ { | "description" : "MySource1", | "offsetDesc" : "0",