From 8de8ee5d3cf01fd225336064d9586380a4fb6ad4 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 8 Sep 2013 10:07:29 -0700 Subject: [PATCH 1/4] Ganglia sink --- .../spark/metrics/sink/GangliaSink.scala | 82 +++++++++++++++++++ project/SparkBuild.scala | 1 + 2 files changed, 83 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala new file mode 100644 index 0000000000..b924907070 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GangliaSink.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import java.util.Properties +import java.util.concurrent.TimeUnit + +import com.codahale.metrics.ganglia.GangliaReporter +import com.codahale.metrics.MetricRegistry +import info.ganglia.gmetric4j.gmetric.GMetric + +import org.apache.spark.metrics.MetricsSystem + +class GangliaSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val GANGLIA_KEY_PERIOD = "period" + val GANGLIA_DEFAULT_PERIOD = 10 + + val GANGLIA_KEY_UNIT = "unit" + val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS + + val GANGLIA_KEY_MODE = "mode" + val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST + + // TTL for multicast messages. If listeners are X hops away in network, must be at least X. + val GANGLIA_KEY_TTL = "ttl" + val GANGLIA_DEFAULT_TTL = 1 + + val GANGLIA_KEY_HOST = "host" + val GANGLIA_KEY_PORT = "port" + + def propertyToOption(prop: String) = Option(property.getProperty(prop)) + + if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) { + throw new Exception("Ganglia sink requires 'host' property.") + } + + if (!propertyToOption(GANGLIA_KEY_PORT).isDefined) { + throw new Exception("Ganglia sink requires 'port' property.") + } + + val host = propertyToOption(GANGLIA_KEY_HOST).get + val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt + val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL) + val mode = propertyToOption(GANGLIA_KEY_MODE) + .map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE) + val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt) + .getOrElse(GANGLIA_DEFAULT_PERIOD) + val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase)) + .getOrElse(GANGLIA_DEFAULT_UNIT) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val ganglia = new GMetric(host, port, mode, ttl) + val reporter: GangliaReporter = GangliaReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .build(ganglia) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} + diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index d038a4f479..0603538b6a 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -207,6 +207,7 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-core" % "3.0.0", "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", + "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" ) From c190b48bf5073b3349b5060c324c890d95bc4260 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 8 Sep 2013 10:47:45 -0700 Subject: [PATCH 2/4] Adding more docs and some code cleanup --- conf/metrics.properties.template | 36 ++++++++++++++++++- .../apache/spark/metrics/MetricsConfig.scala | 7 ++-- .../spark/metrics/sink/MetricsServlet.scala | 11 +++--- .../spark/metrics/MetricsConfigSuite.scala | 19 +++++----- docs/monitoring.md | 9 +++++ 5 files changed, 62 insertions(+), 20 deletions(-) diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index 6c36f3cca4..f0e033eb98 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -31,7 +31,7 @@ # 1. To add a new sink, set the "class" option to a fully qualified class # name (see examples below). # 2. Some sinks involve a polling period. The minimum allowed polling period -# is 1 second. +# is 1 second. # 3. Wild card properties can be overridden by more specific properties. # For example, master.sink.console.period takes precedence over # *.sink.console.period. @@ -47,6 +47,40 @@ # instance master and applications. MetricsServlet may not be configured by self. # +## List of available sinks and their properties. + +# org.apache.spark.metrics.sink.ConsoleSink +# Name: Default: Description: +# period 10 Poll period +# unit seconds Units of poll period + +# org.apache.spark.metrics.sink.CSVSink +# Name: Default: Description: +# period 10 Poll period +# unit seconds Units of poll period +# directory /tmp Where to store CSV files + +# org.apache.spark.metrics.sink.GangliaSink +# Name: Default: Description: +# host NONE Hostname or multicast group of Ganglia server +# port NONE Port of Ganglia server(s) +# period 10 Poll period +# unit seconds Units of poll period +# ttl 1 TTL of messages sent by Ganglia +# mode multicast Ganglia network mode ('unicast' or 'mulitcast') + +# org.apache.spark.metrics.sink.JmxSink + +# org.apache.spark.metrics.sink.MetricsServlet +# Name: Default: Description: +# path VARIES* Path prefix from the web server root +# sample false Whether to show entire set of samples for histograms ('false' or 'true') +# +# * Default path is /metrics/json for all instances except the master. The master has two paths: +# /metrics/aplications/json # App information +# /metrics/master/json # Master information + +## Examples # Enable JmxSink for all instances by class name #*.sink.jmx.class=spark.metrics.sink.JmxSink diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index 0f9c4e00b1..caab748d60 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -37,10 +37,9 @@ private[spark] class MetricsConfig(val configFile: Option[String]) extends Loggi private def setDefaultProperties(prop: Properties) { prop.setProperty("*.sink.servlet.class", "org.apache.spark.metrics.sink.MetricsServlet") - prop.setProperty("*.sink.servlet.uri", "/metrics/json") - prop.setProperty("*.sink.servlet.sample", "false") - prop.setProperty("master.sink.servlet.uri", "/metrics/master/json") - prop.setProperty("applications.sink.servlet.uri", "/metrics/applications/json") + prop.setProperty("*.sink.servlet.path", "/metrics/json") + prop.setProperty("master.sink.servlet.path", "/metrics/master/json") + prop.setProperty("applications.sink.servlet.path", "/metrics/applications/json") } def initialize() { diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala index 4e90dd4323..99357fede6 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/MetricsServlet.scala @@ -31,18 +31,21 @@ import org.eclipse.jetty.server.Handler import org.apache.spark.ui.JettyUtils class MetricsServlet(val property: Properties, val registry: MetricRegistry) extends Sink { - val SERVLET_KEY_URI = "uri" + val SERVLET_KEY_PATH = "path" val SERVLET_KEY_SAMPLE = "sample" - val servletURI = property.getProperty(SERVLET_KEY_URI) + val SERVLET_DEFAULT_SAMPLE = false - val servletShowSample = property.getProperty(SERVLET_KEY_SAMPLE).toBoolean + val servletPath = property.getProperty(SERVLET_KEY_PATH) + + val servletShowSample = Option(property.getProperty(SERVLET_KEY_SAMPLE)).map(_.toBoolean) + .getOrElse(SERVLET_DEFAULT_SAMPLE) val mapper = new ObjectMapper().registerModule( new MetricsModule(TimeUnit.SECONDS, TimeUnit.MILLISECONDS, servletShowSample)) def getHandlers = Array[(String, Handler)]( - (servletURI, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json")) + (servletPath, JettyUtils.createHandler(request => getMetricsSnapshot(request), "text/json")) ) def getMetricsSnapshot(request: HttpServletRequest): String = { diff --git a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala index 58c94a162d..1a9ce8c607 100644 --- a/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/MetricsConfigSuite.scala @@ -30,14 +30,13 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { val conf = new MetricsConfig(Option("dummy-file")) conf.initialize() - assert(conf.properties.size() === 5) + assert(conf.properties.size() === 4) assert(conf.properties.getProperty("test-for-dummy") === null) val property = conf.getInstance("random") - assert(property.size() === 3) + assert(property.size() === 2) assert(property.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") - assert(property.getProperty("sink.servlet.uri") === "/metrics/json") - assert(property.getProperty("sink.servlet.sample") === "false") + assert(property.getProperty("sink.servlet.path") === "/metrics/json") } test("MetricsConfig with properties set") { @@ -45,22 +44,20 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { conf.initialize() val masterProp = conf.getInstance("master") - assert(masterProp.size() === 6) + assert(masterProp.size() === 5) assert(masterProp.getProperty("sink.console.period") === "20") assert(masterProp.getProperty("sink.console.unit") === "minutes") assert(masterProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource") assert(masterProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") - assert(masterProp.getProperty("sink.servlet.uri") === "/metrics/master/json") - assert(masterProp.getProperty("sink.servlet.sample") === "false") + assert(masterProp.getProperty("sink.servlet.path") === "/metrics/master/json") val workerProp = conf.getInstance("worker") - assert(workerProp.size() === 6) + assert(workerProp.size() === 5) assert(workerProp.getProperty("sink.console.period") === "10") assert(workerProp.getProperty("sink.console.unit") === "seconds") assert(workerProp.getProperty("source.jvm.class") === "org.apache.spark.metrics.source.JvmSource") assert(workerProp.getProperty("sink.servlet.class") === "org.apache.spark.metrics.sink.MetricsServlet") - assert(workerProp.getProperty("sink.servlet.uri") === "/metrics/json") - assert(workerProp.getProperty("sink.servlet.sample") === "false") + assert(workerProp.getProperty("sink.servlet.path") === "/metrics/json") } test("MetricsConfig with subProperties") { @@ -84,6 +81,6 @@ class MetricsConfigSuite extends FunSuite with BeforeAndAfter { assert(consoleProps.size() === 2) val servletProps = sinkProps("servlet") - assert(servletProps.size() === 3) + assert(servletProps.size() === 2) } } diff --git a/docs/monitoring.md b/docs/monitoring.md index 0ec987107c..4c4f174503 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -31,6 +31,15 @@ set of sinks to which metrics are reported. The following instances are currentl * `executor`: A Spark executor. * `driver`: The Spark driver process (the process in which your SparkContext is created). +Each instance can report to zero or more _sinks_. Sinks are contained in the +`org.apache.spark.metrics.sink` package: + +* `ConsoleSink`: Logs metrics information to the console. +* `CSVSink`: Exports metrics data to CSV files at regular intervals. +* `GangliaSink`: Sends metrics to a Ganglia node or multicast group. +* `JmxSink`: Registers metrics for viewing in a JXM console. +* `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data. + The syntax of the metrics configuration file is defined in an example configuration file, `$SPARK_HOME/conf/metrics.conf.template`. From 8026537597806d4249c4b1362f8954b6a3e51205 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 8 Sep 2013 16:06:32 -0700 Subject: [PATCH 3/4] Fixing package name in template conf --- conf/metrics.properties.template | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index f0e033eb98..ae10f615d1 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -82,10 +82,10 @@ ## Examples # Enable JmxSink for all instances by class name -#*.sink.jmx.class=spark.metrics.sink.JmxSink +#*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink # Enable ConsoleSink for all instances by class name -#*.sink.console.class=spark.metrics.sink.ConsoleSink +#*.sink.console.class=org.apache.spark.metrics.sink.ConsoleSink # Polling period for ConsoleSink #*.sink.console.period=10 @@ -98,7 +98,7 @@ #master.sink.console.unit=seconds # Enable CsvSink for all instances -#*.sink.csv.class=spark.metrics.sink.CsvSink +#*.sink.csv.class=org.apache.spark.metrics.sink.CsvSink # Polling period for CsvSink #*.sink.csv.period=1 @@ -114,11 +114,11 @@ #worker.sink.csv.unit=minutes # Enable jvm source for instance master, worker, driver and executor -#master.source.jvm.class=spark.metrics.source.JvmSource +#master.source.jvm.class=org.apache.spark.metrics.source.JvmSource -#worker.source.jvm.class=spark.metrics.source.JvmSource +#worker.source.jvm.class=org.apache.spark.metrics.source.JvmSource -#driver.source.jvm.class=spark.metrics.source.JvmSource +#driver.source.jvm.class=org.apache.spark.metrics.source.JvmSource -#executor.source.jvm.class=spark.metrics.source.JvmSource +#executor.source.jvm.class=org.apache.spark.metrics.source.JvmSource From b4e382c210b4987da78421f5de11199e4d74f0e7 Mon Sep 17 00:00:00 2001 From: Patrick Wendell Date: Sun, 8 Sep 2013 16:06:49 -0700 Subject: [PATCH 4/4] Adding sc name in metrics source --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- .../src/main/scala/org/apache/spark/executor/Executor.scala | 2 +- .../scala/org/apache/spark/executor/ExecutorSource.scala | 5 +++-- .../org/apache/spark/scheduler/DAGSchedulerSource.scala | 6 ++++-- .../scala/org/apache/spark/storage/BlockManagerSource.scala | 6 ++++-- 5 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 89318712a5..4f711a5ea6 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -282,8 +282,8 @@ class SparkContext( // Post init taskScheduler.postStartHook() - val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler) - val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager) + val dagSchedulerSource = new DAGSchedulerSource(this.dagScheduler, this) + val blockManagerSource = new BlockManagerSource(SparkEnv.get.blockManager, this) def initDriverMetrics() { SparkEnv.get.metricsSystem.registerSource(dagSchedulerSource) diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index d365804994..ceae3b8289 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -98,7 +98,7 @@ private[spark] class Executor( } ) - val executorSource = new ExecutorSource(this) + val executorSource = new ExecutorSource(this, executorId) // Initialize Spark environment (using system properties read above) val env = SparkEnv.createFromSystemProperties(executorId, slaveHostname, 0, false, false) diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index bf8fb4fd21..18c9dc1c0a 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -27,7 +27,7 @@ import scala.collection.JavaConversions._ import org.apache.spark.metrics.source.Source -class ExecutorSource(val executor: Executor) extends Source { +class ExecutorSource(val executor: Executor, executorId: String) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = FileSystem.getAllStatistics().filter(s => s.getScheme.equals(scheme)).headOption @@ -39,7 +39,8 @@ class ExecutorSource(val executor: Executor) extends Source { } val metricRegistry = new MetricRegistry() - val sourceName = "executor" + // TODO: It would be nice to pass the application name here + val sourceName = "executor.%s".format(executorId) // Gauge for executor thread pool's actively executing task counts metricRegistry.register(MetricRegistry.name("threadpool", "activeTask", "count"), new Gauge[Int] { diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 22e3723ac8..446d490cc9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -20,10 +20,12 @@ package org.apache.spark.scheduler import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) extends Source { +private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "DAGScheduler" + val sourceName = "%s.DAGScheduler".format(sc.appName) metricRegistry.register(MetricRegistry.name("stage", "failedStages", "number"), new Gauge[Int] { override def getValue: Int = dagScheduler.failed.size diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 3d709cfde4..acc3951088 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -20,11 +20,13 @@ package org.apache.spark.storage import com.codahale.metrics.{Gauge,MetricRegistry} import org.apache.spark.metrics.source.Source +import org.apache.spark.SparkContext -private[spark] class BlockManagerSource(val blockManager: BlockManager) extends Source { +private[spark] class BlockManagerSource(val blockManager: BlockManager, sc: SparkContext) + extends Source { val metricRegistry = new MetricRegistry() - val sourceName = "BlockManager" + val sourceName = "%s.BlockManager".format(sc.appName) metricRegistry.register(MetricRegistry.name("memory", "maxMem", "MBytes"), new Gauge[Long] { override def getValue: Long = {