added changes according to comments from rxin

This commit is contained in:
wangda.tan 2013-12-22 21:43:15 +08:00
parent 59e53fa21c
commit c979eecdf6
7 changed files with 23 additions and 43 deletions

View file

@ -56,7 +56,7 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize)).fold(0L)(_+_)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Duration", "Shuffle Read",
"Active tasks", "Failed tasks", "Complete tasks", "Total tasks", "Task Time", "Shuffle Read",
"Shuffle Write")
def execRow(kv: Seq[String]) = {
@ -169,21 +169,13 @@ private[spark] class ExecutorsUI(val sc: SparkContext) {
// update shuffle read/write
if (null != taskEnd.taskMetrics) {
val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
shuffleRead match {
case Some(s) =>
val newShuffleRead = executorToShuffleRead.getOrElse(eid, 0L) + s.remoteBytesRead
executorToShuffleRead.put(eid, newShuffleRead)
case _ => {}
}
val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
shuffleWrite match {
case Some(s) => {
val newShuffleWrite = executorToShuffleWrite.getOrElse(eid, 0L) + s.shuffleBytesWritten
executorToShuffleWrite.put(eid, newShuffleWrite)
}
case _ => {}
}
taskEnd.taskMetrics.shuffleReadMetrics.foreach(shuffleRead =>
executorToShuffleRead.put(eid, executorToShuffleRead.getOrElse(eid, 0L) +
shuffleRead.remoteBytesRead))
taskEnd.taskMetrics.shuffleWriteMetrics.foreach(shuffleWrite =>
executorToShuffleWrite.put(eid, executorToShuffleWrite.getOrElse(eid, 0L) +
shuffleWrite.shuffleBytesWritten))
}
}
}

View file

@ -17,8 +17,9 @@
package org.apache.spark.ui.jobs
private[spark] class ExecutorSummary() {
var duration : Long = 0
/** class for reporting aggregated metrics for each executors in stageUI */
private[spark] class ExecutorSummary {
var taskTime : Long = 0
var failedTasks : Int = 0
var succeededTasks : Int = 0
var shuffleRead : Long = 0

View file

@ -40,7 +40,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
<table class="table table-bordered table-striped table-condensed sortable">
<thead>
<th>Executor ID</th>
<th>Duration</th>
<th>Task Time</th>
<th>Total Tasks</th>
<th>Failed Tasks</th>
<th>Succeeded Tasks</th>
@ -61,7 +61,7 @@ private[spark] class ExecutorTable(val parent: JobProgressUI, val stageId: Int)
case (k,v) => {
<tr>
<td>{k}</td>
<td>{parent.formatDuration(v.duration)}</td>
<td>{parent.formatDuration(v.taskTime)}</td>
<td>{v.failedTasks + v.succeededTasks}</td>
<td>{v.failedTasks}</td>
<td>{v.succeededTasks}</td>

View file

@ -56,10 +56,6 @@ private[spark] class IndexPage(parent: JobProgressUI) {
{parent.formatDuration(now - listener.sc.startTime)}
</li>
<li><strong>Scheduling Mode:</strong> {parent.sc.getSchedulingMode}</li>
<li>
<a href="#executors"><strong>Executor Summary:</strong></a>
{listener.stageIdToExecutorSummaries.size}
</li>
<li>
<a href="#active"><strong>Active Stages:</strong></a>
{activeStages.size}

View file

@ -144,23 +144,14 @@ private[spark] class JobProgressListener(val sc: SparkContext) extends SparkList
}
// update duration
y.duration += taskEnd.taskInfo.duration
y.taskTime += taskEnd.taskInfo.duration
// update shuffle read/write
if (null != taskEnd.taskMetrics) {
val shuffleRead = taskEnd.taskMetrics.shuffleReadMetrics
shuffleRead match {
case Some(s) =>
y.shuffleRead += s.remoteBytesRead
case _ => {}
}
val shuffleWrite = taskEnd.taskMetrics.shuffleWriteMetrics
shuffleWrite match {
case Some(s) => {
y.shuffleWrite += s.shuffleBytesWritten
}
case _ => {}
}
taskEnd.taskMetrics.shuffleReadMetrics.foreach { shuffleRead =>
y.shuffleRead += shuffleRead.remoteBytesRead
}
taskEnd.taskMetrics.shuffleWriteMetrics.foreach { shuffleWrite =>
y.shuffleWrite += shuffleWrite.shuffleBytesWritten
}
}
case _ => {}

View file

@ -66,7 +66,7 @@ private[spark] class StagePage(parent: JobProgressUI) {
<div>
<ul class="unstyled">
<li>
<strong>Total duration across all tasks: </strong>
<strong>Total task time across all tasks: </strong>
{parent.formatDuration(listener.stageIdToTime.getOrElse(stageId, 0L) + activeTime)}
</li>
{if (hasShuffleRead)
@ -163,9 +163,9 @@ private[spark] class StagePage(parent: JobProgressUI) {
val executorTable = new ExecutorTable(parent, stageId)
val content =
summary ++
<h4>Summary Metrics for Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Summary Metrics for {numCompleted} Completed Tasks</h4> ++
<div>{summaryTable.getOrElse("No tasks have reported metrics yet.")}</div> ++
<h4>Aggregated Metrics by Executors</h4> ++ executorTable.toNodeSeq() ++
<h4>Tasks</h4> ++ taskTable
headerSparkPage(content, parent.sc, "Details for Stage %d".format(stageId), Stages)

View file

@ -48,7 +48,7 @@ private[spark] class StageTable(val stages: Seq[StageInfo], val parent: JobProgr
{if (isFairScheduler) {<th>Pool Name</th>} else {}}
<th>Description</th>
<th>Submitted</th>
<th>Duration</th>
<th>Task Time</th>
<th>Tasks: Succeeded/Total</th>
<th>Shuffle Read</th>
<th>Shuffle Write</th>