[SPARK-10692] [STREAMING] Expose failureReasons in BatchInfo for streaming UI to clear failed batches

Slightly modified version of #8818, all credit goes to zsxwing

Author: zsxwing <zsxwing@gmail.com>
Author: Tathagata Das <tathagata.das1565@gmail.com>

Closes #8892 from tdas/SPARK-10692.
This commit is contained in:
zsxwing 2015-09-23 19:52:02 -07:00 committed by Reynold Xin
parent 83f6f54d12
commit 758c9d25e9
4 changed files with 115 additions and 16 deletions

View file

@ -39,6 +39,8 @@ case class BatchInfo(
processingEndTime: Option[Long]
) {
private var _failureReasons: Map[Int, String] = Map.empty
@deprecated("Use streamIdToInputInfo instead", "1.5.0")
def streamIdToNumRecords: Map[Int, Long] = streamIdToInputInfo.mapValues(_.numRecords)
@ -67,4 +69,12 @@ case class BatchInfo(
* The number of recorders received by the receivers in this batch.
*/
def numRecords: Long = streamIdToInputInfo.values.map(_.numRecords).sum
/** Set the failure reasons corresponding to every output ops in the batch */
private[streaming] def setFailureReason(reasons: Map[Int, String]): Unit = {
_failureReasons = reasons
}
/** Failure reasons corresponding to every output ops in the batch */
private[streaming] def failureReasons = _failureReasons
}

View file

@ -166,22 +166,22 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
}
private def handleJobCompletion(job: Job) {
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
job.result match {
case Success(_) =>
val jobSet = jobSets.get(job.time)
jobSet.handleJobCompletion(job)
logInfo("Finished job " + job.id + " from job set of time " + jobSet.time)
if (jobSet.hasCompleted) {
jobSets.remove(jobSet.time)
jobGenerator.onBatchCompletion(jobSet.time)
logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format(
jobSet.totalDelay / 1000.0, jobSet.time.toString,
jobSet.processingDelay / 1000.0
))
listenerBus.post(StreamingListenerBatchCompleted(jobSet.toBatchInfo))
}
case Failure(e) =>
reportError("Error running job " + job, e)
case _ =>
}
}

View file

@ -18,8 +18,10 @@
package org.apache.spark.streaming.scheduler
import scala.collection.mutable.HashSet
import scala.util.Failure
import org.apache.spark.streaming.Time
import org.apache.spark.util.Utils
/** Class representing a set of Jobs
* belong to the same batch.
@ -62,12 +64,23 @@ case class JobSet(
}
def toBatchInfo: BatchInfo = {
new BatchInfo(
val failureReasons: Map[Int, String] = {
if (hasCompleted) {
jobs.filter(_.result.isFailure).map { job =>
(job.outputOpId, Utils.exceptionString(job.result.asInstanceOf[Failure[_]].exception))
}.toMap
} else {
Map.empty
}
}
val binfo = new BatchInfo(
time,
streamIdToInputInfo,
submissionTime,
if (processingStartTime >= 0 ) Some(processingStartTime) else None,
if (processingEndTime >= 0 ) Some(processingEndTime) else None
if (processingStartTime >= 0) Some(processingStartTime) else None,
if (processingEndTime >= 0) Some(processingEndTime) else None
)
binfo.setFailureReason(failureReasons)
binfo
}
}

View file

@ -140,6 +140,69 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
}
}
test("onBatchCompleted with successful batch") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD(_.count)
val failureReasons = startStreamingContextAndCollectFailureReasons(ssc)
assert(failureReasons != null && failureReasons.isEmpty,
"A successful batch should not set errorMessage")
}
test("onBatchCompleted with failed batch and one failed job") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD { _ =>
throw new RuntimeException("This is a failed job")
}
// Check if failureReasons contains the correct error message
val failureReasons = startStreamingContextAndCollectFailureReasons(ssc, isFailed = true)
assert(failureReasons != null)
assert(failureReasons.size === 1)
assert(failureReasons.contains(0))
assert(failureReasons(0).contains("This is a failed job"))
}
test("onBatchCompleted with failed batch and multiple failed jobs") {
ssc = new StreamingContext("local[2]", "test", Milliseconds(1000))
val inputStream = ssc.receiverStream(new StreamingListenerSuiteReceiver)
inputStream.foreachRDD { _ =>
throw new RuntimeException("This is a failed job")
}
inputStream.foreachRDD { _ =>
throw new RuntimeException("This is another failed job")
}
// Check if failureReasons contains the correct error messages
val failureReasons =
startStreamingContextAndCollectFailureReasons(ssc, isFailed = true)
assert(failureReasons != null)
assert(failureReasons.size === 2)
assert(failureReasons.contains(0))
assert(failureReasons.contains(1))
assert(failureReasons(0).contains("This is a failed job"))
assert(failureReasons(1).contains("This is another failed job"))
}
private def startStreamingContextAndCollectFailureReasons(
_ssc: StreamingContext, isFailed: Boolean = false): Map[Int, String] = {
val failureReasonsCollector = new FailureReasonsCollector()
_ssc.addStreamingListener(failureReasonsCollector)
val batchCounter = new BatchCounter(_ssc)
_ssc.start()
// Make sure running at least one batch
batchCounter.waitUntilBatchesCompleted(expectedNumCompletedBatches = 1, timeout = 10000)
if (isFailed) {
intercept[RuntimeException] {
_ssc.awaitTerminationOrTimeout(10000)
}
}
_ssc.stop()
failureReasonsCollector.failureReasons
}
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
for (i <- 1 until seq.size) {
@ -205,3 +268,16 @@ class StreamingListenerSuiteReceiver extends Receiver[Any](StorageLevel.MEMORY_O
}
def onStop() { }
}
/**
* A StreamingListener that saves the latest `failureReasons` in `BatchInfo` to the `failureReasons`
* field.
*/
class FailureReasonsCollector extends StreamingListener {
@volatile var failureReasons: Map[Int, String] = null
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted): Unit = {
failureReasons = batchCompleted.batchInfo.failureReasons
}
}