[SPARK-30119][WEBUI] Add Pagination Support to Streaming Page

### What changes were proposed in this pull request?
* Pagination Support is added to all tables of streaming page in spark web UI.
For adding pagination support, existing classes from #7399 were used.
* Earlier streaming page has two tables `Active Batches` and `Completed Batches`. Now, we will have three tables `Running Batches`, `Waiting Batches` and `Completed Batches`. If we have large number of waiting and running batches then keeping track in a single table is difficult. Also other pages have different table for different type type of data.
* Earlier empty tables were shown. Now only non-empty tables will be shown.
`Active Batches` table used to show details of waiting batches followed by running batches.

### Why are the changes needed?
Pagination will allow users to analyse the table in much better way. All spark web UI pages support pagination apart from streaming pages, so this will add consistency as well. Also it might fix the potential OOM errors that can arise.

### Does this PR introduce _any_ user-facing change?
Yes. `Active Batches` table is split into two tables `Running Batches` and `Waiting Batches`. Pagination Support is added to the all the tables. Every other functionality is unchanged.

### How was this patch tested?
Manually.

Before changes:
<img width="1667" alt="Screenshot 2020-05-03 at 7 07 14 PM" src="https://user-images.githubusercontent.com/15366835/80915680-8fb44b80-8d71-11ea-9957-c4a3769b8b67.png">

After Changes:
<img width="1669" alt="Screenshot 2020-05-03 at 6 51 22 PM" src="https://user-images.githubusercontent.com/15366835/80915694-a9ee2980-8d71-11ea-8fc5-246413a4951d.png">

Closes #28439 from iRakson/streamingPagination.

Authored-by: iRakson <raksonrakesh@gmail.com>
Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
This commit is contained in:
iRakson 2020-06-07 13:08:50 +09:00 committed by Kousuke Saruta
parent 04f66bfd4e
commit e9337f505b
4 changed files with 276 additions and 185 deletions

View file

@ -87,7 +87,8 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');

View file

@ -17,30 +17,41 @@
package org.apache.spark.streaming.ui
import java.net.URLEncoder
import java.nio.charset.StandardCharsets.UTF_8
import javax.servlet.http.HttpServletRequest
import scala.xml.Node
import org.apache.spark.ui.{UIUtils => SparkUIUtils}
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils}
private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
private[ui] class StreamingPagedTable(
request: HttpServletRequest,
tableTag: String,
batches: Seq[BatchUIData],
basePath: String,
subPath: String,
batchInterval: Long) extends PagedTable[BatchUIData] {
protected def columns: Seq[Node] = {
<th>Batch Time</th>
<th>Records</th>
<th>Scheduling Delay
{SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")}
</th>
<th>Processing Time
{SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
}
private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time")
private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
private val firstFailureReason: Option[String] =
if (!tableTag.equals("waitingBatches")) {
getFirstFailureReason(batches)
} else {
None
}
/**
* Return the first failure reason if finding in the batches.
*/
protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
}
protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
firstFailureReason.map { failureReason =>
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
@ -49,7 +60,77 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
}.getOrElse(<td>-</td>)
}
protected def baseRow(batch: BatchUIData): Seq[Node] = {
private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
<td class="progress-cell">
{
SparkUIUtils.makeProgressBar(
started = batch.numActiveOutputOp,
completed = batch.numCompletedOutputOp,
failed = batch.numFailedOutputOp,
skipped = 0,
reasonToNumKilled = Map.empty,
total = batch.outputOperations.size)
}
</td>
}
override def tableId: String = s"$tableTag-table"
override def tableCssClass: String =
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
override def pageSizeFormField: String = s"$tableTag.pageSize"
override def pageNumberFormField: String = s"$tableTag.page"
override def pageLink(page: Int): String = {
parameterPath +
s"&$tableTag.sort=$encodedSortColumn" +
s"&$tableTag.desc=$desc" +
s"&$pageNumberFormField=$page" +
s"&$pageSizeFormField=$pageSize" +
s"#$tableTag"
}
override def goButtonFormPath: String =
s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag"
override def dataSource: PagedDataSource[BatchUIData] =
new StreamingDataSource(batches, pageSize, sortColumn, desc)
override def headers: Seq[Node] = {
// headers, sortable and tooltips
val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = {
Seq(
("Batch Time", true, None),
("Records", true, None),
("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " +
"of a batch")),
("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ {
if (tableTag.equals("completedBatches")) {
Seq(
("Total Delay", true, Some("Total time taken to handle a batch")),
("Output Ops: Succeeded/Total", false, None))
} else {
Seq(
("Output Ops: Succeeded/Total", false, None),
("Status", false, None))
}
} ++ {
if (firstFailureReason.nonEmpty) {
Seq(("Error", false, None))
} else {
Nil
}
}
}
// check if sort column is a valid sortable column
isSortColumnValid(headersAndCssClasses, sortColumn)
headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag)
}
override def row(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
val numRecords = batch.numRecords
@ -58,138 +139,75 @@ private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long)
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
val batchTimeId = s"batch-$batchTime"
<td id={batchTimeId} sorttable_customkey={batchTime.toString}
isFailed={batch.isFailed.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
<td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
{formattedSchedulingDelay}
</td>
<td sorttable_customkey={processingTime.getOrElse(Long.MaxValue).toString}>
{formattedProcessingTime}
</td>
}
private def batchTable: Seq[Node] = {
<table id={tableId} class="table table-bordered table-striped table-sm sortable">
<thead>
{columns}
</thead>
<tbody>
{renderRows}
</tbody>
</table>
}
def toNodeSeq: Seq[Node] = {
batchTable
}
protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
<td class="progress-cell">
{
SparkUIUtils.makeProgressBar(
started = batch.numActiveOutputOp,
completed = batch.numCompletedOutputOp,
failed = batch.numFailedOutputOp,
skipped = 0,
reasonToNumKilled = Map.empty,
total = batch.outputOperations.size)
}
</td>
}
/**
* Return HTML for all rows of this table.
*/
protected def renderRows: Seq[Node]
}
private[ui] class ActiveBatchTable(
runningBatches: Seq[BatchUIData],
waitingBatches: Seq[BatchUIData],
batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
private val firstFailureReason = getFirstFailureReason(runningBatches)
override protected def columns: Seq[Node] = super.columns ++ {
<th>Output Ops: Succeeded/Total</th>
<th>Status</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
}
}
}
override protected def renderRows: Seq[Node] = {
// The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
// waiting batches before running batches
waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
}
private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
}
private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
if (firstFailureReason.nonEmpty) {
// Waiting batches have not run yet, so must have no failure reasons.
<td>-</td>
} else {
Nil
}
}
}
}
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {
private val firstFailureReason = getFirstFailureReason(batches)
override protected def columns: Seq[Node] = super.columns ++ {
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
<th>Output Ops: Succeeded/Total</th> ++ {
if (firstFailureReason.nonEmpty) {
<th>Error</th>
} else {
Nil
}
}
}
override protected def renderRows: Seq[Node] = {
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
}
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
baseRow(batch) ++ {
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
<tr>
<td id={batchTimeId} isFailed={batch.isFailed.toString}>
<a href={s"batch?id=$batchTime"}>
{formattedBatchTime}
</a>
</td>
} ++ createOutputOperationProgressBar(batch)++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
<td> {numRecords.toString} records </td>
<td> {formattedSchedulingDelay} </td>
<td> {formattedProcessingTime} </td>
{
if (tableTag.equals("completedBatches")) {
<td> {formattedTotalDelay} </td> ++
createOutputOperationProgressBar(batch) ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
} else if (tableTag.equals("runningBatches")) {
createOutputOperationProgressBar(batch) ++
<td> processing </td> ++ {
if (firstFailureReason.nonEmpty) {
getFirstFailureTableCell(batch)
} else {
Nil
}
}
} else {
createOutputOperationProgressBar(batch) ++
<td> queued </td> ++ {
if (firstFailureReason.nonEmpty) {
// Waiting batches have not run yet, so must have no failure reasons.
<td>-</td>
} else {
Nil
}
}
}
}
</tr>
}
}
private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String,
desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) {
private val data = info.sorted(ordering(sortColumn, desc))
override protected def dataSize: Int = data.size
override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to)
private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = {
val ordering: Ordering[BatchUIData] = column match {
case "Batch Time" => Ordering.by(_.batchTime.milliseconds)
case "Records" => Ordering.by(_.numRecords)
case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue))
case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue))
case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue))
case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
}
if (desc) {
ordering.reverse
} else {
ordering
}
}
}

View file

@ -20,10 +20,12 @@ package org.apache.spark.streaming.ui
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
import scala.collection.mutable
import scala.xml.{Node, Unparsed}
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
import org.apache.spark.util.Utils
/**
* A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
@ -86,7 +88,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
onClickTimelineFunc ++ basicInfo ++
listener.synchronized {
generateStatTable() ++
generateBatchListTables()
generateBatchListTables(request)
}
SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent)
}
@ -432,50 +434,97 @@ private[ui] class StreamingPage(parent: StreamingTab)
</tr>
}
private def generateBatchListTables(): Seq[Node] = {
private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData],
tableTag: String): Seq[Node] = {
val interval: Long = listener.batchDuration
val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1)
try {
new StreamingPagedTable(
request,
tableTag,
batches,
SparkUIUtils.prependBaseUri(request, parent.basePath),
"streaming",
interval
).table(streamingPage)
} catch {
case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
<div class="alert alert-error">
<p>Error while rendering streaming table:</p>
<pre>
{Utils.exceptionString(e)}
</pre>
</div>
}
}
private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = {
val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse
val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse
val completedBatches = listener.retainedCompletedBatches.
sortBy(_.batchTime.milliseconds).reverse
val activeBatchesContent = {
<div class="row">
<div class="col-12">
<span id="activeBatches" class="collapse-aggregated-activeBatches collapse-table"
onClick="collapseTable('collapse-aggregated-activeBatches',
'aggregated-activeBatches')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>Active Batches ({runningBatches.size + waitingBatches.size})</a>
</h4>
</span>
<div class="aggregated-activeBatches collapsible-table">
{new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq}
val content = mutable.ListBuffer[Node]()
if (runningBatches.nonEmpty) {
content ++=
<div class="row">
<div class="col-12">
<span id="runningBatches" class="collapse-aggregated-runningBatches collapse-table"
onClick="collapseTable('collapse-aggregated-runningBatches',
'aggregated-runningBatches')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>Running Batches ({runningBatches.size})</a>
</h4>
</span>
<div class="aggregated-runningBatches collapsible-table">
{ streamingTable(request, runningBatches, "runningBatches") }
</div>
</div>
</div>
</div>
}
val completedBatchesContent = {
<div class="row">
<div class="col-12">
<span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
onClick="collapseTable('collapse-aggregated-completedBatches',
'aggregated-completedBatches')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>Completed Batches (last {completedBatches.size}
out of {listener.numTotalCompletedBatches})</a>
</h4>
</span>
<div class="aggregated-completedBatches collapsible-table">
{new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq}
if (waitingBatches.nonEmpty) {
content ++=
<div class="row">
<div class="col-12">
<span id="waitingBatches" class="collapse-aggregated-waitingBatches collapse-table"
onClick="collapseTable('collapse-aggregated-waitingBatches',
'aggregated-waitingBatches')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>Waiting Batches ({waitingBatches.size})</a>
</h4>
</span>
<div class="aggregated-waitingBatches collapsible-table">
{ streamingTable(request, waitingBatches, "waitingBatches") }
</div>
</div>
</div>
</div>
}
activeBatchesContent ++ completedBatchesContent
if (completedBatches.nonEmpty) {
content ++=
<div class="row">
<div class="col-12">
<span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
onClick="collapseTable('collapse-aggregated-completedBatches',
'aggregated-completedBatches')">
<h4>
<span class="collapse-table-arrow arrow-open"></span>
<a>Completed Batches (last {completedBatches.size}
out of {listener.numTotalCompletedBatches})</a>
</h4>
</span>
<div class="aggregated-completedBatches collapsible-table">
{ streamingTable(request, completedBatches, "completedBatches") }
</div>
</div>
</div>
}
content
}
}

View file

@ -63,7 +63,7 @@ class UISeleniumSuite
.setMaster("local")
.setAppName("test")
.set(UI_ENABLED, true)
val ssc = new StreamingContext(conf, Seconds(1))
val ssc = new StreamingContext(conf, Milliseconds(100))
assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
ssc
}
@ -104,7 +104,7 @@ class UISeleniumSuite
find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
}
eventually(timeout(10.seconds), interval(50.milliseconds)) {
eventually(timeout(10.seconds), interval(500.milliseconds)) {
// check whether streaming page exists
go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
@ -125,24 +125,47 @@ class UISeleniumSuite
// Check batch tables
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
h4Text.exists(_.matches("Running Batches \\(\\d+\\)")) should be (true)
h4Text.exists(_.matches("Waiting Batches \\(\\d+\\)")) should be (true)
h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
val arrow = 0x25BE.toChar
findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be {
List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
"Output Ops: Succeeded/Total", "Status")
}
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
"Total Delay (?)", "Output Ops: Succeeded/Total")
findAll(cssSelector("""#waitingBatches-table th""")).map(_.text).toList should be {
List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
"Output Ops: Succeeded/Total", "Status")
}
findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be {
List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
"Total Delay", "Output Ops: Succeeded/Total")
}
val batchLinks =
findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq
val pageSize = 3
val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" +
"&completedBatches.desc=true&completedBatches.page=1" +
s"&completedBatches.pageSize=$pageSize#completedBatches"
go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath)
val completedTableRows = findAll(cssSelector("""#completedBatches-table tr"""))
.map(_.text).toList
// header row + pagesize
completedTableRows.length should be (1 + pageSize)
val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" +
"&completedBatches.desc=false&completedBatches.pageSize=3#completedBatches"
// sort batches in ascending order of batch time
go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath)
val batchLinks = findAll(cssSelector("""#completedBatches-table td a"""))
.flatMap(_.attribute("href")).toSeq
batchLinks.size should be >= 1
// Check a normal batch page
go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
go to (batchLinks.head) // Head is the first batch, so it will have some jobs
val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
summaryText should contain ("Batch Duration:")
summaryText should contain ("Input data size:")