[SPARK-4595][Core] Fix MetricsServlet not work issue
`MetricsServlet` handler should be added to the web UI after initialized by `MetricsSystem`, otherwise servlet handler cannot be attached. Author: Saisai Shao <saisai.shao@intel.com> Author: Josh Rosen <joshrosen@databricks.com> Author: jerryshao <saisai.shao@intel.com> Closes #3444 from jerryshao/SPARK-4595 and squashes the following commits: 434d17e [Saisai Shao] Merge pull request #10 from JoshRosen/metrics-system-cleanup 87a2292 [Josh Rosen] Guard against misuse of MetricsSystem methods. f779fe0 [jerryshao] Fix MetricsServlet not work issue
This commit is contained in:
parent
3d0c37b811
commit
cf50631a66
|
@ -344,6 +344,8 @@ class SparkContext(config: SparkConf) extends Logging {
|
|||
// The metrics system for Driver need to be set spark.app.id to app ID.
|
||||
// So it should start after we get app ID from the task scheduler and set spark.app.id.
|
||||
metricsSystem.start()
|
||||
// Attach the driver metrics servlet handler to the web ui after the metrics system is started.
|
||||
metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
|
||||
|
||||
// Optionally log Spark events
|
||||
private[spark] val eventLogger: Option[EventLoggingListener] = {
|
||||
|
|
|
@ -129,6 +129,10 @@ private[spark] class Master(
|
|||
masterMetricsSystem.registerSource(masterSource)
|
||||
masterMetricsSystem.start()
|
||||
applicationMetricsSystem.start()
|
||||
// Attach the master and app metrics servlet handler to the web ui after the metrics systems are
|
||||
// started.
|
||||
masterMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
|
||||
applicationMetricsSystem.getServletHandlers.foreach(webUi.attachHandler)
|
||||
|
||||
val (persistenceEngine_, leaderElectionAgent_) = RECOVERY_MODE match {
|
||||
case "ZOOKEEPER" =>
|
||||
|
|
|
@ -41,8 +41,6 @@ class MasterWebUI(val master: Master, requestedPort: Int)
|
|||
attachPage(new HistoryNotFoundPage(this))
|
||||
attachPage(new MasterPage(this))
|
||||
attachHandler(createStaticHandler(MasterWebUI.STATIC_RESOURCE_DIR, "/static"))
|
||||
master.masterMetricsSystem.getServletHandlers.foreach(attachHandler)
|
||||
master.applicationMetricsSystem.getServletHandlers.foreach(attachHandler)
|
||||
}
|
||||
|
||||
/** Attach a reconstructed UI to this Master UI. Only valid after bind(). */
|
||||
|
|
|
@ -163,6 +163,8 @@ private[spark] class Worker(
|
|||
|
||||
metricsSystem.registerSource(workerSource)
|
||||
metricsSystem.start()
|
||||
// Attach the worker metrics servlet handler to the web ui after the metrics system is started.
|
||||
metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
|
||||
}
|
||||
|
||||
def changeMaster(url: String, uiUrl: String) {
|
||||
|
|
|
@ -50,7 +50,6 @@ class WorkerWebUI(
|
|||
attachHandler(createStaticHandler(WorkerWebUI.STATIC_RESOURCE_BASE, "/static"))
|
||||
attachHandler(createServletHandler("/log",
|
||||
(request: HttpServletRequest) => logPage.renderLog(request), worker.securityMgr))
|
||||
worker.metricsSystem.getServletHandlers.foreach(attachHandler)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -76,22 +76,36 @@ private[spark] class MetricsSystem private (
|
|||
private val sources = new mutable.ArrayBuffer[Source]
|
||||
private val registry = new MetricRegistry()
|
||||
|
||||
private var running: Boolean = false
|
||||
|
||||
// Treat MetricsServlet as a special sink as it should be exposed to add handlers to web ui
|
||||
private var metricsServlet: Option[MetricsServlet] = None
|
||||
|
||||
/** Get any UI handlers used by this metrics system. */
|
||||
def getServletHandlers = metricsServlet.map(_.getHandlers).getOrElse(Array())
|
||||
/**
|
||||
* Get any UI handlers used by this metrics system; can only be called after start().
|
||||
*/
|
||||
def getServletHandlers = {
|
||||
require(running, "Can only call getServletHandlers on a running MetricsSystem")
|
||||
metricsServlet.map(_.getHandlers).getOrElse(Array())
|
||||
}
|
||||
|
||||
metricsConfig.initialize()
|
||||
|
||||
def start() {
|
||||
require(!running, "Attempting to start a MetricsSystem that is already running")
|
||||
running = true
|
||||
registerSources()
|
||||
registerSinks()
|
||||
sinks.foreach(_.start)
|
||||
}
|
||||
|
||||
def stop() {
|
||||
sinks.foreach(_.stop)
|
||||
if (running) {
|
||||
sinks.foreach(_.stop)
|
||||
} else {
|
||||
logWarning("Stopping a MetricsSystem that is not running")
|
||||
}
|
||||
running = false
|
||||
}
|
||||
|
||||
def report() {
|
||||
|
@ -107,7 +121,7 @@ private[spark] class MetricsSystem private (
|
|||
* @return An unique metric name for each combination of
|
||||
* application, executor/driver and metric source.
|
||||
*/
|
||||
def buildRegistryName(source: Source): String = {
|
||||
private[spark] def buildRegistryName(source: Source): String = {
|
||||
val appId = conf.getOption("spark.app.id")
|
||||
val executorId = conf.getOption("spark.executor.id")
|
||||
val defaultName = MetricRegistry.name(source.sourceName)
|
||||
|
@ -144,7 +158,7 @@ private[spark] class MetricsSystem private (
|
|||
})
|
||||
}
|
||||
|
||||
def registerSources() {
|
||||
private def registerSources() {
|
||||
val instConfig = metricsConfig.getInstance(instance)
|
||||
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
|
||||
|
||||
|
@ -160,7 +174,7 @@ private[spark] class MetricsSystem private (
|
|||
}
|
||||
}
|
||||
|
||||
def registerSinks() {
|
||||
private def registerSinks() {
|
||||
val instConfig = metricsConfig.getInstance(instance)
|
||||
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
|
||||
|
||||
|
|
|
@ -57,8 +57,6 @@ private[spark] class SparkUI private (
|
|||
attachHandler(createRedirectHandler("/", "/jobs", basePath = basePath))
|
||||
attachHandler(
|
||||
createRedirectHandler("/stages/stage/kill", "/stages", stagesTab.handleKillRequest))
|
||||
// If the UI is live, then serve
|
||||
sc.foreach { _.env.metricsSystem.getServletHandlers.foreach(attachHandler) }
|
||||
}
|
||||
initialize()
|
||||
|
||||
|
|
Loading…
Reference in a new issue