[SPARK-33287][SS][UI] Expose state custom metrics information on SS UI

### What changes were proposed in this pull request?
Structured Streaming UI is not containing state custom metrics information. In this PR I've added it.

### Why are the changes needed?
Missing state custom metrics information.

### Does this PR introduce _any_ user-facing change?
Additional UI elements appear.

### How was this patch tested?
Existing unit tests + manual test.
```
#Compile Spark
echo "spark.sql.streaming.ui.enabledCustomMetricList stateOnCurrentVersionSizeBytes" >> conf/spark-defaults.conf
sbin/start-master.sh
sbin/start-worker.sh spark://gsomogyi-MBP16:7077
./bin/spark-submit --master spark://gsomogyi-MBP16:7077 --deploy-mode client --class com.spark.Main ../spark-test/target/spark-test-1.0-SNAPSHOT-jar-with-dependencies.jar
```
<img width="1119" alt="Screenshot 2020-11-18 at 12 45 36" src="https://user-images.githubusercontent.com/18561820/99527506-2f979680-299d-11eb-9187-4ae7fbd2596a.png">

Closes #30336 from gaborgsomogyi/SPARK-33287.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
This commit is contained in:
Gabor Somogyi 2020-11-25 07:38:45 +09:00 committed by Jungtaek Lim (HeartSaVioR)
parent 048a9821c7
commit 95b6dabc33
4 changed files with 127 additions and 39 deletions

View file

@ -249,4 +249,16 @@ object StaticSQLConf {
.version("3.1.0")
.timeConf(TimeUnit.SECONDS)
.createWithDefault(-1)
val ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST =
buildStaticConf("spark.sql.streaming.ui.enabledCustomMetricList")
.internal()
.doc("Configures a list of custom metrics on Structured Streaming UI, which are enabled. " +
"The list contains the name of the custom metrics separated by comma. In aggregation" +
"only sum used. The list of supported custom metrics is state store provider specific " +
"and it can be found out for example from query progress log entry.")
.version("3.1.0")
.stringConf
.toSequence
.createWithDefault(Nil)
}

View file

@ -19,18 +19,32 @@ package org.apache.spark.sql.streaming.ui
import java.{util => ju}
import java.lang.{Long => JLong}
import java.util.UUID
import java.util.{Locale, UUID}
import javax.servlet.http.HttpServletRequest
import scala.collection.JavaConverters._
import scala.xml.{Node, NodeBuffer, Unparsed}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.streaming.state.StateStoreProvider
import org.apache.spark.sql.internal.SQLConf.STATE_STORE_PROVIDER_CLASS
import org.apache.spark.sql.internal.StaticSQLConf.ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST
import org.apache.spark.sql.streaming.ui.UIUtils._
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
extends WebUIPage("statistics") with Logging {
// State store provider implementation mustn't do any heavyweight initialiation in constructor
// but in its init method.
private val supportedCustomMetrics = StateStoreProvider.create(
parent.parent.conf.get(STATE_STORE_PROVIDER_CLASS)).supportedCustomMetrics
logDebug(s"Supported custom metrics: $supportedCustomMetrics")
private val enabledCustomMetrics =
parent.parent.conf.get(ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST).map(_.toLowerCase(Locale.ROOT))
logDebug(s"Enabled custom metrics: $enabledCustomMetrics")
def generateLoadResources(request: HttpServletRequest): Seq[Node] = {
// scalastyle:off
<script src={SparkUIUtils.prependBaseUri(request, "/static/d3.min.js")}></script>
@ -199,49 +213,100 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
"records")
graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)
// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-total-state-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-total-state-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-updated-state-rows-timeline"}>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-updated-state-rows-histogram"}>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-state-memory-used-bytes-timeline"}>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-state-memory-used-bytes-histogram"}>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Rows Dropped By Watermark {SparkUIUtils.tooltip("Accumulates all input rows being dropped in stateful operators by watermark. 'Inputs' are relative to operators.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
val result =
// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-total-state-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-total-state-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-updated-state-rows-timeline"}>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-updated-state-rows-histogram"}>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-state-memory-used-bytes-timeline"}>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-state-memory-used-bytes-histogram"}>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Rows Dropped By Watermark {SparkUIUtils.tooltip("Accumulates all input rows being dropped in stateful operators by watermark. 'Inputs' are relative to operators.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
if (enabledCustomMetrics.nonEmpty) {
result ++= generateAggregatedCustomMetrics(query, minBatchTime, maxBatchTime, jsCollector)
}
result
} else {
new NodeBuffer()
}
}
def generateAggregatedCustomMetrics(
query: StreamingQueryUIData,
minBatchTime: Long,
maxBatchTime: Long,
jsCollector: JsCollector): NodeBuffer = {
val result: NodeBuffer = new NodeBuffer
// This is made sure on caller side but put it here to be defensive
require(query.lastProgress.stateOperators.nonEmpty)
query.lastProgress.stateOperators.head.customMetrics.keySet().asScala
.filter(m => enabledCustomMetrics.contains(m.toLowerCase(Locale.ROOT))).map { metricName =>
val data = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.customMetrics.get(metricName).toDouble).sum))
val max = data.maxBy(_._2)._2
val metric = supportedCustomMetrics.find(_.name.equalsIgnoreCase(metricName)).get
val graphUIData =
new GraphUIData(
s"aggregated-$metricName-timeline",
s"aggregated-$metricName-histogram",
data,
minBatchTime,
maxBatchTime,
0,
max,
"")
graphUIData.generateDataJs(jsCollector)
result ++=
// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 240px;">
<div><strong>Aggregated Custom Metric {s"$metricName"} {SparkUIUtils.tooltip(metric.desc, "right")}</strong></div>
</div>
</td>
<td class={s"aggregated-$metricName-timeline"}>{graphUIData.generateTimelineHtml(jsCollector)}</td>
<td class={s"aggregated-$metricName-histogram"}>{graphUIData.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
}
result
}
def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
val batchToTimestamps = withNoProgress(query,
query.recentProgress.map(p => (p.batchId, parseProgressTimestamp(p.timestamp))),

View file

@ -24,8 +24,10 @@ import org.mockito.Mockito.{mock, when, RETURNS_SMART_NULLS}
import org.scalatest.BeforeAndAfter
import scala.xml.Node
import org.apache.spark.SparkConf
import org.apache.spark.sql.streaming.StreamingQueryProgress
import org.apache.spark.sql.test.SharedSparkSession
import org.apache.spark.ui.SparkUI
class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
@ -65,10 +67,13 @@ class StreamingQueryPageSuite extends SharedSparkSession with BeforeAndAfter {
val request = mock(classOf[HttpServletRequest])
val tab = mock(classOf[StreamingQueryTab], RETURNS_SMART_NULLS)
val statusListener = mock(classOf[StreamingQueryStatusListener], RETURNS_SMART_NULLS)
val ui = mock(classOf[SparkUI])
when(request.getParameter("id")).thenReturn(id.toString)
when(tab.appName).thenReturn("testing")
when(tab.headerTabs).thenReturn(Seq.empty)
when(tab.statusListener).thenReturn(statusListener)
when(ui.conf).thenReturn(new SparkConf())
when(tab.parent).thenReturn(ui)
val streamQuery = createStreamQueryUIData(id)
when(statusListener.allQueryStatus).thenReturn(Seq(streamQuery))

View file

@ -31,6 +31,7 @@ import org.apache.spark.internal.config.UI.{UI_ENABLED, UI_PORT}
import org.apache.spark.sql.LocalSparkSession.withSparkSession
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.util.quietly
import org.apache.spark.sql.internal.StaticSQLConf.ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST
import org.apache.spark.sql.streaming.StreamingQueryException
import org.apache.spark.ui.SparkUICssErrorHandler
@ -53,6 +54,7 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
.setAppName("ui-test")
.set(UI_ENABLED, true)
.set(UI_PORT, 0)
.set(ENABLED_STREAMING_UI_CUSTOM_METRIC_LIST, Seq("stateOnCurrentVersionSizeBytes"))
additionalConfs.foreach { case (k, v) => conf.set(k, v) }
val spark = SparkSession.builder().master(master).config(conf).getOrCreate()
assert(spark.sparkContext.ui.isDefined)
@ -140,6 +142,10 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
summaryText should contain ("Aggregated Number Of Updated State Rows (?)")
summaryText should contain ("Aggregated State Memory Used In Bytes (?)")
summaryText should contain ("Aggregated Number Of Rows Dropped By Watermark (?)")
summaryText should contain ("Aggregated Custom Metric stateOnCurrentVersionSizeBytes" +
" (?)")
summaryText should not contain ("Aggregated Custom Metric loadedMapCacheHitCount (?)")
summaryText should not contain ("Aggregated Custom Metric loadedMapCacheMissCount (?)")
}
} finally {
spark.streams.active.foreach(_.stop())