From 3255511d52f0c9652b34de4f499ee5081f59e0a5 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Mon, 28 Jun 2021 02:36:17 -0500 Subject: [PATCH] [SPARK-35258][SHUFFLE][YARN] Add new metrics to ExternalShuffleService for better monitoring ### What changes were proposed in this pull request? This adds two new additional metrics to `ExternalBlockHandler`: - `blockTransferRate` -- for indicating the rate of transferring blocks, vs. the data within them - `blockTransferAvgSize_1min` -- a 1-minute trailing average of block sizes transferred by the ESS Additionally, this enhances `YarnShuffleServiceMetrics` to expose the histogram/`Snapshot` information from `Timer` metrics within `ExternalBlockHandler`. ### Why are the changes needed? Currently `ExternalBlockHandler` exposes some useful metrics, but is lacking around metrics for the rate of block transfers. We have `blockTransferRateBytes` to tell us the rate of _bytes_, but no metric to tell us the rate of _blocks_, which is especially relevant when running the ESS on HDDs that are sensitive to random reads. Many small block transfers can have a negative impact on performance, but won't show up as a spike in `blockTransferRateBytes` since the sizes are small. Thus the new metrics to show information around average block size and block transfer rate are very useful to monitor the health/performance of the ESS, especially when running on HDDs. For the `YarnShuffleServiceMetrics`, currently the three `Timer` metrics exposed by `ExternalBlockHandler` are being underutilized in a YARN-based environment -- they are basically treated as a `Meter`, only exposing rate-based information, when the metrics themselves are collected detailed histograms of timing information. We should expose this information for better observability. ### Does this PR introduce _any_ user-facing change? Yes, there are two entirely new metrics for the ESS, as documented in `monitoring.md`. Additionally in a YARN environment, `Timer` metrics exposed by the ESS will include more rich timing information. ### How was this patch tested? New unit tests are added to verify that new metrics are showing up as expected. We have been running this patch internally for approx. 1 year and have found it to be useful for monitoring the health of ESS and diagnosing performance issues. Closes #32388 from xkrogen/xkrogen-SPARK-35258-ess-new-metrics. Authored-by: Erik Krogen Signed-off-by: Mridul Muralidharan gmail.com> --- .../network/shuffle/ExternalBlockHandler.java | 31 ++++++++++- .../shuffle/ExternalBlockHandlerSuite.java | 35 +++++++------ .../yarn/YarnShuffleServiceMetrics.java | 38 +++++++++++++- .../ExternalShuffleServiceMetricsSuite.scala | 10 ++-- docs/monitoring.md | 4 ++ .../yarn/YarnShuffleServiceMetricsSuite.scala | 52 +++++++++++++++---- .../yarn/YarnShuffleServiceSuite.scala | 8 ++- 7 files changed, 146 insertions(+), 32 deletions(-) diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index c5f5834c0b..922bb96c59 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -31,6 +31,7 @@ import com.codahale.metrics.Gauge; import com.codahale.metrics.Meter; import com.codahale.metrics.Metric; import com.codahale.metrics.MetricSet; +import com.codahale.metrics.RatioGauge; import com.codahale.metrics.Timer; import com.codahale.metrics.Counter; import com.google.common.collect.Sets; @@ -305,6 +306,14 @@ public class ExternalBlockHandler extends RpcHandler private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer(); // Time latency for processing finalize shuffle merge request latency in ms private final Timer finalizeShuffleMergeLatencyMillis = new Timer(); + // Block transfer rate in blocks per second + private final Meter blockTransferRate = new Meter(); + // Block fetch message rate per second. When using non-batch fetches + // (`OpenBlocks` or `FetchShuffleBlocks` with `batchFetchEnabled` as false), this will be the + // same as the `blockTransferRate`. When batch fetches are enabled, this will represent the + // number of batch fetches, and `blockTransferRate` will represent the number of blocks + // returned by the fetches. + private final Meter blockTransferMessageRate = new Meter(); // Block transfer rate in byte per second private final Meter blockTransferRateBytes = new Meter(); // Number of active connections to the shuffle service @@ -318,7 +327,20 @@ public class ExternalBlockHandler extends RpcHandler allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis); allMetrics.put("fetchMergedBlocksMetaLatencyMillis", fetchMergedBlocksMetaLatencyMillis); allMetrics.put("finalizeShuffleMergeLatencyMillis", finalizeShuffleMergeLatencyMillis); + allMetrics.put("blockTransferRate", blockTransferRate); + allMetrics.put("blockTransferMessageRate", blockTransferMessageRate); allMetrics.put("blockTransferRateBytes", blockTransferRateBytes); + allMetrics.put("blockTransferAvgSize_1min", new RatioGauge() { + @Override + protected Ratio getRatio() { + return Ratio.of( + blockTransferRateBytes.getOneMinuteRate(), + // use blockTransferMessageRate here instead of blockTransferRate to represent the + // average size of the disk read / network message which has more operational impact + // than the actual size of the block + blockTransferMessageRate.getOneMinuteRate()); + } + }); allMetrics.put("registeredExecutorsSize", (Gauge) () -> blockManager.getRegisteredExecutorsSize()); allMetrics.put("numActiveConnections", activeConnections); @@ -411,6 +433,8 @@ public class ExternalBlockHandler extends RpcHandler public ManagedBuffer next() { final ManagedBuffer block = blockDataForIndexFn.apply(index); index += 2; + metrics.blockTransferRate.mark(); + metrics.blockTransferMessageRate.mark(); metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); return block; } @@ -458,12 +482,17 @@ public class ExternalBlockHandler extends RpcHandler reduceIdx = 0; mapIdx += 1; } + metrics.blockTransferRate.mark(); } else { assert(reduceIds[mapIdx].length == 2); + int startReduceId = reduceIds[mapIdx][0]; + int endReduceId = reduceIds[mapIdx][1]; block = blockManager.getContinuousBlocksData(appId, execId, shuffleId, mapIds[mapIdx], - reduceIds[mapIdx][0], reduceIds[mapIdx][1]); + startReduceId, endReduceId); mapIdx += 1; + metrics.blockTransferRate.mark(endReduceId - startReduceId); } + metrics.blockTransferMessageRate.mark(); metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0); return block; } diff --git a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java index bad61d30d7..dc41e957f0 100644 --- a/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java +++ b/common/network-shuffle/src/test/java/org/apache/spark/network/shuffle/ExternalBlockHandlerSuite.java @@ -20,8 +20,10 @@ package org.apache.spark.network.shuffle; import java.io.IOException; import java.nio.ByteBuffer; import java.util.Iterator; +import java.util.Map; import com.codahale.metrics.Meter; +import com.codahale.metrics.Metric; import com.codahale.metrics.Timer; import org.junit.Before; import org.junit.Test; @@ -103,7 +105,7 @@ public class ExternalBlockHandlerSuite { verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0); verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1); - verifyOpenBlockLatencyMetrics(); + verifyOpenBlockLatencyMetrics(2, 2); } @Test @@ -117,7 +119,7 @@ public class ExternalBlockHandlerSuite { verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0); verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1); - verifyOpenBlockLatencyMetrics(); + verifyOpenBlockLatencyMetrics(2, 2); } @Test @@ -126,14 +128,14 @@ public class ExternalBlockHandlerSuite { new NioManagedBuffer(ByteBuffer.wrap(new byte[10])) }; when(blockResolver.getContinuousBlocksData( - "app0", "exec1", 0, 0, 0, 1)).thenReturn(batchBlockMarkers[0]); + "app0", "exec1", 0, 0, 0, 3)).thenReturn(batchBlockMarkers[0]); FetchShuffleBlocks fetchShuffleBlocks = new FetchShuffleBlocks( - "app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 1 }}, true); + "app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 3 }}, true); checkOpenBlocksReceive(fetchShuffleBlocks, batchBlockMarkers); - verify(blockResolver, times(1)).getContinuousBlocksData("app0", "exec1", 0, 0, 0, 1); - verifyOpenBlockLatencyMetrics(); + verify(blockResolver, times(1)).getContinuousBlocksData("app0", "exec1", 0, 0, 0, 3); + verifyOpenBlockLatencyMetrics(3, 1); } @Test @@ -147,7 +149,7 @@ public class ExternalBlockHandlerSuite { verify(blockResolver, times(1)).getRddBlockData("app0", "exec1", 0, 0); verify(blockResolver, times(1)).getRddBlockData("app0", "exec1", 0, 1); - verifyOpenBlockLatencyMetrics(); + verifyOpenBlockLatencyMetrics(2, 2); } @Test @@ -195,17 +197,20 @@ public class ExternalBlockHandlerSuite { assertFalse(buffers.hasNext()); } - private void verifyOpenBlockLatencyMetrics() { - Timer openBlockRequestLatencyMillis = (Timer) ((ExternalBlockHandler) handler) + private void verifyOpenBlockLatencyMetrics( + int blockTransferCount, + int blockTransferMessageCount) { + Map metricMap = ((ExternalBlockHandler) handler) .getAllMetrics() - .getMetrics() - .get("openBlockRequestLatencyMillis"); + .getMetrics(); + Timer openBlockRequestLatencyMillis = (Timer) metricMap.get("openBlockRequestLatencyMillis"); assertEquals(1, openBlockRequestLatencyMillis.getCount()); // Verify block transfer metrics - Meter blockTransferRateBytes = (Meter) ((ExternalBlockHandler) handler) - .getAllMetrics() - .getMetrics() - .get("blockTransferRateBytes"); + Meter blockTransferRate = (Meter) metricMap.get("blockTransferRate"); + assertEquals(blockTransferCount, blockTransferRate.getCount()); + Meter blockTransferMessageRate = (Meter) metricMap.get("blockTransferMessageRate"); + assertEquals(blockTransferMessageCount, blockTransferMessageRate.getCount()); + Meter blockTransferRateBytes = (Meter) metricMap.get("blockTransferRateBytes"); assertEquals(10, blockTransferRateBytes.getCount()); } diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index f30abbd0f7..964d8f9592 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -64,7 +64,11 @@ class YarnShuffleServiceMetrics implements MetricsSource { MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) { if (metric instanceof Timer) { + // Timer records both the operations count and delay + // Snapshot inside the Timer provides the information for the operation delay Timer t = (Timer) metric; + Snapshot snapshot = t.getSnapshot(); + String timingName = name + "_nanos"; metricsRecordBuilder .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), t.getCount()) @@ -78,7 +82,32 @@ class YarnShuffleServiceMetrics implements MetricsSource { new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name), t.getOneMinuteRate()) .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), - t.getMeanRate()); + t.getMeanRate()) + .addGauge( + getShuffleServiceMetricsInfoForGenericValue(timingName, "max"), snapshot.getMax()) + .addGauge( + getShuffleServiceMetricsInfoForGenericValue(timingName, "min"), snapshot.getMin()) + .addGauge( + getShuffleServiceMetricsInfoForGenericValue(timingName, "mean"), snapshot.getMean()) + .addGauge( + getShuffleServiceMetricsInfoForGenericValue(timingName, "stdDev"), snapshot.getStdDev()); + for (int percentileThousands : new int[] { 10, 50, 250, 500, 750, 950, 980, 990, 999 }) { + String percentileStr; + switch (percentileThousands) { + case 10: + percentileStr = "1stPercentile"; + break; + case 999: + percentileStr = "999thPercentile"; + break; + default: + percentileStr = String.format("%dthPercentile", percentileThousands / 10); + break; + } + metricsRecordBuilder.addGauge( + getShuffleServiceMetricsInfoForGenericValue(timingName, percentileStr), + snapshot.getValue(percentileThousands / 1000.0)); + } } else if (metric instanceof Meter) { Meter m = (Meter) metric; metricsRecordBuilder @@ -128,6 +157,13 @@ class YarnShuffleServiceMetrics implements MetricsSource { return new ShuffleServiceMetricsInfo(name, "Value of counter " + name); } + private static ShuffleServiceMetricsInfo getShuffleServiceMetricsInfoForGenericValue( + String baseName, String valueName) { + return new ShuffleServiceMetricsInfo( + baseName + "_" + valueName, + valueName + " value of " + baseName); + } + private static class ShuffleServiceMetricsInfo implements MetricsInfo { private final String name; diff --git a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceMetricsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceMetricsSuite.scala index 4ce46156c0..d0e16cb093 100644 --- a/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/ExternalShuffleServiceMetricsSuite.scala @@ -51,9 +51,13 @@ class ExternalShuffleServiceMetricsSuite extends SparkFunSuite { val sourceRef = classOf[ExternalShuffleService].getDeclaredField("shuffleServiceSource") sourceRef.setAccessible(true) val source = sourceRef.get(externalShuffleService).asInstanceOf[ExternalShuffleServiceSource] - assert(source.metricRegistry.getMetrics.keySet().asScala == - Set( + // Use sorted Seq instead of Set for easier comparison when there is a mismatch + assert(source.metricRegistry.getMetrics.keySet().asScala.toSeq.sorted == + Seq( + "blockTransferRate", + "blockTransferMessageRate", "blockTransferRateBytes", + "blockTransferAvgSize_1min", "numActiveConnections", "numCaughtExceptions", "numRegisteredConnections", @@ -63,7 +67,7 @@ class ExternalShuffleServiceMetricsSuite extends SparkFunSuite { "shuffle-server.usedDirectMemory", "shuffle-server.usedHeapMemory", "finalizeShuffleMergeLatencyMillis", - "fetchMergedBlocksMetaLatencyMillis") + "fetchMergedBlocksMetaLatencyMillis").sorted ) } } diff --git a/docs/monitoring.md b/docs/monitoring.md index ee5aca83af..b30c8e2110 100644 --- a/docs/monitoring.md +++ b/docs/monitoring.md @@ -1377,7 +1377,11 @@ Note: applies when running in Spark standalone as worker ### Component instance = shuffleService Note: applies to the shuffle service +- blockTransferRate (meter) - rate of blocks being transferred +- blockTransferMessageRate (meter) - rate of block transfer messages, + i.e. if batch fetches are enabled, this represents number of batches rather than number of blocks - blockTransferRateBytes (meter) +- blockTransferAvgTime_1min (gauge - 1-minute moving average) - numActiveConnections.count - numRegisteredConnections.count - numCaughtExceptions.count diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index d866fac726..eff2de7143 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -17,10 +17,11 @@ package org.apache.spark.network.yarn import scala.collection.JavaConverters._ +import scala.collection.mutable -import org.apache.hadoop.metrics2.MetricsRecordBuilder +import org.apache.hadoop.metrics2.{MetricsInfo, MetricsRecordBuilder} import org.mockito.ArgumentMatchers.{any, anyDouble, anyInt, anyLong} -import org.mockito.Mockito.{mock, times, verify, when} +import org.mockito.Mockito.{mock, verify, when} import org.scalatest.matchers.must.Matchers import org.scalatest.matchers.should.Matchers._ @@ -37,29 +38,60 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { val metrics = new ExternalBlockHandler(streamManager, blockResolver).getAllMetrics test("metrics named as expected") { - val allMetrics = Set( + val allMetrics = Seq( "openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", + "blockTransferRate", "blockTransferMessageRate", "blockTransferAvgSize_1min", "blockTransferRateBytes", "registeredExecutorsSize", "numActiveConnections", "numCaughtExceptions", "finalizeShuffleMergeLatencyMillis", "fetchMergedBlocksMetaLatencyMillis") - metrics.getMetrics.keySet().asScala should be (allMetrics) + // Use sorted Seq instead of Set for easier comparison when there is a mismatch + metrics.getMetrics.keySet().asScala.toSeq.sorted should be (allMetrics.sorted) } - // these three metrics have the same effect on the collector + // these metrics will generate more metrics on the collector for (testname <- Seq("openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis", - "blockTransferRateBytes")) { + "blockTransferRateBytes", "blockTransferRate", "blockTransferMessageRate")) { test(s"$testname - collector receives correct types") { val builder = mock(classOf[MetricsRecordBuilder]) - when(builder.addCounter(any(), anyLong())).thenReturn(builder) - when(builder.addGauge(any(), anyDouble())).thenReturn(builder) + val counterNames = mutable.Buffer[String]() + when(builder.addCounter(any(), anyLong())).thenAnswer(iom => { + counterNames += iom.getArgument[MetricsInfo](0).name() + builder + }) + val gaugeLongNames = mutable.Buffer[String]() + when(builder.addGauge(any(), anyLong())).thenAnswer(iom => { + gaugeLongNames += iom.getArgument[MetricsInfo](0).name() + builder + }) + val gaugeDoubleNames = mutable.Buffer[String]() + when(builder.addGauge(any(), anyDouble())).thenAnswer(iom => { + gaugeDoubleNames += iom.getArgument[MetricsInfo](0).name() + builder + }) YarnShuffleServiceMetrics.collectMetric(builder, testname, metrics.getMetrics.get(testname)) - verify(builder).addCounter(any(), anyLong()) - verify(builder, times(4)).addGauge(any(), anyDouble()) + assert(counterNames === Seq(s"${testname}_count")) + val (expectLong, expectDouble) = + if (testname.matches("blockTransfer(Message)?Rate(Bytes)?$")) { + // blockTransfer(Message)?Rate(Bytes)? metrics are Meter so just have rate information + (Seq(), Seq("1", "5", "15", "Mean").map(suffix => s"${testname}_rate$suffix")) + } else { + // other metrics are Timer so have rate and timing information + ( + Seq(s"${testname}_nanos_max", s"${testname}_nanos_min"), + Seq("rate1", "rate5", "rate15", "rateMean", "nanos_mean", "nanos_stdDev", + "nanos_1stPercentile", "nanos_5thPercentile", "nanos_25thPercentile", + "nanos_50thPercentile", "nanos_75thPercentile", "nanos_95thPercentile", + "nanos_98thPercentile", "nanos_99thPercentile", "nanos_999thPercentile") + .map(suffix => s"${testname}_$suffix") + ) + } + assert(gaugeLongNames.sorted === expectLong.sorted) + assert(gaugeDoubleNames.sorted === expectDouble.sorted) } } diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala index afe85b3d40..fb4097304d 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceSuite.scala @@ -402,8 +402,12 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd metricSetRef.setAccessible(true) val metrics = metricSetRef.get(metricsSource).asInstanceOf[MetricSet].getMetrics - assert(metrics.keySet().asScala == Set( + // Use sorted Seq instead of Set for easier comparison when there is a mismatch + assert(metrics.keySet().asScala.toSeq.sorted == Seq( + "blockTransferRate", + "blockTransferMessageRate", "blockTransferRateBytes", + "blockTransferAvgSize_1min", "numActiveConnections", "numCaughtExceptions", "numRegisteredConnections", @@ -414,7 +418,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd "shuffle-server.usedDirectMemory", "shuffle-server.usedHeapMemory", "fetchMergedBlocksMetaLatencyMillis" - )) + ).sorted) } test("SPARK-34828: metrics should be registered with configured name") {