diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 82d419453b..d9ddc41aa2 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -618,8 +618,11 @@ class DAGScheduler( handleExecutorLost(bmAddress.executorId, Some(task.generation)) } + case ExceptionFailure(className, description, stackTrace) => + // Do nothing here, left up to the TaskScheduler to decide how to handle user failures + case other => - // Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage + // Unrecognized failure - abort all jobs depending on this stage abortStage(idToStage(task.stageId), task + " failed: " + other) } } @@ -667,6 +670,7 @@ class DAGScheduler( */ private def abortStage(failedStage: Stage, reason: String) { val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq + failedStage.completionTime = Some(System.currentTimeMillis()) for (resultStage <- dependentStages) { val job = resultStageToJob(resultStage) val error = new SparkException("Job failed: " + reason) diff --git a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala index d72b0bfc9f..6965cde5da 100644 --- a/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/cluster/ClusterTaskSetManager.scala @@ -571,6 +571,7 @@ private[spark] class ClusterTaskSetManager( return case ef: ExceptionFailure => + sched.listener.taskEnded(tasks(index), ef, null, null, info, null) val key = ef.description val now = System.currentTimeMillis val (printFull, dupCount) = { diff --git a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala index 70b69bb26f..499116f653 100644 --- a/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala +++ b/core/src/main/scala/spark/scheduler/local/LocalTaskSetManager.scala @@ -152,6 +152,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas info.markFailed() decreaseRunningTasks(1) val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader) + sched.listener.taskEnded(task, reason, null, null, info, null) if (!finished(index)) { copiesRunning(index) -= 1 numFailures(index) += 1 diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index f584d1e187..aafa414055 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -12,12 +12,11 @@ import scala.Seq import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} import spark.ui.JettyUtils._ -import spark.SparkContext +import spark.{ExceptionFailure, SparkContext, Success, Utils} import spark.scheduler._ import spark.scheduler.cluster.TaskInfo import spark.executor.TaskMetrics -import spark.Success -import spark.Utils +import collection.mutable /** Web UI showing progress status of all jobs in the given SparkContext. */ private[spark] class JobProgressUI(val sc: SparkContext) { @@ -51,7 +50,8 @@ private[spark] class JobProgressListener extends SparkListener { val stageToTasksComplete = HashMap[Int, Int]() val stageToTasksFailed = HashMap[Int, Int]() - val stageToTaskInfos = HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics)]]() + val stageToTaskInfos = + HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]]() override def onJobStart(jobStart: SparkListenerJobStart) {} @@ -67,8 +67,6 @@ private[spark] class JobProgressListener extends SparkListener { if (stages.size > RETAINED_STAGES) { val toRemove = RETAINED_STAGES / 10 stages.takeRight(toRemove).foreach( s => { - stageToTasksComplete.remove(s.id) - stageToTasksFailed.remove(s.id) stageToTaskInfos.remove(s.id) }) stages.trimEnd(toRemove) @@ -80,14 +78,18 @@ private[spark] class JobProgressListener extends SparkListener { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { val sid = taskEnd.task.stageId - taskEnd.reason match { - case Success => - stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 - case _ => - stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 - } - val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]()) - taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics)) + val failureInfo: Option[ExceptionFailure] = + taskEnd.reason match { + case e: ExceptionFailure => + stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 + Some(e) + case _ => + stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 + None + } + val taskList = stageToTaskInfos.getOrElse( + sid, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]()) + taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics, failureInfo)) stageToTaskInfos(sid) = taskList } diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index c9294a7261..ed96fc2994 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -9,7 +9,7 @@ import scala.xml.Node import spark.ui.UIUtils._ import spark.ui.Page._ import spark.util.Distribution -import spark.Utils +import spark.{ExceptionFailure, Utils} import spark.scheduler.cluster.TaskInfo import spark.executor.TaskMetrics @@ -38,56 +38,71 @@ private[spark] class StagePage(parent: JobProgressUI) { val taskHeaders: Seq[String] = Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++ {if (shuffleRead) Seq("Shuffle Read") else Nil} ++ - {if (shuffleWrite) Seq("Shuffle Write") else Nil} + {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++ + Seq("Details") val taskTable = listingTable(taskHeaders, taskRow, tasks) - val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble} - val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( - ms => parent.formatDuration(ms.toLong)) + // Excludes tasks which failed and have incomplete metrics + val validTasks = tasks.filter(t => Option(t._2).isDefined) - def getQuantileCols(data: Seq[Double]) = - Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong)) + val summaryTable: Option[Seq[Node]] = + if (validTasks.size == 0) { + None + } + else { + val serviceTimes = validTasks.map{case (info, metrics, exception) => + metrics.executorRunTime.toDouble} + val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( + ms => parent.formatDuration(ms.toLong)) - val shuffleReadSizes = tasks.map { - case(info, metrics) => - metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble - } - val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) + def getQuantileCols(data: Seq[Double]) = + Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong)) - val shuffleWriteSizes = tasks.map { - case(info, metrics) => - metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble - } - val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) + val shuffleReadSizes = validTasks.map { + case(info, metrics, exception) => + metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + } + val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) - val listings: Seq[Seq[String]] = Seq(serviceQuantiles, - if (shuffleRead) shuffleReadQuantiles else Nil, - if (shuffleWrite) shuffleWriteQuantiles else Nil) + val shuffleWriteSizes = validTasks.map { + case(info, metrics, exception) => + metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble + } + val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) - val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") - val quantileTable = listingTable(quantileHeaders, quantileRow, listings) + val listings: Seq[Seq[String]] = Seq(serviceQuantiles, + if (shuffleRead) shuffleReadQuantiles else Nil, + if (shuffleWrite) shuffleWriteQuantiles else Nil) + + val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") + def quantileRow(data: Seq[String]): Seq[Node] = {data.map(d => {d})} + Some(listingTable(quantileHeaders, quantileRow, listings)) + } val content = -

Summary Metrics

++ quantileTable ++

Tasks

++ taskTable; +

Summary Metrics

++ summaryTable.getOrElse(Nil) ++

Tasks

++ taskTable; headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) } - def quantileRow(data: Seq[String]): Seq[Node] = {data.map(d => {d})} - def taskRow(taskData: (TaskInfo, TaskMetrics)): Seq[Node] = { - val (info, metrics) = taskData + def taskRow(taskData: (TaskInfo, TaskMetrics, Option[ExceptionFailure])): Seq[Node] = { + def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] = + trace.map(e => {e.toString}) + val (info, metrics, exception) = taskData {info.taskId} - {parent.formatDuration(metrics.executorRunTime)} + {Option(metrics).map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")} {info.taskLocality} {info.hostPort} {dateFmt.format(new Date(info.launchTime))} - {metrics.shuffleReadMetrics.map{m => - {Utils.memoryBytesToString(m.remoteBytesRead)}}.getOrElse("") } - {metrics.shuffleWriteMetrics.map{m => - {Utils.memoryBytesToString(m.shuffleBytesWritten)}}.getOrElse("") } + {Option(metrics).flatMap{m => m.shuffleReadMetrics}.map{s => + {Utils.memoryBytesToString(s.remoteBytesRead)}}.getOrElse("")} + {Option(metrics).flatMap{m => m.shuffleWriteMetrics}.map{s => + {Utils.memoryBytesToString(s.shuffleBytesWritten)}}.getOrElse("")} + {exception.map(e => + {e.className}
{fmtStackTrace(e.stackTrace)}
).getOrElse("")} } }