diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala index 227e5e5af3..77078046dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatisticsPage.scala @@ -22,7 +22,7 @@ import java.lang.{Long => JLong} import java.util.UUID import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, Unparsed} +import scala.xml.{Node, NodeBuffer, Unparsed} import org.apache.spark.internal.Logging import org.apache.spark.sql.streaming.ui.UIUtils._ @@ -126,6 +126,122 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
} + def generateAggregatedStateOperators( + query: StreamingQueryUIData, + minBatchTime: Long, + maxBatchTime: Long, + jsCollector: JsCollector): NodeBuffer = { + // This is made sure on caller side but put it here to be defensive + require(query.lastProgress != null) + if (query.lastProgress.stateOperators.nonEmpty) { + val numRowsTotalData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp), + p.stateOperators.map(_.numRowsTotal).sum.toDouble)) + val maxNumRowsTotal = numRowsTotalData.maxBy(_._2)._2 + + val numRowsUpdatedData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp), + p.stateOperators.map(_.numRowsUpdated).sum.toDouble)) + val maxNumRowsUpdated = numRowsUpdatedData.maxBy(_._2)._2 + + val memoryUsedBytesData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp), + p.stateOperators.map(_.memoryUsedBytes).sum.toDouble)) + val maxMemoryUsedBytes = memoryUsedBytesData.maxBy(_._2)._2 + + val numRowsDroppedByWatermarkData = query.recentProgress + .map(p => (parseProgressTimestamp(p.timestamp), + p.stateOperators.map(_.numRowsDroppedByWatermark).sum.toDouble)) + val maxNumRowsDroppedByWatermark = numRowsDroppedByWatermarkData.maxBy(_._2)._2 + + val graphUIDataForNumberTotalRows = + new GraphUIData( + "aggregated-num-total-state-rows-timeline", + "aggregated-num-total-state-rows-histogram", + numRowsTotalData, + minBatchTime, + maxBatchTime, + 0, + maxNumRowsTotal, + "records") + graphUIDataForNumberTotalRows.generateDataJs(jsCollector) + + val graphUIDataForNumberUpdatedRows = + new GraphUIData( + "aggregated-num-updated-state-rows-timeline", + "aggregated-num-updated-state-rows-histogram", + numRowsUpdatedData, + minBatchTime, + maxBatchTime, + 0, + maxNumRowsUpdated, + "records") + graphUIDataForNumberUpdatedRows.generateDataJs(jsCollector) + + val graphUIDataForMemoryUsedBytes = + new GraphUIData( + "aggregated-state-memory-used-bytes-timeline", + "aggregated-state-memory-used-bytes-histogram", + memoryUsedBytesData, + minBatchTime, + maxBatchTime, + 0, + maxMemoryUsedBytes, + "bytes") + graphUIDataForMemoryUsedBytes.generateDataJs(jsCollector) + + val graphUIDataForNumRowsDroppedByWatermark = + new GraphUIData( + "aggregated-num-state-rows-dropped-by-watermark-timeline", + "aggregated-num-state-rows-dropped-by-watermark-histogram", + numRowsDroppedByWatermarkData, + minBatchTime, + maxBatchTime, + 0, + maxNumRowsDroppedByWatermark, + "records") + graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector) + + // scalastyle:off + + +
+
Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}
+
+ + {graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)} + {graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)} + + + +
+
Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}
+
+ + {graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)} + {graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)} + + + +
+
Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}
+
+ + {graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)} + {graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)} + + + +
+
Aggregated Number Of State Rows Dropped By Watermark {SparkUIUtils.tooltip("Aggregated number of state rows dropped by watermark.", "right")}
+
+ + {graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)} + {graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)} + + // scalastyle:on + } else { + new NodeBuffer() + } + } + def generateStatTable(query: StreamingQueryUIData): Seq[Node] = { val batchToTimestamps = withNoProgress(query, query.recentProgress.map(p => (p.batchId, parseProgressTimestamp(p.timestamp))), @@ -284,6 +400,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab) {graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)} + {generateAggregatedStateOperators(query, minBatchTime, maxBatchTime, jsCollector)} } else { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala index 82aa1453f9..1a8b28001b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ui/UISeleniumSuite.scala @@ -75,10 +75,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq h3Text should not contain ("Streaming Query") + val input1 = spark.readStream.format("rate").load() + val input2 = spark.readStream.format("rate").load() val activeQuery = - spark.readStream.format("rate").load().writeStream.format("noop").start() + input1.join(input2, "value").writeStream.format("noop").start() val completedQuery = - spark.readStream.format("rate").load().writeStream.format("noop").start() + input1.join(input2, "value").writeStream.format("noop").start() completedQuery.stop() val failedQuery = spark.readStream.format("rate").load().select("value").as[Long] .map(_ / 0).writeStream.format("noop").start() @@ -129,6 +131,15 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B findAll(cssSelector("""#stat-table th""")).map(_.text).toSeq should be { List("", "Timelines", "Histograms") } + summaryText should contain ("Input Rate (?)") + summaryText should contain ("Process Rate (?)") + summaryText should contain ("Input Rows (?)") + summaryText should contain ("Batch Duration (?)") + summaryText should contain ("Operation Duration (?)") + summaryText should contain ("Aggregated Number Of Total State Rows (?)") + summaryText should contain ("Aggregated Number Of Updated State Rows (?)") + summaryText should contain ("Aggregated State Memory Used In Bytes (?)") + summaryText should contain ("Aggregated Number Of State Rows Dropped By Watermark (?)") } } finally { spark.streams.active.foreach(_.stop())