From ef85a51f85c9720bc091367a0d4f80e7ed6b9778 Mon Sep 17 00:00:00 2001 From: Russell Cardullo Date: Fri, 8 Nov 2013 16:36:03 -0800 Subject: [PATCH] Add graphite sink for metrics This adds a metrics sink for graphite. The sink must be configured with the host and port of a graphite node and optionally may be configured with a prefix that will be prepended to all metrics that are sent to graphite. --- conf/metrics.properties.template | 8 ++ core/pom.xml | 4 + .../spark/metrics/sink/GraphiteSink.scala | 82 +++++++++++++++++++ docs/monitoring.md | 1 + project/SparkBuild.scala | 1 + 5 files changed, 96 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala diff --git a/conf/metrics.properties.template b/conf/metrics.properties.template index ae10f615d1..1c3d94e1b0 100644 --- a/conf/metrics.properties.template +++ b/conf/metrics.properties.template @@ -80,6 +80,14 @@ # /metrics/aplications/json # App information # /metrics/master/json # Master information +# org.apache.spark.metrics.sink.GraphiteSink +# Name: Default: Description: +# host NONE Hostname of Graphite server +# port NONE Port of Graphite server +# period 10 Poll period +# unit seconds Units of poll period +# prefix EMPTY STRING Prefix to prepend to metric name + ## Examples # Enable JmxSink for all instances by class name #*.sink.jmx.class=org.apache.spark.metrics.sink.JmxSink diff --git a/core/pom.xml b/core/pom.xml index 8621d257e5..6af229c71d 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -158,6 +158,10 @@ com.codahale.metrics metrics-ganglia + + com.codahale.metrics + metrics-graphite + org.apache.derby derby diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala new file mode 100644 index 0000000000..eb1315e6de --- /dev/null +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.metrics.sink + +import com.codahale.metrics.MetricRegistry +import com.codahale.metrics.graphite.{GraphiteReporter, Graphite} + +import java.util.Properties +import java.util.concurrent.TimeUnit +import java.net.InetSocketAddress + +import org.apache.spark.metrics.MetricsSystem + +class GraphiteSink(val property: Properties, val registry: MetricRegistry) extends Sink { + val GRAPHITE_DEFAULT_PERIOD = 10 + val GRAPHITE_DEFAULT_UNIT = "SECONDS" + val GRAPHITE_DEFAULT_PREFIX = "" + + val GRAPHITE_KEY_HOST = "host" + val GRAPHITE_KEY_PORT = "port" + val GRAPHITE_KEY_PERIOD = "period" + val GRAPHITE_KEY_UNIT = "unit" + val GRAPHITE_KEY_PREFIX = "prefix" + + def propertyToOption(prop: String) = Option(property.getProperty(prop)) + + if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) { + throw new Exception("Graphite sink requires 'host' property.") + } + + if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) { + throw new Exception("Graphite sink requires 'port' property.") + } + + val host = propertyToOption(GRAPHITE_KEY_HOST).get + val port = propertyToOption(GRAPHITE_KEY_PORT).get.toInt + + val pollPeriod = Option(property.getProperty(GRAPHITE_KEY_PERIOD)) match { + case Some(s) => s.toInt + case None => GRAPHITE_DEFAULT_PERIOD + } + + val pollUnit = Option(property.getProperty(GRAPHITE_KEY_UNIT)) match { + case Some(s) => TimeUnit.valueOf(s.toUpperCase()) + case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT) + } + + val prefix = propertyToOption(GRAPHITE_KEY_PREFIX).getOrElse(GRAPHITE_DEFAULT_PREFIX) + + MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod) + + val graphite: Graphite = new Graphite(new InetSocketAddress(host, port)) + + val reporter: GraphiteReporter = GraphiteReporter.forRegistry(registry) + .convertDurationsTo(TimeUnit.MILLISECONDS) + .convertRatesTo(TimeUnit.SECONDS) + .prefixedWith(prefix) + .build(graphite) + + override def start() { + reporter.start(pollPeriod, pollUnit) + } + + override def stop() { + reporter.stop() + } +} diff --git a/docs/monitoring.md b/docs/monitoring.md index 5f456b999b..5ed0474477 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -50,6 +50,7 @@ Each instance can report to zero or more _sinks_. Sinks are contained in the * `GangliaSink`: Sends metrics to a Ganglia node or multicast group. * `JmxSink`: Registers metrics for viewing in a JXM console. * `MetricsServlet`: Adds a servlet within the existing Spark UI to serve metrics data as JSON data. +* `GraphiteSink`: Sends metrics to a Graphite node. The syntax of the metrics configuration file is defined in an example configuration file, `$SPARK_HOME/conf/metrics.conf.template`. diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 45fd30a7c8..0bc2ca8d08 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -229,6 +229,7 @@ object SparkBuild extends Build { "com.codahale.metrics" % "metrics-jvm" % "3.0.0", "com.codahale.metrics" % "metrics-json" % "3.0.0", "com.codahale.metrics" % "metrics-ganglia" % "3.0.0", + "com.codahale.metrics" % "metrics-graphite" % "3.0.0", "com.twitter" % "chill_2.9.3" % "0.3.1", "com.twitter" % "chill-java" % "0.3.1" )