From b5e6e8bcc8ca05cd97f35a502a89c686aa4a5a12 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Mon, 24 Jun 2013 14:13:24 -0700 Subject: [PATCH] Cleaning up some code for Job Progress --- .../main/scala/spark/ui/jobs/IndexPage.scala | 4 +- .../scala/spark/ui/jobs/JobProgressUI.scala | 9 +++-- .../main/scala/spark/ui/jobs/StagePage.scala | 37 ++++++++++++------- 3 files changed, 31 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/spark/ui/jobs/IndexPage.scala b/core/src/main/scala/spark/ui/jobs/IndexPage.scala index 134c93091d..c7ee9dc1c1 100644 --- a/core/src/main/scala/spark/ui/jobs/IndexPage.scala +++ b/core/src/main/scala/spark/ui/jobs/IndexPage.scala @@ -38,7 +38,7 @@ class IndexPage(parent: JobProgressUI) { def getElapsedTime(submitted: Option[Long], completed: Long): String = { submitted match { - case Some(t) => Duration(completed - t, "milliseconds").printHMS + case Some(t) => parent.formatDuration(completed - t) case _ => "Unknown" } } @@ -53,7 +53,7 @@ class IndexPage(parent: JobProgressUI) { case (true, true) => "Read/Write" case (true, false) => "Read" case (false, true) => "Write" - case _ => "None" + case _ => "" } diff --git a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala index 99f9f2d9f6..027eadde3a 100644 --- a/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala +++ b/core/src/main/scala/spark/ui/jobs/JobProgressUI.scala @@ -1,5 +1,7 @@ package spark.ui.jobs +import akka.util.Duration + import java.text.SimpleDateFormat import javax.servlet.http.HttpServletRequest @@ -20,16 +22,17 @@ import spark.Success private[spark] class JobProgressUI(val sc: SparkContext) { private var _listener: Option[JobProgressListener] = None def listener = _listener.get - val dateFmt = new SimpleDateFormat("EEE, MMM d yyyy HH:mm:ss") + private val indexPage = new IndexPage(this) + private val stagePage = new StagePage(this) + def start() { _listener = Some(new JobProgressListener) sc.addSparkListener(listener) } - private val indexPage = new IndexPage(this) - private val stagePage = new StagePage(this) + def formatDuration(ms: Long) = Duration(ms, "milliseconds").printHMS def getHandlers = Seq[(String, Handler)]( ("/stages/stage", (request: HttpServletRequest) => stagePage.render(request)), diff --git a/core/src/main/scala/spark/ui/jobs/StagePage.scala b/core/src/main/scala/spark/ui/jobs/StagePage.scala index 8a488498d9..65dbd389b1 100644 --- a/core/src/main/scala/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/spark/ui/jobs/StagePage.scala @@ -8,6 +8,7 @@ import scala.xml.Node import spark.ui.UIUtils._ import spark.util.Distribution +import spark.Utils import spark.scheduler.cluster.TaskInfo import spark.executor.TaskMetrics @@ -24,24 +25,30 @@ class StagePage(parent: JobProgressUI) { val shuffleWrite = listener.hasShuffleWrite(stageId) val taskHeaders: Seq[String] = - Seq("Task ID", "Service Time (ms)", "Locality Level", "Worker", "Launch Time") ++ - {if (shuffleRead) Seq("Shuffle Read (bytes)") else Nil} ++ - {if (shuffleWrite) Seq("Shuffle Write (bytes)") else Nil} + Seq("Task ID", "Service Time", "Locality Level", "Worker", "Launch Time") ++ + {if (shuffleRead) Seq("Shuffle Read") else Nil} ++ + {if (shuffleWrite) Seq("Shuffle Write") else Nil} val taskTable = listingTable(taskHeaders, taskRow, tasks) - // TODO(pwendell): Consider factoring this more nicely with the functions in SparkListener val serviceTimes = tasks.map{case (info, metrics) => metrics.executorRunTime.toDouble} - val serviceQuantiles = "Service Time" +: Distribution(serviceTimes).get.getQuantiles().map(_.toString) + val serviceQuantiles = "Service Time" +: Distribution(serviceTimes).get.getQuantiles().map( + ms => parent.formatDuration(ms.toLong)) - val shuffleReadSizes = tasks.map{ - case(info, metrics) => metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble} - val shuffleReadQuantiles = "Shuffle Read" +: Distribution(shuffleReadSizes).get.getQuantiles().map(_.toString) + def getQuantileCols(data: Seq[Double]) = + Distribution(data).get.getQuantiles().map(d => Utils.memoryBytesToString(d.toLong)) - val shuffleWriteSizes = tasks.map{ - case(info, metrics) => metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble} - val shuffleWriteQuantiles = "Shuffle Write" +: Distribution(shuffleWriteSizes).get.getQuantiles().map(_.toString) + val shuffleReadSizes = tasks.map { + case(info, metrics) => + metrics.shuffleReadMetrics.map(_.remoteBytesRead).getOrElse(0L).toDouble + } + val shuffleReadQuantiles = "Shuffle Read (Remote)" +: getQuantileCols(shuffleReadSizes) + val shuffleWriteSizes = tasks.map { + case(info, metrics) => + metrics.shuffleWriteMetrics.map(_.shuffleBytesWritten).getOrElse(0L).toDouble + } + val shuffleWriteQuantiles = "Shuffle Write" +: getQuantileCols(shuffleWriteSizes) val listings: Seq[Seq[String]] = Seq(serviceQuantiles, if (shuffleRead) shuffleReadQuantiles else Nil, @@ -62,12 +69,14 @@ class StagePage(parent: JobProgressUI) { val (info, metrics) = taskData {info.taskId} - {metrics.executorRunTime} + {parent.formatDuration(metrics.executorRunTime)} {info.taskLocality} {info.hostPort} {dateFmt.format(new Date(info.launchTime))} - {metrics.shuffleReadMetrics.map{m => {m.remoteBytesRead}}.getOrElse("") } - {metrics.shuffleWriteMetrics.map{m => {m.shuffleBytesWritten}}.getOrElse("") } + {metrics.shuffleReadMetrics.map{m => + {Utils.memoryBytesToString(m.remoteBytesRead)}}.getOrElse("") } + {metrics.shuffleWriteMetrics.map{m => + {Utils.memoryBytesToString(m.shuffleBytesWritten)}}.getOrElse("") } } }