Merge pull request #906 from pwendell/ganglia-sink

Clean-up of Metrics Code/Docs and Add Ganglia Sink
This commit is contained in:
Patrick Wendell 2013-09-08 18:32:16 -07:00
commit f68848d95d
12 changed files with 166 additions and 36 deletions

View file

@ -31,7 +31,7 @@
# 1. To add a new sink, set the "class" option to a fully qualified class
# name (see examples below).
# 2. Some sinks involve a polling period. The minimum allowed polling period
# is 1 second.
# is 1 second.
# 3. Wild card properties can be overridden by more specific properties.
# For example, master.sink.console.period takes precedence over
# *.sink.console.period.
@ -47,11 +47,45 @@
# instance master and applications. MetricsServlet may not be configured by self.
#
## List of available sinks and their properties.
# org.apache.spark.metrics.sink.ConsoleSink
# Name: Default: Description:
# period 10 Poll period
# unit seconds Units of poll period
# org.apache.spark.metrics.sink.CSVSink
# Name: Default: Description:
# period 10 Poll period
# unit seconds Units of poll period
# directory /tmp Where to store CSV files
# org.apache.spark.metrics.sink.GangliaSink
# Name: Default: Description:
# host NONE Hostname or multicast group of Ganglia server
# port NONE Port of Ganglia server(s)
# period 10 Poll period
# unit seconds Units of poll period
# ttl 1 TTL of messages sent by Ganglia
# mode multicast Ganglia network mode ('unicast' or 'mulitcast')
# org.apache.spark.metrics.sink.JmxSink
# org.apache.spark.metrics.sink.MetricsServlet
# Name: Default: Description:
# path VARIES* Path prefix from the web server root
# sample false Whether to show entire set of samples for histograms ('false' or 'true')
#
# * Default path is /metrics/json for all instances except the master. The master has two paths:
# /metrics/aplications/json # App information
# /metrics/master/json # Master information
## Examples
# Enable JmxSink for all instances by class name
#*.sink.jmx.class=spark.metrics.sink.JmxSink
#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink
# Enable ConsoleSink for all instances by class name
#*.sink.console.class=spark.metrics.sink.ConsoleSink
#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink
# Polling period for ConsoleSink
#*.sink.console.period=10
@ -64,7 +98,7 @@
#master.sink.console.unit=seconds
# Enable CsvSink for all instances
#*.sink.csv.class=spark.metrics.sink.CsvSink
#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink
# Polling period for CsvSink
#*.sink.csv.period=1
@ -80,11 +114,11 @@
#worker.sink.csv.unit=minutes
# Enable jvm source for instance master, worker, driver and executor
#master.source.jvm.class=spark.metrics.source.JvmSource
#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource
#worker.source.jvm.class=spark.metrics.source.JvmSource
#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource
#driver.source.jvm.class=spark.metrics.source.JvmSource
#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource
#executor.source.jvm.class=spark.metrics.source.JvmSource
#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource

View file

@ -282,8 +282,8 @@ class SparkContext(
// Post init
taskScheduler.postStartHook()
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler)
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager)
val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this)
val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this)
def initDriverMetrics() {
SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource)

View file

@ -98,7 +98,7 @@ private[spark] class Executor(
}
)
val executorSource = new ExecutorSource(this)
val executorSource = new ExecutorSource(this, executorId)
// Initialize Spark environment (using system properties read above)
val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false)

View file

@ -27,7 +27,7 @@ import scala.collection.JavaConversions._
import org.apache.spark.metrics.source.Source
class ExecutorSource(val executor: Executor) extends Source {
class ExecutorSource(val executor: Executor, executorId: String) extends Source {
private def fileStats(scheme: String) : Option[FileSystem.Statistics] =
FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption
@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source {
}
val metricRegistry = new MetricRegistry()
val sourceName = "executor"
// TODO: It would be nice to pass the application name here
val sourceName = "executor.%s".format(executorId)
// Gauge for executor thread pool's actively executing task counts
metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] {

View file

@ -37,10 +37,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi
private def setDefaultProperties(prop: Properties) {
prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet")
prop.setProperty("*.sink.servlet.uri", "/metrics/json")
prop.setProperty("*.sink.servlet.sample", "false")
prop.setProperty("master.sink.servlet.uri", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json")
prop.setProperty("*.sink.servlet.path", "/metrics/json")
prop.setProperty("master.sink.servlet.path", "/metrics/master/json")
prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json")
}
def initialize() {

View file

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.metrics.sink
import java.util.Properties
import java.util.concurrent.TimeUnit
import com.codahale.metrics.ganglia.GangliaReporter
import com.codahale.metrics.MetricRegistry
import info.ganglia.gmetric4j.gmetric.GMetric
import org.apache.spark.metrics.MetricsSystem
class GangliaSink(val property: Properties, val registry: MetricRegistry) extends Sink {
val GANGLIA_KEY_PERIOD = "period"
val GANGLIA_DEFAULT_PERIOD = 10
val GANGLIA_KEY_UNIT = "unit"
val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS
val GANGLIA_KEY_MODE = "mode"
val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST
// TTL for multicast messages. If listeners are X hops away in network, must be at least X.
val GANGLIA_KEY_TTL = "ttl"
val GANGLIA_DEFAULT_TTL = 1
val GANGLIA_KEY_HOST = "host"
val GANGLIA_KEY_PORT = "port"
def propertyToOption(prop: String) = Option(property.getProperty(prop))
if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
throw new Exception("Ganglia sink requires 'host' property.")
}
if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) {
throw new Exception("Ganglia sink requires 'port' property.")
}
val host = propertyToOption(GANGLIA_KEY_HOST).get
val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
val mode = propertyToOption(GANGLIA_KEY_MODE)
.map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
.getOrElse(GANGLIA_DEFAULT_PERIOD)
val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
.getOrElse(GANGLIA_DEFAULT_UNIT)
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)
val ganglia = new GMetric(host, port, mode, ttl)
val reporter: GangliaReporter = GangliaReporter.forRegistry(registry)
.convertDurationsTo(TimeUnit.MILLISECONDS)
.convertRatesTo(TimeUnit.SECONDS)
.build(ganglia)
override def start() {
reporter.start(pollPeriod, pollUnit)
}
override def stop() {
reporter.stop()
}
}

View file

@ -31,18 +31,21 @@ import org.eclipse.jetty.server.Handler
import org.apache.spark.ui.JettyUtils
class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink {
val SERVLET_KEY_URI = "uri"
val SERVLET_KEY_PATH = "path"
val SERVLET_KEY_SAMPLE = "sample"
val servletURI = property.getProperty(SERVLET_KEY_URI)
val SERVLET_DEFAULT_SAMPLE = false
val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean
val servletPath = property.getProperty(SERVLET_KEY_PATH)
val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean)
.getOrElse(SERVLET_DEFAULT_SAMPLE)
val mapper = new ObjectMapper().registerModule(
new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample))
def getHandlers = Array[(String, Handler)](
(servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
(servletPath, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json"))
)
def getMetricsSnapshot(request: HttpServletRequest): String = {

View file

@ -20,10 +20,12 @@ package org.apache.spark.scheduler
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
import org.apache.spark.SparkContext
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source {
private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext)
extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "DAGScheduler"
val sourceName = "%s.DAGScheduler".format(sc.appName)
metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] {
override def getValue: Int = dagScheduler.failed.size

View file

@ -20,11 +20,13 @@ package org.apache.spark.storage
import com.codahale.metrics.{Gauge,MetricRegistry}
import org.apache.spark.metrics.source.Source
import org.apache.spark.SparkContext
private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source {
private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext)
extends Source {
val metricRegistry = new MetricRegistry()
val sourceName = "BlockManager"
val sourceName = "%s.BlockManager".format(sc.appName)
metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] {
override def getValue: Long = {

View file

@ -30,14 +30,13 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
val conf = new MetricsConfig(Option("dummy-file"))
conf.initialize()
assert(conf.properties.size() === 5)
assert(conf.properties.size() === 4)
assert(conf.properties.getProperty("test-for-dummy") === null)
val property = conf.getInstance("random")
assert(property.size() === 3)
assert(property.size() === 2)
assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
assert(property.getProperty("sink.servlet.uri") === "/metrics/json")
assert(property.getProperty("sink.servlet.sample") === "false")
assert(property.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with properties set") {
@ -45,22 +44,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
conf.initialize()
val masterProp = conf.getInstance("master")
assert(masterProp.size() === 6)
assert(masterProp.size() === 5)
assert(masterProp.getProperty("sink.console.period") === "20")
assert(masterProp.getProperty("sink.console.unit") === "minutes")
assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json")
assert(masterProp.getProperty("sink.servlet.sample") === "false")
assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json")
val workerProp = conf.getInstance("worker")
assert(workerProp.size() === 6)
assert(workerProp.size() === 5)
assert(workerProp.getProperty("sink.console.period") === "10")
assert(workerProp.getProperty("sink.console.unit") === "seconds")
assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource")
assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet")
assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json")
assert(workerProp.getProperty("sink.servlet.sample") === "false")
assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json")
}
test("MetricsConfig with subProperties") {
@ -84,6 +81,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter {
assert(consoleProps.size() === 2)
val servletProps = sinkProps("servlet")
assert(servletProps.size() === 3)
assert(servletProps.size() === 2)
}
}

View file

@ -31,6 +31,15 @@ set of sinks to which metrics are reported. The following instances are currentl
* `executor`: A Spark executor.
* `driver`: The Spark driver process (the process in which your SparkContext is created).
Each instance can report to zero or more _sinks_. Sinks are contained in the
`org.apache.spark.metrics.sink` package:
* `ConsoleSink`: Logs metrics information to the console.
* `CSVSink`: Exports metrics data to CSV files at regular intervals.
* `GangliaSink`: Sends metrics to a Ganglia node or multicast group.
* `JmxSink`: Registers metrics for viewing in a JXM console.
* `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data.
The syntax of the metrics configuration file is defined in an example configuration file,
`$SPARK_HOME/conf/metrics.conf.template`.

View file

@ -213,6 +213,7 @@ object SparkBuild extends Build {
"com.codahale.metrics" % "metrics-core" % "3.0.0",
"com.codahale.metrics" % "metrics-jvm" % "3.0.0",
"com.codahale.metrics" % "metrics-json" % "3.0.0",
"com.codahale.metrics" % "metrics-ganglia" % "3.0.0",
"com.twitter" % "chill_2.9.3" % "0.3.1",
"com.twitter" % "chill-java" % "0.3.1"
)