diff --git a/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java b/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java new file mode 100644 index 0000000000..86fc4f1567 --- /dev/null +++ b/common/network-common/src/main/java/org/apache/spark/network/util/TimerWithCustomTimeUnit.java @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.io.OutputStream; +import java.io.PrintWriter; +import java.util.concurrent.TimeUnit; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.ExponentiallyDecayingReservoir; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; + +/** + * A custom version of a {@link Timer} which allows for specifying a specific {@link TimeUnit} to + * be used when accessing timing values via {@link #getSnapshot()}. Normally, though the + * {@link #update(long, TimeUnit)} method requires a unit, the extraction methods on the snapshot + * do not specify a unit, and always return nanoseconds. It can be useful to specify that a timer + * should use a different unit for its snapshot. Note that internally, all values are still stored + * with nanosecond-precision; it is only before being returned to the caller that the nanosecond + * value is converted to the custom time unit. + */ +public class TimerWithCustomTimeUnit extends Timer { + + private final TimeUnit timeUnit; + private final double nanosPerUnit; + + public TimerWithCustomTimeUnit(TimeUnit timeUnit) { + this(timeUnit, Clock.defaultClock()); + } + + TimerWithCustomTimeUnit(TimeUnit timeUnit, Clock clock) { + super(new ExponentiallyDecayingReservoir(), clock); + this.timeUnit = timeUnit; + this.nanosPerUnit = timeUnit.toNanos(1); + } + + @Override + public Snapshot getSnapshot() { + return new SnapshotWithCustomTimeUnit(super.getSnapshot()); + } + + private double toUnit(double nanos) { + // TimeUnit.convert() truncates (loses precision), so floating-point division is used instead + return nanos / nanosPerUnit; + } + + private long toUnit(long nanos) { + return timeUnit.convert(nanos, TimeUnit.NANOSECONDS); + } + + private class SnapshotWithCustomTimeUnit extends Snapshot { + + private final Snapshot wrappedSnapshot; + + SnapshotWithCustomTimeUnit(Snapshot wrappedSnapshot) { + this.wrappedSnapshot = wrappedSnapshot; + } + + @Override + public double getValue(double v) { + return toUnit(wrappedSnapshot.getValue(v)); + } + + @Override + public long[] getValues() { + long[] nanoValues = wrappedSnapshot.getValues(); + long[] customUnitValues = new long[nanoValues.length]; + for (int i = 0; i < nanoValues.length; i++) { + customUnitValues[i] = toUnit(nanoValues[i]); + } + return customUnitValues; + } + + @Override + public int size() { + return wrappedSnapshot.size(); + } + + @Override + public long getMax() { + return toUnit(wrappedSnapshot.getMax()); + } + + @Override + public double getMean() { + return toUnit(wrappedSnapshot.getMean()); + } + + @Override + public long getMin() { + return toUnit(wrappedSnapshot.getMin()); + } + + @Override + public double getStdDev() { + return toUnit(wrappedSnapshot.getStdDev()); + } + + @Override + public void dump(OutputStream outputStream) { + try (PrintWriter writer = new PrintWriter(outputStream)) { + for (long value : getValues()) { + writer.println(value); + } + } + } + } +} diff --git a/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java b/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java new file mode 100644 index 0000000000..1da0912a7c --- /dev/null +++ b/common/network-common/src/test/java/org/apache/spark/network/util/TimerWithCustomUnitSuite.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.network.util; + +import java.time.Duration; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import com.codahale.metrics.Clock; +import com.codahale.metrics.Snapshot; +import com.codahale.metrics.Timer; +import org.junit.Test; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** Tests for {@link TimerWithCustomTimeUnit} */ +public class TimerWithCustomUnitSuite { + + private static final double EPSILON = 1.0 / 1_000_000_000; + + @Test + public void testTimerWithMillisecondTimeUnit() { + testTimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); + } + + @Test + public void testTimerWithNanosecondTimeUnit() { + testTimerWithCustomTimeUnit(TimeUnit.NANOSECONDS); + } + + private void testTimerWithCustomTimeUnit(TimeUnit timeUnit) { + Timer timer = new TimerWithCustomTimeUnit(timeUnit); + Duration[] durations = { + Duration.ofNanos(1), + Duration.ofMillis(1), + Duration.ofMillis(5), + Duration.ofMillis(100), + Duration.ofSeconds(10) + }; + Arrays.stream(durations).forEach(timer::update); + + Snapshot snapshot = timer.getSnapshot(); + assertEquals(toTimeUnit(durations[0], timeUnit), snapshot.getMin()); + assertEquals(toTimeUnitFloating(durations[0], timeUnit), snapshot.getValue(0), EPSILON); + assertEquals(toTimeUnitFloating(durations[2], timeUnit), snapshot.getMedian(), EPSILON); + assertEquals(toTimeUnitFloating(durations[3], timeUnit), snapshot.get75thPercentile(), EPSILON); + assertEquals(toTimeUnit(durations[4], timeUnit), snapshot.getMax()); + + assertArrayEquals(Arrays.stream(durations).mapToLong(d -> toTimeUnit(d, timeUnit)).toArray(), + snapshot.getValues()); + double total = Arrays.stream(durations).mapToDouble(d -> toTimeUnitFloating(d, timeUnit)).sum(); + assertEquals(total / durations.length, snapshot.getMean(), EPSILON); + } + + @Test + public void testTimingViaContext() { + ManualClock clock = new ManualClock(); + Timer timer = new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS, clock); + Duration[] durations = { Duration.ofNanos(1), Duration.ofMillis(100), Duration.ofMillis(1000) }; + for (Duration d : durations) { + Timer.Context context = timer.time(); + clock.advance(toTimeUnit(d, TimeUnit.NANOSECONDS)); + context.stop(); + } + + Snapshot snapshot = timer.getSnapshot(); + assertEquals(0, snapshot.getMin()); + assertEquals(100, snapshot.getMedian(), EPSILON); + assertEquals(1000, snapshot.getMax(), EPSILON); + } + + private static long toTimeUnit(Duration duration, TimeUnit timeUnit) { + return timeUnit.convert(duration.toNanos(), TimeUnit.NANOSECONDS); + } + + private static double toTimeUnitFloating(Duration duration, TimeUnit timeUnit) { + return ((double) duration.toNanos()) / timeUnit.toNanos(1); + } + + private static class ManualClock extends Clock { + + private long currTick = 1; + + void advance(long nanos) { + currTick += nanos; + } + + @Override + public long getTick() { + return currTick; + } + } +} diff --git a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java index 922bb96c59..e0f2e95950 100644 --- a/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java +++ b/common/network-shuffle/src/main/java/org/apache/spark/network/shuffle/ExternalBlockHandler.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Function; import com.codahale.metrics.Gauge; @@ -49,6 +50,7 @@ import org.apache.spark.network.server.OneForOneStreamManager; import org.apache.spark.network.server.RpcHandler; import org.apache.spark.network.server.StreamManager; import org.apache.spark.network.shuffle.protocol.*; +import org.apache.spark.network.util.TimerWithCustomTimeUnit; import static org.apache.spark.network.util.NettyUtils.getRemoteAddress; import org.apache.spark.network.util.TransportConf; @@ -299,13 +301,17 @@ public class ExternalBlockHandler extends RpcHandler public class ShuffleMetrics implements MetricSet { private final Map allMetrics; // Time latency for open block request in ms - private final Timer openBlockRequestLatencyMillis = new Timer(); + private final Timer openBlockRequestLatencyMillis = + new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); // Time latency for executor registration latency in ms - private final Timer registerExecutorRequestLatencyMillis = new Timer(); + private final Timer registerExecutorRequestLatencyMillis = + new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); // Time latency for processing fetch merged blocks meta request latency in ms - private final Timer fetchMergedBlocksMetaLatencyMillis = new Timer(); + private final Timer fetchMergedBlocksMetaLatencyMillis = + new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); // Time latency for processing finalize shuffle merge request latency in ms - private final Timer finalizeShuffleMergeLatencyMillis = new Timer(); + private final Timer finalizeShuffleMergeLatencyMillis = + new TimerWithCustomTimeUnit(TimeUnit.MILLISECONDS); // Block transfer rate in blocks per second private final Meter blockTransferRate = new Meter(); // Block fetch message rate per second. When using non-batch fetches diff --git a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java index 964d8f9592..d843a6719e 100644 --- a/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java +++ b/common/network-yarn/src/main/java/org/apache/spark/network/yarn/YarnShuffleServiceMetrics.java @@ -68,7 +68,6 @@ class YarnShuffleServiceMetrics implements MetricsSource { // Snapshot inside the Timer provides the information for the operation delay Timer t = (Timer) metric; Snapshot snapshot = t.getSnapshot(); - String timingName = name + "_nanos"; metricsRecordBuilder .addCounter(new ShuffleServiceMetricsInfo(name + "_count", "Count of timer " + name), t.getCount()) @@ -84,13 +83,13 @@ class YarnShuffleServiceMetrics implements MetricsSource { .addGauge(new ShuffleServiceMetricsInfo(name + "_rateMean", "Mean rate of timer " + name), t.getMeanRate()) .addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, "max"), snapshot.getMax()) + getShuffleServiceMetricsInfoForGenericValue(name, "max"), snapshot.getMax()) .addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, "min"), snapshot.getMin()) + getShuffleServiceMetricsInfoForGenericValue(name, "min"), snapshot.getMin()) .addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, "mean"), snapshot.getMean()) + getShuffleServiceMetricsInfoForGenericValue(name, "mean"), snapshot.getMean()) .addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, "stdDev"), snapshot.getStdDev()); + getShuffleServiceMetricsInfoForGenericValue(name, "stdDev"), snapshot.getStdDev()); for (int percentileThousands : new int[] { 10, 50, 250, 500, 750, 950, 980, 990, 999 }) { String percentileStr; switch (percentileThousands) { @@ -105,7 +104,7 @@ class YarnShuffleServiceMetrics implements MetricsSource { break; } metricsRecordBuilder.addGauge( - getShuffleServiceMetricsInfoForGenericValue(timingName, percentileStr), + getShuffleServiceMetricsInfoForGenericValue(name, percentileStr), snapshot.getValue(percentileThousands / 1000.0)); } } else if (metric instanceof Meter) { diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala index eff2de7143..f040594f39 100644 --- a/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala +++ b/resource-managers/yarn/src/test/scala/org/apache/spark/network/yarn/YarnShuffleServiceMetricsSuite.scala @@ -75,23 +75,19 @@ class YarnShuffleServiceMetricsSuite extends SparkFunSuite with Matchers { metrics.getMetrics.get(testname)) assert(counterNames === Seq(s"${testname}_count")) + val rates = Seq("rate1", "rate5", "rate15", "rateMean") + val percentiles = + "1stPercentile" +: Seq(5, 25, 50, 75, 95, 98, 99, 999).map(_ + "thPercentile") val (expectLong, expectDouble) = if (testname.matches("blockTransfer(Message)?Rate(Bytes)?$")) { // blockTransfer(Message)?Rate(Bytes)? metrics are Meter so just have rate information - (Seq(), Seq("1", "5", "15", "Mean").map(suffix => s"${testname}_rate$suffix")) + (Seq(), rates) } else { // other metrics are Timer so have rate and timing information - ( - Seq(s"${testname}_nanos_max", s"${testname}_nanos_min"), - Seq("rate1", "rate5", "rate15", "rateMean", "nanos_mean", "nanos_stdDev", - "nanos_1stPercentile", "nanos_5thPercentile", "nanos_25thPercentile", - "nanos_50thPercentile", "nanos_75thPercentile", "nanos_95thPercentile", - "nanos_98thPercentile", "nanos_99thPercentile", "nanos_999thPercentile") - .map(suffix => s"${testname}_$suffix") - ) + (Seq("max", "min"), rates ++ Seq("mean", "stdDev") ++ percentiles) } - assert(gaugeLongNames.sorted === expectLong.sorted) - assert(gaugeDoubleNames.sorted === expectDouble.sorted) + assert(gaugeLongNames.sorted === expectLong.map(testname + "_" + _).sorted) + assert(gaugeDoubleNames.sorted === expectDouble.map(testname + "_" + _).sorted) } }