Cleaning up some code for Job Progress

This commit is contained in:
Patrick Wendell 2013-06-24 14:13:24 -07:00
parent 93e8ed85aa
commit b5e6e8bcc8
3 changed files with 31 additions and 19 deletions

View file

@ -38,7 +38,7 @@ class IndexPage(parent: JobProgressUI) {
def getElapsedTime(submitted: Option[Long], completed: Long): String = {
submitted match {
case Some(t) => Duration(completed - t, "milliseconds").printHMS
case Some(t) => parent.formatDuration(completed - t)
case _ => "Unknown"
}
}
@ -53,7 +53,7 @@ class IndexPage(parent: JobProgressUI) {
case (true, true) => "Read/Write"
case (true, false) => "Read"
case (false, true) => "Write"
case _ => "None"
case _ => ""
}
<tr>

View file

@ -1,5 +1,7 @@
package spark.ui.jobs
import akka.util.Duration
import java.text.SimpleDateFormat
import javax.servlet.http.HttpServletRequest
@ -20,16 +22,17 @@ import spark.Success
private[spark] class JobProgressUI(val sc: SparkContext) {
private var _listener: Option[JobProgressListener] = None
def listener = _listener.get
val dateFmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss")
private val indexPage = new IndexPage(this)
private val stagePage = new StagePage(this)
def start() {
_listener = Some(new JobProgressListener)
sc.addSparkListener(listener)
}
private val indexPage = new IndexPage(this)
private val stagePage = new StagePage(this)
def formatDuration(ms: Long) = Duration(ms, "milliseconds").printHMS
def getHandlers = Seq[(String, Handler)](
("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)),

View file

@ -8,6 +8,7 @@ import scala.xml.Node
import spark.ui.UIUtils._
import spark.util.Distribution
import spark.Utils
import spark.scheduler.cluster.TaskInfo
import spark.executor.TaskMetrics
@ -24,24 +25,30 @@ class StagePage(parent: JobProgressUI) {
val shuffleWrite = listener.hasShuffleWrite(stageId)
val taskHeaders: Seq[String] =
Seq("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read (bytes)") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write (bytes)") else Nil}
Seq("Task ID", "Service Time", "Locality Level", "Worker", "Launch Time") ++
{if (shuffleRead) Seq("Shuffle Read") else Nil} ++
{if (shuffleWrite) Seq("Shuffle Write") else Nil}
val taskTable = listingTable(taskHeaders, taskRow, tasks)
// TODO(pwendell): Consider factoring this more nicely with the functions in SparkListener
val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble}
val serviceQuantiles = "Service Time" +: Distribution(serviceTimes).get.getQuantiles().map(_.toString)
val serviceQuantiles = "Service Time" +: Distribution(serviceTimes).get.getQuantiles().map(
ms => parent.formatDuration(ms.toLong))
val shuffleReadSizes = tasks.map{
case(info, metrics) => metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble}
val shuffleReadQuantiles = "Shuffle Read" +: Distribution(shuffleReadSizes).get.getQuantiles().map(_.toString)
def getQuantileCols(data: Seq[Double]) =
Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong))
val shuffleWriteSizes = tasks.map{
case(info, metrics) => metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble}
val shuffleWriteQuantiles = "Shuffle Write" +: Distribution(shuffleWriteSizes).get.getQuantiles().map(_.toString)
val shuffleReadSizes = tasks.map {
case(info, metrics) =>
metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble
}
val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes)
val shuffleWriteSizes = tasks.map {
case(info, metrics) =>
metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble
}
val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes)
val listings: Seq[Seq[String]] = Seq(serviceQuantiles,
if (shuffleRead) shuffleReadQuantiles else Nil,
@ -62,12 +69,14 @@ class StagePage(parent: JobProgressUI) {
val (info, metrics) = taskData
<tr>
<td>{info.taskId}</td>
<td>{metrics.executorRunTime}</td>
<td>{parent.formatDuration(metrics.executorRunTime)}</td>
<td>{info.taskLocality}</td>
<td>{info.hostPort}</td>
<td>{dateFmt.format(new Date(info.launchTime))}</td>
{metrics.shuffleReadMetrics.map{m => <td>{m.remoteBytesRead}</td>}.getOrElse("") }
{metrics.shuffleWriteMetrics.map{m => <td>{m.shuffleBytesWritten}</td>}.getOrElse("") }
{metrics.shuffleReadMetrics.map{m =>
<td>{Utils.memoryBytesToString(m.remoteBytesRead)}</td>}.getOrElse("") }
{metrics.shuffleWriteMetrics.map{m =>
<td>{Utils.memoryBytesToString(m.shuffleBytesWritten)}</td>}.getOrElse("") }
</tr>
}
}