Merge pull request #671 from jerryshao/master

Add metrics system for Spark
This commit is contained in:
Matei Zaharia 2013-07-24 08:59:14 -07:00
commit a73f3ee536
26 changed files with 784 additions and 6 deletions

View file

@ -0,0 +1,87 @@
# syntax: [instance].[sink|source].[name].[options]
# "instance" specify "who" (the role) use metrics system. In spark there are
# several roles like master, worker, executor, driver, these roles will
# create metrics system for monitoring. So instance represents these roles.
# Currently in Spark, several instances have already implemented: master,
# worker, executor, driver.
#
# [instance] field can be "master", "worker", "executor", "driver", which means
# only the specified instance has this property.
# a wild card "*" can be used to represent instance name, which means all the
# instances will have this property.
#
# "source" specify "where" (source) to collect metrics data. In metrics system,
# there exists two kinds of source:
# 1. Spark internal source, like MasterSource, WorkerSource, etc, which will
# collect Spark component's internal state, these sources are related to
# instance and will be added after specific metrics system is created.
# 2. Common source, like JvmSource, which will collect low level state, is
# configured by configuration and loaded through reflection.
#
# "sink" specify "where" (destination) to output metrics data to. Several sinks
# can be coexisted and flush metrics to all these sinks.
#
# [sink|source] field specify this property is source related or sink, this
# field can only be source or sink.
#
# [name] field specify the name of source or sink, this is custom defined.
#
# [options] field is the specific property of this source or sink, this source
# or sink is responsible for parsing this property.
#
# Notes:
# 1. Sinks should be added through configuration, like console sink, class
# full name should be specified by class property.
# 2. Some sinks can specify polling period, like console sink, which is 10 seconds,
# it should be attention minimal polling period is 1 seconds, any period
# below than 1s is illegal.
# 3. Wild card property can be overlapped by specific instance property, for
# example, *.sink.console.period can be overlapped by master.sink.console.period.
# 4. A metrics specific configuration
# "spark.metrics.conf=${SPARK_HOME}/conf/metrics.properties" should be
# added to Java property using -Dspark.metrics.conf=xxx if you want to
# customize metrics system, or you can put it in ${SPARK_HOME}/conf,
# metrics system will search and load it automatically.
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=spark.metrics.sink.JmxSink
# Enable ConsoleSink for all instances by class name
#*.sink.console.class=spark.metrics.sink.ConsoleSink
# Polling period for ConsoleSink
#*.sink.console.period=10
#*.sink.console.unit=seconds
# Master instance overlap polling period
#master.sink.console.period=15
#master.sink.console.unit=seconds
# Enable CsvSink for all instances
#*.sink.csv.class=spark.metrics.sink.CsvSink
# Polling period for CsvSink
#*.sink.csv.period=1
#*.sink.csv.unit=minutes
# Polling directory for CsvSink
#*.sink.csv.directory=/tmp/
# Worker instance overlap polling period
#worker.sink.csv.period=10
#worker.sink.csv.unit=minutes
# Enable jvm source for instance master, worker, driver and executor
#master.source.jvm.class=spark.metrics.source.JvmSource
#worker.source.jvm.class=spark.metrics.source.JvmSource
#driver.source.jvm.class=spark.metrics.source.JvmSource
#executor.source.jvm.class=spark.metrics.source.JvmSource

View file

@ -108,6 +108,14 @@
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
</dependency> </dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
</dependency>
<dependency> <dependency>
<groupId>org.apache.derby</groupId> <groupId>org.apache.derby</groupId>

View file

@ -60,13 +60,14 @@ import org.apache.mesos.MesosNativeLibrary
import spark.deploy.{LocalSparkCluster, SparkHadoopUtil} import spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import spark.partial.{ApproximateEvaluator, PartialResult} import spark.partial.{ApproximateEvaluator, PartialResult}
import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD} import spark.rdd.{CheckpointRDD, HadoopRDD, NewHadoopRDD, UnionRDD, ParallelCollectionRDD}
import spark.scheduler.{DAGScheduler, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler} import spark.scheduler.{DAGScheduler, DAGSchedulerSource, ResultTask, ShuffleMapTask, SparkListener, SplitInfo, Stage, StageInfo, TaskScheduler}
import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler} import spark.scheduler.cluster.{StandaloneSchedulerBackend, SparkDeploySchedulerBackend, ClusterScheduler}
import spark.scheduler.local.LocalScheduler import spark.scheduler.local.LocalScheduler
import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend} import spark.scheduler.mesos.{CoarseMesosSchedulerBackend, MesosSchedulerBackend}
import spark.storage.{StorageStatus, StorageUtils, RDDInfo} import spark.storage.{StorageStatus, StorageUtils, RDDInfo, BlockManagerSource}
import spark.util.{MetadataCleaner, TimeStampedHashMap} import spark.util.{MetadataCleaner, TimeStampedHashMap}
import ui.{SparkUI} import ui.{SparkUI}
import spark.metrics._
/** /**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
@ -270,6 +271,16 @@ class SparkContext(
// Post init // Post init
taskScheduler.postStartHook() taskScheduler.postStartHook()
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)
SparkEnv.get.metricsSystem.registerSource(blockManagerSource)
}
initDriverMetrics()
// Methods for creating RDDs // Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD. */ /** Distribute a local Scala collection to form an RDD. */

View file

@ -24,6 +24,7 @@ import akka.actor.{Actor, ActorRef, Props, ActorSystemImpl, ActorSystem}
import akka.remote.RemoteActorRefProvider import akka.remote.RemoteActorRefProvider
import spark.broadcast.BroadcastManager import spark.broadcast.BroadcastManager
import spark.metrics.MetricsSystem
import spark.storage.BlockManager import spark.storage.BlockManager
import spark.storage.BlockManagerMaster import spark.storage.BlockManagerMaster
import spark.network.ConnectionManager import spark.network.ConnectionManager
@ -53,6 +54,7 @@ class SparkEnv (
val connectionManager: ConnectionManager, val connectionManager: ConnectionManager,
val httpFileServer: HttpFileServer, val httpFileServer: HttpFileServer,
val sparkFilesDir: String, val sparkFilesDir: String,
val metricsSystem: MetricsSystem,
// To be set only as part of initialization of SparkContext. // To be set only as part of initialization of SparkContext.
// (executorId, defaultHostPort) => executorHostPort // (executorId, defaultHostPort) => executorHostPort
// If executorId is NOT found, return defaultHostPort // If executorId is NOT found, return defaultHostPort
@ -68,6 +70,7 @@ class SparkEnv (
broadcastManager.stop() broadcastManager.stop()
blockManager.stop() blockManager.stop()
blockManager.master.stop() blockManager.master.stop()
metricsSystem.stop()
actorSystem.shutdown() actorSystem.shutdown()
// Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut // Unfortunately Akka's awaitTermination doesn't actually wait for the Netty server to shut
// down, but let's call it anyway in case it gets fixed in a later release // down, but let's call it anyway in case it gets fixed in a later release
@ -184,6 +187,13 @@ object SparkEnv extends Logging {
httpFileServer.initialize() httpFileServer.initialize()
System.setProperty("spark.fileserver.uri", httpFileServer.serverUri) System.setProperty("spark.fileserver.uri", httpFileServer.serverUri)
val metricsSystem = if (isDriver) {
MetricsSystem.createMetricsSystem("driver")
} else {
MetricsSystem.createMetricsSystem("executor")
}
metricsSystem.start()
// Set the sparkFiles directory, used when downloading dependencies. In local mode, // Set the sparkFiles directory, used when downloading dependencies. In local mode,
// this is a temporary directory; in distributed mode, this is the executor's current working // this is a temporary directory; in distributed mode, this is the executor's current working
// directory. // directory.
@ -213,6 +223,7 @@ object SparkEnv extends Logging {
connectionManager, connectionManager,
httpFileServer, httpFileServer,
sparkFilesDir, sparkFilesDir,
metricsSystem,
None) None)
} }
} }

View file

@ -29,6 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import spark.deploy._ import spark.deploy._
import spark.{Logging, SparkException, Utils} import spark.{Logging, SparkException, Utils}
import spark.metrics.MetricsSystem
import spark.util.AkkaUtils import spark.util.AkkaUtils
import ui.MasterWebUI import ui.MasterWebUI
@ -57,6 +58,9 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
Utils.checkHost(host, "Expected hostname") Utils.checkHost(host, "Expected hostname")
val metricsSystem = MetricsSystem.createMetricsSystem("master")
val masterSource = new MasterSource(this)
val masterPublicAddress = { val masterPublicAddress = {
val envVar = System.getenv("SPARK_PUBLIC_DNS") val envVar = System.getenv("SPARK_PUBLIC_DNS")
if (envVar != null) envVar else host if (envVar != null) envVar else host
@ -73,10 +77,14 @@ private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Act
context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
webUi.start() webUi.start()
context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers()) context.system.scheduler.schedule(0 millis, WORKER_TIMEOUT millis)(timeOutDeadWorkers())
metricsSystem.registerSource(masterSource)
metricsSystem.start()
} }
override def postStop() { override def postStop() {
webUi.stop() webUi.stop()
metricsSystem.stop()
} }
override def receive = { override def receive = {

View file

@ -0,0 +1,25 @@
package spark.deploy.master
import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
private[spark] class MasterSource(val master: Master) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "master"
// Gauge for worker numbers in cluster
metricRegistry.register(MetricRegistry.name("workers","number"), new Gauge[Int] {
override def getValue: Int = master.workers.size
})
// Gauge for application numbers in cluster
metricRegistry.register(MetricRegistry.name("apps", "number"), new Gauge[Int] {
override def getValue: Int = master.apps.size
})
// Gauge for waiting application numbers in cluster
metricRegistry.register(MetricRegistry.name("waitingApps", "number"), new Gauge[Int] {
override def getValue: Int = master.waitingApps.size
})
}

View file

@ -23,6 +23,7 @@ import akka.util.duration._
import spark.{Logging, Utils} import spark.{Logging, Utils}
import spark.util.AkkaUtils import spark.util.AkkaUtils
import spark.deploy._ import spark.deploy._
import spark.metrics.MetricsSystem
import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected} import akka.remote.{RemoteClientLifeCycleEvent, RemoteClientShutdown, RemoteClientDisconnected}
import java.text.SimpleDateFormat import java.text.SimpleDateFormat
import java.util.Date import java.util.Date
@ -67,6 +68,9 @@ private[spark] class Worker(
var coresUsed = 0 var coresUsed = 0
var memoryUsed = 0 var memoryUsed = 0
val metricsSystem = MetricsSystem.createMetricsSystem("worker")
val workerSource = new WorkerSource(this)
def coresFree: Int = cores - coresUsed def coresFree: Int = cores - coresUsed
def memoryFree: Int = memory - memoryUsed def memoryFree: Int = memory - memoryUsed
@ -97,6 +101,9 @@ private[spark] class Worker(
webUi = new WorkerWebUI(this, workDir, Some(webUiPort)) webUi = new WorkerWebUI(this, workDir, Some(webUiPort))
webUi.start() webUi.start()
connectToMaster() connectToMaster()
metricsSystem.registerSource(workerSource)
metricsSystem.start()
} }
def connectToMaster() { def connectToMaster() {
@ -178,6 +185,7 @@ private[spark] class Worker(
override def postStop() { override def postStop() {
executors.values.foreach(_.kill()) executors.values.foreach(_.kill())
webUi.stop() webUi.stop()
metricsSystem.stop()
} }
} }

View file

@ -0,0 +1,34 @@
package spark.deploy.worker
import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
private[spark] class WorkerSource(val worker: Worker) extends Source {
val sourceName = "worker"
val metricRegistry = new MetricRegistry()
metricRegistry.register(MetricRegistry.name("executors", "number"), new Gauge[Int] {
override def getValue: Int = worker.executors.size
})
// Gauge for cores used of this worker
metricRegistry.register(MetricRegistry.name("coresUsed", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresUsed
})
// Gauge for memory used of this worker
metricRegistry.register(MetricRegistry.name("memUsed", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryUsed
})
// Gauge for cores free of this worker
metricRegistry.register(MetricRegistry.name("coresFree", "number"), new Gauge[Int] {
override def getValue: Int = worker.coresFree
})
// Gauge for memory free of this worker
metricRegistry.register(MetricRegistry.name("memFree", "MBytes"), new Gauge[Int] {
override def getValue: Int = worker.memoryFree
})
}

View file

@ -87,9 +87,13 @@ private[spark] class Executor(executorId: String, slaveHostname: String, propert
} }
) )
val executorSource = new ExecutorSource(this)
// Initialize Spark environment (using system properties read above) // Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)
SparkEnv.set(env) SparkEnv.set(env)
env.metricsSystem.registerSource(executorSource)
private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size") private val akkaFrameSize = env.actorSystem.settings.config.getBytes("akka.remote.netty.message-frame-size")
// Start worker thread pool // Start worker thread pool

View file

@ -0,0 +1,30 @@
package spark.executor
import com.codahale.metrics.{Gauge, MetricRegistry}
import spark.metrics.source.Source
class ExecutorSource(val executor: Executor) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "executor"
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getActiveCount()
})
// Gauge for executor thread pool's approximate total number of tasks that have been completed
metricRegistry.register(MetricRegistry.name("threadpool", "completeTask", "count"), new Gauge[Long] {
override def getValue: Long = executor.threadPool.getCompletedTaskCount()
})
// Gauge for executor thread pool's current number of threads
metricRegistry.register(MetricRegistry.name("threadpool", "currentPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getPoolSize()
})
// Gauge got executor thread pool's largest number of threads that have ever simultaneously been in th pool
metricRegistry.register(MetricRegistry.name("threadpool", "maxPool", "size"), new Gauge[Int] {
override def getValue: Int = executor.threadPool.getMaximumPoolSize()
})
}

View file

@ -0,0 +1,79 @@
package spark.metrics
import java.util.Properties
import java.io.{File, FileInputStream, InputStream, IOException}
import scala.collection.mutable
import scala.util.matching.Regex
import spark.Logging
private[spark] class MetricsConfig(val configFile: Option[String]) extends Logging {
initLogging()
val DEFAULT_PREFIX = "*"
val INSTANCE_REGEX = "^(\\*|[a-zA-Z]+)\\.(.+)".r
val METRICS_CONF = "metrics.properties"
val properties = new Properties()
var propertyCategories: mutable.HashMap[String, Properties] = null
private def setDefaultProperties(prop: Properties) {
// empty function, any default property can be set here
}
def initialize() {
//Add default properties in case there's no properties file
setDefaultProperties(properties)
// If spark.metrics.conf is not set, try to get file in class path
var is: InputStream = null
try {
is = configFile match {
case Some(f) => new FileInputStream(f)
case None => getClass.getClassLoader.getResourceAsStream(METRICS_CONF)
}
if (is != null) {
properties.load(is)
}
} catch {
case e: Exception => logError("Error loading configure file", e)
} finally {
if (is != null) is.close()
}
propertyCategories = subProperties(properties, INSTANCE_REGEX)
if (propertyCategories.contains(DEFAULT_PREFIX)) {
import scala.collection.JavaConversions._
val defaultProperty = propertyCategories(DEFAULT_PREFIX)
for { (inst, prop) <- propertyCategories
if (inst != DEFAULT_PREFIX)
(k, v) <- defaultProperty
if (prop.getProperty(k) == null) } {
prop.setProperty(k, v)
}
}
}
def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = {
val subProperties = new mutable.HashMap[String, Properties]
import scala.collection.JavaConversions._
prop.foreach { kv =>
if (regex.findPrefixOf(kv._1) != None) {
val regex(prefix, suffix) = kv._1
subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2)
}
}
subProperties
}
def getInstance(inst: String): Properties = {
propertyCategories.get(inst) match {
case Some(s) => s
case None => propertyCategories.getOrElse(DEFAULT_PREFIX, new Properties)
}
}
}

View file

@ -0,0 +1,129 @@
package spark.metrics
import com.codahale.metrics.{JmxReporter, MetricSet, MetricRegistry}
import java.util.Properties
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import spark.Logging
import spark.metrics.sink.Sink
import spark.metrics.source.Source
/**
* Spark Metrics System, created by specific "instance", combined by source,
* sink, periodically poll source metrics data to sink destinations.
*
* "instance" specify "who" (the role) use metrics system. In spark there are several roles
* like master, worker, executor, client driver, these roles will create metrics system
* for monitoring. So instance represents these roles. Currently in Spark, several instances
* have already implemented: master, worker, executor, driver.
*
* "source" specify "where" (source) to collect metrics data. In metrics system, there exists
* two kinds of source:
* 1. Spark internal source, like MasterSource, WorkerSource, etc, which will collect
* Spark component's internal state, these sources are related to instance and will be
* added after specific metrics system is created.
* 2. Common source, like JvmSource, which will collect low level state, is configured by
* configuration and loaded through reflection.
*
* "sink" specify "where" (destination) to output metrics data to. Several sinks can be
* coexisted and flush metrics to all these sinks.
*
* Metrics configuration format is like below:
* [instance].[sink|source].[name].[options] = xxxx
*
* [instance] can be "master", "worker", "executor", "driver", which means only the specified
* instance has this property.
* wild card "*" can be used to replace instance name, which means all the instances will have
* this property.
*
* [sink|source] means this property belongs to source or sink. This field can only be source or sink.
*
* [name] specify the name of sink or source, it is custom defined.
*
* [options] is the specific property of this source or sink.
*/
private[spark] class MetricsSystem private (val instance: String) extends Logging {
initLogging()
val confFile = System.getProperty("spark.metrics.conf")
val metricsConfig = new MetricsConfig(Option(confFile))
val sinks = new mutable.ArrayBuffer[Sink]
val sources = new mutable.ArrayBuffer[Source]
val registry = new MetricRegistry()
metricsConfig.initialize()
registerSources()
registerSinks()
def start() {
sinks.foreach(_.start)
}
def stop() {
sinks.foreach(_.stop)
}
def registerSource(source: Source) {
sources += source
try {
registry.register(source.sourceName, source.metricRegistry)
} catch {
case e: IllegalArgumentException => logInfo("Metrics already registered", e)
}
}
def registerSources() {
val instConfig = metricsConfig.getInstance(instance)
val sourceConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SOURCE_REGEX)
// Register all the sources related to instance
sourceConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val source = Class.forName(classPath).newInstance()
registerSource(source.asInstanceOf[Source])
} catch {
case e: Exception => logError("Source class " + classPath + " cannot be instantialized", e)
}
}
}
def registerSinks() {
val instConfig = metricsConfig.getInstance(instance)
val sinkConfigs = metricsConfig.subProperties(instConfig, MetricsSystem.SINK_REGEX)
sinkConfigs.foreach { kv =>
val classPath = kv._2.getProperty("class")
try {
val sink = Class.forName(classPath)
.getConstructor(classOf[Properties], classOf[MetricRegistry])
.newInstance(kv._2, registry)
sinks += sink.asInstanceOf[Sink]
} catch {
case e: Exception => logError("Sink class " + classPath + " cannot be instantialized", e)
}
}
}
}
private[spark] object MetricsSystem {
val SINK_REGEX = "^sink\\.(.+)\\.(.+)".r
val SOURCE_REGEX = "^source\\.(.+)\\.(.+)".r
val MINIMAL_POLL_UNIT = TimeUnit.SECONDS
val MINIMAL_POLL_PERIOD = 1
def checkMinimalPollingPeriod(pollUnit: TimeUnit, pollPeriod: Int) {
val period = MINIMAL_POLL_UNIT.convert(pollPeriod, pollUnit)
if (period < MINIMAL_POLL_PERIOD) {
throw new IllegalArgumentException("Polling period " + pollPeriod + " " + pollUnit +
" below than minimal polling period ")
}
}
def createMetricsSystem(instance: String): MetricsSystem = new MetricsSystem(instance)
}

View file

@ -0,0 +1,42 @@
package spark.metrics.sink
import com.codahale.metrics.{ConsoleReporter, MetricRegistry}
import java.util.Properties
import java.util.concurrent.TimeUnit
import spark.metrics.MetricsSystem
class ConsoleSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val CONSOLE_DEFAULT_PERIOD = 10
val CONSOLE_DEFAULT_UNIT = "SECONDS"
val CONSOLE_KEY_PERIOD = "period"
val CONSOLE_KEY_UNIT = "unit"
val pollPeriod = Option(property.getProperty(CONSOLE_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CONSOLE_DEFAULT_PERIOD
}
val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val reporter: ConsoleReporter = ConsoleReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build()
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
}

View file

@ -0,0 +1,51 @@
package spark.metrics.sink
import com.codahale.metrics.{CsvReporter, MetricRegistry}
import java.io.File
import java.util.{Locale, Properties}
import java.util.concurrent.TimeUnit
import spark.metrics.MetricsSystem
class CsvSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val CSV_KEY_PERIOD = "period"
val CSV_KEY_UNIT = "unit"
val CSV_KEY_DIR = "directory"
val CSV_DEFAULT_PERIOD = 10
val CSV_DEFAULT_UNIT = "SECONDS"
val CSV_DEFAULT_DIR = "/tmp/"
val pollPeriod = Option(property.getProperty(CSV_KEY_PERIOD)) match {
case Some(s) => s.toInt
case None => CSV_DEFAULT_PERIOD
}
val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val pollDir = Option(property.getProperty(CSV_KEY_DIR)) match {
case Some(s) => s
case None => CSV_DEFAULT_DIR
}
val reporter: CsvReporter = CsvReporter.forRegistry(registry)
.formatFor(Locale.US)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build(new File(pollDir))
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
}

View file

@ -0,0 +1,18 @@
package spark.metrics.sink
import com.codahale.metrics.{JmxReporter, MetricRegistry}
import java.util.Properties
class JmxSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val reporter: JmxReporter = JmxReporter.forRegistry(registry).build()
override def start() {
reporter.start()
}
override def stop() {
reporter.stop()
}
}

View file

@ -0,0 +1,6 @@
package spark.metrics.sink
trait Sink {
def start: Unit
def stop: Unit
}

View file

@ -0,0 +1,15 @@
package spark.metrics.source
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.jvm.{GarbageCollectorMetricSet, MemoryUsageGaugeSet}
class JvmSource extends Source {
val sourceName = "jvm"
val metricRegistry = new MetricRegistry()
val gcMetricSet = new GarbageCollectorMetricSet
val memGaugeSet = new MemoryUsageGaugeSet
metricRegistry.registerAll(gcMetricSet)
metricRegistry.registerAll(memGaugeSet)
}

View file

@ -0,0 +1,8 @@
package spark.metrics.source
import com.codahale.metrics.MetricRegistry
trait Source {
def sourceName: String
def metricRegistry: MetricRegistry
}

View file

@ -0,0 +1,30 @@
package spark.scheduler
import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "DAGScheduler"
metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size
})
metricRegistry.register(MetricRegistry.name("stage", "runningStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.running.size
})
metricRegistry.register(MetricRegistry.name("stage", "waitingStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.waiting.size
})
metricRegistry.register(MetricRegistry.name("job", "allJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.nextRunId.get()
})
metricRegistry.register(MetricRegistry.name("job", "activeJobs", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.activeJobs.size
})
}

View file

@ -0,0 +1,48 @@
package spark.storage
import com.codahale.metrics.{Gauge,MetricRegistry}
import spark.metrics.source.Source
import spark.storage._
private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "BlockManager"
metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
maxMem / 1024 / 1024
}
})
metricRegistry.register(MetricRegistry.name("memory", "remainingMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
remainingMem / 1024 / 1024
}
})
metricRegistry.register(MetricRegistry.name("memory", "memUsed", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val maxMem = storageStatusList.map(_.maxMem).reduce(_ + _)
val remainingMem = storageStatusList.map(_.memRemaining).reduce(_ + _)
(maxMem - remainingMem) / 1024 / 1024
}
})
metricRegistry.register(MetricRegistry.name("disk", "diskSpaceUsed", "MBytes"), new Gauge[Long] {
override def getValue: Long = {
val storageStatusList = blockManager.master.getStorageStatus
val diskSpaceUsed = storageStatusList
.flatMap(_.blocks.values.map(_.diskSize))
.reduceOption(_ + _)
.getOrElse(0L)
diskSpaceUsed / 1024 / 1024
}
})
}

View file

@ -0,0 +1,6 @@
*.sink.console.period = 10
*.sink.console.unit = seconds
*.source.jvm.class = spark.metrics.source.JvmSource
master.sink.console.period = 20
master.sink.console.unit = minutes

View file

@ -0,0 +1,7 @@
*.sink.console.period = 10
*.sink.console.unit = seconds
test.sink.console.class = spark.metrics.sink.ConsoleSink
test.sink.dummy.class = spark.metrics.sink.DummySink
test.source.dummy.class = spark.metrics.source.DummySource
test.sink.console.period = 20
test.sink.console.unit = minutes

View file

@ -0,0 +1,64 @@
package spark.metrics
import java.util.Properties
import java.io.{File, FileOutputStream}
import org.scalatest.{BeforeAndAfter, FunSuite}
import spark.metrics._
class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
before {
filePath = getClass.getClassLoader.getResource("test_metrics_config.properties").getFile()
}
test("MetricsConfig with default properties") {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
assert(conf.properties.size() === 0)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
assert(property.size() === 0)
}
test("MetricsConfig with properties set") {
val conf = new MetricsConfig(Option(filePath))
conf.initialize()
val masterProp = conf.getInstance("master")
assert(masterProp.size() === 3)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
val workerProp = conf.getInstance("worker")
assert(workerProp.size() === 3)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(masterProp.getProperty("source.jvm.class") === "spark.metrics.source.JvmSource")
}
test("MetricsConfig with subProperties") {
val conf = new MetricsConfig(Option(filePath))
conf.initialize()
val propCategories = conf.propertyCategories
assert(propCategories.size === 2)
val masterProp = conf.getInstance("master")
val sourceProps = conf.subProperties(masterProp, MetricsSystem.SOURCE_REGEX)
assert(sourceProps.size === 1)
assert(sourceProps("jvm").getProperty("class") === "spark.metrics.source.JvmSource")
val sinkProps = conf.subProperties(masterProp, MetricsSystem.SINK_REGEX)
assert(sinkProps.size === 1)
assert(sinkProps.contains("console"))
val consoleProps = sinkProps("console")
assert(consoleProps.size() === 2)
}
}

View file

@ -0,0 +1,39 @@
package spark.metrics
import java.util.Properties
import java.io.{File, FileOutputStream}
import org.scalatest.{BeforeAndAfter, FunSuite}
import spark.metrics._
class MetricsSystemSuite extends FunSuite with BeforeAndAfter {
var filePath: String = _
before {
filePath = getClass.getClassLoader.getResource("test_metrics_system.properties").getFile()
System.setProperty("spark.metrics.conf", filePath)
}
test("MetricsSystem with default config") {
val metricsSystem = MetricsSystem.createMetricsSystem("default")
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
assert(sources.length === 0)
assert(sinks.length === 0)
}
test("MetricsSystem with sources add") {
val metricsSystem = MetricsSystem.createMetricsSystem("test")
val sources = metricsSystem.sources
val sinks = metricsSystem.sinks
assert(sources.length === 0)
assert(sinks.length === 1)
val source = new spark.deploy.master.MasterSource(null)
metricsSystem.registerSource(source)
assert(sources.length === 1)
}
}

View file

@ -268,6 +268,14 @@
<groupId>org.scala-lang</groupId> <groupId>org.scala-lang</groupId>
<artifactId>scalap</artifactId> <artifactId>scalap</artifactId>
<version>${scala.version}</version> <version>${scala.version}</version>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-core</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>com.codahale.metrics</groupId>
<artifactId>metrics-jvm</artifactId>
<version>3.0.0</version>
</dependency> </dependency>
<dependency> <dependency>

View file

@ -179,7 +179,9 @@ object SparkBuild extends Build {
"net.liftweb" % "lift-json_2.9.2" % "2.5", "net.liftweb" % "lift-json_2.9.2" % "2.5",
"org.apache.mesos" % "mesos" % "0.9.0-incubating", "org.apache.mesos" % "mesos" % "0.9.0-incubating",
"io.netty" % "netty-all" % "4.0.0.Beta2", "io.netty" % "netty-all" % "4.0.0.Beta2",
"org.apache.derby" % "derby" % "10.4.2.0" % "test" "org.apache.derby" % "derby" % "10.4.2.0" % "test",
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0"
) ++ ( ) ++ (
if (HADOOP_MAJOR_VERSION == "2") { if (HADOOP_MAJOR_VERSION == "2") {
if (HADOOP_YARN) { if (HADOOP_YARN) {