[SPARK-10885] [STREAMING] Display the failed output op in Streaming UI

This PR implements the following features for both `master` and `branch-1.5`.
1. Display the failed output op count in the batch list
2. Display the failure reason of output op in the batch detail page

Screenshots:
<img width="1356" alt="1" src="https://cloud.githubusercontent.com/assets/1000778/10198387/5b2b97ec-67ce-11e5-81c2-f818b9d2f3ad.png">
<img width="1356" alt="2" src="https://cloud.githubusercontent.com/assets/1000778/10198388/5b76ac14-67ce-11e5-8c8b-de2683c5b485.png">

There are still two remaining problems in the UI.
1. If an output operation doesn't run any spark job, we cannot get the its duration since now it's the sum of all jobs' durations.
2. If an output operation doesn't run any spark job, we cannot get the description since it's the latest job's call site.

We need to add new `StreamingListenerEvent` about output operations to fix them. So I'd like to fix them only for `master` in another PR.

Author: zsxwing <zsxwing@gmail.com>

Closes #8950 from zsxwing/batch-failure.
This commit is contained in:
zsxwing 2015-10-06 16:51:03 -07:00 committed by Tathagata Das
parent 5e035403d4
commit ffe6831e49
6 changed files with 143 additions and 27 deletions

View file

@ -41,6 +41,8 @@ case class BatchInfo(
private var _failureReasons: Map[Int, String] = Map.empty
private var _numOutputOp: Int = 0
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
@ -77,4 +79,12 @@ case class BatchInfo(
/** Failure reasons corresponding to every output ops in the batch */
private[streaming] def failureReasons = _failureReasons
/** Set the number of output operations in this batch */
private[streaming] def setNumOutputOp(numOutputOp: Int): Unit = {
_numOutputOp = numOutputOp
}
/** Return the number of output operations in this batch */
private[streaming] def numOutputOp: Int = _numOutputOp
}

View file

@ -81,6 +81,7 @@ case class JobSet(
if (processingEndTime >= 0) Some(processingEndTime) else None
)
binfo.setFailureReason(failureReasons)
binfo.setNumOutputOp(jobs.size)
binfo
}
}

View file

@ -107,9 +107,10 @@ private[ui] class ActiveBatchTable(
private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval: Long)
extends BatchTableBase("completed-batches-table", batchInterval) {
override protected def columns: Seq[Node] = super.columns ++
<th>Total Delay
{SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
override protected def columns: Seq[Node] = super.columns ++ {
<th>Total Delay {SparkUIUtils.tooltip("Total time taken to handle a batch", "top")}</th>
<th>Output Ops: Succeeded/Total</th>
}
override protected def renderRows: Seq[Node] = {
batches.flatMap(batch => <tr>{completedBatchRow(batch)}</tr>)
@ -118,9 +119,17 @@ private[ui] class CompletedBatchTable(batches: Seq[BatchUIData], batchInterval:
private def completedBatchRow(batch: BatchUIData): Seq[Node] = {
val totalDelay = batch.totalDelay
val formattedTotalDelay = totalDelay.map(SparkUIUtils.formatDuration).getOrElse("-")
val numFailedOutputOp = batch.failureReason.size
val outputOpColumn = if (numFailedOutputOp > 0) {
s"${batch.numOutputOp - numFailedOutputOp}/${batch.numOutputOp}" +
s" (${numFailedOutputOp} failed)"
} else {
s"${batch.numOutputOp}/${batch.numOutputOp}"
}
baseRow(batch) ++
<td sorttable_customkey={totalDelay.getOrElse(Long.MaxValue).toString}>
{formattedTotalDelay}
</td>
<td>{outputOpColumn}</td>
}
}

View file

@ -38,6 +38,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<th>Output Op Id</th>
<th>Description</th>
<th>Duration</th>
<th>Status</th>
<th>Job Id</th>
<th>Duration</th>
<th class="sorttable_nosort">Stages: Succeeded/Total</th>
@ -49,18 +50,42 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: SparkJobIdWithUIData): Seq[Node] = {
if (sparkJob.jobUIData.isDefined) {
generateNormalJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.jobUIData.get)
} else {
generateDroppedJobRow(outputOpId, outputOpDescription, formattedOutputOpDuration,
numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
outputOpStatus, numSparkJobRowsInOutputOp, isFirstRow, sparkJob.sparkJobId)
}
}
private def generateOutputOpRowWithoutSparkJobs(
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
outputOpStatus: String): Seq[Node] = {
<tr>
<td class="output-op-id-cell" >{outputOpId.toString}</td>
<td>{outputOpDescription}</td>
<td>{formattedOutputOpDuration}</td>
{outputOpStatusCell(outputOpStatus, rowspan = 1)}
<!-- Job Id -->
<td>-</td>
<!-- Duration -->
<td>-</td>
<!-- Stages: Succeeded/Total -->
<td>-</td>
<!-- Tasks (for all stages): Succeeded/Total -->
<td>-</td>
<!-- Error -->
<td>-</td>
</tr>
}
/**
* Generate a row for a Spark Job. Because duplicated output op infos needs to be collapsed into
* one cell, we use "rowspan" for the first row of a output op.
@ -69,6 +94,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
sparkJob: JobUIData): Seq[Node] = {
@ -94,7 +120,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<td rowspan={numSparkJobRowsInOutputOp.toString}>
{outputOpDescription}
</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
{outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
} else {
Nil
}
@ -125,7 +152,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
total = sparkJob.numTasks - sparkJob.numSkippedTasks)
}
</td>
{failureReasonCell(lastFailureReason)}
{failureReasonCell(lastFailureReason, rowspan = 1)}
</tr>
}
@ -137,6 +164,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
outputOpId: OutputOpId,
outputOpDescription: Seq[Node],
formattedOutputOpDuration: String,
outputOpStatus: String,
numSparkJobRowsInOutputOp: Int,
isFirstRow: Boolean,
jobId: Int): Seq[Node] = {
@ -147,7 +175,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
if (isFirstRow) {
<td class="output-op-id-cell" rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpId.toString}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{outputOpDescription}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td>
<td rowspan={numSparkJobRowsInOutputOp.toString}>{formattedOutputOpDuration}</td> ++
{outputOpStatusCell(outputOpStatus, numSparkJobRowsInOutputOp)}
} else {
Nil
}
@ -156,7 +185,7 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tr>
{prefixCells}
<td sorttable_customkey={jobId.toString}>
{jobId.toString}
{if (jobId >= 0) jobId.toString else "-"}
</td>
<!-- Duration -->
<td>-</td>
@ -170,7 +199,9 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
private def generateOutputOpIdRow(
outputOpId: OutputOpId, sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
outputOpId: OutputOpId,
outputOpStatus: String,
sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
// We don't count the durations of dropped jobs
val sparkJobDurations = sparkJobs.filter(_.jobUIData.nonEmpty).map(_.jobUIData.get).
map(sparkJob => {
@ -189,12 +220,32 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
val description = generateOutputOpDescription(sparkJobs)
generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, true, sparkJobs.head) ++
sparkJobs.tail.map { sparkJob =>
if (sparkJobs.isEmpty) {
generateOutputOpRowWithoutSparkJobs(
outputOpId, description, formattedOutputOpDuration, outputOpStatus)
} else {
val firstRow =
generateJobRow(
outputOpId, description, formattedOutputOpDuration, sparkJobs.size, false, sparkJob)
}.flatMap(x => x)
outputOpId,
description,
formattedOutputOpDuration,
outputOpStatus,
sparkJobs.size,
true,
sparkJobs.head)
val tailRows =
sparkJobs.tail.map { sparkJob =>
generateJobRow(
outputOpId,
description,
formattedOutputOpDuration,
outputOpStatus,
sparkJobs.size,
false,
sparkJob)
}
(firstRow ++ tailRows).flatten
}
}
private def generateOutputOpDescription(sparkJobs: Seq[SparkJobIdWithUIData]): Seq[Node] = {
@ -228,7 +279,10 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
}
}
private def failureReasonCell(failureReason: String): Seq[Node] = {
private def failureReasonCell(
failureReason: String,
rowspan: Int,
includeFirstLineInExpandDetails: Boolean = true): Seq[Node] = {
val isMultiline = failureReason.indexOf('\n') >= 0
// Display the first line by default
val failureReasonSummary = StringEscapeUtils.escapeHtml4(
@ -237,6 +291,13 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
} else {
failureReason
})
val failureDetails =
if (isMultiline && !includeFirstLineInExpandDetails) {
// Skip the first line
failureReason.substring(failureReason.indexOf('\n') + 1)
} else {
failureReason
}
val details = if (isMultiline) {
// scalastyle:off
<span onclick="this.parentNode.querySelector('.stacktrace-details').classList.toggle('collapsed')"
@ -244,13 +305,20 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
+details
</span> ++
<div class="stacktrace-details collapsed">
<pre>{failureReason}</pre>
<pre>{failureDetails}</pre>
</div>
// scalastyle:on
} else {
""
}
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
if (rowspan == 1) {
<td valign="middle" style="max-width: 300px">{failureReasonSummary}{details}</td>
} else {
<td valign="middle" style="max-width: 300px" rowspan={rowspan.toString}>
{failureReasonSummary}{details}
</td>
}
}
private def getJobData(sparkJobId: SparkJobId): Option[JobUIData] = {
@ -265,16 +333,31 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
* Generate the job table for the batch.
*/
private def generateJobTable(batchUIData: BatchUIData): Seq[Node] = {
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).toSeq.
sortBy(_._1). // sorted by OutputOpId
val outputOpIdToSparkJobIds = batchUIData.outputOpIdSparkJobIdPairs.groupBy(_.outputOpId).
map { case (outputOpId, outputOpIdAndSparkJobIds) =>
// sort SparkJobIds for each OutputOpId
(outputOpId, outputOpIdAndSparkJobIds.map(_.sparkJobId).sorted)
}
val outputOps = (0 until batchUIData.numOutputOp).map { outputOpId =>
val status = batchUIData.failureReason.get(outputOpId).map { failure =>
if (failure.startsWith("org.apache.spark.SparkException")) {
"Failed due to Spark job error\n" + failure
} else {
var nextLineIndex = failure.indexOf("\n")
if (nextLineIndex < 0) {
nextLineIndex = failure.size
}
val firstLine = failure.substring(0, nextLineIndex)
s"Failed due to error: $firstLine\n$failure"
}
}.getOrElse("Succeeded")
val sparkJobIds = outputOpIdToSparkJobIds.getOrElse(outputOpId, Seq.empty)
(outputOpId, status, sparkJobIds)
}
sparkListener.synchronized {
val outputOpIdWithJobs: Seq[(OutputOpId, Seq[SparkJobIdWithUIData])] =
outputOpIdToSparkJobIds.map { case (outputOpId, sparkJobIds) =>
(outputOpId,
val outputOpIdWithJobs: Seq[(OutputOpId, String, Seq[SparkJobIdWithUIData])] =
outputOps.map { case (outputOpId, status, sparkJobIds) =>
(outputOpId, status,
sparkJobIds.map(sparkJobId => SparkJobIdWithUIData(sparkJobId, getJobData(sparkJobId))))
}
@ -285,7 +368,8 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
<tbody>
{
outputOpIdWithJobs.map {
case (outputOpId, sparkJobIds) => generateOutputOpIdRow(outputOpId, sparkJobIds)
case (outputOpId, status, sparkJobIds) =>
generateOutputOpIdRow(outputOpId, status, sparkJobIds)
}
}
</tbody>
@ -386,4 +470,12 @@ private[ui] class BatchPage(parent: StreamingTab) extends WebUIPage("batch") {
Unparsed(StringEscapeUtils.escapeHtml4(metadataDescription).
replaceAllLiterally("\t", "&nbsp;&nbsp;&nbsp;&nbsp;").replaceAllLiterally("\n", "<br/>"))
}
private def outputOpStatusCell(status: String, rowspan: Int): Seq[Node] = {
if (status == "Succeeded") {
<td rowspan={rowspan.toString}>Succeeded</td>
} else {
failureReasonCell(status, rowspan, includeFirstLineInExpandDetails = false)
}
}
}

View file

@ -30,6 +30,8 @@ private[ui] case class BatchUIData(
val submissionTime: Long,
val processingStartTime: Option[Long],
val processingEndTime: Option[Long],
val numOutputOp: Int,
val failureReason: Map[Int, String],
var outputOpIdSparkJobIdPairs: Seq[OutputOpIdAndSparkJobId] = Seq.empty) {
/**
@ -69,7 +71,9 @@ private[ui] object BatchUIData {
batchInfo.streamIdToInputInfo,
batchInfo.submissionTime,
batchInfo.processingStartTime,
batchInfo.processingEndTime
batchInfo.processingEndTime,
batchInfo.numOutputOp,
batchInfo.failureReasons
)
}
}

View file

@ -121,7 +121,7 @@ class UISeleniumSuite
}
findAll(cssSelector("""#completed-batches-table th""")).map(_.text).toSeq should be {
List("Batch Time", "Input Size", "Scheduling Delay (?)", "Processing Time (?)",
"Total Delay (?)")
"Total Delay (?)", "Output Ops: Succeeded/Total")
}
val batchLinks =
@ -138,7 +138,7 @@ class UISeleniumSuite
summaryText should contain ("Total delay:")
findAll(cssSelector("""#batch-job-table th""")).map(_.text).toSeq should be {
List("Output Op Id", "Description", "Duration", "Job Id", "Duration",
List("Output Op Id", "Description", "Duration", "Status", "Job Id", "Duration",
"Stages: Succeeded/Total", "Tasks (for all stages): Succeeded/Total", "Error")
}