Spark WebUI Implementation.

This commit is contained in:
Denny 2012-07-31 08:39:24 -07:00
parent f471c82558
commit 7a295fee96
24 changed files with 399 additions and 36 deletions

View file

@ -1,6 +0,0 @@
<html>
<head><title>Hello world!</title></head>
<body>
<p>Hello world!</p>
</body>
</html>

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 14 KiB

View file

@ -1,12 +1,17 @@
package spark.deploy
import spark.deploy.ExecutorState.ExecutorState
import spark.deploy.master.{WorkerInfo, JobInfo}
import spark.deploy.worker.ExecutorRunner
import scala.collection.immutable.List
import scala.collection.mutable.HashMap
sealed trait DeployMessage extends Serializable
// Worker to Master
case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int)
case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
extends DeployMessage
case class ExecutorStateChanged(
@ -45,3 +50,18 @@ case class JobKilled(message: String)
// Internal message in Client
case object StopClient
// MasterWebUI To Master
case object RequestMasterState
// Master to MasterWebUI
case class MasterState(workers: List[WorkerInfo], jobs: HashMap[String, JobInfo])
// WorkerWebUI to Worker
case object RequestWorkerState
// Worker to WorkerWebUI
case class WorkerState(workerId: String, executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int)

View file

@ -51,13 +51,13 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
}
override def receive = {
case RegisterWorker(id, host, workerPort, cores, memory) => {
case RegisterWorker(id, host, workerPort, cores, memory, webUiPort) => {
logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
host, workerPort, cores, Utils.memoryMegabytesToString(memory)))
if (idToWorker.contains(id)) {
sender ! RegisterWorkerFailed("Duplicate worker ID")
} else {
addWorker(id, host, workerPort, cores, memory)
addWorker(id, host, workerPort, cores, memory, webUiPort)
context.watch(sender) // This doesn't work with remote actors but helps for testing
sender ! RegisteredWorker
schedule()
@ -112,6 +112,10 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
addressToWorker.get(address).foreach(removeWorker)
addressToJob.get(address).foreach(removeJob)
}
case RequestMasterState => {
sender ! MasterState(workers.toList, idToJob.clone)
}
}
/**
@ -143,8 +147,8 @@ class Master(ip: String, port: Int, webUiPort: Int) extends Actor with Logging {
exec.job.actor ! ExecutorAdded(exec.id, worker.id, worker.host, exec.cores, exec.memory)
}
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int): WorkerInfo = {
val worker = new WorkerInfo(id, host, port, cores, memory, sender)
def addWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int): WorkerInfo = {
val worker = new WorkerInfo(id, host, port, cores, memory, sender, webUiPort)
workers += worker
idToWorker(worker.id) = worker
actorToWorker(sender) = worker

View file

@ -1,7 +1,14 @@
package spark.deploy.master
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
import cc.spray.Directives
import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy._
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
@ -9,9 +16,29 @@ class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Direct
val handler = {
get {
path("") {
getFromResource(RESOURCE_DIR + "/index.html")
completeWith {
val masterState = getMasterState()
// Render the HTML
masterui.html.index.render(masterState.jobs.values.toList, masterState.workers)
}
} ~
path("job") {
parameter("jobId") { jobId =>
completeWith {
val masterState = getMasterState
masterui.html.job_details.render(masterState.jobs(jobId))
}
}
} ~
getFromResourceDirectory(RESOURCE_DIR)
}
}
// Requests the current state from the Master and waits for the response
def getMasterState() : MasterState = {
implicit val timeout = Timeout(1 seconds)
val future = master ? RequestMasterState
return Await.result(future, timeout.duration).asInstanceOf[MasterState]
}
}

View file

@ -9,7 +9,8 @@ class WorkerInfo(
val port: Int,
val cores: Int,
val memory: Int,
val actor: ActorRef) {
val actor: ActorRef,
val webUiPort: Int) {
var executors = new mutable.HashMap[String, ExecutorInfo] // fullId => info
@ -32,4 +33,8 @@ class WorkerInfo(
memoryUsed -= exec.memory
}
}
def webUiAddress : String = {
"http://" + this.host + ":" + this.webUiPort
}
}

View file

@ -14,16 +14,16 @@ import spark.deploy.ExecutorStateChanged
* Manages the execution of one executor process.
*/
class ExecutorRunner(
jobId: String,
execId: Int,
jobDesc: JobDescription,
cores: Int,
memory: Int,
worker: ActorRef,
workerId: String,
hostname: String,
sparkHome: File,
workDir: File)
val jobId: String,
val execId: Int,
val jobDesc: JobDescription,
val cores: Int,
val memory: Int,
val worker: ActorRef,
val workerId: String,
val hostname: String,
val sparkHome: File,
val workDir: File)
extends Logging {
val fullId = jobId + "/" + execId

View file

@ -27,7 +27,7 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
var sparkHome: File = null
var workDir: File = null
val executors = new HashMap[String, ExecutorRunner]
val finishedExecutors = new ArrayBuffer[String]
val finishedExecutors = new HashMap[String, ExecutorRunner]
var coresUsed = 0
var memoryUsed = 0
@ -67,7 +67,7 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
val akkaUrl = "akka://spark@%s:%s/user/Master".format(masterHost, masterPort)
try {
master = context.actorFor(akkaUrl)
master ! RegisterWorker(workerId, ip, port, cores, memory)
master ! RegisterWorker(workerId, ip, port, cores, memory, webUiPort)
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
context.watch(master) // Doesn't work with remote actors, but useful for testing
} catch {
@ -108,25 +108,34 @@ class Worker(ip: String, port: Int, webUiPort: Int, cores: Int, memory: Int, mas
jobId, execId, jobDesc, cores_, memory_, self, workerId, ip, sparkHome, workDir)
executors(jobId + "/" + execId) = manager
manager.start()
coresUsed += cores_
memoryUsed += memory_
master ! ExecutorStateChanged(jobId, execId, ExecutorState.LOADING, None)
case ExecutorStateChanged(jobId, execId, state, message) =>
master ! ExecutorStateChanged(jobId, execId, state, message)
val fullId = jobId + "/" + execId
if (ExecutorState.isFinished(state)) {
logInfo("Executor " + jobId + "/" + execId + " finished with state " + state)
executors -= jobId + "/" + execId
finishedExecutors += jobId + "/" + execId
val executor = executors(fullId)
logInfo("Executor " + fullId + " finished with state " + state)
finishedExecutors(fullId) = executor
executors -= fullId
coresUsed -= executor.cores
memoryUsed -= executor.memory
}
case KillExecutor(jobId, execId) =>
val fullId = jobId + "/" + execId
val executor = executors(fullId)
logInfo("Asked to kill executor " + fullId)
executors(jobId + "/" + execId).kill()
executors -= fullId
finishedExecutors += fullId
executor.kill()
case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
masterDisconnected()
case RequestWorkerState => {
sender ! WorkerState(workerId, executors.values.toList, finishedExecutors.values.toList, masterUrl, cores, memory, coresUsed, memoryUsed)
}
}
def masterDisconnected() {

View file

@ -1,7 +1,13 @@
package spark.deploy.worker
import akka.actor.{ActorRef, ActorSystem}
import akka.dispatch.Await
import akka.pattern.ask
import akka.util.Timeout
import akka.util.duration._
import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
@ -9,9 +15,24 @@ class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Direct
val handler = {
get {
path("") {
getFromResource(RESOURCE_DIR + "/index.html")
completeWith{
workerui.html.index(getWorkerState())
}
} ~
path("log") {
parameters("jobId", "executorId", "logType") { (jobId, executorId, logType) =>
getFromFileName("work/" + jobId + "/" + executorId + "/" + logType)
}
} ~
getFromResourceDirectory(RESOURCE_DIR)
}
}
// Requests the current state from the Master and waits for the response
def getWorkerState() : WorkerState = {
implicit val timeout = Timeout(1 seconds)
val future = worker ? RequestWorkerState
return Await.result(future, timeout.duration).asInstanceOf[WorkerState]
}
}

View file

@ -0,0 +1,31 @@
@(title: String)(content: Html)
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-type" content="text/html; charset=utf-8">
<link rel="stylesheet" href="http://netdna.bootstrapcdn.com/twitter-bootstrap/2.0.4/css/bootstrap.min.css" type="text/css">
<link rel="stylesheet" href="http://netdna.bootstrapcdn.com/twitter-bootstrap/2.0.4/css/bootstrap-responsive.min.css" type="text/css">
<title>Spark WebUI</title>
</head>
<body>
<div class="container">
<!-- HEADER -->
<div class="row">
<div class="span12">
<img src="spark_logo.png">
<h1 style="vertical-align: bottom; margin-bottom: 10px; margin-left: 30px; display: inline-block;"> @title </h1>
</div>
</div>
<hr/>
@content
</div>
</body>
</html>

View file

@ -0,0 +1,15 @@
@(executor: spark.deploy.master.ExecutorInfo)
<tr>
<td>@executor.id</td>
<td>
<a href="@executor.worker.webUiAddress">@executor.worker.id</href>
</td>
<td>@executor.cores</td>
<td>@executor.memory</td>
<td>@executor.state</td>
<td>
<a href="@(executor.worker.webUiAddress)/log?jobId=@(executor.job.id)&executorId=@(executor.id)&logType=stdout">stdout</a>
<a href="@(executor.worker.webUiAddress)/log?jobId=@(executor.job.id)&executorId=@(executor.id)&logType=stderr">stderr</a>
</td>
</tr>

View file

@ -0,0 +1,19 @@
@(executors: List[spark.deploy.master.ExecutorInfo])
<table class="table table-bordered table-striped table-condensed">
<thead>
<tr>
<th>ExecutorID</th>
<th>Worker</th>
<th>Cores</th>
<th>Memory</th>
<th>State</th>
<th>Logs</th>
</tr>
</thead>
<tbody>
@for(e <- executors) {
@executor_row(e)
}
</tbody>
</table>

View file

@ -0,0 +1,37 @@
@(jobs: List[spark.deploy.master.JobInfo], workers: List[spark.deploy.master.WorkerInfo])
@import spark.deploy.master._
@common.html.layout(title = "Master WebUI") {
<!-- Cluster Summary (Workers) -->
<div class="row">
<div class="span12">
<h3> Cluster Summary </h3>
<br/>
@worker_table(workers)
</div>
</div>
<hr/>
<!-- Job Summary (Running) -->
<div class="row">
<div class="span12">
<h3> Running Jobs </h3>
<br/>
@job_table(jobs.filter(j => j.state == JobState.WAITING || j.state == JobState.RUNNING))
</div>
</div>
<hr/>
<!-- Job Summary (Completed) -->
<div class="row">
<div class="span12">
<h3> Completed Jobs </h3>
<br/>
@job_table(jobs.filter(j => j.state == JobState.FINISHED || j.state == JobState.FAILED))
</div>
</div>
}

View file

@ -0,0 +1,34 @@
@(job: spark.deploy.master.JobInfo)
@common.html.layout(title = "Job Details") {
<!-- Job Details -->
<div class="row">
<div class="span12">
<ul class="unstyled">
<li><strong>ID:</strong> @job.id</li>
<li><strong>Description:</strong> @job.desc.name</li>
<li><strong>User:</strong> @job.desc.user</li>
<li><strong>Cores:</strong> @job.desc.cores</li>
<li><strong>Memory per Slave:</strong> @job.desc.memoryPerSlave</li>
<li><strong>Submit Date:</strong> @job.submitDate</li>
<li><strong>State:</strong> @job.state</li>
<li><strong>Cores Granted:</strong> @job.coresGranted</li>
<li><strong>Cores Left:</strong> @job.coresLeft</li>
<li><strong>Command:</strong> @job.desc.command</li>
</ul>
</div>
</div>
<hr/>
<!-- Executors -->
<div class="row">
<div class="span12">
<h3> Executor Summary </h3>
<br/>
@executors_table(job.executors.values.toList)
</div>
</div>
}

View file

@ -0,0 +1,15 @@
@(job: spark.deploy.master.JobInfo)
<tr>
<td>
<a href="job?jobId=@(job.id)">@job.id</a>
</td>
<td>@job.desc</td>
<td>
@job.coresGranted Granted, @job.coresLeft Left
</td>
<td>@job.desc.memoryPerSlave</td>
<td>@job.submitDate</td>
<td>@job.desc.user</td>
<td>@job.state.toString()</td>
</tr>

View file

@ -0,0 +1,20 @@
@(jobs: List[spark.deploy.master.JobInfo])
<table class="table table-bordered table-striped table-condensed">
<thead>
<tr>
<th>JobID</th>
<th>Description</th>
<th>Cores</th>
<th>Memory per Slave</th>
<th>Submit Date</th>
<th>User</th>
<th>State</th>
</tr>
</thead>
<tbody>
@for(j <- jobs) {
@job_row(j)
}
</tbody>
</table>

View file

@ -0,0 +1,11 @@
@(worker: spark.deploy.master.WorkerInfo)
<tr>
<td>
<a href="http://@worker.host:@worker.webUiPort">@worker.id</href>
</td>
<td>@worker.host</td>
<td>@worker.port</td>
<td>@worker.cores (@worker.coresUsed Used)</td>
<td>@worker.memory (@worker.memoryUsed Used)</td>
</tr>

View file

@ -0,0 +1,18 @@
@(workers: List[spark.deploy.master.WorkerInfo])
<table class="table table-bordered table-striped table-condensed">
<thead>
<tr>
<th>ID</th>
<th>Host</th>
<th>Port</th>
<th>Cores</th>
<th>Memory</th>
</tr>
</thead>
<tbody>
@for(w <- workers) {
@worker_row(w)
}
</tbody>
</table>

View file

@ -0,0 +1,21 @@
@(executor: spark.deploy.worker.ExecutorRunner)
<tr>
<td>@executor.execId</td>
<td>@executor.cores</td>
<td>@executor.memory</td>
<td>
<ul class="unstyled">
<li><strong>ID:</strong> @executor.jobId</li>
<li><strong>Name:</strong> @executor.jobDesc.name</li>
<li><strong>User:</strong> @executor.jobDesc.user</li>
<li><strong>Cores:</strong> @executor.jobDesc.cores </li>
<li><strong>Memory per Slave:</strong> @executor.jobDesc.memoryPerSlave</li>
<li><strong>Command:</strong> @executor.jobDesc.command</li>
</ul>
</td>
<td>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stdout">stdout</a>
<a href="log?jobId=@(executor.jobId)&executorId=@(executor.execId)&logType=stderr">stderr</a>
</td>
</tr>

View file

@ -0,0 +1,18 @@
@(executors: List[spark.deploy.worker.ExecutorRunner])
<table class="table table-bordered table-striped table-condensed">
<thead>
<tr>
<th>ExecutorID</th>
<th>Cores</th>
<th>Memory</th>
<th>Job Details</th>
<th>Logs</th>
</tr>
</thead>
<tbody>
@for(e <- executors) {
@executor_row(e)
}
</tbody>
</table>

View file

@ -0,0 +1,39 @@
@(worker: spark.deploy.WorkerState)
@common.html.layout(title = "Worker WebUI") {
<!-- Worker Details -->
<div class="row">
<div class="span12">
<ul class="unstyled">
<li><strong>ID:</strong> @worker.workerId</li>
<li><strong>Master URL:</strong> @worker.masterUrl </li>
<li><strong>Cores:</strong> @worker.cores (@worker.coresUsed Used)</li>
<li><strong>Memory:</strong> @worker.memory (@worker.memoryUsed Used)</li>
</ul>
</div>
</div>
<hr/>
<!-- Running Executors -->
<div class="row">
<div class="span12">
<h3> Running Executors </h3>
<br/>
@executors_table(worker.executors)
</div>
</div>
<hr/>
<!-- Finished Executors -->
<div class="row">
<div class="span12">
<h3> Finished Executors </h3>
<br/>
@executors_table(worker.finishedExecutors)
</div>
</div>
}

View file

@ -2,6 +2,7 @@ import sbt._
import Keys._
import sbtassembly.Plugin._
import AssemblyKeys._
import twirl.sbt.TwirlPlugin._
object SparkBuild extends Build {
// Hadoop version to build against. For example, "0.20.2", "0.20.205.0", or
@ -69,7 +70,7 @@ object SparkBuild extends Build {
"cc.spray" % "spray-can" % "1.0-M2.1",
"cc.spray" % "spray-server" % "1.0-M2.1"
)
) ++ assemblySettings ++ extraAssemblySettings
) ++ assemblySettings ++ extraAssemblySettings ++ Seq(Twirl.settings: _*)
def replSettings = sharedSettings ++ Seq(
name := "spark-repl",

View file

@ -1,9 +1,13 @@
resolvers += Classpaths.typesafeResolver
resolvers += Resolver.url("artifactory", url("http://scalasbt.artifactoryonline.com/scalasbt/sbt-plugin-releases"))(Resolver.ivyStylePatterns)
resolvers += "Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/"
resolvers += "Spray Repository" at "http://repo.spray.cc/"
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.8.3")
addSbtPlugin("com.typesafe.sbteclipse" % "sbteclipse-plugin" % "2.1.0-RC1")
addSbtPlugin("com.github.mpeltonen" % "sbt-idea" % "1.0.0")
addSbtPlugin("cc.spray" %% "sbt-twirl" % "0.5.2")