MetricsServlet code refactor according to comments
This commit is contained in:
parent
320e87e7ab
commit
09c7179e81
|
@ -41,17 +41,12 @@
|
|||
# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
|
||||
# and it will be loaded automatically.
|
||||
# 5. MetricsServlet is added by default as a sink in master, worker and client
|
||||
# driver, you can send http request "/metrics" to get a snapshot of all the
|
||||
# registered metrics in json format. For master, requests "/metrics/master" and
|
||||
# "/metrics/applications" can be sent seperately to get metrics snapshot of
|
||||
# instance master and applications.
|
||||
# driver, you can send http request "/metrics/json" to get a snapshot of all the
|
||||
# registered metrics in json format. For master, requests "/metrics/master/json" and
|
||||
# "/metrics/applications/json" can be sent seperately to get metrics snapshot of
|
||||
# instance master and applications. MetricsServlet may not be configured by self.
|
||||
#
|
||||
|
||||
# Change MetricsServlet's property
|
||||
#*.sink.servlet.uri=/metrics
|
||||
#
|
||||
#*.sink.servlet.sample=false
|
||||
|
||||
# Enable JmxSink for all instances by class name
|
||||
#*.sink.jmx.class=spark.metrics.sink.JmxSink
|
||||
|
||||
|
|
|
@ -57,22 +57,13 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
|
|||
|
||||
var firstApp: Option[ApplicationInfo] = None
|
||||
|
||||
val webUi = new MasterWebUI(self, webUiPort)
|
||||
|
||||
Utils.checkHost(host, "Expected hostname")
|
||||
|
||||
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
|
||||
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
|
||||
val masterSource = new MasterSource(this)
|
||||
|
||||
// Add default MetricsServlet handler to web ui
|
||||
masterMetricsSystem.metricsServlet foreach { m =>
|
||||
webUi.handlers = m.getHandlers ++ webUi.handlers
|
||||
}
|
||||
|
||||
applicationMetricsSystem.metricsServlet foreach { m =>
|
||||
webUi.handlers = m.getHandlers ++ webUi.handlers
|
||||
}
|
||||
val webUi = new MasterWebUI(this, webUiPort)
|
||||
|
||||
val masterPublicAddress = {
|
||||
val envVar = System.getenv("SPARK_PUBLIC_DNS")
|
||||
|
|
|
@ -33,7 +33,7 @@ import spark.deploy.master.ExecutorInfo
|
|||
import spark.ui.UIUtils
|
||||
|
||||
private[spark] class ApplicationPage(parent: MasterWebUI) {
|
||||
val master = parent.master
|
||||
val master = parent.masterActorRef
|
||||
implicit val timeout = parent.timeout
|
||||
|
||||
/** Executor details for a particular application */
|
||||
|
|
|
@ -35,7 +35,7 @@ import spark.deploy.master.{ApplicationInfo, WorkerInfo}
|
|||
import spark.ui.UIUtils
|
||||
|
||||
private[spark] class IndexPage(parent: MasterWebUI) {
|
||||
val master = parent.master
|
||||
val master = parent.masterActorRef
|
||||
implicit val timeout = parent.timeout
|
||||
|
||||
def renderJson(request: HttpServletRequest): JValue = {
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package spark.deploy.master.ui
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.util.Duration
|
||||
|
||||
import javax.servlet.http.HttpServletRequest
|
||||
|
@ -25,6 +24,7 @@ import javax.servlet.http.HttpServletRequest
|
|||
import org.eclipse.jetty.server.{Handler, Server}
|
||||
|
||||
import spark.{Logging, Utils}
|
||||
import spark.deploy.master.Master
|
||||
import spark.ui.JettyUtils
|
||||
import spark.ui.JettyUtils._
|
||||
|
||||
|
@ -32,12 +32,14 @@ import spark.ui.JettyUtils._
|
|||
* Web UI server for the standalone master.
|
||||
*/
|
||||
private[spark]
|
||||
class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
|
||||
class MasterWebUI(val master: Master, requestedPort: Int) extends Logging {
|
||||
implicit val timeout = Duration.create(
|
||||
System.getProperty("spark.akka.askTimeout", "10").toLong, "seconds")
|
||||
val host = Utils.localHostName()
|
||||
val port = requestedPort
|
||||
|
||||
val masterActorRef = master.self
|
||||
|
||||
var server: Option[Server] = None
|
||||
var boundPort: Option[Int] = None
|
||||
|
||||
|
@ -57,7 +59,11 @@ class MasterWebUI(val master: ActorRef, requestedPort: Int) extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
var handlers = Array[(String, Handler)](
|
||||
val metricsHandlers = master.masterMetricsSystem.metricsServlet.map(_.getHandlers)
|
||||
.getOrElse(Array()) ++ master.applicationMetricsSystem.metricsServlet.map(_.getHandlers)
|
||||
.getOrElse(Array())
|
||||
|
||||
val handlers = metricsHandlers ++ Array[(String, Handler)](
|
||||
("/static", createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR)),
|
||||
("/app/json", (request: HttpServletRequest) => applicationPage.renderJson(request)),
|
||||
("/app", (request: HttpServletRequest) => applicationPage.render(request)),
|
||||
|
|
|
@ -102,11 +102,6 @@ private[spark] class Worker(
|
|||
createWorkDir()
|
||||
webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
|
||||
|
||||
// Add default MetricsServlet handlers to webUi
|
||||
metricsSystem.metricsServlet foreach { m =>
|
||||
webUi.handlers = m.getHandlers ++ webUi.handlers
|
||||
}
|
||||
|
||||
webUi.start()
|
||||
connectToMaster()
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
|
||||
package spark.deploy.worker.ui
|
||||
|
||||
import akka.actor.ActorRef
|
||||
import akka.util.{Duration, Timeout}
|
||||
|
||||
import java.io.{FileInputStream, File}
|
||||
|
@ -49,7 +48,9 @@ class WorkerWebUI(val worker: Worker, val workDir: File, requestedPort: Option[I
|
|||
|
||||
val indexPage = new IndexPage(this)
|
||||
|
||||
var handlers = Array[(String, Handler)](
|
||||
val metricsHandlers = worker.metricsSystem.metricsServlet.map(_.getHandlers).getOrElse(Array())
|
||||
|
||||
val handlers = metricsHandlers ++ Array[(String, Handler)](
|
||||
("/static", createStaticHandler(WorkerWebUI.STATIC_RESOURCE_DIR)),
|
||||
("/log", (request: HttpServletRequest) => log(request)),
|
||||
("/logPage", (request: HttpServletRequest) => logPage(request)),
|
||||
|
|
|
@ -37,8 +37,10 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
|
|||
|
||||
private def setDefaultProperties(prop: Properties) {
|
||||
prop.setProperty("*.sink.servlet.class", "spark.metrics.sink.MetricsServlet")
|
||||
prop.setProperty("master.sink.servlet.uri", "/metrics/master")
|
||||
prop.setProperty("applications.sink.servlet.uri", "/metrics/applications")
|
||||
prop.setProperty("*.sink.servlet.uri", "/metrics/json")
|
||||
prop.setProperty("*.sink.servlet.sample", "false")
|
||||
prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
|
||||
prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
|
||||
}
|
||||
|
||||
def initialize() {
|
||||
|
|
|
@ -129,7 +129,7 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
|
|||
val sink = Class.forName(classPath)
|
||||
.getConstructor(classOf[Properties], classOf[MetricRegistry])
|
||||
.newInstance(kv._2, registry)
|
||||
if (kv._1 =="servlet") {
|
||||
if (kv._1 == "servlet") {
|
||||
metricsServlet = Some(sink.asInstanceOf[MetricsServlet])
|
||||
} else {
|
||||
sinks += sink.asInstanceOf[Sink]
|
||||
|
|
|
@ -34,15 +34,9 @@ class MetricsServlet(val property: Properties, val registry: MetricRegistry) ext
|
|||
val SERVLET_KEY_URI = "uri"
|
||||
val SERVLET_KEY_SAMPLE = "sample"
|
||||
|
||||
val SERVLET_DEFAULT_URI = "/metrics"
|
||||
val SERVLET_DEFAULT_SAMPLE = false
|
||||
val servletURI = property.getProperty(SERVLET_KEY_URI)
|
||||
|
||||
val servletURI = property.getProperty(SERVLET_KEY_URI, SERVLET_DEFAULT_URI)
|
||||
|
||||
val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)) match {
|
||||
case Some(s) => s.toBoolean
|
||||
case None => SERVLET_DEFAULT_SAMPLE
|
||||
}
|
||||
val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
|
||||
|
||||
val mapper = new ObjectMapper().registerModule(
|
||||
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
|
||||
|
|
|
@ -45,10 +45,8 @@ private[spark] class SparkUI(sc: SparkContext) extends Logging {
|
|||
val exec = new ExecutorsUI(sc)
|
||||
|
||||
// Add MetricsServlet handlers by default
|
||||
val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet match {
|
||||
case Some(s) => s.getHandlers
|
||||
case None => Array[(String, Handler)]()
|
||||
}
|
||||
val metricsServletHandlers = SparkEnv.get.metricsSystem.metricsServlet.map(_.getHandlers)
|
||||
.getOrElse(Array())
|
||||
|
||||
val allHandlers = storage.getHandlers ++ jobs.getHandlers ++ env.getHandlers ++
|
||||
exec.getHandlers ++ metricsServletHandlers ++ handlers
|
||||
|
|
|
@ -30,12 +30,14 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
|
|||
val conf = new MetricsConfig(Option("dummy-file"))
|
||||
conf.initialize()
|
||||
|
||||
assert(conf.properties.size() === 3)
|
||||
assert(conf.properties.size() === 5)
|
||||
assert(conf.properties.getProperty("test-for-dummy") === null)
|
||||
|
||||
val property = conf.getInstance("random")
|
||||
assert(property.size() === 1)
|
||||
assert(property.size() === 3)
|
||||
assert(property.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
|
||||
assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
|
||||
assert(property.getProperty("sink.servlet.sample") === "false")
|
||||
}
|
||||
|
||||
test("MetricsConfig with properties set") {
|
||||
|
@ -43,19 +45,22 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
|
|||
conf.initialize()
|
||||
|
||||
val masterProp = conf.getInstance("master")
|
||||
assert(masterProp.size() === 5)
|
||||
assert(masterProp.size() === 6)
|
||||
assert(masterProp.getProperty("sink.console.period") === "20")
|
||||
assert(masterProp.getProperty("sink.console.unit") === "minutes")
|
||||
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
|
||||
assert(masterProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
|
||||
assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master")
|
||||
assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
|
||||
assert(masterProp.getProperty("sink.servlet.sample") === "false")
|
||||
|
||||
val workerProp = conf.getInstance("worker")
|
||||
assert(workerProp.size() === 4)
|
||||
assert(workerProp.size() === 6)
|
||||
assert(workerProp.getProperty("sink.console.period") === "10")
|
||||
assert(workerProp.getProperty("sink.console.unit") === "seconds")
|
||||
assert(workerProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
|
||||
assert(workerProp.getProperty("sink.servlet.class") === "spark.metrics.sink.MetricsServlet")
|
||||
assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
|
||||
assert(workerProp.getProperty("sink.servlet.sample") === "false")
|
||||
}
|
||||
|
||||
test("MetricsConfig with subProperties") {
|
||||
|
@ -79,6 +84,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
|
|||
assert(consoleProps.size() === 2)
|
||||
|
||||
val servletProps = sinkProps("servlet")
|
||||
assert(servletProps.size() === 2)
|
||||
assert(servletProps.size() === 3)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue