From a802c69b130b69a35b372ffe1b01289577f6fafb Mon Sep 17 00:00:00 2001 From: "marek.simunek" Date: Mon, 1 Oct 2018 11:04:37 -0500 Subject: [PATCH] [SPARK-18364][YARN] Expose metrics for YarnShuffleService ## What changes were proposed in this pull request? This PR is follow-up of closed https://github.com/apache/spark/pull/17401 which only ended due to of inactivity, but its still nice feature to have. Given review by jerryshao taken in consideration and edited: - VisibleForTesting deleted because of dependency conflicts - removed unnecessary reflection for `MetricsSystemImpl` - added more available types for gauge ## How was this patch tested? Manual deploy of new yarn-shuffle jar into a Node Manager and verifying that the metrics appear in the Node Manager-standard location. This is JMX with an query endpoint running on `hostname:port` Resulting metrics look like this: ``` curl -sk -XGET hostname:port | grep -v '#' | grep 'shuffleService' hadoop_nodemanager_openblockrequestlatencymillis_rate15{name="shuffleService",} 0.31428910657834713 hadoop_nodemanager_blocktransferratebytes_rate15{name="shuffleService",} 566144.9983653595 hadoop_nodemanager_blocktransferratebytes_ratemean{name="shuffleService",} 2464409.9678099006 hadoop_nodemanager_openblockrequestlatencymillis_rate1{name="shuffleService",} 1.2893844732240272 hadoop_nodemanager_registeredexecutorssize{name="shuffleService",} 2.0 hadoop_nodemanager_openblockrequestlatencymillis_ratemean{name="shuffleService",} 1.255574678369966 hadoop_nodemanager_openblockrequestlatencymillis_count{name="shuffleService",} 315.0 hadoop_nodemanager_openblockrequestlatencymillis_rate5{name="shuffleService",} 0.7661929192569739 hadoop_nodemanager_registerexecutorrequestlatencymillis_ratemean{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_count{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate1{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate5{name="shuffleService",} 0.0 hadoop_nodemanager_blocktransferratebytes_count{name="shuffleService",} 6.18271213E8 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate15{name="shuffleService",} 0.0 hadoop_nodemanager_blocktransferratebytes_rate5{name="shuffleService",} 1154114.4881816586 hadoop_nodemanager_blocktransferratebytes_rate1{name="shuffleService",} 574745.0749848988 ``` Closes #22485 from mareksimunek/SPARK-18364. Lead-authored-by: marek.simunek Co-authored-by: Andrew Ash Signed-off-by: Thomas Graves --- .../network/yarn/YarnShuffleService.java | 11 ++ .../yarn/YarnShuffleServiceMetrics.java | 137 ++++++++++++++++++ .../yarn/YarnShuffleServiceMetricsSuite.scala | 73 ++++++++++ 3 files changed, 221 insertions(+) create mode 100644 common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java create mode 100644 resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java index d8b2ed6b5d..72ae1a1295 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleService.java @@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.server.api.*; import org.apache.spark.network.util.LevelDBProvider; @@ -168,6 +170,15 @@ public class YarnShuffleService extends AuxiliaryService { TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf)); blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile); + // register metrics on the block handler into the Node Manager's metrics system. + YarnShuffleServiceMetrics serviceMetrics = + new YarnShuffleServiceMetrics(blockHandler.getAllMetrics()); + + MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance(); + metricsSystem.register( + "sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics); + logger.info("Registered metrics with Hadoop's DefaultMetricsSystem"); + // If authentication is enabled, set up the shuffle server to use a // special RPC handler that filters out unauthenticated fetch requests List bootstraps = Lists.newArrayList(); diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java new file mode 100644 index 0000000000..3e4d479b86 --- /dev/null +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.yarn; + +import java.util.Map; + +import com.codahale.metrics.*; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; + +/** + * Forward {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics} + * to hadoop metrics system. + * NodeManager by default exposes JMX endpoint where can be collected. + */ +class YarnShuffleServiceMetrics implements MetricsSource { + + private final MetricSet metricSet; + + YarnShuffleServiceMetrics(MetricSet metricSet) { + this.metricSet = metricSet; + } + + /** + * Get metrics from the source + * + * @param collector to contain the resulting metrics snapshot + * @param all if true, return all metrics even if unchanged. + */ + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService"); + + for (Map.Entry entry : metricSet.getMetrics().entrySet()) { + collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue()); + } + } + + /** + * The metric types used in + * {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}. + * Visible for testing. + */ + public static void collectMetric( + MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { + + if (metric instanceof Timer) { + Timer t = (Timer) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), + t.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name), + t.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name), + t.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), + t.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), + t.getMeanRate()); + } else if (metric instanceof Meter) { + Meter m = (Meter) metric; + metricsRecordBuilder + .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name), + m.getCount()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name), + m.getFifteenMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name), + m.getFiveMinuteRate()) + .addGauge( + new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name), + m.getOneMinuteRate()) + .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name), + m.getMeanRate()); + } else if (metric instanceof Gauge) { + final Object gaugeValue = ((Gauge) metric).getValue(); + if (gaugeValue instanceof Integer) { + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Integer) gaugeValue); + } else if (gaugeValue instanceof Long) { + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Long) gaugeValue); + } else if (gaugeValue instanceof Float) { + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Float) gaugeValue); + } else if (gaugeValue instanceof Double) { + metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Double) gaugeValue); + } else { + throw new IllegalStateException( + "Not supported class type of metric[" + name + "] for value " + gaugeValue); + } + } + } + + private static MetricsInfo getShuffleServiceMetricsInfo(String name) { + return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name); + } + + private static class ShuffleServiceMetricsInfo implements MetricsInfo { + + private final String name; + private final String description; + + ShuffleServiceMetricsInfo(String name, String description) { + this.name = name; + this.description = description; + } + + @Override + public String name() { + return name; + } + + @Override + public String description() { + return description; + } + } +} diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala new file mode 100644 index 0000000000..40b92282a3 --- /dev/null +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.network.yarn + +import scala.collection.JavaConverters._ + +import org.apache.hadoop.metrics2.MetricsRecordBuilder +import org.mockito.Matchers._ +import org.mockito.Mockito.{mock, times, verify, when} +import org.scalatest.Matchers + +import org.apache.spark.SparkFunSuite +import org.apache.spark.network.server.OneForOneStreamManager +import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver} + +class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { + + val streamManager = mock(classOf[OneForOneStreamManager]) + val blockResolver = mock(classOf[ExternalShuffleBlockResolver]) + when(blockResolver.getRegisteredExecutorsSize).thenReturn(42) + + val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics + + test("metrics named as expected") { + val allMetrics = Set( + "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes", "registeredExecutorsSize") + + metrics.getMetrics.keySet().asScala should be (allMetrics) + } + + // these three metrics have the same effect on the collector + for (testname <- Seq("openBlockRequestLatencyMillis", + "registerExecutorRequestLatencyMillis", + "blockTransferRateBytes")) { + test(s"$testname - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + when(builder.addCounter(any(), anyLong())).thenReturn(builder) + when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + + YarnShuffleServiceMetrics.collectMetric(builder, testname, + metrics.getMetrics.get(testname)) + + verify(builder).addCounter(anyObject(), anyLong()) + verify(builder, times(4)).addGauge(anyObject(), anyDouble()) + } + } + + // this metric writes only one gauge to the collector + test("registeredExecutorsSize - collector receives correct types") { + val builder = mock(classOf[MetricsRecordBuilder]) + + YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize", + metrics.getMetrics.get("registeredExecutorsSize")) + + // only one + verify(builder).addGauge(anyObject(), anyInt()) + } +}