[SPARK-35258][SHUFFLE][YARN] Add new metrics to ExternalShuffleService for better monitoring
### What changes were proposed in this pull request? This adds two new additional metrics to `ExternalBlockHandler`: - `blockTransferRate` -- for indicating the rate of transferring blocks, vs. the data within them - `blockTransferAvgSize_1min` -- a 1-minute trailing average of block sizes transferred by the ESS Additionally, this enhances `YarnShuffleServiceMetrics` to expose the histogram/`Snapshot` information from `Timer` metrics within `ExternalBlockHandler`. ### Why are the changes needed? Currently `ExternalBlockHandler` exposes some useful metrics, but is lacking around metrics for the rate of block transfers. We have `blockTransferRateBytes` to tell us the rate of _bytes_, but no metric to tell us the rate of _blocks_, which is especially relevant when running the ESS on HDDs that are sensitive to random reads. Many small block transfers can have a negative impact on performance, but won't show up as a spike in `blockTransferRateBytes` since the sizes are small. Thus the new metrics to show information around average block size and block transfer rate are very useful to monitor the health/performance of the ESS, especially when running on HDDs. For the `YarnShuffleServiceMetrics`, currently the three `Timer` metrics exposed by `ExternalBlockHandler` are being underutilized in a YARN-based environment -- they are basically treated as a `Meter`, only exposing rate-based information, when the metrics themselves are collected detailed histograms of timing information. We should expose this information for better observability. ### Does this PR introduce _any_ user-facing change? Yes, there are two entirely new metrics for the ESS, as documented in `monitoring.md`. Additionally in a YARN environment, `Timer` metrics exposed by the ESS will include more rich timing information. ### How was this patch tested? New unit tests are added to verify that new metrics are showing up as expected. We have been running this patch internally for approx. 1 year and have found it to be useful for monitoring the health of ESS and diagnosing performance issues. Closes #32388 from xkrogen/xkrogen-SPARK-35258-ess-new-metrics. Authored-by: Erik Krogen <xkrogen@apache.org> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
This commit is contained in:
parent
1c81ad2029
commit
3255511d52
|
@ -31,6 +31,7 @@ import com.codahale.metrics.Gauge;
|
|||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.Metric;
|
||||
import com.codahale.metrics.MetricSet;
|
||||
import com.codahale.metrics.RatioGauge;
|
||||
import com.codahale.metrics.Timer;
|
||||
import com.codahale.metrics.Counter;
|
||||
import com.google.common.collect.Sets;
|
||||
|
@ -305,6 +306,14 @@ public class ExternalBlockHandler extends RpcHandler
|
|||
private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
|
||||
// Time latency for processing finalize shuffle merge request latency in ms
|
||||
private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
|
||||
// Block transfer rate in blocks per second
|
||||
private final Meter blockTransferRate = new Meter();
|
||||
// Block fetch message rate per second. When using non-batch fetches
|
||||
// (`OpenBlocks` or `FetchShuffleBlocks` with `batchFetchEnabled` as false), this will be the
|
||||
// same as the `blockTransferRate`. When batch fetches are enabled, this will represent the
|
||||
// number of batch fetches, and `blockTransferRate` will represent the number of blocks
|
||||
// returned by the fetches.
|
||||
private final Meter blockTransferMessageRate = new Meter();
|
||||
// Block transfer rate in byte per second
|
||||
private final Meter blockTransferRateBytes = new Meter();
|
||||
// Number of active connections to the shuffle service
|
||||
|
@ -318,7 +327,20 @@ public class ExternalBlockHandler extends RpcHandler
|
|||
allMetrics.put("registerExecutorRequestLatencyMillis", registerExecutorRequestLatencyMillis);
|
||||
allMetrics.put("fetchMergedBlocksMetaLatencyMillis", fetchMergedBlocksMetaLatencyMillis);
|
||||
allMetrics.put("finalizeShuffleMergeLatencyMillis", finalizeShuffleMergeLatencyMillis);
|
||||
allMetrics.put("blockTransferRate", blockTransferRate);
|
||||
allMetrics.put("blockTransferMessageRate", blockTransferMessageRate);
|
||||
allMetrics.put("blockTransferRateBytes", blockTransferRateBytes);
|
||||
allMetrics.put("blockTransferAvgSize_1min", new RatioGauge() {
|
||||
@Override
|
||||
protected Ratio getRatio() {
|
||||
return Ratio.of(
|
||||
blockTransferRateBytes.getOneMinuteRate(),
|
||||
// use blockTransferMessageRate here instead of blockTransferRate to represent the
|
||||
// average size of the disk read / network message which has more operational impact
|
||||
// than the actual size of the block
|
||||
blockTransferMessageRate.getOneMinuteRate());
|
||||
}
|
||||
});
|
||||
allMetrics.put("registeredExecutorsSize",
|
||||
(Gauge<Integer>) () -> blockManager.getRegisteredExecutorsSize());
|
||||
allMetrics.put("numActiveConnections", activeConnections);
|
||||
|
@ -411,6 +433,8 @@ public class ExternalBlockHandler extends RpcHandler
|
|||
public ManagedBuffer next() {
|
||||
final ManagedBuffer block = blockDataForIndexFn.apply(index);
|
||||
index += 2;
|
||||
metrics.blockTransferRate.mark();
|
||||
metrics.blockTransferMessageRate.mark();
|
||||
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
|
||||
return block;
|
||||
}
|
||||
|
@ -458,12 +482,17 @@ public class ExternalBlockHandler extends RpcHandler
|
|||
reduceIdx = 0;
|
||||
mapIdx += 1;
|
||||
}
|
||||
metrics.blockTransferRate.mark();
|
||||
} else {
|
||||
assert(reduceIds[mapIdx].length == 2);
|
||||
int startReduceId = reduceIds[mapIdx][0];
|
||||
int endReduceId = reduceIds[mapIdx][1];
|
||||
block = blockManager.getContinuousBlocksData(appId, execId, shuffleId, mapIds[mapIdx],
|
||||
reduceIds[mapIdx][0], reduceIds[mapIdx][1]);
|
||||
startReduceId, endReduceId);
|
||||
mapIdx += 1;
|
||||
metrics.blockTransferRate.mark(endReduceId - startReduceId);
|
||||
}
|
||||
metrics.blockTransferMessageRate.mark();
|
||||
metrics.blockTransferRateBytes.mark(block != null ? block.size() : 0);
|
||||
return block;
|
||||
}
|
||||
|
|
|
@ -20,8 +20,10 @@ package org.apache.spark.network.shuffle;
|
|||
import java.io.IOException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
|
||||
import com.codahale.metrics.Meter;
|
||||
import com.codahale.metrics.Metric;
|
||||
import com.codahale.metrics.Timer;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
@ -103,7 +105,7 @@ public class ExternalBlockHandlerSuite {
|
|||
|
||||
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
|
||||
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
|
||||
verifyOpenBlockLatencyMetrics();
|
||||
verifyOpenBlockLatencyMetrics(2, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -117,7 +119,7 @@ public class ExternalBlockHandlerSuite {
|
|||
|
||||
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 0);
|
||||
verify(blockResolver, times(1)).getBlockData("app0", "exec1", 0, 0, 1);
|
||||
verifyOpenBlockLatencyMetrics();
|
||||
verifyOpenBlockLatencyMetrics(2, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -126,14 +128,14 @@ public class ExternalBlockHandlerSuite {
|
|||
new NioManagedBuffer(ByteBuffer.wrap(new byte[10]))
|
||||
};
|
||||
when(blockResolver.getContinuousBlocksData(
|
||||
"app0", "exec1", 0, 0, 0, 1)).thenReturn(batchBlockMarkers[0]);
|
||||
"app0", "exec1", 0, 0, 0, 3)).thenReturn(batchBlockMarkers[0]);
|
||||
|
||||
FetchShuffleBlocks fetchShuffleBlocks = new FetchShuffleBlocks(
|
||||
"app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 1 }}, true);
|
||||
"app0", "exec1", 0, new long[] { 0 }, new int[][] {{ 0, 3 }}, true);
|
||||
checkOpenBlocksReceive(fetchShuffleBlocks, batchBlockMarkers);
|
||||
|
||||
verify(blockResolver, times(1)).getContinuousBlocksData("app0", "exec1", 0, 0, 0, 1);
|
||||
verifyOpenBlockLatencyMetrics();
|
||||
verify(blockResolver, times(1)).getContinuousBlocksData("app0", "exec1", 0, 0, 0, 3);
|
||||
verifyOpenBlockLatencyMetrics(3, 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -147,7 +149,7 @@ public class ExternalBlockHandlerSuite {
|
|||
|
||||
verify(blockResolver, times(1)).getRddBlockData("app0", "exec1", 0, 0);
|
||||
verify(blockResolver, times(1)).getRddBlockData("app0", "exec1", 0, 1);
|
||||
verifyOpenBlockLatencyMetrics();
|
||||
verifyOpenBlockLatencyMetrics(2, 2);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -195,17 +197,20 @@ public class ExternalBlockHandlerSuite {
|
|||
assertFalse(buffers.hasNext());
|
||||
}
|
||||
|
||||
private void verifyOpenBlockLatencyMetrics() {
|
||||
Timer openBlockRequestLatencyMillis = (Timer) ((ExternalBlockHandler) handler)
|
||||
private void verifyOpenBlockLatencyMetrics(
|
||||
int blockTransferCount,
|
||||
int blockTransferMessageCount) {
|
||||
Map<String, Metric> metricMap = ((ExternalBlockHandler) handler)
|
||||
.getAllMetrics()
|
||||
.getMetrics()
|
||||
.get("openBlockRequestLatencyMillis");
|
||||
.getMetrics();
|
||||
Timer openBlockRequestLatencyMillis = (Timer) metricMap.get("openBlockRequestLatencyMillis");
|
||||
assertEquals(1, openBlockRequestLatencyMillis.getCount());
|
||||
// Verify block transfer metrics
|
||||
Meter blockTransferRateBytes = (Meter) ((ExternalBlockHandler) handler)
|
||||
.getAllMetrics()
|
||||
.getMetrics()
|
||||
.get("blockTransferRateBytes");
|
||||
Meter blockTransferRate = (Meter) metricMap.get("blockTransferRate");
|
||||
assertEquals(blockTransferCount, blockTransferRate.getCount());
|
||||
Meter blockTransferMessageRate = (Meter) metricMap.get("blockTransferMessageRate");
|
||||
assertEquals(blockTransferMessageCount, blockTransferMessageRate.getCount());
|
||||
Meter blockTransferRateBytes = (Meter) metricMap.get("blockTransferRateBytes");
|
||||
assertEquals(10, blockTransferRateBytes.getCount());
|
||||
}
|
||||
|
||||
|
|
|
@ -64,7 +64,11 @@ class YarnShuffleServiceMetrics implements MetricsSource {
|
|||
MetricsRecordBuilder metricsRecordBuilder, String name, Metric metric) {
|
||||
|
||||
if (metric instanceof Timer) {
|
||||
// Timer records both the operations count and delay
|
||||
// Snapshot inside the Timer provides the information for the operation delay
|
||||
Timer t = (Timer) metric;
|
||||
Snapshot snapshot = t.getSnapshot();
|
||||
String timingName = name + "_nanos";
|
||||
metricsRecordBuilder
|
||||
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
|
||||
t.getCount())
|
||||
|
@ -78,7 +82,32 @@ class YarnShuffleServiceMetrics implements MetricsSource {
|
|||
new ShuffleServiceMetricsInfo(name + "_rate1", "1 minute rate of timer " + name),
|
||||
t.getOneMinuteRate())
|
||||
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
|
||||
t.getMeanRate());
|
||||
t.getMeanRate())
|
||||
.addGauge(
|
||||
getShuffleServiceMetricsInfoForGenericValue(timingName, "max"), snapshot.getMax())
|
||||
.addGauge(
|
||||
getShuffleServiceMetricsInfoForGenericValue(timingName, "min"), snapshot.getMin())
|
||||
.addGauge(
|
||||
getShuffleServiceMetricsInfoForGenericValue(timingName, "mean"), snapshot.getMean())
|
||||
.addGauge(
|
||||
getShuffleServiceMetricsInfoForGenericValue(timingName, "stdDev"), snapshot.getStdDev());
|
||||
for (int percentileThousands : new int[] { 10, 50, 250, 500, 750, 950, 980, 990, 999 }) {
|
||||
String percentileStr;
|
||||
switch (percentileThousands) {
|
||||
case 10:
|
||||
percentileStr = "1stPercentile";
|
||||
break;
|
||||
case 999:
|
||||
percentileStr = "999thPercentile";
|
||||
break;
|
||||
default:
|
||||
percentileStr = String.format("%dthPercentile", percentileThousands / 10);
|
||||
break;
|
||||
}
|
||||
metricsRecordBuilder.addGauge(
|
||||
getShuffleServiceMetricsInfoForGenericValue(timingName, percentileStr),
|
||||
snapshot.getValue(percentileThousands / 1000.0));
|
||||
}
|
||||
} else if (metric instanceof Meter) {
|
||||
Meter m = (Meter) metric;
|
||||
metricsRecordBuilder
|
||||
|
@ -128,6 +157,13 @@ class YarnShuffleServiceMetrics implements MetricsSource {
|
|||
return new ShuffleServiceMetricsInfo(name, "Value of counter " + name);
|
||||
}
|
||||
|
||||
private static ShuffleServiceMetricsInfo getShuffleServiceMetricsInfoForGenericValue(
|
||||
String baseName, String valueName) {
|
||||
return new ShuffleServiceMetricsInfo(
|
||||
baseName + "_" + valueName,
|
||||
valueName + " value of " + baseName);
|
||||
}
|
||||
|
||||
private static class ShuffleServiceMetricsInfo implements MetricsInfo {
|
||||
|
||||
private final String name;
|
||||
|
|
|
@ -51,9 +51,13 @@ class ExternalShuffleServiceMetricsSuite extends SparkFunSuite {
|
|||
val sourceRef = classOf[ExternalShuffleService].getDeclaredField("shuffleServiceSource")
|
||||
sourceRef.setAccessible(true)
|
||||
val source = sourceRef.get(externalShuffleService).asInstanceOf[ExternalShuffleServiceSource]
|
||||
assert(source.metricRegistry.getMetrics.keySet().asScala ==
|
||||
Set(
|
||||
// Use sorted Seq instead of Set for easier comparison when there is a mismatch
|
||||
assert(source.metricRegistry.getMetrics.keySet().asScala.toSeq.sorted ==
|
||||
Seq(
|
||||
"blockTransferRate",
|
||||
"blockTransferMessageRate",
|
||||
"blockTransferRateBytes",
|
||||
"blockTransferAvgSize_1min",
|
||||
"numActiveConnections",
|
||||
"numCaughtExceptions",
|
||||
"numRegisteredConnections",
|
||||
|
@ -63,7 +67,7 @@ class ExternalShuffleServiceMetricsSuite extends SparkFunSuite {
|
|||
"shuffle-server.usedDirectMemory",
|
||||
"shuffle-server.usedHeapMemory",
|
||||
"finalizeShuffleMergeLatencyMillis",
|
||||
"fetchMergedBlocksMetaLatencyMillis")
|
||||
"fetchMergedBlocksMetaLatencyMillis").sorted
|
||||
)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1377,7 +1377,11 @@ Note: applies when running in Spark standalone as worker
|
|||
### Component instance = shuffleService
|
||||
Note: applies to the shuffle service
|
||||
|
||||
- blockTransferRate (meter) - rate of blocks being transferred
|
||||
- blockTransferMessageRate (meter) - rate of block transfer messages,
|
||||
i.e. if batch fetches are enabled, this represents number of batches rather than number of blocks
|
||||
- blockTransferRateBytes (meter)
|
||||
- blockTransferAvgTime_1min (gauge - 1-minute moving average)
|
||||
- numActiveConnections.count
|
||||
- numRegisteredConnections.count
|
||||
- numCaughtExceptions.count
|
||||
|
|
|
@ -17,10 +17,11 @@
|
|||
package org.apache.spark.network.yarn
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import scala.collection.mutable
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder
|
||||
import org.apache.hadoop.metrics2.{MetricsInfo, MetricsRecordBuilder}
|
||||
import org.mockito.ArgumentMatchers.{any, anyDouble, anyInt, anyLong}
|
||||
import org.mockito.Mockito.{mock, times, verify, when}
|
||||
import org.mockito.Mockito.{mock, verify, when}
|
||||
import org.scalatest.matchers.must.Matchers
|
||||
import org.scalatest.matchers.should.Matchers._
|
||||
|
||||
|
@ -37,29 +38,60 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
|
|||
val metrics = new ExternalBlockHandler(streamManager, blockResolver).getAllMetrics
|
||||
|
||||
test("metrics named as expected") {
|
||||
val allMetrics = Set(
|
||||
val allMetrics = Seq(
|
||||
"openBlockRequestLatencyMillis", "registerExecutorRequestLatencyMillis",
|
||||
"blockTransferRate", "blockTransferMessageRate", "blockTransferAvgSize_1min",
|
||||
"blockTransferRateBytes", "registeredExecutorsSize", "numActiveConnections",
|
||||
"numCaughtExceptions", "finalizeShuffleMergeLatencyMillis",
|
||||
"fetchMergedBlocksMetaLatencyMillis")
|
||||
|
||||
metrics.getMetrics.keySet().asScala should be (allMetrics)
|
||||
// Use sorted Seq instead of Set for easier comparison when there is a mismatch
|
||||
metrics.getMetrics.keySet().asScala.toSeq.sorted should be (allMetrics.sorted)
|
||||
}
|
||||
|
||||
// these three metrics have the same effect on the collector
|
||||
// these metrics will generate more metrics on the collector
|
||||
for (testname <- Seq("openBlockRequestLatencyMillis",
|
||||
"registerExecutorRequestLatencyMillis",
|
||||
"blockTransferRateBytes")) {
|
||||
"blockTransferRateBytes", "blockTransferRate", "blockTransferMessageRate")) {
|
||||
test(s"$testname - collector receives correct types") {
|
||||
val builder = mock(classOf[MetricsRecordBuilder])
|
||||
when(builder.addCounter(any(), anyLong())).thenReturn(builder)
|
||||
when(builder.addGauge(any(), anyDouble())).thenReturn(builder)
|
||||
val counterNames = mutable.Buffer[String]()
|
||||
when(builder.addCounter(any(), anyLong())).thenAnswer(iom => {
|
||||
counterNames += iom.getArgument[MetricsInfo](0).name()
|
||||
builder
|
||||
})
|
||||
val gaugeLongNames = mutable.Buffer[String]()
|
||||
when(builder.addGauge(any(), anyLong())).thenAnswer(iom => {
|
||||
gaugeLongNames += iom.getArgument[MetricsInfo](0).name()
|
||||
builder
|
||||
})
|
||||
val gaugeDoubleNames = mutable.Buffer[String]()
|
||||
when(builder.addGauge(any(), anyDouble())).thenAnswer(iom => {
|
||||
gaugeDoubleNames += iom.getArgument[MetricsInfo](0).name()
|
||||
builder
|
||||
})
|
||||
|
||||
YarnShuffleServiceMetrics.collectMetric(builder, testname,
|
||||
metrics.getMetrics.get(testname))
|
||||
|
||||
verify(builder).addCounter(any(), anyLong())
|
||||
verify(builder, times(4)).addGauge(any(), anyDouble())
|
||||
assert(counterNames === Seq(s"${testname}_count"))
|
||||
val (expectLong, expectDouble) =
|
||||
if (testname.matches("blockTransfer(Message)?Rate(Bytes)?$")) {
|
||||
// blockTransfer(Message)?Rate(Bytes)? metrics are Meter so just have rate information
|
||||
(Seq(), Seq("1", "5", "15", "Mean").map(suffix => s"${testname}_rate$suffix"))
|
||||
} else {
|
||||
// other metrics are Timer so have rate and timing information
|
||||
(
|
||||
Seq(s"${testname}_nanos_max", s"${testname}_nanos_min"),
|
||||
Seq("rate1", "rate5", "rate15", "rateMean", "nanos_mean", "nanos_stdDev",
|
||||
"nanos_1stPercentile", "nanos_5thPercentile", "nanos_25thPercentile",
|
||||
"nanos_50thPercentile", "nanos_75thPercentile", "nanos_95thPercentile",
|
||||
"nanos_98thPercentile", "nanos_99thPercentile", "nanos_999thPercentile")
|
||||
.map(suffix => s"${testname}_$suffix")
|
||||
)
|
||||
}
|
||||
assert(gaugeLongNames.sorted === expectLong.sorted)
|
||||
assert(gaugeDoubleNames.sorted === expectDouble.sorted)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -402,8 +402,12 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
|||
metricSetRef.setAccessible(true)
|
||||
val metrics = metricSetRef.get(metricsSource).asInstanceOf[MetricSet].getMetrics
|
||||
|
||||
assert(metrics.keySet().asScala == Set(
|
||||
// Use sorted Seq instead of Set for easier comparison when there is a mismatch
|
||||
assert(metrics.keySet().asScala.toSeq.sorted == Seq(
|
||||
"blockTransferRate",
|
||||
"blockTransferMessageRate",
|
||||
"blockTransferRateBytes",
|
||||
"blockTransferAvgSize_1min",
|
||||
"numActiveConnections",
|
||||
"numCaughtExceptions",
|
||||
"numRegisteredConnections",
|
||||
|
@ -414,7 +418,7 @@ class YarnShuffleServiceSuite extends SparkFunSuite with Matchers with BeforeAnd
|
|||
"shuffle-server.usedDirectMemory",
|
||||
"shuffle-server.usedHeapMemory",
|
||||
"fetchMergedBlocksMetaLatencyMillis"
|
||||
))
|
||||
).sorted)
|
||||
}
|
||||
|
||||
test("SPARK-34828: metrics should be registered with configured name") {
|
||||
|
|
Loading…
Reference in a new issue