Refactoring UI packages
This commit is contained in:
parent
8b5c7e71c4
commit
77c53f7868
Before Width: | Height: | Size: 14 KiB After Width: | Height: | Size: 14 KiB |
|
@ -46,8 +46,9 @@ import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener,
|
|||
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
|
||||
import spark.scheduler.local.LocalScheduler
|
||||
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
|
||||
import spark.storage.{BlockManagerUI, StorageStatus, StorageUtils, RDDInfo}
|
||||
import spark.storage.{StorageStatus, StorageUtils, RDDInfo}
|
||||
import spark.util.{MetadataCleaner, TimeStampedHashMap}
|
||||
import ui.{SparkUI, BlockManagerUI}
|
||||
|
||||
/**
|
||||
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
|
||||
|
@ -93,9 +94,8 @@ class SparkContext(
|
|||
isLocal)
|
||||
SparkEnv.set(env)
|
||||
|
||||
// Start the BlockManager UI
|
||||
private[spark] val ui = new BlockManagerUI(
|
||||
env.actorSystem, env.blockManager.master.driverActor, this)
|
||||
// Start the Spark UI
|
||||
private[spark] val ui = new SparkUI(this)
|
||||
ui.start()
|
||||
|
||||
// Used to store a URL for each static file/jar together with the file's local timestamp
|
||||
|
|
|
@ -10,10 +10,11 @@ import net.liftweb.json.JsonAST.JValue
|
|||
import org.eclipse.jetty.server.Handler
|
||||
import scala.xml.Node
|
||||
import spark.{Logging, Utils}
|
||||
import spark.util.WebUI
|
||||
import spark.util.WebUI._
|
||||
import spark.ui.WebUI
|
||||
import WebUI._
|
||||
import spark.deploy._
|
||||
import spark.deploy.MasterState
|
||||
import spark.ui.WebUI
|
||||
|
||||
/**
|
||||
* Web UI server for the standalone master.
|
||||
|
@ -64,6 +65,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
|
|||
state.completedApps.find(_.id == appId).getOrElse(null)
|
||||
})
|
||||
val content =
|
||||
<hr />
|
||||
<div class="row">
|
||||
<div class="span12">
|
||||
<ul class="unstyled">
|
||||
|
@ -97,7 +99,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
|
|||
{executorTable(app.executors.values.toList)}
|
||||
</div>
|
||||
</div>;
|
||||
WebUI.makePage(content, "Application Info: " + app.desc.name)
|
||||
WebUI.sparkPage(content, "Application Info: " + app.desc.name)
|
||||
}
|
||||
|
||||
def executorTable(executors: Seq[ExecutorInfo]): Seq[Node] = {
|
||||
|
@ -142,6 +144,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
|
|||
val state = Await.result(stateFuture, 3 seconds)
|
||||
|
||||
val content =
|
||||
<hr />
|
||||
<div class="row">
|
||||
<div class="span12">
|
||||
<ul class="unstyled">
|
||||
|
@ -186,7 +189,7 @@ class MasterWebUI(master: ActorRef) extends Logging {
|
|||
{appTable(state.completedApps.sortBy(_.endTime).reverse)}
|
||||
</div>
|
||||
</div>;
|
||||
WebUI.makePage(content, "Spark Master: " + state.uri)
|
||||
WebUI.sparkPage(content, "Spark Master: " + state.uri)
|
||||
}
|
||||
|
||||
def workerTable(workers: Seq[spark.deploy.master.WorkerInfo]) = {
|
||||
|
@ -258,6 +261,6 @@ class MasterWebUI(master: ActorRef) extends Logging {
|
|||
}
|
||||
|
||||
object MasterWebUI {
|
||||
val STATIC_RESOURCE_DIR = "spark/deploy/static"
|
||||
val STATIC_RESOURCE_DIR = "spark/webui/static"
|
||||
val DEFAULT_PORT = "8080"
|
||||
}
|
|
@ -12,8 +12,8 @@ import org.eclipse.jetty.server.Handler
|
|||
import scala.io.Source
|
||||
import spark.{Utils, Logging}
|
||||
import spark.deploy.{JsonProtocol, WorkerState, RequestWorkerState}
|
||||
import spark.util.{WebUI => UtilsWebUI}
|
||||
import spark.util.WebUI._
|
||||
import spark.ui.{WebUI => UtilsWebUI}
|
||||
import spark.ui.WebUI._
|
||||
import xml.Node
|
||||
|
||||
/**
|
||||
|
@ -55,6 +55,7 @@ class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging {
|
|||
val stateFuture = (worker ? RequestWorkerState)(timeout).mapTo[WorkerState]
|
||||
val workerState = Await.result(stateFuture, 3 seconds)
|
||||
val content =
|
||||
<hr />
|
||||
<div class="row"> <!-- Worker Details -->
|
||||
<div class="span12">
|
||||
<ul class="unstyled">
|
||||
|
@ -88,7 +89,7 @@ class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging {
|
|||
</div>
|
||||
</div>;
|
||||
|
||||
UtilsWebUI.makePage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
|
||||
UtilsWebUI.sparkPage(content, "Spark Worker on %s:%s".format(workerState.host, workerState.port))
|
||||
}
|
||||
|
||||
def executorTable(executors: Seq[ExecutorRunner]): Seq[Node] = {
|
||||
|
@ -139,42 +140,9 @@ class WorkerWebUI(worker: ActorRef, workDir: File) extends Logging {
|
|||
source.close()
|
||||
lines
|
||||
}
|
||||
|
||||
/*
|
||||
val handler = {
|
||||
get {
|
||||
(path("") & parameters('format ?)) {
|
||||
case Some(js) if js.equalsIgnoreCase("json") => {
|
||||
val future = worker ? RequestWorkerState
|
||||
respondWithMediaType(MediaTypes.`application/json`) { ctx =>
|
||||
ctx.complete(future.mapTo[WorkerState])
|
||||
}
|
||||
}
|
||||
case _ =>
|
||||
completeWith{
|
||||
val future = worker ? RequestWorkerState
|
||||
future.map { workerState =>
|
||||
spark.deploy.worker.html.index(workerState.asInstanceOf[WorkerState])
|
||||
}
|
||||
}
|
||||
} ~
|
||||
path("log") {
|
||||
parameters("appId", "executorId", "logType") { (appId, executorId, logType) =>
|
||||
respondWithMediaType(cc.spray.http.MediaTypes.`text/plain`) {
|
||||
getFromFileName(workDir.getPath() + "/" + appId + "/" + executorId + "/" + logType)
|
||||
}
|
||||
}
|
||||
} ~
|
||||
pathPrefix("static") {
|
||||
getFromResourceDirectory(STATIC_RESOURCE_DIR)
|
||||
} ~
|
||||
getFromResourceDirectory(RESOURCE_DIR)
|
||||
}
|
||||
}
|
||||
*/
|
||||
}
|
||||
|
||||
object WorkerWebUI {
|
||||
val STATIC_RESOURCE_DIR = "spark/deploy/static"
|
||||
val STATIC_RESOURCE_DIR = "spark/webui/static"
|
||||
val DEFAULT_PORT="8081"
|
||||
}
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
package spark.storage
|
||||
package spark.ui
|
||||
|
||||
import akka.actor.{ActorRef, ActorSystem}
|
||||
import akka.util.Duration
|
||||
|
@ -6,38 +6,23 @@ import javax.servlet.http.HttpServletRequest
|
|||
import org.eclipse.jetty.server.Handler
|
||||
import spark.{Logging, SparkContext}
|
||||
import spark.Utils
|
||||
import spark.util.WebUI._
|
||||
import spark.util.WebUI
|
||||
import WebUI._
|
||||
import xml.Node
|
||||
import spark.storage.StorageUtils
|
||||
|
||||
/**
|
||||
* Web UI server for the BlockManager inside each SparkContext.
|
||||
*/
|
||||
private[spark]
|
||||
class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef, sc: SparkContext)
|
||||
extends Logging {
|
||||
class BlockManagerUI(sc: SparkContext)
|
||||
extends UIComponent with Logging {
|
||||
implicit val timeout = Duration.create(
|
||||
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
|
||||
|
||||
implicit val timeout = Duration.create(System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
|
||||
val host = Utils.localHostName()
|
||||
val port = Option(System.getProperty("spark.ui.port"))
|
||||
.getOrElse(BlockManagerUI.DEFAULT_PORT).toInt
|
||||
|
||||
/** Start a HTTP server to run the Web interface */
|
||||
def start() {
|
||||
try {
|
||||
val (server, boundPort) = WebUI.startJettyServer("0.0.0.0", port, handlers)
|
||||
logInfo("Started BlockManager web UI at http://%s:%d".format(host, boundPort))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logError("Failed to create BlockManager WebUI", e)
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
val handlers = Array[(String, Handler)](
|
||||
("/static", createStaticHandler(BlockManagerUI.STATIC_RESOURCE_DIR)),
|
||||
("/rdd", (request: HttpServletRequest) => rddPage(request)),
|
||||
("*", (request: HttpServletRequest) => indexPage)
|
||||
def getHandlers = Seq[(String, Handler)](
|
||||
("/storage/rdd", (request: HttpServletRequest) => rddPage(request)),
|
||||
("/storage", (request: HttpServletRequest) => indexPage)
|
||||
)
|
||||
|
||||
def rddPage(request: HttpServletRequest): Seq[Node] = {
|
||||
|
@ -137,7 +122,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
|
|||
</div>
|
||||
</div>;
|
||||
|
||||
WebUI.makePage(content, "RDD Info: " + id)
|
||||
WebUI.headerSparkPage(content, "RDD Info: " + id)
|
||||
}
|
||||
|
||||
def indexPage: Seq[Node] = {
|
||||
|
@ -176,7 +161,7 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
|
|||
{for (rdd <- rdds) yield
|
||||
<tr>
|
||||
<td>
|
||||
<a href={"/rdd?id=%s".format(rdd.id)}>
|
||||
<a href={"/storage/rdd?id=%s".format(rdd.id)}>
|
||||
{rdd.name}
|
||||
</a>
|
||||
</td>
|
||||
|
@ -191,12 +176,6 @@ class BlockManagerUI(val actorSystem: ActorSystem, blockManagerMaster: ActorRef,
|
|||
</tbody>
|
||||
</table>;
|
||||
|
||||
WebUI.makePage(content, "Spark Storage")
|
||||
WebUI.headerSparkPage(content, "Spark Storage ")
|
||||
}
|
||||
private[spark] def appUIAddress = "http://" + host + ":" + port
|
||||
}
|
||||
|
||||
object BlockManagerUI {
|
||||
val STATIC_RESOURCE_DIR = "spark/deploy/static"
|
||||
val DEFAULT_PORT = "33000"
|
||||
}
|
41
core/src/main/scala/spark/ui/SparkUI.scala
Normal file
41
core/src/main/scala/spark/ui/SparkUI.scala
Normal file
|
@ -0,0 +1,41 @@
|
|||
package spark.ui
|
||||
|
||||
import spark.{Logging, SparkContext, Utils}
|
||||
import javax.servlet.http.HttpServletRequest
|
||||
import org.eclipse.jetty.server.Handler
|
||||
import WebUI._
|
||||
|
||||
private[spark] class SparkUI(sc: SparkContext) extends Logging {
|
||||
val host = Utils.localHostName()
|
||||
val port = Option(System.getProperty("spark.ui.port")).getOrElse(SparkUI.DEFAULT_PORT).toInt
|
||||
|
||||
|
||||
val handlers = Seq[(String, Handler)](
|
||||
("/static", createStaticHandler(SparkUI.STATIC_RESOURCE_DIR)),
|
||||
("*", (request: HttpServletRequest) => WebUI.headerSparkPage(<h1>Test</h1>, "Test page"))
|
||||
)
|
||||
val components = Seq(new BlockManagerUI(sc))
|
||||
|
||||
def start() {
|
||||
/** Start an HTTP server to run the Web interface */
|
||||
try {
|
||||
val allHandlers = components.flatMap(_.getHandlers) ++ handlers
|
||||
val (server, boundPort) = WebUI.startJettyServer("0.0.0.0", port, allHandlers)
|
||||
logInfo("Started Spark Web UI at http://%s:%d".format(host, boundPort))
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
logError("Failed to create Spark WebUI", e)
|
||||
System.exit(1)
|
||||
}
|
||||
}
|
||||
|
||||
private[spark] def appUIAddress = "http://" + host + ":" + port
|
||||
}
|
||||
|
||||
object SparkUI {
|
||||
val DEFAULT_PORT = "33000"
|
||||
val STATIC_RESOURCE_DIR = "spark/webui/static"
|
||||
}
|
||||
|
||||
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
package spark.util
|
||||
package spark.ui
|
||||
|
||||
import annotation.tailrec
|
||||
import javax.servlet.http.{HttpServletResponse, HttpServletRequest}
|
||||
|
@ -9,6 +9,10 @@ import spark.Logging
|
|||
import util.{Try, Success, Failure}
|
||||
import xml.Node
|
||||
|
||||
abstract class UIComponent {
|
||||
def getHandlers(): Seq[(String, Handler)]
|
||||
}
|
||||
|
||||
object WebUI extends Logging {
|
||||
type Responder[T] = HttpServletRequest => T
|
||||
|
||||
|
@ -41,24 +45,27 @@ object WebUI extends Logging {
|
|||
val staticHandler = new ResourceHandler
|
||||
Option(getClass.getClassLoader.getResource(resourceBase)) match {
|
||||
case Some(res) =>
|
||||
staticHandler.setResourceBase (res.toString)
|
||||
staticHandler
|
||||
staticHandler.setResourceBase(res.toString)
|
||||
case None =>
|
||||
logError("Could not find resource path for Web UI: " + resourceBase)
|
||||
}
|
||||
staticHandler
|
||||
}
|
||||
|
||||
def startJettyServer(ip: String, port: Int, handlers: Array[(String, Handler)]): (Server, Int) = {
|
||||
def startJettyServer(ip: String, port: Int, handlers: Seq[(String, Handler)]): (Server, Int) = {
|
||||
val handlersToRegister = handlers.map { case(path, handler) =>
|
||||
if (path == "*") {
|
||||
handler
|
||||
} else {
|
||||
val contextHandler = new ContextHandler(path)
|
||||
println("Adding handler for path: " + path)
|
||||
contextHandler.setHandler(handler)
|
||||
contextHandler.asInstanceOf[org.eclipse.jetty.server.Handler]
|
||||
}
|
||||
}
|
||||
|
||||
val handlerList = new HandlerList
|
||||
handlerList.setHandlers(handlersToRegister)
|
||||
handlerList.setHandlers(handlersToRegister.toArray)
|
||||
|
||||
@tailrec
|
||||
def connect(currentPort: Int): (Server, Int) = {
|
||||
|
@ -71,11 +78,19 @@ object WebUI extends Logging {
|
|||
connect((currentPort + 1) % 65536)
|
||||
}
|
||||
}
|
||||
|
||||
connect(port)
|
||||
}
|
||||
|
||||
def makePage(content: => Seq[Node], title: String): Seq[Node] = {
|
||||
/** Page with Spark logo, title, and Spark UI headers */
|
||||
def headerSparkPage(content: => Seq[Node], title: String): Seq[Node] = {
|
||||
val newContent =
|
||||
<h2><a href="/storage">Storage</a> | <a href="/jobs">Jobs</a> </h2><hl/>;
|
||||
|
||||
sparkPage(newContent ++ content, title)
|
||||
}
|
||||
|
||||
/** Page with Spark logo and title */
|
||||
def sparkPage(content: => Seq[Node], title: String): Seq[Node] = {
|
||||
<html>
|
||||
<head>
|
||||
<meta http-equiv="Content-type" content="text/html; charset=utf-8" />
|
||||
|
@ -98,7 +113,6 @@ object WebUI extends Logging {
|
|||
</h1>
|
||||
</div>
|
||||
</div>
|
||||
<hr />
|
||||
{content}
|
||||
</div>
|
||||
</body>
|
Loading…
Reference in a new issue