diff --git a/core/src/main/resources/org/apache/spark/ui/static/webui.js b/core/src/main/resources/org/apache/spark/ui/static/webui.js
index bb3725650c..4f8409ca2b 100644
--- a/core/src/main/resources/org/apache/spark/ui/static/webui.js
+++ b/core/src/main/resources/org/apache/spark/ui/static/webui.js
@@ -87,8 +87,7 @@ $(function() {
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
- collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
- collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
+ collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
index c0eec0e0b0..1e443f6567 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/AllBatchesTable.scala
@@ -17,41 +17,30 @@
package org.apache.spark.streaming.ui
-import java.net.URLEncoder
-import java.nio.charset.StandardCharsets.UTF_8
-import javax.servlet.http.HttpServletRequest
-
import scala.xml.Node
-import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils}
+import org.apache.spark.ui.{UIUtils => SparkUIUtils}
-private[ui] class StreamingPagedTable(
- request: HttpServletRequest,
- tableTag: String,
- batches: Seq[BatchUIData],
- basePath: String,
- subPath: String,
- batchInterval: Long) extends PagedTable[BatchUIData] {
+private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
- private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time")
- private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
- private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
-
- private val firstFailureReason: Option[String] =
- if (!tableTag.equals("waitingBatches")) {
- getFirstFailureReason(batches)
- } else {
- None
- }
+ protected def columns: Seq[Node] = {
+
Batch Time |
+ Records |
+ Scheduling Delay
+ {SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")}
+ |
+ Processing Time
+ {SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")} |
+ }
/**
* Return the first failure reason if finding in the batches.
*/
- private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
+ protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
}
- private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
+ protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
firstFailureReason.map { failureReason =>
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
@@ -60,77 +49,7 @@ private[ui] class StreamingPagedTable(
}.getOrElse(- | )
}
- private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
-
- {
- SparkUIUtils.makeProgressBar(
- started = batch.numActiveOutputOp,
- completed = batch.numCompletedOutputOp,
- failed = batch.numFailedOutputOp,
- skipped = 0,
- reasonToNumKilled = Map.empty,
- total = batch.outputOperations.size)
- }
- |
- }
-
- override def tableId: String = s"$tableTag-table"
-
- override def tableCssClass: String =
- "table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
-
- override def pageSizeFormField: String = s"$tableTag.pageSize"
-
- override def pageNumberFormField: String = s"$tableTag.page"
-
- override def pageLink(page: Int): String = {
- parameterPath +
- s"&$tableTag.sort=$encodedSortColumn" +
- s"&$tableTag.desc=$desc" +
- s"&$pageNumberFormField=$page" +
- s"&$pageSizeFormField=$pageSize" +
- s"#$tableTag"
- }
-
- override def goButtonFormPath: String =
- s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag"
-
- override def dataSource: PagedDataSource[BatchUIData] =
- new StreamingDataSource(batches, pageSize, sortColumn, desc)
-
- override def headers: Seq[Node] = {
- // headers, sortable and tooltips
- val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = {
- Seq(
- ("Batch Time", true, None),
- ("Records", true, None),
- ("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " +
- "of a batch")),
- ("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ {
- if (tableTag.equals("completedBatches")) {
- Seq(
- ("Total Delay", true, Some("Total time taken to handle a batch")),
- ("Output Ops: Succeeded/Total", false, None))
- } else {
- Seq(
- ("Output Ops: Succeeded/Total", false, None),
- ("Status", false, None))
- }
- } ++ {
- if (firstFailureReason.nonEmpty) {
- Seq(("Error", false, None))
- } else {
- Nil
- }
- }
- }
- // check if sort column is a valid sortable column
- isSortColumnValid(headersAndCssClasses, sortColumn)
-
- headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag)
- }
-
- override def row(batch: BatchUIData): Seq[Node] = {
+ protected def baseRow(batch: BatchUIData): Seq[Node] = {
val batchTime = batch.batchTime.milliseconds
val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
val numRecords = batch.numRecords
@@ -139,75 +58,138 @@ private[ui] class StreamingPagedTable(
val processingTime = batch.processingDelay
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
val batchTimeId = s"batch-$batchTime"
+
+
+
+ {formattedBatchTime}
+
+ |
+ {numRecords.toString} records |
+
+ {formattedSchedulingDelay}
+ |
+
+ {formattedProcessingTime}
+ |
+ }
+
+ private def batchTable: Seq[Node] = {
+
+
+ {columns}
+
+
+ {renderRows}
+
+
+ }
+
+ def toNodeSeq: Seq[Node] = {
+ batchTable
+ }
+
+ protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
+
+ {
+ SparkUIUtils.makeProgressBar(
+ started = batch.numActiveOutputOp,
+ completed = batch.numCompletedOutputOp,
+ failed = batch.numFailedOutputOp,
+ skipped = 0,
+ reasonToNumKilled = Map.empty,
+ total = batch.outputOperations.size)
+ }
+ |
+ }
+
+ /**
+ * Return HTML for all rows of this table.
+ */
+ protected def renderRows: Seq[Node]
+}
+
+private[ui] class ActiveBatchTable(
+ runningBatches: Seq[BatchUIData],
+ waitingBatches: Seq[BatchUIData],
+ batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
+
+ private val firstFailureReason = getFirstFailureReason(runningBatches)
+
+ override protected def columns: Seq[Node] = super.columns ++ {
+ Output Ops: Succeeded/Total |
+ Status | ++ {
+ if (firstFailureReason.nonEmpty) {
+ Error |
+ } else {
+ Nil
+ }
+ }
+ }
+
+ override protected def renderRows: Seq[Node] = {
+ // The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
+ // waiting batches before running batches
+ waitingBatches.flatMap(batch => {waitingBatchRow(batch)}
) ++
+ runningBatches.flatMap(batch => {runningBatchRow(batch)}
)
+ }
+
+ private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
+ baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ processing | ++ {
+ if (firstFailureReason.nonEmpty) {
+ getFirstFailureTableCell(batch)
+ } else {
+ Nil
+ }
+ }
+ }
+
+ private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
+ baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ queued | ++ {
+ if (firstFailureReason.nonEmpty) {
+ // Waiting batches have not run yet, so must have no failure reasons.
+ - |
+ } else {
+ Nil
+ }
+ }
+ }
+}
+
+private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
+ extends BatchTableBase("completed-batches-table", batchInterval) {
+
+ private val firstFailureReason = getFirstFailureReason(batches)
+
+ override protected def columns: Seq[Node] = super.columns ++ {
+ Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")} |
+ Output Ops: Succeeded/Total | ++ {
+ if (firstFailureReason.nonEmpty) {
+ Error |
+ } else {
+ Nil
+ }
+ }
+ }
+
+ override protected def renderRows: Seq[Node] = {
+ batches.flatMap(batch => {completedBatchRow(batch)}
)
+ }
+
+ private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
-
-
-
- {formattedBatchTime}
-
+ baseRow(batch) ++ {
+ |
+ {formattedTotalDelay}
|
- {numRecords.toString} records |
- {formattedSchedulingDelay} |
- {formattedProcessingTime} |
- {
- if (tableTag.equals("completedBatches")) {
- {formattedTotalDelay} | ++
- createOutputOperationProgressBar(batch) ++ {
- if (firstFailureReason.nonEmpty) {
- getFirstFailureTableCell(batch)
- } else {
- Nil
- }
- }
- } else if (tableTag.equals("runningBatches")) {
- createOutputOperationProgressBar(batch) ++
- processing | ++ {
- if (firstFailureReason.nonEmpty) {
- getFirstFailureTableCell(batch)
- } else {
- Nil
- }
- }
- } else {
- createOutputOperationProgressBar(batch) ++
- queued | ++ {
- if (firstFailureReason.nonEmpty) {
- // Waiting batches have not run yet, so must have no failure reasons.
- - |
- } else {
- Nil
- }
- }
- }
+ } ++ createOutputOperationProgressBar(batch)++ {
+ if (firstFailureReason.nonEmpty) {
+ getFirstFailureTableCell(batch)
+ } else {
+ Nil
}
-
- }
-}
-
-private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String,
- desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) {
-
- private val data = info.sorted(ordering(sortColumn, desc))
-
- override protected def dataSize: Int = data.size
-
- override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to)
-
- private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = {
- val ordering: Ordering[BatchUIData] = column match {
- case "Batch Time" => Ordering.by(_.batchTime.milliseconds)
- case "Records" => Ordering.by(_.numRecords)
- case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue))
- case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue))
- case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue))
- case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
- }
- if (desc) {
- ordering.reverse
- } else {
- ordering
}
}
}
diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
index 42d0e50a06..3bdf009dbc 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingPage.scala
@@ -20,12 +20,10 @@ package org.apache.spark.streaming.ui
import java.util.concurrent.TimeUnit
import javax.servlet.http.HttpServletRequest
-import scala.collection.mutable
import scala.xml.{Node, Unparsed}
import org.apache.spark.internal.Logging
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
-import org.apache.spark.util.Utils
/**
* A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
@@ -88,7 +86,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
onClickTimelineFunc ++ basicInfo ++
listener.synchronized {
generateStatTable() ++
- generateBatchListTables(request)
+ generateBatchListTables()
}
SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent)
}
@@ -434,97 +432,50 @@ private[ui] class StreamingPage(parent: StreamingTab)
}
- private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData],
- tableTag: String): Seq[Node] = {
- val interval: Long = listener.batchDuration
- val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1)
-
- try {
- new StreamingPagedTable(
- request,
- tableTag,
- batches,
- SparkUIUtils.prependBaseUri(request, parent.basePath),
- "streaming",
- interval
- ).table(streamingPage)
- } catch {
- case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
-
-
Error while rendering streaming table:
-
- {Utils.exceptionString(e)}
-
-
- }
- }
-
- private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = {
+ private def generateBatchListTables(): Seq[Node] = {
val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse
val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse
val completedBatches = listener.retainedCompletedBatches.
sortBy(_.batchTime.milliseconds).reverse
- val content = mutable.ListBuffer[Node]()
-
- if (runningBatches.nonEmpty) {
- content ++=
-
-
-
-
-
-
- { streamingTable(request, runningBatches, "runningBatches") }
-
+ val activeBatchesContent = {
+
+
+
+
+
+
+ {new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq}
+
}
- if (waitingBatches.nonEmpty) {
- content ++=
-
-
-
-
-
-
- { streamingTable(request, waitingBatches, "waitingBatches") }
-
+ val completedBatchesContent = {
+
+
+
+
+
+
+ {new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq}
+
}
- if (completedBatches.nonEmpty) {
- content ++=
-
-
-
-
-
-
- { streamingTable(request, completedBatches, "completedBatches") }
-
-
-
- }
- content
+ activeBatchesContent ++ completedBatchesContent
}
}
diff --git a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
index 952ef6c374..bdc9e9ee2a 100644
--- a/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
+++ b/streaming/src/test/scala/org/apache/spark/streaming/UISeleniumSuite.scala
@@ -63,7 +63,7 @@ class UISeleniumSuite
.setMaster("local")
.setAppName("test")
.set(UI_ENABLED, true)
- val ssc = new StreamingContext(conf, Milliseconds(100))
+ val ssc = new StreamingContext(conf, Seconds(1))
assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
ssc
}
@@ -104,7 +104,7 @@ class UISeleniumSuite
find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
}
- eventually(timeout(10.seconds), interval(500.milliseconds)) {
+ eventually(timeout(10.seconds), interval(50.milliseconds)) {
// check whether streaming page exists
go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
@@ -125,47 +125,24 @@ class UISeleniumSuite
// Check batch tables
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
- h4Text.exists(_.matches("Running Batches \\(\\d+\\)")) should be (true)
- h4Text.exists(_.matches("Waiting Batches \\(\\d+\\)")) should be (true)
+ h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
- val arrow = 0x25BE.toChar
- findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be {
- List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
+ findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
+ List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
"Output Ops: Succeeded/Total", "Status")
}
- findAll(cssSelector("""#waitingBatches-table th""")).map(_.text).toList should be {
- List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
- "Output Ops: Succeeded/Total", "Status")
- }
- findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be {
- List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
- "Total Delay", "Output Ops: Succeeded/Total")
+ findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
+ List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
+ "Total Delay (?)", "Output Ops: Succeeded/Total")
}
- val pageSize = 3
- val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" +
- "&completedBatches.desc=true&completedBatches.page=1" +
- s"&completedBatches.pageSize=$pageSize#completedBatches"
-
- go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath)
- val completedTableRows = findAll(cssSelector("""#completedBatches-table tr"""))
- .map(_.text).toList
- // header row + pagesize
- completedTableRows.length should be (1 + pageSize)
-
- val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" +
- "&completedBatches.desc=false&completedBatches.pageSize=3#completedBatches"
-
- // sort batches in ascending order of batch time
- go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath)
-
- val batchLinks = findAll(cssSelector("""#completedBatches-table td a"""))
- .flatMap(_.attribute("href")).toSeq
+ val batchLinks =
+ findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq
batchLinks.size should be >= 1
// Check a normal batch page
- go to (batchLinks.head) // Head is the first batch, so it will have some jobs
+ go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
summaryText should contain ("Batch Duration:")
summaryText should contain ("Input data size:")