[SPARK-18734][SS] Represent timestamp in StreamingQueryProgress as formatted string instead of millis

## What changes were proposed in this pull request?

Easier to read while debugging as a formatted string (in ISO8601 format) than in millis

## How was this patch tested?
Updated unit tests

Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #16166 from tdas/SPARK-18734.
This commit is contained in:
Tathagata Das 2016-12-06 17:04:26 -08:00 committed by Shixiong Zhu
parent 4cc8d8906d
commit 539bb3cf95
4 changed files with 14 additions and 10 deletions

View file

@ -17,7 +17,8 @@
package org.apache.spark.sql.execution.streaming
import java.util.UUID
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone, UUID}
import scala.collection.mutable
import scala.collection.JavaConverters._
@ -78,6 +79,9 @@ trait ProgressReporter extends Logging {
// The timestamp we report an event that has no input data
private var lastNoDataProgressEventTime = Long.MinValue
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))
@volatile
protected var currentStatus: StreamingQueryStatus = {
new StreamingQueryStatus(
@ -156,7 +160,7 @@ trait ProgressReporter extends Logging {
id = id,
runId = runId,
name = name,
timestamp = currentTriggerStartTimestamp,
timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)),
batchId = currentBatchId,
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
currentWatermark = offsetSeqMetadata.batchWatermarkMs,

View file

@ -29,6 +29,7 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._
import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.util.DateTimeUtils
/**
* :: Experimental ::
@ -76,7 +77,7 @@ class StreamingQueryProgress private[sql](
val id: UUID,
val runId: UUID,
val name: String,
val timestamp: Long,
val timestamp: String,
val batchId: Long,
val durationMs: ju.Map[String, java.lang.Long],
val currentWatermark: Long,
@ -109,7 +110,7 @@ class StreamingQueryProgress private[sql](
("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JInt(timestamp)) ~
("timestamp" -> JString(timestamp)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
@ -121,7 +122,6 @@ class StreamingQueryProgress private[sql](
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue)
}
}

View file

@ -38,7 +38,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "id" : "${testProgress1.id.toString}",
| "runId" : "${testProgress1.runId.toString}",
| "name" : "myName",
| "timestamp" : 1,
| "timestamp" : "2016-12-05T20:54:20.827Z",
| "numInputRows" : 678,
| "inputRowsPerSecond" : 10.0,
| "durationMs" : {
@ -71,7 +71,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "id" : "${testProgress2.id.toString}",
| "runId" : "${testProgress2.runId.toString}",
| "name" : null,
| "timestamp" : 1,
| "timestamp" : "2016-12-05T20:54:20.827Z",
| "numInputRows" : 678,
| "durationMs" : {
| "total" : 0
@ -131,7 +131,7 @@ object StreamingQueryStatusAndProgressSuite {
id = UUID.randomUUID,
runId = UUID.randomUUID,
name = "myName",
timestamp = 1L,
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
currentWatermark = 3L,
@ -153,7 +153,7 @@ object StreamingQueryStatusAndProgressSuite {
id = UUID.randomUUID,
runId = UUID.randomUUID,
name = null, // should not be present in the json
timestamp = 1L,
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
currentWatermark = 3L,

View file

@ -243,7 +243,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
assert(progress.id === query.id)
assert(progress.name === query.name)
assert(progress.batchId === 0)
assert(progress.timestamp === 100)
assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC
assert(progress.numInputRows === 2)
assert(progress.processedRowsPerSecond === 2.0)