Merge pull request #743 from pwendell/app-metrics
Add application metrics to standalone master
This commit is contained in:
commit
9d7dfd2d5a
|
@ -1,48 +1,45 @@
|
||||||
# syntax: [instance].[sink|source].[name].[options]
|
# syntax: [instance].sink|source.[name].[options]=[value]
|
||||||
|
|
||||||
# "instance" specify "who" (the role) use metrics system. In spark there are
|
# This file configures Spark's internal metrics system. The metrics system is
|
||||||
# several roles like master, worker, executor, driver, these roles will
|
# divided into instances which correspond to internal components.
|
||||||
# create metrics system for monitoring. So instance represents these roles.
|
# Each instance can be configured to report its metrics to one or more sinks.
|
||||||
# Currently in Spark, several instances have already implemented: master,
|
# Accepted values for [instance] are "master", "worker", "executor", "driver",
|
||||||
# worker, executor, driver.
|
# and "applications". A wild card "*" can be used as an instance name, in
|
||||||
|
# which case all instances will inherit the supplied property.
|
||||||
#
|
#
|
||||||
# [instance] field can be "master", "worker", "executor", "driver", which means
|
# Within an instance, a "source" specifies a particular set of grouped metrics.
|
||||||
# only the specified instance has this property.
|
# there are two kinds of sources:
|
||||||
# a wild card "*" can be used to represent instance name, which means all the
|
# 1. Spark internal sources, like MasterSource, WorkerSource, etc, which will
|
||||||
# instances will have this property.
|
# collect a Spark component's internal state. Each instance is paired with a
|
||||||
|
# Spark source that is added automatically.
|
||||||
|
# 2. Common sources, like JvmSource, which will collect low level state.
|
||||||
|
# These can be added through configuration options and are then loaded
|
||||||
|
# using reflection.
|
||||||
#
|
#
|
||||||
# "source" specify "where" (source) to collect metrics data. In metrics system,
|
# A "sink" specifies where metrics are delivered to. Each instance can be
|
||||||
# there exists two kinds of source:
|
# assigned one or more sinks.
|
||||||
# 1. Spark internal source, like MasterSource, WorkerSource, etc, which will
|
|
||||||
# collect Spark component's internal state, these sources are related to
|
|
||||||
# instance and will be added after specific metrics system is created.
|
|
||||||
# 2. Common source, like JvmSource, which will collect low level state, is
|
|
||||||
# configured by configuration and loaded through reflection.
|
|
||||||
#
|
#
|
||||||
# "sink" specify "where" (destination) to output metrics data to. Several sinks
|
# The sink|source field specifies whether the property relates to a sink or
|
||||||
# can be coexisted and flush metrics to all these sinks.
|
# source.
|
||||||
#
|
#
|
||||||
# [sink|source] field specify this property is source related or sink, this
|
# The [name] field specifies the name of source or sink.
|
||||||
# field can only be source or sink.
|
|
||||||
#
|
#
|
||||||
# [name] field specify the name of source or sink, this is custom defined.
|
# The [options] field is the specific property of this source or sink. The
|
||||||
#
|
# source or sink is responsible for parsing this property.
|
||||||
# [options] field is the specific property of this source or sink, this source
|
|
||||||
# or sink is responsible for parsing this property.
|
|
||||||
#
|
#
|
||||||
# Notes:
|
# Notes:
|
||||||
# 1. Sinks should be added through configuration, like console sink, class
|
# 1. To add a new sink, set the "class" option to a fully qualified class
|
||||||
# full name should be specified by class property.
|
# name (see examples below).
|
||||||
# 2. Some sinks can specify polling period, like console sink, which is 10 seconds,
|
# 2. Some sinks involve a polling period. The minimum allowed polling period
|
||||||
# it should be attention minimal polling period is 1 seconds, any period
|
# is 1 second.
|
||||||
# below than 1s is illegal.
|
# 3. Wild card properties can be overridden by more specific properties.
|
||||||
# 3. Wild card property can be overlapped by specific instance property, for
|
# For example, master.sink.console.period takes precedence over
|
||||||
# example, *.sink.console.period can be overlapped by master.sink.console.period.
|
# *.sink.console.period.
|
||||||
# 4. A metrics specific configuration
|
# 4. A metrics specific configuration
|
||||||
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
|
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
|
||||||
# added to Java property using -Dspark.metrics.conf=xxx if you want to
|
# added to Java properties using -Dspark.metrics.conf=xxx if you want to
|
||||||
# customize metrics system, or you can put it in ${SPARK_HOME}/conf,
|
# customize metrics system. You can also put the file in ${SPARK_HOME}/conf
|
||||||
# metrics system will search and load it automatically.
|
# and it will be loaded automatically.
|
||||||
|
|
||||||
# Enable JmxSink for all instances by class name
|
# Enable JmxSink for all instances by class name
|
||||||
#*.sink.jmx.class=spark.metrics.sink.JmxSink
|
#*.sink.jmx.class=spark.metrics.sink.JmxSink
|
||||||
|
|
|
@ -34,6 +34,7 @@ private[spark] class ApplicationInfo(
|
||||||
var executors = new mutable.HashMap[Int, ExecutorInfo]
|
var executors = new mutable.HashMap[Int, ExecutorInfo]
|
||||||
var coresGranted = 0
|
var coresGranted = 0
|
||||||
var endTime = -1L
|
var endTime = -1L
|
||||||
|
val appSource = new ApplicationSource(this)
|
||||||
|
|
||||||
private var nextExecutorId = 0
|
private var nextExecutorId = 0
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,24 @@
|
||||||
|
package spark.deploy.master
|
||||||
|
|
||||||
|
import com.codahale.metrics.{Gauge, MetricRegistry}
|
||||||
|
|
||||||
|
import spark.metrics.source.Source
|
||||||
|
|
||||||
|
class ApplicationSource(val application: ApplicationInfo) extends Source {
|
||||||
|
val metricRegistry = new MetricRegistry()
|
||||||
|
val sourceName = "%s.%s.%s".format("application", application.desc.name,
|
||||||
|
System.currentTimeMillis())
|
||||||
|
|
||||||
|
metricRegistry.register(MetricRegistry.name("status"), new Gauge[String] {
|
||||||
|
override def getValue: String = application.state.toString
|
||||||
|
})
|
||||||
|
|
||||||
|
metricRegistry.register(MetricRegistry.name("runtime_ms"), new Gauge[Long] {
|
||||||
|
override def getValue: Long = application.duration
|
||||||
|
})
|
||||||
|
|
||||||
|
metricRegistry.register(MetricRegistry.name("cores", "number"), new Gauge[Int] {
|
||||||
|
override def getValue: Int = application.coresGranted
|
||||||
|
})
|
||||||
|
|
||||||
|
}
|
|
@ -38,6 +38,7 @@ import spark.util.AkkaUtils
|
||||||
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
|
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging {
|
||||||
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
|
val DATE_FORMAT = new SimpleDateFormat("yyyyMMddHHmmss") // For application IDs
|
||||||
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
|
val WORKER_TIMEOUT = System.getProperty("spark.worker.timeout", "60").toLong * 1000
|
||||||
|
val RETAINED_APPLICATIONS = System.getProperty("spark.deploy.retainedApplications", "200").toInt
|
||||||
|
|
||||||
var nextAppNumber = 0
|
var nextAppNumber = 0
|
||||||
val workers = new HashSet[WorkerInfo]
|
val workers = new HashSet[WorkerInfo]
|
||||||
|
@ -59,7 +60,8 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
|
||||||
|
|
||||||
Utils.checkHost(host, "Expected hostname")
|
Utils.checkHost(host, "Expected hostname")
|
||||||
|
|
||||||
val metricsSystem = MetricsSystem.createMetricsSystem("master")
|
val masterMetricsSystem = MetricsSystem.createMetricsSystem("master")
|
||||||
|
val applicationMetricsSystem = MetricsSystem.createMetricsSystem("applications")
|
||||||
val masterSource = new MasterSource(this)
|
val masterSource = new MasterSource(this)
|
||||||
|
|
||||||
val masterPublicAddress = {
|
val masterPublicAddress = {
|
||||||
|
@ -79,13 +81,15 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
|
||||||
webUi.start()
|
webUi.start()
|
||||||
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
|
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
|
||||||
|
|
||||||
metricsSystem.registerSource(masterSource)
|
masterMetricsSystem.registerSource(masterSource)
|
||||||
metricsSystem.start()
|
masterMetricsSystem.start()
|
||||||
|
applicationMetricsSystem.start()
|
||||||
}
|
}
|
||||||
|
|
||||||
override def postStop() {
|
override def postStop() {
|
||||||
webUi.stop()
|
webUi.stop()
|
||||||
metricsSystem.stop()
|
masterMetricsSystem.stop()
|
||||||
|
applicationMetricsSystem.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
override def receive = {
|
override def receive = {
|
||||||
|
@ -275,6 +279,7 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
|
||||||
val now = System.currentTimeMillis()
|
val now = System.currentTimeMillis()
|
||||||
val date = new Date(now)
|
val date = new Date(now)
|
||||||
val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
|
val app = new ApplicationInfo(now, newApplicationId(date), desc, date, driver, desc.appUiUrl)
|
||||||
|
applicationMetricsSystem.registerSource(app.appSource)
|
||||||
apps += app
|
apps += app
|
||||||
idToApp(app.id) = app
|
idToApp(app.id) = app
|
||||||
actorToApp(driver) = app
|
actorToApp(driver) = app
|
||||||
|
@ -300,7 +305,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
|
||||||
idToApp -= app.id
|
idToApp -= app.id
|
||||||
actorToApp -= app.driver
|
actorToApp -= app.driver
|
||||||
addressToApp -= app.driver.path.address
|
addressToApp -= app.driver.path.address
|
||||||
completedApps += app // Remember it in our history
|
if (completedApps.size >= RETAINED_APPLICATIONS) {
|
||||||
|
val toRemove = math.max(RETAINED_APPLICATIONS / 10, 1)
|
||||||
|
completedApps.take(toRemove).foreach( a => {
|
||||||
|
applicationMetricsSystem.removeSource(a.appSource)
|
||||||
|
})
|
||||||
|
completedApps.trimStart(toRemove)
|
||||||
|
}
|
||||||
|
completedApps += app // Remember it in our history
|
||||||
waitingApps -= app
|
waitingApps -= app
|
||||||
for (exec <- app.executors.values) {
|
for (exec <- app.executors.values) {
|
||||||
exec.worker.removeExecutor(exec)
|
exec.worker.removeExecutor(exec)
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
|
|
||||||
package spark.metrics
|
package spark.metrics
|
||||||
|
|
||||||
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
|
import com.codahale.metrics.{Metric, MetricFilter, MetricRegistry}
|
||||||
|
|
||||||
import java.util.Properties
|
import java.util.Properties
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit
|
||||||
|
@ -93,6 +93,13 @@ private[spark] class MetricsSystem private (val instance: String) extends Loggin
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def removeSource(source: Source) {
|
||||||
|
sources -= source
|
||||||
|
registry.removeMatching(new MetricFilter {
|
||||||
|
def matches(name: String, metric: Metric): Boolean = name.startsWith(source.sourceName)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
def registerSources() {
|
def registerSources() {
|
||||||
val instConfig = metricsConfig.getInstance(instance)
|
val instConfig = metricsConfig.getInstance(instance)
|
||||||
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
|
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
|
||||||
|
|
Loading…
Reference in a new issue