From 23e48dbf77ba59a24c7c1eaf5bb2be003c6258d0 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Sat, 24 Jul 2021 21:26:18 +0800 Subject: [PATCH] [SPARK-35259][SHUFFLE] Update ExternalBlockHandler Timer variables to expose correct units ### What changes were proposed in this pull request? `ExternalBlockHandler` exposes 4 metrics which are Dropwizard `Timer` metrics, and are named with a `millis` suffix: ``` private final Timer openBlockRequestLatencyMillis = new Timer(); private final Timer registerExecutorRequestLatencyMillis = new Timer(); private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer(); private final Timer finalizeShuffleMergeLatencyMillis = new Timer(); ``` However these Dropwizard Timers by default use nanoseconds ([documentation](https://metrics.dropwizard.io/3.2.3/getting-started.html#timers)). This causes `YarnShuffleServiceMetrics` to expose confusingly-named metrics like `openBlockRequestLatencyMillis_nanos_max` (the actual values are currently in nanos). This PR adds a new `Timer` subclass, `TimerWithCustomTimeUnit`, which accepts a `TimeUnit` at creation time and exposes timing information using this time unit when values are read. Internally, values are still stored with nanosecond-level precision. The `Timer` metrics within `ExternalBlockHandler` are updated to use the new class with milliseconds as the unit. The logic to include the `nanos` suffix in the metric name within `YarnShuffleServiceMetrics` has also been removed, with the assumption that the metric name itself includes the units. ### Does this PR introduce _any_ user-facing change? Yes, there are two changes. First, the names for metrics exposed by `ExternalBlockHandler` via `YarnShuffleServiceMetrics` such as `openBlockRequestLatencyMillis_nanos_max` and `openBlockRequestLatencyMillis_nanos_50thPercentile` have been changed to remove the `_nanos` suffix. This would be considered a breaking change, but these names were only exposed as part of #32388, which has not yet been released (slated for 3.2.0). New names are like `openBlockRequestLatencyMillis_max` and `openBlockRequestLatencyMillis_50thPercentile` Second, the values of the metrics themselves have changed, to expose milliseconds instead of nanoseconds. Note that this does not affect metrics such as `openBlockRequestLatencyMillis_count` or `openBlockRequestLatencyMillis_rate1`, only the `Snapshot`-related metrics (`max`, `median`, percentiles, etc.). For the YARN case, these metrics were also introduced by #32388, and thus also have not yet been released. It was possible for the nanosecond values to be consumed by some other metrics reporter reading the Dropwizard metrics directly, but I'm not aware of any such usages. ### How was this patch tested? Unit tests have been updated. Closes #33116 from xkrogen/xkrogen-SPARK-35259-ess-fix-metric-unit-prefix. Authored-by: Erik Krogen Signed-off-by: yi.wu (cherry picked from commit 70a15868fc97e2b86c5ecc7bcf812bfdb05d98ea) Signed-off-by: yi.wu --- .../network/util/TimerWithCustomTimeUnit.java | 124 ++++++++++++++++++ .../util/TimerWithCustomUnitSuite.java | 109 +++++++++++++++ .../network/shuffle/ExternalBlockHandler.java | 14 +- .../yarn/YarnShuffleServiceMetrics.java | 11 +- .../yarn/YarnShuffleServiceMetricsSuite.scala | 18 +-- 5 files changed, 255 insertions(+), 21 deletions(-) create mode 100644 common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java create mode 100644 common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java b/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java new file mode 100644 index 0000000000..86fc4f1567 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.io.OutputStream; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; + +/** + * A custom version of a {@link Timer} which allows for specifying a specific {@link TimeUnit} to + * be used when accessing timing values via {@link #getSnapshot()}. Normally, though the + * {@link #update(long, TimeUnit)} method requires a unit, the extraction methods on the snapshot + * do not specify a unit, and always return nanoseconds. It can be useful to specify that a timer + * should use a different unit for its snapshot. Note that internally, all values are still stored + * with nanosecond-precision; it is only before being returned to the caller that the nanosecond + * value is converted to the custom time unit. + */ +public class TimerWithCustomTimeUnit extends Timer { + + private final TimeUnit timeUnit; + private final double nanosPerUnit; + + public TimerWithCustomTimeUnit(TimeUnit timeUnit) { + this(timeUnit, Clock.defaultClock()); + } + + TimerWithCustomTimeUnit(TimeUnit timeUnit, Clock clock) { + super(new ExponentiallyDecayingReservoir(), clock); + this.timeUnit = timeUnit; + this.nanosPerUnit = timeUnit.toNanos(1); + } + + @Override + public Snapshot getSnapshot() { + return new SnapshotWithCustomTimeUnit(super.getSnapshot()); + } + + private double toUnit(double nanos) { + // TimeUnit.convert() truncates (loses precision), so floating-point division is used instead + return nanos / nanosPerUnit; + } + + private long toUnit(long nanos) { + return timeUnit.convert(nanos, TimeUnit.NANOSECONDS); + } + + private class SnapshotWithCustomTimeUnit extends Snapshot { + + private final Snapshot wrappedSnapshot; + + SnapshotWithCustomTimeUnit(Snapshot wrappedSnapshot) { + this.wrappedSnapshot = wrappedSnapshot; + } + + @Override + public double getValue(double v) { + return toUnit(wrappedSnapshot.getValue(v)); + } + + @Override + public long[] getValues() { + long[] nanoValues = wrappedSnapshot.getValues(); + long[] customUnitValues = new long[nanoValues.length]; + for (int i = 0; i < nanoValues.length; i++) { + customUnitValues[i] = toUnit(nanoValues[i]); + } + return customUnitValues; + } + + @Override + public int size() { + return wrappedSnapshot.size(); + } + + @Override + public long getMax() { + return toUnit(wrappedSnapshot.getMax()); + } + + @Override + public double getMean() { + return toUnit(wrappedSnapshot.getMean()); + } + + @Override + public long getMin() { + return toUnit(wrappedSnapshot.getMin()); + } + + @Override + public double getStdDev() { + return toUnit(wrappedSnapshot.getStdDev()); + } + + @Override + public void dump(OutputStream outputStream) { + try (PrintWriter writer = new PrintWriter(outputStream)) { + for (long value : getValues()) { + writer.println(value); + } + } + } + } +} diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java new file mode 100644 index 0000000000..1da0912a7c --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** Tests for {@link TimerWithCustomTimeUnit} */ +public class TimerWithCustomUnitSuite { + + private static final double EPSILON = 1.0 / 1_000_000_000; + + @Test + public void testTimerWithMillisecondTimeUnit() { + testTimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); + } + + @Test + public void testTimerWithNanosecondTimeUnit() { + testTimerWithCustomTimeUnit(TimeUnit.NANOSECONDS); + } + + private void testTimerWithCustomTimeUnit(TimeUnit timeUnit) { + Timer timer = new TimerWithCustomTimeUnit(timeUnit); + Duration[] durations = { + Duration.ofNanos(1), + Duration.ofMillis(1), + Duration.ofMillis(5), + Duration.ofMillis(100), + Duration.ofSeconds(10) + }; + Arrays.stream(durations).forEach(timer::update); + + Snapshot snapshot = timer.getSnapshot(); + assertEquals(toTimeUnit(durations[0], timeUnit), snapshot.getMin()); + assertEquals(toTimeUnitFloating(durations[0], timeUnit), snapshot.getValue(0), EPSILON); + assertEquals(toTimeUnitFloating(durations[2], timeUnit), snapshot.getMedian(), EPSILON); + assertEquals(toTimeUnitFloating(durations[3], timeUnit), snapshot.get75thPercentile(), EPSILON); + assertEquals(toTimeUnit(durations[4], timeUnit), snapshot.getMax()); + + assertArrayEquals(Arrays.stream(durations).mapToLong(d -> toTimeUnit(d, timeUnit)).toArray(), + snapshot.getValues()); + double total = Arrays.stream(durations).mapToDouble(d -> toTimeUnitFloating(d, timeUnit)).sum(); + assertEquals(total / durations.length, snapshot.getMean(), EPSILON); + } + + @Test + public void testTimingViaContext() { + ManualClock clock = new ManualClock(); + Timer timer = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS, clock); + Duration[] durations = { Duration.ofNanos(1), Duration.ofMillis(100), Duration.ofMillis(1000) }; + for (Duration d : durations) { + Timer.Context context = timer.time(); + clock.advance(toTimeUnit(d, TimeUnit.NANOSECONDS)); + context.stop(); + } + + Snapshot snapshot = timer.getSnapshot(); + assertEquals(0, snapshot.getMin()); + assertEquals(100, snapshot.getMedian(), EPSILON); + assertEquals(1000, snapshot.getMax(), EPSILON); + } + + private static long toTimeUnit(Duration duration, TimeUnit timeUnit) { + return timeUnit.convert(duration.toNanos(), TimeUnit.NANOSECONDS); + } + + private static double toTimeUnitFloating(Duration duration, TimeUnit timeUnit) { + return ((double) duration.toNanos()) / timeUnit.toNanos(1); + } + + private static class ManualClock extends Clock { + + private long currTick = 1; + + void advance(long nanos) { + currTick += nanos; + } + + @Override + public long getTick() { + return currTick; + } + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index 922bb96c59..e0f2e95950 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import com.codahale.metrics.Gauge; @@ -49,6 +50,7 @@ import org.apache.spark.network.server.OneForOneStreamManager; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.shuffle.protocol.*; +import org.apache.spark.network.util.TimerWithCustomTimeUnit; import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; import org.apache.spark.network.util.TransportConf; @@ -299,13 +301,17 @@ public class ExternalBlockHandler extends RpcHandler public class ShuffleMetrics implements MetricSet { private final Map allMetrics; // Time latency for open block request in ms - private final Timer openBlockRequestLatencyMillis = new Timer(); + private final Timer openBlockRequestLatencyMillis = + new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); // Time latency for executor registration latency in ms - private final Timer registerExecutorRequestLatencyMillis = new Timer(); + private final Timer registerExecutorRequestLatencyMillis = + new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); // Time latency for processing fetch merged blocks meta request latency in ms - private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer(); + private final Timer fetchMergedBlocksMetaLatencyMillis = + new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); // Time latency for processing finalize shuffle merge request latency in ms - private final Timer finalizeShuffleMergeLatencyMillis = new Timer(); + private final Timer finalizeShuffleMergeLatencyMillis = + new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); // Block transfer rate in blocks per second private final Meter blockTransferRate = new Meter(); // Block fetch message rate per second. When using non-batch fetches diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 964d8f9592..d843a6719e 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -68,7 +68,6 @@ class YarnShuffleServiceMetrics implements MetricsSource { // Snapshot inside the Timer provides the information for the operation delay Timer t = (Timer) metric; Snapshot snapshot = t.getSnapshot(); - String timingName = name + "_nanos"; metricsRecordBuilder .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), t.getCount()) @@ -84,13 +83,13 @@ class YarnShuffleServiceMetrics implements MetricsSource { .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), t.getMeanRate()) .addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, "max"), snapshot.getMax()) + getShuffleServiceMetricsInfoForGenericValue(name, "max"), snapshot.getMax()) .addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, "min"), snapshot.getMin()) + getShuffleServiceMetricsInfoForGenericValue(name, "min"), snapshot.getMin()) .addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, "mean"), snapshot.getMean()) + getShuffleServiceMetricsInfoForGenericValue(name, "mean"), snapshot.getMean()) .addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, "stdDev"), snapshot.getStdDev()); + getShuffleServiceMetricsInfoForGenericValue(name, "stdDev"), snapshot.getStdDev()); for (int percentileThousands : new int[] { 10, 50, 250, 500, 750, 950, 980, 990, 999 }) { String percentileStr; switch (percentileThousands) { @@ -105,7 +104,7 @@ class YarnShuffleServiceMetrics implements MetricsSource { break; } metricsRecordBuilder.addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, percentileStr), + getShuffleServiceMetricsInfoForGenericValue(name, percentileStr), snapshot.getValue(percentileThousands / 1000.0)); } } else if (metric instanceof Meter) { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index eff2de7143..f040594f39 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -75,23 +75,19 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { metrics.getMetrics.get(testname)) assert(counterNames === Seq(s"${testname}_count")) + val rates = Seq("rate1", "rate5", "rate15", "rateMean") + val percentiles = + "1stPercentile" +: Seq(5, 25, 50, 75, 95, 98, 99, 999).map(_ + "thPercentile") val (expectLong, expectDouble) = if (testname.matches("blockTransfer(Message)?Rate(Bytes)?$")) { // blockTransfer(Message)?Rate(Bytes)? metrics are Meter so just have rate information - (Seq(), Seq("1", "5", "15", "Mean").map(suffix => s"${testname}_rate$suffix")) + (Seq(), rates) } else { // other metrics are Timer so have rate and timing information - ( - Seq(s"${testname}_nanos_max", s"${testname}_nanos_min"), - Seq("rate1", "rate5", "rate15", "rateMean", "nanos_mean", "nanos_stdDev", - "nanos_1stPercentile", "nanos_5thPercentile", "nanos_25thPercentile", - "nanos_50thPercentile", "nanos_75thPercentile", "nanos_95thPercentile", - "nanos_98thPercentile", "nanos_99thPercentile", "nanos_999thPercentile") - .map(suffix => s"${testname}_$suffix") - ) + (Seq("max", "min"), rates ++ Seq("mean", "stdDev") ++ percentiles) } - assert(gaugeLongNames.sorted === expectLong.sorted) - assert(gaugeDoubleNames.sorted === expectDouble.sorted) + assert(gaugeLongNames.sorted === expectLong.map(testname + "_" + _).sorted) + assert(gaugeDoubleNames.sorted === expectDouble.map(testname + "_" + _).sorted) } }