Allowing details for failed stages

This commit is contained in:
Patrick Wendell 2013-06-29 11:26:30 -07:00
parent 473961d82e
commit e721ff7e5a
5 changed files with 69 additions and 46 deletions

View file

@ -618,8 +618,11 @@ class DAGScheduler(
handleExecutorLost(bmAddress.executorId, Some(task.generation)) handleExecutorLost(bmAddress.executorId, Some(task.generation))
} }
case ExceptionFailure(className, description, stackTrace) =>
// Do nothing here, left up to the TaskScheduler to decide how to handle user failures
case other => case other =>
// Non-fetch failure -- probably a bug in user code; abort all jobs depending on this stage // Unrecognized failure - abort all jobs depending on this stage
abortStage(idToStage(task.stageId), task + " failed: " + other) abortStage(idToStage(task.stageId), task + " failed: " + other)
} }
} }
@ -667,6 +670,7 @@ class DAGScheduler(
*/ */
private def abortStage(failedStage: Stage, reason: String) { private def abortStage(failedStage: Stage, reason: String) {
val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq val dependentStages = resultStageToJob.keys.filter(x => stageDependsOn(x, failedStage)).toSeq
failedStage.completionTime = Some(System.currentTimeMillis())
for (resultStage <- dependentStages) { for (resultStage <- dependentStages) {
val job = resultStageToJob(resultStage) val job = resultStageToJob(resultStage)
val error = new SparkException("Job failed: " + reason) val error = new SparkException("Job failed: " + reason)

View file

@ -571,6 +571,7 @@ private[spark] class ClusterTaskSetManager(
return return
case ef: ExceptionFailure => case ef: ExceptionFailure =>
sched.listener.taskEnded(tasks(index), ef, null, null, info, null)
val key = ef.description val key = ef.description
val now = System.currentTimeMillis val now = System.currentTimeMillis
val (printFull, dupCount) = { val (printFull, dupCount) = {

View file

@ -152,6 +152,7 @@ private[spark] class LocalTaskSetManager(sched: LocalScheduler, val taskSet: Tas
info.markFailed() info.markFailed()
decreaseRunningTasks(1) decreaseRunningTasks(1)
val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader) val reason: ExceptionFailure = ser.deserialize[ExceptionFailure](serializedData, getClass.getClassLoader)
sched.listener.taskEnded(task, reason, null, null, info, null)
if (!finished(index)) { if (!finished(index)) {
copiesRunning(index) -= 1 copiesRunning(index) -= 1
numFailures(index) += 1 numFailures(index) += 1

View file

@ -12,12 +12,11 @@ import scala.Seq
import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer} import scala.collection.mutable.{HashSet, ListBuffer, HashMap, ArrayBuffer}
import spark.ui.JettyUtils._ import spark.ui.JettyUtils._
import spark.SparkContext import spark.{ExceptionFailure, SparkContext, Success, Utils}
import spark.scheduler._ import spark.scheduler._
import spark.scheduler.cluster.TaskInfo import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics import spark.executor.TaskMetrics
import spark.Success import collection.mutable
import spark.Utils
/** Web UI showing progress status of all jobs in the given SparkContext. */ /** Web UI showing progress status of all jobs in the given SparkContext. */
private[spark] class JobProgressUI(val sc: SparkContext) { private[spark] class JobProgressUI(val sc: SparkContext) {
@ -51,7 +50,8 @@ private[spark] class JobProgressListener extends SparkListener {
val stageToTasksComplete = HashMap[Int, Int]() val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]() val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos = HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics)]]() val stageToTaskInfos =
HashMap[Int, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]]()
override def onJobStart(jobStart: SparkListenerJobStart) {} override def onJobStart(jobStart: SparkListenerJobStart) {}
@ -67,8 +67,6 @@ private[spark] class JobProgressListener extends SparkListener {
if (stages.size > RETAINED_STAGES) { if (stages.size > RETAINED_STAGES) {
val toRemove = RETAINED_STAGES / 10 val toRemove = RETAINED_STAGES / 10
stages.takeRight(toRemove).foreach( s => { stages.takeRight(toRemove).foreach( s => {
stageToTasksComplete.remove(s.id)
stageToTasksFailed.remove(s.id)
stageToTaskInfos.remove(s.id) stageToTaskInfos.remove(s.id)
}) })
stages.trimEnd(toRemove) stages.trimEnd(toRemove)
@ -80,14 +78,18 @@ private[spark] class JobProgressListener extends SparkListener {
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) { override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId val sid = taskEnd.task.stageId
taskEnd.reason match { val failureInfo: Option[ExceptionFailure] =
case Success => taskEnd.reason match {
stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1 case e: ExceptionFailure =>
case _ => stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1
stageToTasksFailed(sid) = stageToTasksFailed.getOrElse(sid, 0) + 1 Some(e)
} case _ =>
val taskList = stageToTaskInfos.getOrElse(sid, ArrayBuffer[(TaskInfo, TaskMetrics)]()) stageToTasksComplete(sid) = stageToTasksComplete.getOrElse(sid, 0) + 1
taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics)) None
}
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, TaskMetrics, Option[ExceptionFailure])]())
taskList += ((taskEnd.taskInfo, taskEnd.taskMetrics, failureInfo))
stageToTaskInfos(sid) = taskList stageToTaskInfos(sid) = taskList
} }

View file

@ -9,7 +9,7 @@ import scala.xml.Node
import spark.ui.UIUtils._ import spark.ui.UIUtils._
import spark.ui.Page._ import spark.ui.Page._
import spark.util.Distribution import spark.util.Distribution
import spark.Utils import spark.{ExceptionFailure, Utils}
import spark.scheduler.cluster.TaskInfo import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics import spark.executor.TaskMetrics
@ -38,56 +38,71 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskHeaders: Seq[String] = val taskHeaders: Seq[String] =
Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++ Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++ {if (shuffleRead) Seq("Shuffle Read") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write") else Nil} {if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
Seq("Details")
val taskTable = listingTable(taskHeaders, taskRow, tasks) val taskTable = listingTable(taskHeaders, taskRow, tasks)
val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble} // Excludes tasks which failed and have incomplete metrics
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map( val validTasks = tasks.filter(t => Option(t._2).isDefined)
ms => parent.formatDuration(ms.toLong))
def getQuantileCols(data: Seq[Double]) = val summaryTable: Option[Seq[Node]] =
Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong)) if (validTasks.size == 0) {
None
}
else {
val serviceTimes = validTasks.map{case (info, metrics, exception) =>
metrics.executorRunTime.toDouble}
val serviceQuantiles = "Duration" +: Distribution(serviceTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
val shuffleReadSizes = tasks.map { def getQuantileCols(data: Seq[Double]) =
case(info, metrics) => Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
val shuffleWriteSizes = tasks.map { val shuffleReadSizes = validTasks.map {
case(info, metrics) => case(info, metrics, exception) =>
metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
} }
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
val listings: Seq[Seq[String]] = Seq(serviceQuantiles, val shuffleWriteSizes = validTasks.map {
if (shuffleRead) shuffleReadQuantiles else Nil, case(info, metrics, exception) =>
if (shuffleWrite) shuffleWriteQuantiles else Nil) metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max") val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
val quantileTable = listingTable(quantileHeaders, quantileRow, listings) if (shuffleRead) shuffleReadQuantiles else Nil,
if (shuffleWrite) shuffleWriteQuantiles else Nil)
val quantileHeaders = Seq("Metric", "Min", "25%", "50%", "75%", "Max")
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
Some(listingTable(quantileHeaders, quantileRow, listings))
}
val content = val content =
<h2>Summary Metrics</h2> ++ quantileTable ++ <h2>Tasks</h2> ++ taskTable; <h2>Summary Metrics</h2> ++ summaryTable.getOrElse(Nil) ++ <h2>Tasks</h2> ++ taskTable;
headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs) headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
} }
def quantileRow(data: Seq[String]): Seq[Node] = <tr> {data.map(d => <td>{d}</td>)} </tr>
def taskRow(taskData: (TaskInfo, TaskMetrics)): Seq[Node] = { def taskRow(taskData: (TaskInfo, TaskMetrics, Option[ExceptionFailure])): Seq[Node] = {
val (info, metrics) = taskData def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData
<tr> <tr>
<td>{info.taskId}</td> <td>{info.taskId}</td>
<td>{parent.formatDuration(metrics.executorRunTime)}</td> <td>{Option(metrics).map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}</td>
<td>{info.taskLocality}</td> <td>{info.taskLocality}</td>
<td>{info.hostPort}</td> <td>{info.hostPort}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td> <td>{dateFmt.format(new Date(info.launchTime))}</td>
{metrics.shuffleReadMetrics.map{m => {Option(metrics).flatMap{m => m.shuffleReadMetrics}.map{s =>
<td>{Utils.memoryBytesToString(m.remoteBytesRead)}</td>}.getOrElse("") } <td>{Utils.memoryBytesToString(s.remoteBytesRead)}</td>}.getOrElse("")}
{metrics.shuffleWriteMetrics.map{m => {Option(metrics).flatMap{m => m.shuffleWriteMetrics}.map{s =>
<td>{Utils.memoryBytesToString(m.shuffleBytesWritten)}</td>}.getOrElse("") } <td>{Utils.memoryBytesToString(s.shuffleBytesWritten)}</td>}.getOrElse("")}
<td>{exception.map(e =>
<span>{e.className}<br/>{fmtStackTrace(e.stackTrace)}</span>).getOrElse("")}</td>
</tr> </tr>
} }
} }