[SPARK-18364][YARN] Expose metrics for YarnShuffleService
## What changes were proposed in this pull request? This PR is follow-up of closed https://github.com/apache/spark/pull/17401 which only ended due to of inactivity, but its still nice feature to have. Given review by jerryshao taken in consideration and edited: - VisibleForTesting deleted because of dependency conflicts - removed unnecessary reflection for `MetricsSystemImpl` - added more available types for gauge ## How was this patch tested? Manual deploy of new yarn-shuffle jar into a Node Manager and verifying that the metrics appear in the Node Manager-standard location. This is JMX with an query endpoint running on `hostname:port` Resulting metrics look like this: ``` curl -sk -XGET hostname:port | grep -v '#' | grep 'shuffleService' hadoop_nodemanager_openblockrequestlatencymillis_rate15{name="shuffleService",} 0.31428910657834713 hadoop_nodemanager_blocktransferratebytes_rate15{name="shuffleService",} 566144.9983653595 hadoop_nodemanager_blocktransferratebytes_ratemean{name="shuffleService",} 2464409.9678099006 hadoop_nodemanager_openblockrequestlatencymillis_rate1{name="shuffleService",} 1.2893844732240272 hadoop_nodemanager_registeredexecutorssize{name="shuffleService",} 2.0 hadoop_nodemanager_openblockrequestlatencymillis_ratemean{name="shuffleService",} 1.255574678369966 hadoop_nodemanager_openblockrequestlatencymillis_count{name="shuffleService",} 315.0 hadoop_nodemanager_openblockrequestlatencymillis_rate5{name="shuffleService",} 0.7661929192569739 hadoop_nodemanager_registerexecutorrequestlatencymillis_ratemean{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_count{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate1{name="shuffleService",} 0.0 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate5{name="shuffleService",} 0.0 hadoop_nodemanager_blocktransferratebytes_count{name="shuffleService",} 6.18271213E8 hadoop_nodemanager_registerexecutorrequestlatencymillis_rate15{name="shuffleService",} 0.0 hadoop_nodemanager_blocktransferratebytes_rate5{name="shuffleService",} 1154114.4881816586 hadoop_nodemanager_blocktransferratebytes_rate1{name="shuffleService",} 574745.0749848988 ``` Closes #22485 from mareksimunek/SPARK-18364. Lead-authored-by: marek.simunek <marek.simunek@firma.seznam.cz> Co-authored-by: Andrew Ash <andrew@andrewash.com> Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
parent
b96fd44f0e
commit
a802c69b13
|
@ -35,6 +35,8 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.server.api.*;
|
||||
import org.apache.spark.network.util.LevelDBProvider;
|
||||
|
@ -168,6 +170,15 @@ public class YarnShuffleService extends AuxiliaryService {
|
|||
TransportConf transportConf = new TransportConf("shuffle", new HadoopConfigProvider(conf));
|
||||
blockHandler = new ExternalShuffleBlockHandler(transportConf, registeredExecutorFile);
|
||||
|
||||
// register metrics on the block handler into the Node Manager's metrics system.
|
||||
YarnShuffleServiceMetrics serviceMetrics =
|
||||
new YarnShuffleServiceMetrics(blockHandler.getAllMetrics());
|
||||
|
||||
MetricsSystemImpl metricsSystem = (MetricsSystemImpl) DefaultMetricsSystem.instance();
|
||||
metricsSystem.register(
|
||||
"sparkShuffleService", "Metrics on the Spark Shuffle Service", serviceMetrics);
|
||||
logger.info("Registered metrics with Hadoop's DefaultMetricsSystem");
|
||||
|
||||
// If authentication is enabled, set up the shuffle server to use a
|
||||
// special RPC handler that filters out unauthenticated fetch requests
|
||||
List<TransportServerBootstrap> bootstraps = Lists.newArrayList();
|
||||
|
|
|
@ -0,0 +1,137 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.spark.network.yarn;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import com.codahale.metrics.*;
|
||||
import org.apache.hadoop.metrics2.MetricsCollector;
|
||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
|
||||
/**
|
||||
* Forward {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}
|
||||
* to hadoop metrics system.
|
||||
* NodeManager by default exposes JMX endpoint where can be collected.
|
||||
*/
|
||||
class YarnShuffleServiceMetrics implements MetricsSource {
|
||||
|
||||
private final MetricSet metricSet;
|
||||
|
||||
YarnShuffleServiceMetrics(MetricSet metricSet) {
|
||||
this.metricSet = metricSet;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get metrics from the source
|
||||
*
|
||||
* @param collector to contain the resulting metrics snapshot
|
||||
* @param all if true, return all metrics even if unchanged.
|
||||
*/
|
||||
@Override
|
||||
public void getMetrics(MetricsCollector collector, boolean all) {
|
||||
MetricsRecordBuilder metricsRecordBuilder = collector.addRecord("sparkShuffleService");
|
||||
|
||||
for (Map.Entry<String, Metric> entry : metricSet.getMetrics().entrySet()) {
|
||||
collectMetric(metricsRecordBuilder, entry.getKey(), entry.getValue());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The metric types used in
|
||||
* {@link org.apache.spark.network.shuffle.ExternalShuffleBlockHandler.ShuffleMetrics}.
|
||||
* Visible for testing.
|
||||
*/
|
||||
public static void collectMetric(
|
||||
MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
|
||||
|
||||
if (metric instanceof Timer) {
|
||||
Timer t = (Timer) metric;
|
||||
metricsRecordBuilder
|
||||
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
|
||||
t.getCount())
|
||||
.addGauge(
|
||||
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of timer " + name),
|
||||
t.getFifteenMinuteRate())
|
||||
.addGauge(
|
||||
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of timer " + name),
|
||||
t.getFiveMinuteRate())
|
||||
.addGauge(
|
||||
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name),
|
||||
t.getOneMinuteRate())
|
||||
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
|
||||
t.getMeanRate());
|
||||
} else if (metric instanceof Meter) {
|
||||
Meter m = (Meter) metric;
|
||||
metricsRecordBuilder
|
||||
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of meter " + name),
|
||||
m.getCount())
|
||||
.addGauge(
|
||||
new ShuffleServiceMetricsInfo(name + "_rate15", "15 minute rate of meter " + name),
|
||||
m.getFifteenMinuteRate())
|
||||
.addGauge(
|
||||
new ShuffleServiceMetricsInfo(name + "_rate5", "5 minute rate of meter " + name),
|
||||
m.getFiveMinuteRate())
|
||||
.addGauge(
|
||||
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of meter " + name),
|
||||
m.getOneMinuteRate())
|
||||
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of meter " + name),
|
||||
m.getMeanRate());
|
||||
} else if (metric instanceof Gauge) {
|
||||
final Object gaugeValue = ((Gauge) metric).getValue();
|
||||
if (gaugeValue instanceof Integer) {
|
||||
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Integer) gaugeValue);
|
||||
} else if (gaugeValue instanceof Long) {
|
||||
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Long) gaugeValue);
|
||||
} else if (gaugeValue instanceof Float) {
|
||||
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Float) gaugeValue);
|
||||
} else if (gaugeValue instanceof Double) {
|
||||
metricsRecordBuilder.addGauge(getShuffleServiceMetricsInfo(name), (Double) gaugeValue);
|
||||
} else {
|
||||
throw new IllegalStateException(
|
||||
"Not supported class type of metric[" + name + "] for value " + gaugeValue);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static MetricsInfo getShuffleServiceMetricsInfo(String name) {
|
||||
return new ShuffleServiceMetricsInfo(name, "Value of gauge " + name);
|
||||
}
|
||||
|
||||
private static class ShuffleServiceMetricsInfo implements MetricsInfo {
|
||||
|
||||
private final String name;
|
||||
private final String description;
|
||||
|
||||
ShuffleServiceMetricsInfo(String name, String description) {
|
||||
this.name = name;
|
||||
this.description = description;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String name() {
|
||||
return name;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String description() {
|
||||
return description;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.spark.network.yarn
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder
|
||||
import org.mockito.Matchers._
|
||||
import org.mockito.Mockito.{mock, times, verify, when}
|
||||
import org.scalatest.Matchers
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
import org.apache.spark.network.server.OneForOneStreamManager
|
||||
import org.apache.spark.network.shuffle.{ExternalShuffleBlockHandler, ExternalShuffleBlockResolver}
|
||||
|
||||
class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
|
||||
|
||||
val streamManager = mock(classOf[OneForOneStreamManager])
|
||||
val blockResolver = mock(classOf[ExternalShuffleBlockResolver])
|
||||
when(blockResolver.getRegisteredExecutorsSize).thenReturn(42)
|
||||
|
||||
val metrics = new ExternalShuffleBlockHandler(streamManager, blockResolver).getAllMetrics
|
||||
|
||||
test("metrics named as expected") {
|
||||
val allMetrics = Set(
|
||||
"openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis",
|
||||
"blockTransferRateBytes", "registeredExecutorsSize")
|
||||
|
||||
metrics.getMetrics.keySet().asScala should be (allMetrics)
|
||||
}
|
||||
|
||||
// these three metrics have the same effect on the collector
|
||||
for (testname <- Seq("openBlockRequestLatencyMillis",
|
||||
"registerExecutorRequestLatencyMillis",
|
||||
"blockTransferRateBytes")) {
|
||||
test(s"$testname - collector receives correct types") {
|
||||
val builder = mock(classOf[MetricsRecordBuilder])
|
||||
when(builder.addCounter(any(), anyLong())).thenReturn(builder)
|
||||
when(builder.addGauge(any(), anyDouble())).thenReturn(builder)
|
||||
|
||||
YarnShuffleServiceMetrics.collectMetric(builder, testname,
|
||||
metrics.getMetrics.get(testname))
|
||||
|
||||
verify(builder).addCounter(anyObject(), anyLong())
|
||||
verify(builder, times(4)).addGauge(anyObject(), anyDouble())
|
||||
}
|
||||
}
|
||||
|
||||
// this metric writes only one gauge to the collector
|
||||
test("registeredExecutorsSize - collector receives correct types") {
|
||||
val builder = mock(classOf[MetricsRecordBuilder])
|
||||
|
||||
YarnShuffleServiceMetrics.collectMetric(builder, "registeredExecutorsSize",
|
||||
metrics.getMetrics.get("registeredExecutorsSize"))
|
||||
|
||||
// only one
|
||||
verify(builder).addGauge(anyObject(), anyInt())
|
||||
}
|
||||
}
|
Loading…
Reference in a new issue