Merge pull request #719 from karenfeng/ui-808

Creates Executors tab for Jobs UI
This commit is contained in:
Matei Zaharia 2013-07-22 16:57:16 -07:00
commit 401aac8b18
5 changed files with 148 additions and 19 deletions

View file

@ -17,4 +17,4 @@
package spark.ui
private[spark] object Page extends Enumeration { val Storage, Jobs, Environment = Value }
private[spark] object Page extends Enumeration { val Storage, Jobs, Environment, Executors = Value }

View file

@ -23,6 +23,7 @@ import org.eclipse.jetty.server.{Handler, Server}
import spark.{Logging, SparkContext, Utils}
import spark.ui.env.EnvironmentUI
import spark.ui.exec.ExecutorsUI
import spark.ui.storage.BlockManagerUI
import spark.ui.jobs.JobProgressUI
import spark.ui.JettyUtils._
@ -41,7 +42,9 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
val storage = new BlockManagerUI(sc)
val jobs = new JobProgressUI(sc)
val env = new EnvironmentUI(sc)
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++ handlers
val exec = new ExecutorsUI(sc)
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
exec.getHandlers ++ handlers
/** Bind the HTTP server which backs this web interface */
def bind() {
@ -64,6 +67,7 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
// This server must register all handlers, including JobProgressUI, before binding
// JobProgressUI registers a listener with SparkContext, which requires sc to initialize
jobs.start()
exec.start()
}
def stop() {

View file

@ -40,6 +40,10 @@ private[spark] object UIUtils {
case Environment => <li class="active"><a href="/environment">Environment</a></li>
case _ => <li><a href="/environment">Environment</a></li>
}
val executors = page match {
case Executors => <li class="active"><a href="/executors">Executors</a></li>
case _ => <li><a href="/executors">Executors</a></li>
}
<html>
<head>
@ -66,6 +70,7 @@ private[spark] object UIUtils {
{storage}
{jobs}
{environment}
{executors}
</ul>
<ul id="infolist">
<li>Application: <strong>{sc.appName}</strong></li>

View file

@ -0,0 +1,136 @@
package spark.ui.exec
import javax.servlet.http.HttpServletRequest
import org.eclipse.jetty.server.Handler
import scala.collection.mutable.{ArrayBuffer, HashMap}
import scala.util.Properties
import spark.{ExceptionFailure, Logging, SparkContext, Success, Utils}
import spark.executor.TaskMetrics
import spark.scheduler.cluster.TaskInfo
import spark.scheduler._
import spark.SparkContext
import spark.storage.{StorageStatus, StorageUtils}
import spark.ui.JettyUtils._
import spark.ui.Page.Executors
import spark.ui.UIUtils.headerSparkPage
import spark.ui.UIUtils
import spark.Utils
import scala.xml.{Node, XML}
private[spark] class ExecutorsUI(val sc: SparkContext) {
private var _listener: Option[ExecutorsListener] = None
def listener = _listener.get
def start() {
_listener = Some(new ExecutorsListener)
sc.addSparkListener(listener)
}
def getHandlers = Seq[(String, Handler)](
("/executors", (request: HttpServletRequest) => render(request))
)
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val memUsed = storageStatusList.map(_.memUsed()).reduce(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L)
val execHead = Seq("Executor ID", "Address", "RDD blocks", "Memory used", "Disk used",
"Failed tasks", "Complete tasks", "Total tasks")
def execRow(kv: Seq[String]) =
<tr>
<td>{kv(0)}</td>
<td>{kv(1)}</td>
<td>{kv(2)}</td>
<td sorttable_customkey={kv(3)}>
{Utils.memoryBytesToString(kv(3).toLong)} / {Utils.memoryBytesToString(kv(4).toLong)}
</td>
<td sorttable_customkey={kv(5)}>
{Utils.memoryBytesToString(kv(5).toLong)}
</td>
<td>{kv(6)}</td>
<td>{kv(7)}</td>
<td>{kv(8)}</td>
</tr>
val execInfo =
for (b <- 0 until storageStatusList.size)
yield getExecInfo(b)
val execTable = UIUtils.listingTable(execHead, execRow, execInfo)
val content =
<div class="row">
<div class="span12">
<ul class="unstyled">
<li><strong>Memory:</strong>
{Utils.memoryBytesToString(memUsed)} Used
({Utils.memoryBytesToString(maxMem)} Total) </li>
<li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
</ul>
</div>
</div>
<div class = "row">
<div class="span12">
{execTable}
</div>
</div>;
headerSparkPage(content, sc, "Executors", Executors)
}
def getExecInfo(a: Int): Seq[String] = {
val execId = sc.getExecutorStorageStatus(a).blockManagerId.executorId
val hostPort = sc.getExecutorStorageStatus(a).blockManagerId.hostPort
val rddBlocks = sc.getExecutorStorageStatus(a).blocks.size.toString
val memUsed = sc.getExecutorStorageStatus(a).memUsed().toString
val maxMem = sc.getExecutorStorageStatus(a).maxMem.toString
val diskUsed = sc.getExecutorStorageStatus(a).diskUsed().toString
val failedTasks = listener.executorToTasksFailed.getOrElse(a.toString, 0).toString
val completedTasks = listener.executorToTasksComplete.getOrElse(a.toString, 0).toString
val totalTasks = listener.executorToTaskInfos(a.toString).size.toString
Seq(
execId,
hostPort,
rddBlocks,
memUsed,
maxMem,
diskUsed,
failedTasks,
completedTasks,
totalTasks
)
}
private[spark] class ExecutorsListener extends SparkListener with Logging {
val executorToTasksComplete = HashMap[String, Int]()
val executorToTasksFailed = HashMap[String, Int]()
val executorToTaskInfos =
HashMap[String, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]]()
override def onTaskEnd(taskEnd: SparkListenerTaskEnd) {
val eid = taskEnd.taskInfo.executorId
val (failureInfo, metrics): (Option[ExceptionFailure], Option[TaskMetrics]) =
taskEnd.reason match {
case e: ExceptionFailure =>
executorToTasksFailed(eid) = executorToTasksFailed.getOrElse(eid, 0) + 1
(Some(e), e.metrics)
case _ =>
executorToTasksComplete(eid) = executorToTasksComplete.getOrElse(eid, 0) + 1
(None, Some(taskEnd.taskMetrics))
}
val taskList = executorToTaskInfos.getOrElse(
eid, ArrayBuffer[(TaskInfo, Option[TaskMetrics], Option[ExceptionFailure])]())
taskList += ((taskEnd.taskInfo, metrics, failureInfo))
executorToTaskInfos(eid) = taskList
}
}
}

View file

@ -33,10 +33,6 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
def render(request: HttpServletRequest): Seq[Node] = {
val storageStatusList = sc.getExecutorStorageStatus
// Calculate macro-level statistics
val maxMem = storageStatusList.map(_.maxMem).reduce(_+_)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_+_)
val diskSpaceUsed = storageStatusList.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_+_).getOrElse(0L)
val rddHeaders = Seq(
"RDD Name",
@ -46,19 +42,7 @@ private[spark] class IndexPage(parent: BlockManagerUI) {
"Size in Memory",
"Size on Disk")
val rdds = StorageUtils.rddInfoFromStorageStatus(storageStatusList, sc)
val rddTable = listingTable(rddHeaders, rddRow, rdds)
val content =
<div class="row">
<div class="span12">
<ul class="unstyled">
<li><strong>Memory:</strong>
{Utils.memoryBytesToString(maxMem - remainingMem)} Used
({Utils.memoryBytesToString(remainingMem)} Available) </li>
<li><strong>Disk:</strong> {Utils.memoryBytesToString(diskSpaceUsed)} Used </li>
</ul>
</div>
</div> ++ {rddTable};
val content = listingTable(rddHeaders, rddRow, rdds)
headerSparkPage(content, parent.sc, "Spark Storage ", Storage)
}