Merge pull request #729 from karenfeng/ui-811

Stage Page updates
This commit is contained in:
Matei Zaharia 2013-07-24 14:32:24 -07:00
commit 20338c2983
4 changed files with 45 additions and 6 deletions

View file

@ -51,6 +51,17 @@ class TaskInfo(
def running: Boolean = !finished
def status: String = {
if (running)
"RUNNING"
else if (failed)
"FAILED"
else if (successful)
"SUCCESS"
else
"UNKNOWN"
}
def duration: Long = {
if (!finished) {
throw new UnsupportedOperationException("duration() called on unfinished tasks")

View file

@ -124,6 +124,10 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
if (!executorToTasksActive.contains(eid))
executorToTasksActive(eid) = HashSet[Long]()
executorToTasksActive(eid) += taskStart.taskInfo.taskId
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
executorToTaskInfos(eid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
@ -142,6 +146,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
}
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
executorToTaskInfos(eid) = taskList
}

View file

@ -65,6 +65,7 @@ private[spark] class JobProgressListener extends SparkListener {
val completedStages = ListBuffer[Stage]()
val failedStages = ListBuffer[Stage]()
val stageToTasksActive = HashMap[Int, HashSet[Long]]()
val stageToTasksComplete = HashMap[Int, Int]()
val stageToTasksFailed = HashMap[Int, Int]()
val stageToTaskInfos =
@ -93,8 +94,22 @@ private[spark] class JobProgressListener extends SparkListener {
override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted) =
activeStages += stageSubmitted.stage
override def onTaskStart(taskStart: SparkListenerTaskStart) {
val sid = taskStart.task.stageId
if (!stageToTasksActive.contains(sid))
stageToTasksActive(sid) = HashSet[Long]()
stageToTasksActive(sid) += taskStart.taskInfo.taskId
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskStart.taskInfo, None, None))
stageToTaskInfos(sid) = taskList
}
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val sid = taskEnd.task.stageId
if (!stageToTasksActive.contains(sid))
stageToTasksActive(sid) = HashSet[Long]()
stageToTasksActive(sid) -= taskEnd.taskInfo.taskId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
@ -106,6 +121,7 @@ private[spark] class JobProgressListener extends SparkListener {
}
val taskList = stageToTaskInfos.getOrElse(
sid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList -= ((taskEnd.taskInfo, None, None))
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
stageToTaskInfos(sid) = taskList
}

View file

@ -41,8 +41,8 @@ private[spark] class StagePage(parent: JobProgressUI) {
if (!listener.stageToTaskInfos.contains(stageId)) {
val content =
<div>
<h2>Summary Metrics</h2> No tasks have finished yet
<h2>Tasks</h2> No tasks have finished yet
<h2>Summary Metrics</h2> No tasks have started yet
<h2>Tasks</h2> No tasks have started yet
</div>
return headerSparkPage(content, parent.sc, "Stage Details: %s".format(stageId), Jobs)
}
@ -53,7 +53,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val shuffleWrite = listener.hasShuffleWrite(stageId)
val taskHeaders: Seq[String] =
Seq("Task ID", "Duration", "Locality Level", "Worker", "Launch Time") ++
Seq("Task ID", "Status", "Duration", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write") else Nil} ++
Seq("Details")
@ -61,7 +61,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
val taskTable = listingTable(taskHeaders, taskRow, tasks)
// Excludes tasks which failed and have incomplete metrics
val validTasks = tasks.filter(t => Option(t._2).isDefined)
val validTasks = tasks.filter(t => t._1.status == "SUCCESS" && (Option(t._2).isDefined))
val summaryTable: Option[Seq[Node]] =
if (validTasks.size == 0) {
@ -108,10 +108,17 @@ private[spark] class StagePage(parent: JobProgressUI) {
def fmtStackTrace(trace: Seq[StackTraceElement]): Seq[Node] =
trace.map(e => <span style="display:block;">{e.toString}</span>)
val (info, metrics, exception) = taskData
val duration = if (info.status == "RUNNING") info.timeRunning(System.currentTimeMillis())
else metrics.map(m => m.executorRunTime).getOrElse(1)
val formatDuration = if (info.status == "RUNNING") parent.formatDuration(duration)
else metrics.map(m => parent.formatDuration(m.executorRunTime)).getOrElse("")
<tr>
<td>{info.taskId}</td>
<td sorttable_customkey={metrics.map{m => m.executorRunTime.toString}.getOrElse("1")}>
{metrics.map{m => parent.formatDuration(m.executorRunTime)}.getOrElse("")}
<td>{info.status}</td>
<td sorttable_customkey={duration.toString}>
{formatDuration}
</td>
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>