Revert "[SPARK-30119][WEBUI] Add Pagination Support to Streaming Page"
This PR reverts #28439 due to that PR breaks QA build. Closes #28747 from sarutak/revert-SPARK-30119. Authored-by: Kousuke Saruta <sarutak@oss.nttdata.com> Signed-off-by: Kousuke Saruta <sarutak@oss.nttdata.com>
This commit is contained in:
parent
e9337f505b
commit
f7501ddd70
|
@ -87,8 +87,7 @@ $(function() {
|
|||
collapseTablePageLoad('collapse-aggregated-poolActiveStages','aggregated-poolActiveStages');
|
||||
collapseTablePageLoad('collapse-aggregated-tasks','aggregated-tasks');
|
||||
collapseTablePageLoad('collapse-aggregated-rdds','aggregated-rdds');
|
||||
collapseTablePageLoad('collapse-aggregated-waitingBatches','aggregated-waitingBatches');
|
||||
collapseTablePageLoad('collapse-aggregated-runningBatches','aggregated-runningBatches');
|
||||
collapseTablePageLoad('collapse-aggregated-activeBatches','aggregated-activeBatches');
|
||||
collapseTablePageLoad('collapse-aggregated-completedBatches','aggregated-completedBatches');
|
||||
collapseTablePageLoad('collapse-aggregated-runningExecutions','aggregated-runningExecutions');
|
||||
collapseTablePageLoad('collapse-aggregated-completedExecutions','aggregated-completedExecutions');
|
||||
|
|
|
@ -17,41 +17,30 @@
|
|||
|
||||
package org.apache.spark.streaming.ui
|
||||
|
||||
import java.net.URLEncoder
|
||||
import java.nio.charset.StandardCharsets.UTF_8
|
||||
import javax.servlet.http.HttpServletRequest
|
||||
|
||||
import scala.xml.Node
|
||||
|
||||
import org.apache.spark.ui.{PagedDataSource, PagedTable, UIUtils => SparkUIUtils}
|
||||
import org.apache.spark.ui.{UIUtils => SparkUIUtils}
|
||||
|
||||
private[ui] class StreamingPagedTable(
|
||||
request: HttpServletRequest,
|
||||
tableTag: String,
|
||||
batches: Seq[BatchUIData],
|
||||
basePath: String,
|
||||
subPath: String,
|
||||
batchInterval: Long) extends PagedTable[BatchUIData] {
|
||||
private[ui] abstract class BatchTableBase(tableId: String, batchInterval: Long) {
|
||||
|
||||
private val(sortColumn, desc, pageSize) = getTableParameters(request, tableTag, "Batch Time")
|
||||
private val parameterPath = s"$basePath/$subPath/?${getParameterOtherTable(request, tableTag)}"
|
||||
private val encodedSortColumn = URLEncoder.encode(sortColumn, UTF_8.name())
|
||||
|
||||
private val firstFailureReason: Option[String] =
|
||||
if (!tableTag.equals("waitingBatches")) {
|
||||
getFirstFailureReason(batches)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
protected def columns: Seq[Node] = {
|
||||
<th>Batch Time</th>
|
||||
<th>Records</th>
|
||||
<th>Scheduling Delay
|
||||
{SparkUIUtils.tooltip("Time taken by Streaming scheduler to submit jobs of a batch", "top")}
|
||||
</th>
|
||||
<th>Processing Time
|
||||
{SparkUIUtils.tooltip("Time taken to process all jobs of a batch", "top")}</th>
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the first failure reason if finding in the batches.
|
||||
*/
|
||||
private def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
|
||||
protected def getFirstFailureReason(batches: Seq[BatchUIData]): Option[String] = {
|
||||
batches.flatMap(_.outputOperations.flatMap(_._2.failureReason)).headOption
|
||||
}
|
||||
|
||||
private def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
|
||||
protected def getFirstFailureTableCell(batch: BatchUIData): Seq[Node] = {
|
||||
val firstFailureReason = batch.outputOperations.flatMap(_._2.failureReason).headOption
|
||||
firstFailureReason.map { failureReason =>
|
||||
val failureReasonForUI = UIUtils.createOutputOperationFailureForUI(failureReason)
|
||||
|
@ -60,77 +49,7 @@ private[ui] class StreamingPagedTable(
|
|||
}.getOrElse(<td>-</td>)
|
||||
}
|
||||
|
||||
private def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
|
||||
<td class="progress-cell">
|
||||
{
|
||||
SparkUIUtils.makeProgressBar(
|
||||
started = batch.numActiveOutputOp,
|
||||
completed = batch.numCompletedOutputOp,
|
||||
failed = batch.numFailedOutputOp,
|
||||
skipped = 0,
|
||||
reasonToNumKilled = Map.empty,
|
||||
total = batch.outputOperations.size)
|
||||
}
|
||||
</td>
|
||||
}
|
||||
|
||||
override def tableId: String = s"$tableTag-table"
|
||||
|
||||
override def tableCssClass: String =
|
||||
"table table-bordered table-sm table-striped table-head-clickable table-cell-width-limited"
|
||||
|
||||
override def pageSizeFormField: String = s"$tableTag.pageSize"
|
||||
|
||||
override def pageNumberFormField: String = s"$tableTag.page"
|
||||
|
||||
override def pageLink(page: Int): String = {
|
||||
parameterPath +
|
||||
s"&$tableTag.sort=$encodedSortColumn" +
|
||||
s"&$tableTag.desc=$desc" +
|
||||
s"&$pageNumberFormField=$page" +
|
||||
s"&$pageSizeFormField=$pageSize" +
|
||||
s"#$tableTag"
|
||||
}
|
||||
|
||||
override def goButtonFormPath: String =
|
||||
s"$parameterPath&$tableTag.sort=$encodedSortColumn&$tableTag.desc=$desc#$tableTag"
|
||||
|
||||
override def dataSource: PagedDataSource[BatchUIData] =
|
||||
new StreamingDataSource(batches, pageSize, sortColumn, desc)
|
||||
|
||||
override def headers: Seq[Node] = {
|
||||
// headers, sortable and tooltips
|
||||
val headersAndCssClasses: Seq[(String, Boolean, Option[String])] = {
|
||||
Seq(
|
||||
("Batch Time", true, None),
|
||||
("Records", true, None),
|
||||
("Scheduling Delay", true, Some("Time taken by Streaming scheduler to submit jobs " +
|
||||
"of a batch")),
|
||||
("Processing Time", true, Some("Time taken to process all jobs of a batch"))) ++ {
|
||||
if (tableTag.equals("completedBatches")) {
|
||||
Seq(
|
||||
("Total Delay", true, Some("Total time taken to handle a batch")),
|
||||
("Output Ops: Succeeded/Total", false, None))
|
||||
} else {
|
||||
Seq(
|
||||
("Output Ops: Succeeded/Total", false, None),
|
||||
("Status", false, None))
|
||||
}
|
||||
} ++ {
|
||||
if (firstFailureReason.nonEmpty) {
|
||||
Seq(("Error", false, None))
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
}
|
||||
}
|
||||
// check if sort column is a valid sortable column
|
||||
isSortColumnValid(headersAndCssClasses, sortColumn)
|
||||
|
||||
headerRow(headersAndCssClasses, desc, pageSize, sortColumn, parameterPath, tableTag, tableTag)
|
||||
}
|
||||
|
||||
override def row(batch: BatchUIData): Seq[Node] = {
|
||||
protected def baseRow(batch: BatchUIData): Seq[Node] = {
|
||||
val batchTime = batch.batchTime.milliseconds
|
||||
val formattedBatchTime = SparkUIUtils.formatBatchTime(batchTime, batchInterval)
|
||||
val numRecords = batch.numRecords
|
||||
|
@ -139,75 +58,138 @@ private[ui] class StreamingPagedTable(
|
|||
val processingTime = batch.processingDelay
|
||||
val formattedProcessingTime = processingTime.map(SparkUIUtils.formatDuration).getOrElse("-")
|
||||
val batchTimeId = s"batch-$batchTime"
|
||||
|
||||
<td id={batchTimeId} sorttable_customkey={batchTime.toString}
|
||||
isFailed={batch.isFailed.toString}>
|
||||
<a href={s"batch?id=$batchTime"}>
|
||||
{formattedBatchTime}
|
||||
</a>
|
||||
</td>
|
||||
<td sorttable_customkey={numRecords.toString}>{numRecords.toString} records</td>
|
||||
<td sorttable_customkey={schedulingDelay.getOrElse(Long.MaxValue).toString}>
|
||||
{formattedSchedulingDelay}
|
||||
</td>
|
||||
<td sorttable_customkey={processingTime.getOrElse(Long.MaxValue).toString}>
|
||||
{formattedProcessingTime}
|
||||
</td>
|
||||
}
|
||||
|
||||
private def batchTable: Seq[Node] = {
|
||||
<table id={tableId} class="table table-bordered table-striped table-sm sortable">
|
||||
<thead>
|
||||
{columns}
|
||||
</thead>
|
||||
<tbody>
|
||||
{renderRows}
|
||||
</tbody>
|
||||
</table>
|
||||
}
|
||||
|
||||
def toNodeSeq: Seq[Node] = {
|
||||
batchTable
|
||||
}
|
||||
|
||||
protected def createOutputOperationProgressBar(batch: BatchUIData): Seq[Node] = {
|
||||
<td class="progress-cell">
|
||||
{
|
||||
SparkUIUtils.makeProgressBar(
|
||||
started = batch.numActiveOutputOp,
|
||||
completed = batch.numCompletedOutputOp,
|
||||
failed = batch.numFailedOutputOp,
|
||||
skipped = 0,
|
||||
reasonToNumKilled = Map.empty,
|
||||
total = batch.outputOperations.size)
|
||||
}
|
||||
</td>
|
||||
}
|
||||
|
||||
/**
|
||||
* Return HTML for all rows of this table.
|
||||
*/
|
||||
protected def renderRows: Seq[Node]
|
||||
}
|
||||
|
||||
private[ui] class ActiveBatchTable(
|
||||
runningBatches: Seq[BatchUIData],
|
||||
waitingBatches: Seq[BatchUIData],
|
||||
batchInterval: Long) extends BatchTableBase("active-batches-table", batchInterval) {
|
||||
|
||||
private val firstFailureReason = getFirstFailureReason(runningBatches)
|
||||
|
||||
override protected def columns: Seq[Node] = super.columns ++ {
|
||||
<th>Output Ops: Succeeded/Total</th>
|
||||
<th>Status</th> ++ {
|
||||
if (firstFailureReason.nonEmpty) {
|
||||
<th>Error</th>
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override protected def renderRows: Seq[Node] = {
|
||||
// The "batchTime"s of "waitingBatches" must be greater than "runningBatches"'s, so display
|
||||
// waiting batches before running batches
|
||||
waitingBatches.flatMap(batch => <tr>{waitingBatchRow(batch)}</tr>) ++
|
||||
runningBatches.flatMap(batch => <tr>{runningBatchRow(batch)}</tr>)
|
||||
}
|
||||
|
||||
private def runningBatchRow(batch: BatchUIData): Seq[Node] = {
|
||||
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>processing</td> ++ {
|
||||
if (firstFailureReason.nonEmpty) {
|
||||
getFirstFailureTableCell(batch)
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def waitingBatchRow(batch: BatchUIData): Seq[Node] = {
|
||||
baseRow(batch) ++ createOutputOperationProgressBar(batch) ++ <td>queued</td>++ {
|
||||
if (firstFailureReason.nonEmpty) {
|
||||
// Waiting batches have not run yet, so must have no failure reasons.
|
||||
<td>-</td>
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
|
||||
extends BatchTableBase("completed-batches-table", batchInterval) {
|
||||
|
||||
private val firstFailureReason = getFirstFailureReason(batches)
|
||||
|
||||
override protected def columns: Seq[Node] = super.columns ++ {
|
||||
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
|
||||
<th>Output Ops: Succeeded/Total</th> ++ {
|
||||
if (firstFailureReason.nonEmpty) {
|
||||
<th>Error</th>
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override protected def renderRows: Seq[Node] = {
|
||||
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
|
||||
}
|
||||
|
||||
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
|
||||
val totalDelay = batch.totalDelay
|
||||
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
|
||||
|
||||
<tr>
|
||||
<td id={batchTimeId} isFailed={batch.isFailed.toString}>
|
||||
<a href={s"batch?id=$batchTime"}>
|
||||
{formattedBatchTime}
|
||||
</a>
|
||||
baseRow(batch) ++ {
|
||||
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
|
||||
{formattedTotalDelay}
|
||||
</td>
|
||||
<td> {numRecords.toString} records </td>
|
||||
<td> {formattedSchedulingDelay} </td>
|
||||
<td> {formattedProcessingTime} </td>
|
||||
{
|
||||
if (tableTag.equals("completedBatches")) {
|
||||
<td> {formattedTotalDelay} </td> ++
|
||||
createOutputOperationProgressBar(batch) ++ {
|
||||
if (firstFailureReason.nonEmpty) {
|
||||
getFirstFailureTableCell(batch)
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
}
|
||||
} else if (tableTag.equals("runningBatches")) {
|
||||
createOutputOperationProgressBar(batch) ++
|
||||
<td> processing </td> ++ {
|
||||
if (firstFailureReason.nonEmpty) {
|
||||
getFirstFailureTableCell(batch)
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
}
|
||||
} else {
|
||||
createOutputOperationProgressBar(batch) ++
|
||||
<td> queued </td> ++ {
|
||||
if (firstFailureReason.nonEmpty) {
|
||||
// Waiting batches have not run yet, so must have no failure reasons.
|
||||
<td>-</td>
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
}
|
||||
}
|
||||
} ++ createOutputOperationProgressBar(batch)++ {
|
||||
if (firstFailureReason.nonEmpty) {
|
||||
getFirstFailureTableCell(batch)
|
||||
} else {
|
||||
Nil
|
||||
}
|
||||
</tr>
|
||||
}
|
||||
}
|
||||
|
||||
private[ui] class StreamingDataSource(info: Seq[BatchUIData], pageSize: Int, sortColumn: String,
|
||||
desc: Boolean) extends PagedDataSource[BatchUIData](pageSize) {
|
||||
|
||||
private val data = info.sorted(ordering(sortColumn, desc))
|
||||
|
||||
override protected def dataSize: Int = data.size
|
||||
|
||||
override protected def sliceData(from: Int, to: Int): Seq[BatchUIData] = data.slice(from, to)
|
||||
|
||||
private def ordering(column: String, desc: Boolean): Ordering[BatchUIData] = {
|
||||
val ordering: Ordering[BatchUIData] = column match {
|
||||
case "Batch Time" => Ordering.by(_.batchTime.milliseconds)
|
||||
case "Records" => Ordering.by(_.numRecords)
|
||||
case "Scheduling Delay" => Ordering.by(_.schedulingDelay.getOrElse(Long.MaxValue))
|
||||
case "Processing Time" => Ordering.by(_.processingDelay.getOrElse(Long.MaxValue))
|
||||
case "Total Delay" => Ordering.by(_.totalDelay.getOrElse(Long.MaxValue))
|
||||
case unknownColumn => throw new IllegalArgumentException(s"Unknown Column: $unknownColumn")
|
||||
}
|
||||
if (desc) {
|
||||
ordering.reverse
|
||||
} else {
|
||||
ordering
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -20,12 +20,10 @@ package org.apache.spark.streaming.ui
|
|||
import java.util.concurrent.TimeUnit
|
||||
import javax.servlet.http.HttpServletRequest
|
||||
|
||||
import scala.collection.mutable
|
||||
import scala.xml.{Node, Unparsed}
|
||||
|
||||
import org.apache.spark.internal.Logging
|
||||
import org.apache.spark.ui.{GraphUIData, JsCollector, UIUtils => SparkUIUtils, WebUIPage}
|
||||
import org.apache.spark.util.Utils
|
||||
|
||||
/**
|
||||
* A helper class for "scheduling delay", "processing time" and "total delay" to generate data that
|
||||
|
@ -88,7 +86,7 @@ private[ui] class StreamingPage(parent: StreamingTab)
|
|||
onClickTimelineFunc ++ basicInfo ++
|
||||
listener.synchronized {
|
||||
generateStatTable() ++
|
||||
generateBatchListTables(request)
|
||||
generateBatchListTables()
|
||||
}
|
||||
SparkUIUtils.headerSparkPage(request, "Streaming Statistics", content, parent)
|
||||
}
|
||||
|
@ -434,97 +432,50 @@ private[ui] class StreamingPage(parent: StreamingTab)
|
|||
</tr>
|
||||
}
|
||||
|
||||
private def streamingTable(request: HttpServletRequest, batches: Seq[BatchUIData],
|
||||
tableTag: String): Seq[Node] = {
|
||||
val interval: Long = listener.batchDuration
|
||||
val streamingPage = Option(request.getParameter(s"$tableTag.page")).map(_.toInt).getOrElse(1)
|
||||
|
||||
try {
|
||||
new StreamingPagedTable(
|
||||
request,
|
||||
tableTag,
|
||||
batches,
|
||||
SparkUIUtils.prependBaseUri(request, parent.basePath),
|
||||
"streaming",
|
||||
interval
|
||||
).table(streamingPage)
|
||||
} catch {
|
||||
case e @ (_: IllegalArgumentException | _: IndexOutOfBoundsException) =>
|
||||
<div class="alert alert-error">
|
||||
<p>Error while rendering streaming table:</p>
|
||||
<pre>
|
||||
{Utils.exceptionString(e)}
|
||||
</pre>
|
||||
</div>
|
||||
}
|
||||
}
|
||||
|
||||
private def generateBatchListTables(request: HttpServletRequest): Seq[Node] = {
|
||||
private def generateBatchListTables(): Seq[Node] = {
|
||||
val runningBatches = listener.runningBatches.sortBy(_.batchTime.milliseconds).reverse
|
||||
val waitingBatches = listener.waitingBatches.sortBy(_.batchTime.milliseconds).reverse
|
||||
val completedBatches = listener.retainedCompletedBatches.
|
||||
sortBy(_.batchTime.milliseconds).reverse
|
||||
|
||||
val content = mutable.ListBuffer[Node]()
|
||||
|
||||
if (runningBatches.nonEmpty) {
|
||||
content ++=
|
||||
<div class="row">
|
||||
<div class="col-12">
|
||||
<span id="runningBatches" class="collapse-aggregated-runningBatches collapse-table"
|
||||
onClick="collapseTable('collapse-aggregated-runningBatches',
|
||||
'aggregated-runningBatches')">
|
||||
<h4>
|
||||
<span class="collapse-table-arrow arrow-open"></span>
|
||||
<a>Running Batches ({runningBatches.size})</a>
|
||||
</h4>
|
||||
</span>
|
||||
<div class="aggregated-runningBatches collapsible-table">
|
||||
{ streamingTable(request, runningBatches, "runningBatches") }
|
||||
</div>
|
||||
val activeBatchesContent = {
|
||||
<div class="row">
|
||||
<div class="col-12">
|
||||
<span id="activeBatches" class="collapse-aggregated-activeBatches collapse-table"
|
||||
onClick="collapseTable('collapse-aggregated-activeBatches',
|
||||
'aggregated-activeBatches')">
|
||||
<h4>
|
||||
<span class="collapse-table-arrow arrow-open"></span>
|
||||
<a>Active Batches ({runningBatches.size + waitingBatches.size})</a>
|
||||
</h4>
|
||||
</span>
|
||||
<div class="aggregated-activeBatches collapsible-table">
|
||||
{new ActiveBatchTable(runningBatches, waitingBatches, listener.batchDuration).toNodeSeq}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
}
|
||||
|
||||
if (waitingBatches.nonEmpty) {
|
||||
content ++=
|
||||
<div class="row">
|
||||
<div class="col-12">
|
||||
<span id="waitingBatches" class="collapse-aggregated-waitingBatches collapse-table"
|
||||
onClick="collapseTable('collapse-aggregated-waitingBatches',
|
||||
'aggregated-waitingBatches')">
|
||||
<h4>
|
||||
<span class="collapse-table-arrow arrow-open"></span>
|
||||
<a>Waiting Batches ({waitingBatches.size})</a>
|
||||
</h4>
|
||||
</span>
|
||||
<div class="aggregated-waitingBatches collapsible-table">
|
||||
{ streamingTable(request, waitingBatches, "waitingBatches") }
|
||||
</div>
|
||||
val completedBatchesContent = {
|
||||
<div class="row">
|
||||
<div class="col-12">
|
||||
<span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
|
||||
onClick="collapseTable('collapse-aggregated-completedBatches',
|
||||
'aggregated-completedBatches')">
|
||||
<h4>
|
||||
<span class="collapse-table-arrow arrow-open"></span>
|
||||
<a>Completed Batches (last {completedBatches.size}
|
||||
out of {listener.numTotalCompletedBatches})</a>
|
||||
</h4>
|
||||
</span>
|
||||
<div class="aggregated-completedBatches collapsible-table">
|
||||
{new CompletedBatchTable(completedBatches, listener.batchDuration).toNodeSeq}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
}
|
||||
|
||||
if (completedBatches.nonEmpty) {
|
||||
content ++=
|
||||
<div class="row">
|
||||
<div class="col-12">
|
||||
<span id="completedBatches" class="collapse-aggregated-completedBatches collapse-table"
|
||||
onClick="collapseTable('collapse-aggregated-completedBatches',
|
||||
'aggregated-completedBatches')">
|
||||
<h4>
|
||||
<span class="collapse-table-arrow arrow-open"></span>
|
||||
<a>Completed Batches (last {completedBatches.size}
|
||||
out of {listener.numTotalCompletedBatches})</a>
|
||||
</h4>
|
||||
</span>
|
||||
<div class="aggregated-completedBatches collapsible-table">
|
||||
{ streamingTable(request, completedBatches, "completedBatches") }
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
}
|
||||
content
|
||||
activeBatchesContent ++ completedBatchesContent
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ class UISeleniumSuite
|
|||
.setMaster("local")
|
||||
.setAppName("test")
|
||||
.set(UI_ENABLED, true)
|
||||
val ssc = new StreamingContext(conf, Milliseconds(100))
|
||||
val ssc = new StreamingContext(conf, Seconds(1))
|
||||
assert(ssc.sc.ui.isDefined, "Spark UI is not started!")
|
||||
ssc
|
||||
}
|
||||
|
@ -104,7 +104,7 @@ class UISeleniumSuite
|
|||
find(cssSelector( """ul li a[href*="streaming"]""")) should not be (None)
|
||||
}
|
||||
|
||||
eventually(timeout(10.seconds), interval(500.milliseconds)) {
|
||||
eventually(timeout(10.seconds), interval(50.milliseconds)) {
|
||||
// check whether streaming page exists
|
||||
go to (sparkUI.webUrl.stripSuffix("/") + "/streaming")
|
||||
val h3Text = findAll(cssSelector("h3")).map(_.text).toSeq
|
||||
|
@ -125,47 +125,24 @@ class UISeleniumSuite
|
|||
|
||||
// Check batch tables
|
||||
val h4Text = findAll(cssSelector("h4")).map(_.text).toSeq
|
||||
h4Text.exists(_.matches("Running Batches \\(\\d+\\)")) should be (true)
|
||||
h4Text.exists(_.matches("Waiting Batches \\(\\d+\\)")) should be (true)
|
||||
h4Text.exists(_.matches("Active Batches \\(\\d+\\)")) should be (true)
|
||||
h4Text.exists(_.matches("Completed Batches \\(last \\d+ out of \\d+\\)")) should be (true)
|
||||
|
||||
val arrow = 0x25BE.toChar
|
||||
findAll(cssSelector("""#runningBatches-table th""")).map(_.text).toList should be {
|
||||
List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
|
||||
findAll(cssSelector("""#active-batches-table th""")).map(_.text).toSeq should be {
|
||||
List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
|
||||
"Output Ops: Succeeded/Total", "Status")
|
||||
}
|
||||
findAll(cssSelector("""#waitingBatches-table th""")).map(_.text).toList should be {
|
||||
List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
|
||||
"Output Ops: Succeeded/Total", "Status")
|
||||
}
|
||||
findAll(cssSelector("""#completedBatches-table th""")).map(_.text).toList should be {
|
||||
List(s"Batch Time $arrow", "Records", "Scheduling Delay", "Processing Time",
|
||||
"Total Delay", "Output Ops: Succeeded/Total")
|
||||
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
|
||||
List("Batch Time", "Records", "Scheduling Delay (?)", "Processing Time (?)",
|
||||
"Total Delay (?)", "Output Ops: Succeeded/Total")
|
||||
}
|
||||
|
||||
val pageSize = 3
|
||||
val pagedTablePath = "/streaming/?completedBatches.sort=Batch+Time" +
|
||||
"&completedBatches.desc=true&completedBatches.page=1" +
|
||||
s"&completedBatches.pageSize=$pageSize#completedBatches"
|
||||
|
||||
go to (sparkUI.webUrl.stripSuffix("/") + pagedTablePath)
|
||||
val completedTableRows = findAll(cssSelector("""#completedBatches-table tr"""))
|
||||
.map(_.text).toList
|
||||
// header row + pagesize
|
||||
completedTableRows.length should be (1 + pageSize)
|
||||
|
||||
val sortedBatchTimePath = "/streaming/?&completedBatches.sort=Batch+Time" +
|
||||
"&completedBatches.desc=false&completedBatches.pageSize=3#completedBatches"
|
||||
|
||||
// sort batches in ascending order of batch time
|
||||
go to (sparkUI.webUrl.stripSuffix("/") + sortedBatchTimePath)
|
||||
|
||||
val batchLinks = findAll(cssSelector("""#completedBatches-table td a"""))
|
||||
.flatMap(_.attribute("href")).toSeq
|
||||
val batchLinks =
|
||||
findAll(cssSelector("""#completed-batches-table a""")).flatMap(_.attribute("href")).toSeq
|
||||
batchLinks.size should be >= 1
|
||||
|
||||
// Check a normal batch page
|
||||
go to (batchLinks.head) // Head is the first batch, so it will have some jobs
|
||||
go to (batchLinks.last) // Last should be the first batch, so it will have some jobs
|
||||
val summaryText = findAll(cssSelector("li strong")).map(_.text).toSeq
|
||||
summaryText should contain ("Batch Duration:")
|
||||
summaryText should contain ("Input data size:")
|
||||
|
|
Loading…
Reference in a new issue