[SPARK-35259][SHUFFLE] Update ExternalBlockHandler Timer variables to expose correct units

### What changes were proposed in this pull request?
`ExternalBlockHandler` exposes 4 metrics which are Dropwizard `Timer` metrics, and are named with a `millis` suffix:
```
    private final Timer openBlockRequestLatencyMillis = new Timer();
    private final Timer registerExecutorRequestLatencyMillis = new Timer();
    private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
    private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
```
However these Dropwizard Timers by default use nanoseconds ([documentation](https://metrics.dropwizard.io/3.2.3/getting-started.html#timers)).

This causes `YarnShuffleServiceMetrics` to expose confusingly-named metrics like `openBlockRequestLatencyMillis_nanos_max` (the actual values are currently in nanos).

This PR adds a new `Timer` subclass, `TimerWithCustomTimeUnit`, which accepts a `TimeUnit` at creation time and exposes timing information using this time unit when values are read. Internally, values are still stored with nanosecond-level precision. The `Timer` metrics within `ExternalBlockHandler` are updated to use the new class with milliseconds as the unit. The logic to include the `nanos` suffix in the metric name within `YarnShuffleServiceMetrics` has also been removed, with the assumption that the metric name itself includes the units.

### Does this PR introduce _any_ user-facing change?
Yes, there are two changes.
First, the names for metrics exposed by `ExternalBlockHandler` via `YarnShuffleServiceMetrics` such as `openBlockRequestLatencyMillis_nanos_max` and `openBlockRequestLatencyMillis_nanos_50thPercentile` have been changed to remove the `_nanos` suffix. This would be considered a breaking change, but these names were only exposed as part of #32388, which has not yet been released (slated for 3.2.0). New names are like `openBlockRequestLatencyMillis_max` and `openBlockRequestLatencyMillis_50thPercentile`
Second, the values of the metrics themselves have changed, to expose milliseconds instead of nanoseconds. Note that this does not affect metrics such as `openBlockRequestLatencyMillis_count` or `openBlockRequestLatencyMillis_rate1`, only the `Snapshot`-related metrics (`max`, `median`, percentiles, etc.). For the YARN case, these metrics were also introduced by #32388, and thus also have not yet been released. It was possible for the nanosecond values to be consumed by some other metrics reporter reading the Dropwizard metrics directly, but I'm not aware of any such usages.

### How was this patch tested?
Unit tests have been updated.

Closes #33116 from xkrogen/xkrogen-SPARK-35259-ess-fix-metric-unit-prefix.

Authored-by: Erik Krogen <xkrogen@apache.org>
Signed-off-by: yi.wu <yi.wu@databricks.com>
(cherry picked from commit 70a15868fc)
Signed-off-by: yi.wu <yi.wu@databricks.com>
This commit is contained in:
Erik Krogen 2021-07-24 21:26:18 +08:00 committed by yi.wu
parent c7d246ba4e
commit 23e48dbf77
5 changed files with 255 additions and 21 deletions

View file

@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.util;
import java.io.OutputStream;
import java.io.PrintWriter;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Clock;
import com.codahale.metrics.ExponentiallyDecayingReservoir;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
/**
* A custom version of a {@link Timer} which allows for specifying a specific {@link TimeUnit} to
* be used when accessing timing values via {@link #getSnapshot()}. Normally, though the
* {@link #update(long, TimeUnit)} method requires a unit, the extraction methods on the snapshot
* do not specify a unit, and always return nanoseconds. It can be useful to specify that a timer
* should use a different unit for its snapshot. Note that internally, all values are still stored
* with nanosecond-precision; it is only before being returned to the caller that the nanosecond
* value is converted to the custom time unit.
*/
public class TimerWithCustomTimeUnit extends Timer {
private final TimeUnit timeUnit;
private final double nanosPerUnit;
public TimerWithCustomTimeUnit(TimeUnit timeUnit) {
this(timeUnit, Clock.defaultClock());
}
TimerWithCustomTimeUnit(TimeUnit timeUnit, Clock clock) {
super(new ExponentiallyDecayingReservoir(), clock);
this.timeUnit = timeUnit;
this.nanosPerUnit = timeUnit.toNanos(1);
}
@Override
public Snapshot getSnapshot() {
return new SnapshotWithCustomTimeUnit(super.getSnapshot());
}
private double toUnit(double nanos) {
// TimeUnit.convert() truncates (loses precision), so floating-point division is used instead
return nanos / nanosPerUnit;
}
private long toUnit(long nanos) {
return timeUnit.convert(nanos, TimeUnit.NANOSECONDS);
}
private class SnapshotWithCustomTimeUnit extends Snapshot {
private final Snapshot wrappedSnapshot;
SnapshotWithCustomTimeUnit(Snapshot wrappedSnapshot) {
this.wrappedSnapshot = wrappedSnapshot;
}
@Override
public double getValue(double v) {
return toUnit(wrappedSnapshot.getValue(v));
}
@Override
public long[] getValues() {
long[] nanoValues = wrappedSnapshot.getValues();
long[] customUnitValues = new long[nanoValues.length];
for (int i = 0; i < nanoValues.length; i++) {
customUnitValues[i] = toUnit(nanoValues[i]);
}
return customUnitValues;
}
@Override
public int size() {
return wrappedSnapshot.size();
}
@Override
public long getMax() {
return toUnit(wrappedSnapshot.getMax());
}
@Override
public double getMean() {
return toUnit(wrappedSnapshot.getMean());
}
@Override
public long getMin() {
return toUnit(wrappedSnapshot.getMin());
}
@Override
public double getStdDev() {
return toUnit(wrappedSnapshot.getStdDev());
}
@Override
public void dump(OutputStream outputStream) {
try (PrintWriter writer = new PrintWriter(outputStream)) {
for (long value : getValues()) {
writer.println(value);
}
}
}
}
}

View file

@ -0,0 +1,109 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.network.util;
import java.time.Duration;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import com.codahale.metrics.Clock;
import com.codahale.metrics.Snapshot;
import com.codahale.metrics.Timer;
import org.junit.Test;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
/** Tests for {@link TimerWithCustomTimeUnit} */
public class TimerWithCustomUnitSuite {
private static final double EPSILON = 1.0 / 1_000_000_000;
@Test
public void testTimerWithMillisecondTimeUnit() {
testTimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
}
@Test
public void testTimerWithNanosecondTimeUnit() {
testTimerWithCustomTimeUnit(TimeUnit.NANOSECONDS);
}
private void testTimerWithCustomTimeUnit(TimeUnit timeUnit) {
Timer timer = new TimerWithCustomTimeUnit(timeUnit);
Duration[] durations = {
Duration.ofNanos(1),
Duration.ofMillis(1),
Duration.ofMillis(5),
Duration.ofMillis(100),
Duration.ofSeconds(10)
};
Arrays.stream(durations).forEach(timer::update);
Snapshot snapshot = timer.getSnapshot();
assertEquals(toTimeUnit(durations[0], timeUnit), snapshot.getMin());
assertEquals(toTimeUnitFloating(durations[0], timeUnit), snapshot.getValue(0), EPSILON);
assertEquals(toTimeUnitFloating(durations[2], timeUnit), snapshot.getMedian(), EPSILON);
assertEquals(toTimeUnitFloating(durations[3], timeUnit), snapshot.get75thPercentile(), EPSILON);
assertEquals(toTimeUnit(durations[4], timeUnit), snapshot.getMax());
assertArrayEquals(Arrays.stream(durations).mapToLong(d -> toTimeUnit(d, timeUnit)).toArray(),
snapshot.getValues());
double total = Arrays.stream(durations).mapToDouble(d -> toTimeUnitFloating(d, timeUnit)).sum();
assertEquals(total / durations.length, snapshot.getMean(), EPSILON);
}
@Test
public void testTimingViaContext() {
ManualClock clock = new ManualClock();
Timer timer = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS, clock);
Duration[] durations = { Duration.ofNanos(1), Duration.ofMillis(100), Duration.ofMillis(1000) };
for (Duration d : durations) {
Timer.Context context = timer.time();
clock.advance(toTimeUnit(d, TimeUnit.NANOSECONDS));
context.stop();
}
Snapshot snapshot = timer.getSnapshot();
assertEquals(0, snapshot.getMin());
assertEquals(100, snapshot.getMedian(), EPSILON);
assertEquals(1000, snapshot.getMax(), EPSILON);
}
private static long toTimeUnit(Duration duration, TimeUnit timeUnit) {
return timeUnit.convert(duration.toNanos(), TimeUnit.NANOSECONDS);
}
private static double toTimeUnitFloating(Duration duration, TimeUnit timeUnit) {
return ((double) duration.toNanos()) / timeUnit.toNanos(1);
}
private static class ManualClock extends Clock {
private long currTick = 1;
void advance(long nanos) {
currTick += nanos;
}
@Override
public long getTick() {
return currTick;
}
}
}

View file

@ -25,6 +25,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.codahale.metrics.Gauge;
@ -49,6 +50,7 @@ import org.apache.spark.network.server.OneForOneStreamManager;
import org.apache.spark.network.server.RpcHandler;
import org.apache.spark.network.server.StreamManager;
import org.apache.spark.network.shuffle.protocol.*;
import org.apache.spark.network.util.TimerWithCustomTimeUnit;
import static org.apache.spark.network.util.NettyUtils.getRemoteAddress;
import org.apache.spark.network.util.TransportConf;
@ -299,13 +301,17 @@ public class ExternalBlockHandler extends RpcHandler
public class ShuffleMetrics implements MetricSet {
private final Map<String, Metric> allMetrics;
// Time latency for open block request in ms
private final Timer openBlockRequestLatencyMillis = new Timer();
private final Timer openBlockRequestLatencyMillis =
new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Time latency for executor registration latency in ms
private final Timer registerExecutorRequestLatencyMillis = new Timer();
private final Timer registerExecutorRequestLatencyMillis =
new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Time latency for processing fetch merged blocks meta request latency in ms
private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer();
private final Timer fetchMergedBlocksMetaLatencyMillis =
new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Time latency for processing finalize shuffle merge request latency in ms
private final Timer finalizeShuffleMergeLatencyMillis = new Timer();
private final Timer finalizeShuffleMergeLatencyMillis =
new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS);
// Block transfer rate in blocks per second
private final Meter blockTransferRate = new Meter();
// Block fetch message rate per second. When using non-batch fetches

View file

@ -68,7 +68,6 @@ class YarnShuffleServiceMetrics implements MetricsSource {
// Snapshot inside the Timer provides the information for the operation delay
Timer t = (Timer) metric;
Snapshot snapshot = t.getSnapshot();
String timingName = name + "_nanos";
metricsRecordBuilder
.addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name),
t.getCount())
@ -84,13 +83,13 @@ class YarnShuffleServiceMetrics implements MetricsSource {
.addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name),
t.getMeanRate())
.addGauge(
getShuffleServiceMetricsInfoForGenericValue(timingName, "max"), snapshot.getMax())
getShuffleServiceMetricsInfoForGenericValue(name, "max"), snapshot.getMax())
.addGauge(
getShuffleServiceMetricsInfoForGenericValue(timingName, "min"), snapshot.getMin())
getShuffleServiceMetricsInfoForGenericValue(name, "min"), snapshot.getMin())
.addGauge(
getShuffleServiceMetricsInfoForGenericValue(timingName, "mean"), snapshot.getMean())
getShuffleServiceMetricsInfoForGenericValue(name, "mean"), snapshot.getMean())
.addGauge(
getShuffleServiceMetricsInfoForGenericValue(timingName, "stdDev"), snapshot.getStdDev());
getShuffleServiceMetricsInfoForGenericValue(name, "stdDev"), snapshot.getStdDev());
for (int percentileThousands : new int[] { 10, 50, 250, 500, 750, 950, 980, 990, 999 }) {
String percentileStr;
switch (percentileThousands) {
@ -105,7 +104,7 @@ class YarnShuffleServiceMetrics implements MetricsSource {
break;
}
metricsRecordBuilder.addGauge(
getShuffleServiceMetricsInfoForGenericValue(timingName, percentileStr),
getShuffleServiceMetricsInfoForGenericValue(name, percentileStr),
snapshot.getValue(percentileThousands / 1000.0));
}
} else if (metric instanceof Meter) {

View file

@ -75,23 +75,19 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers {
metrics.getMetrics.get(testname))
assert(counterNames === Seq(s"${testname}_count"))
val rates = Seq("rate1", "rate5", "rate15", "rateMean")
val percentiles =
"1stPercentile" +: Seq(5, 25, 50, 75, 95, 98, 99, 999).map(_ + "thPercentile")
val (expectLong, expectDouble) =
if (testname.matches("blockTransfer(Message)?Rate(Bytes)?$")) {
// blockTransfer(Message)?Rate(Bytes)? metrics are Meter so just have rate information
(Seq(), Seq("1", "5", "15", "Mean").map(suffix => s"${testname}_rate$suffix"))
(Seq(), rates)
} else {
// other metrics are Timer so have rate and timing information
(
Seq(s"${testname}_nanos_max", s"${testname}_nanos_min"),
Seq("rate1", "rate5", "rate15", "rateMean", "nanos_mean", "nanos_stdDev",
"nanos_1stPercentile", "nanos_5thPercentile", "nanos_25thPercentile",
"nanos_50thPercentile", "nanos_75thPercentile", "nanos_95thPercentile",
"nanos_98thPercentile", "nanos_99thPercentile", "nanos_999thPercentile")
.map(suffix => s"${testname}_$suffix")
)
(Seq("max", "min"), rates ++ Seq("mean", "stdDev") ++ percentiles)
}
assert(gaugeLongNames.sorted === expectLong.sorted)
assert(gaugeDoubleNames.sorted === expectDouble.sorted)
assert(gaugeLongNames.sorted === expectLong.map(testname + "_" + _).sorted)
assert(gaugeDoubleNames.sorted === expectDouble.map(testname + "_" + _).sorted)
}
}