[SPARK-33223][SS][UI] Structured Streaming Web UI state information

### What changes were proposed in this pull request?
Structured Streaming UI is not containing state information. In this PR I've added it.

### Why are the changes needed?
Missing state information.

### Does this PR introduce _any_ user-facing change?
Additional UI elements appear.

### How was this patch tested?
Existing unit tests + manual test.
<img width="1044" alt="Screenshot 2020-10-30 at 15 14 21" src="https://user-images.githubusercontent.com/18561820/97715405-a1797000-1ac2-11eb-886a-e3e6efa3af3e.png">

Closes #30151 from gaborgsomogyi/SPARK-33223.

Authored-by: Gabor Somogyi <gabor.g.somogyi@gmail.com>
Signed-off-by: Jungtaek Lim (HeartSaVioR) <kabhwan.opensource@gmail.com>
This commit is contained in:
Gabor Somogyi 2020-11-10 11:22:35 +09:00 committed by Jungtaek Lim (HeartSaVioR)
parent 4360c6f12a
commit 4ac8133866
2 changed files with 131 additions and 3 deletions

View file

@ -22,7 +22,7 @@ import java.lang.{Long => JLong}
import java.util.UUID
import javax.servlet.http.HttpServletRequest
import scala.xml.{Node, Unparsed}
import scala.xml.{Node, NodeBuffer, Unparsed}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.streaming.ui.UIUtils._
@ -126,6 +126,122 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
<br />
}
def generateAggregatedStateOperators(
query: StreamingQueryUIData,
minBatchTime: Long,
maxBatchTime: Long,
jsCollector: JsCollector): NodeBuffer = {
// This is made sure on caller side but put it here to be defensive
require(query.lastProgress != null)
if (query.lastProgress.stateOperators.nonEmpty) {
val numRowsTotalData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.numRowsTotal).sum.toDouble))
val maxNumRowsTotal = numRowsTotalData.maxBy(_._2)._2
val numRowsUpdatedData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.numRowsUpdated).sum.toDouble))
val maxNumRowsUpdated = numRowsUpdatedData.maxBy(_._2)._2
val memoryUsedBytesData = query.recentProgress.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.memoryUsedBytes).sum.toDouble))
val maxMemoryUsedBytes = memoryUsedBytesData.maxBy(_._2)._2
val numRowsDroppedByWatermarkData = query.recentProgress
.map(p => (parseProgressTimestamp(p.timestamp),
p.stateOperators.map(_.numRowsDroppedByWatermark).sum.toDouble))
val maxNumRowsDroppedByWatermark = numRowsDroppedByWatermarkData.maxBy(_._2)._2
val graphUIDataForNumberTotalRows =
new GraphUIData(
"aggregated-num-total-state-rows-timeline",
"aggregated-num-total-state-rows-histogram",
numRowsTotalData,
minBatchTime,
maxBatchTime,
0,
maxNumRowsTotal,
"records")
graphUIDataForNumberTotalRows.generateDataJs(jsCollector)
val graphUIDataForNumberUpdatedRows =
new GraphUIData(
"aggregated-num-updated-state-rows-timeline",
"aggregated-num-updated-state-rows-histogram",
numRowsUpdatedData,
minBatchTime,
maxBatchTime,
0,
maxNumRowsUpdated,
"records")
graphUIDataForNumberUpdatedRows.generateDataJs(jsCollector)
val graphUIDataForMemoryUsedBytes =
new GraphUIData(
"aggregated-state-memory-used-bytes-timeline",
"aggregated-state-memory-used-bytes-histogram",
memoryUsedBytesData,
minBatchTime,
maxBatchTime,
0,
maxMemoryUsedBytes,
"bytes")
graphUIDataForMemoryUsedBytes.generateDataJs(jsCollector)
val graphUIDataForNumRowsDroppedByWatermark =
new GraphUIData(
"aggregated-num-state-rows-dropped-by-watermark-timeline",
"aggregated-num-state-rows-dropped-by-watermark-histogram",
numRowsDroppedByWatermarkData,
minBatchTime,
maxBatchTime,
0,
maxNumRowsDroppedByWatermark,
"records")
graphUIDataForNumRowsDroppedByWatermark.generateDataJs(jsCollector)
// scalastyle:off
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Total State Rows {SparkUIUtils.tooltip("Aggregated number of total state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-total-state-rows-timeline"}>{graphUIDataForNumberTotalRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-total-state-rows-histogram"}>{graphUIDataForNumberTotalRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of Updated State Rows {SparkUIUtils.tooltip("Aggregated number of updated state rows.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-updated-state-rows-timeline"}>{graphUIDataForNumberUpdatedRows.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-updated-state-rows-histogram"}>{graphUIDataForNumberUpdatedRows.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated State Memory Used In Bytes {SparkUIUtils.tooltip("Aggregated state memory used in bytes.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-state-memory-used-bytes-timeline"}>{graphUIDataForMemoryUsedBytes.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-state-memory-used-bytes-histogram"}>{graphUIDataForMemoryUsedBytes.generateHistogramHtml(jsCollector)}</td>
</tr>
<tr>
<td style="vertical-align: middle;">
<div style="width: 160px;">
<div><strong>Aggregated Number Of State Rows Dropped By Watermark {SparkUIUtils.tooltip("Aggregated number of state rows dropped by watermark.", "right")}</strong></div>
</div>
</td>
<td class={"aggregated-num-state-rows-dropped-by-watermark-timeline"}>{graphUIDataForNumRowsDroppedByWatermark.generateTimelineHtml(jsCollector)}</td>
<td class={"aggregated-num-state-rows-dropped-by-watermark-histogram"}>{graphUIDataForNumRowsDroppedByWatermark.generateHistogramHtml(jsCollector)}</td>
</tr>
// scalastyle:on
} else {
new NodeBuffer()
}
}
def generateStatTable(query: StreamingQueryUIData): Seq[Node] = {
val batchToTimestamps = withNoProgress(query,
query.recentProgress.map(p => (p.batchId, parseProgressTimestamp(p.timestamp))),
@ -284,6 +400,7 @@ private[ui] class StreamingQueryStatisticsPage(parent: StreamingQueryTab)
</td>
<td class="duration-area-stack" colspan="2">{graphUIDataForDuration.generateAreaStackHtmlWithData(jsCollector, operationDurationData)}</td>
</tr>
{generateAggregatedStateOperators(query, minBatchTime, maxBatchTime, jsCollector)}
</tbody>
</table>
} else {

View file

@ -75,10 +75,12 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
h3Text should not contain ("Streaming Query")
val input1 = spark.readStream.format("rate").load()
val input2 = spark.readStream.format("rate").load()
val activeQuery =
spark.readStream.format("rate").load().writeStream.format("noop").start()
input1.join(input2, "value").writeStream.format("noop").start()
val completedQuery =
spark.readStream.format("rate").load().writeStream.format("noop").start()
input1.join(input2, "value").writeStream.format("noop").start()
completedQuery.stop()
val failedQuery = spark.readStream.format("rate").load().select("value").as[Long]
.map(_ / 0).writeStream.format("noop").start()
@ -129,6 +131,15 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B
findAll(cssSelector("""#stat-table th""")).map(_.text).toSeq should be {
List("", "Timelines", "Histograms")
}
summaryText should contain ("Input Rate (?)")
summaryText should contain ("Process Rate (?)")
summaryText should contain ("Input Rows (?)")
summaryText should contain ("Batch Duration (?)")
summaryText should contain ("Operation Duration (?)")
summaryText should contain ("Aggregated Number Of Total State Rows (?)")
summaryText should contain ("Aggregated Number Of Updated State Rows (?)")
summaryText should contain ("Aggregated State Memory Used In Bytes (?)")
summaryText should contain ("Aggregated Number Of State Rows Dropped By Watermark (?)")
}
} finally {
spark.streams.active.foreach(_.stop())