diff --git a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala index 5114cf70e3..7415302fc5 100644 --- a/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala +++ b/core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala @@ -288,7 +288,7 @@ private[spark] class ExecutorAllocationManager( } // Update executor target number only after initializing flag is unset - updateAndSyncNumExecutorsTarget(clock.getTimeMillis()) + updateAndSyncNumExecutorsTarget(clock.nanoTime()) if (executorIdsToBeRemoved.nonEmpty) { removeExecutors(executorIdsToBeRemoved) } @@ -336,7 +336,7 @@ private[spark] class ExecutorAllocationManager( val delta = addExecutors(maxNeeded) logDebug(s"Starting timer to add more executors (to " + s"expire in $sustainedSchedulerBacklogTimeoutS seconds)") - addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000) + addTime = now + TimeUnit.SECONDS.toNanos(sustainedSchedulerBacklogTimeoutS) delta } else { 0 @@ -481,7 +481,7 @@ private[spark] class ExecutorAllocationManager( if (addTime == NOT_SET) { logDebug(s"Starting timer to add executors because pending tasks " + s"are building up (to expire in $schedulerBacklogTimeoutS seconds)") - addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000 + addTime = clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeoutS) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala index aa901d6568..3dfd1eac8c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala @@ -39,11 +39,12 @@ private[spark] class ExecutorMonitor( listenerBus: LiveListenerBus, clock: Clock) extends SparkListener with CleanerListener with Logging { - private val idleTimeoutMs = TimeUnit.SECONDS.toMillis( + private val idleTimeoutNs = TimeUnit.SECONDS.toNanos( conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT)) - private val storageTimeoutMs = TimeUnit.SECONDS.toMillis( + private val storageTimeoutNs = TimeUnit.SECONDS.toNanos( conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT)) - private val shuffleTimeoutMs = conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT) + private val shuffleTimeoutNs = TimeUnit.MILLISECONDS.toNanos( + conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT)) private val fetchFromShuffleSvcEnabled = conf.get(SHUFFLE_SERVICE_ENABLED) && conf.get(SHUFFLE_SERVICE_FETCH_RDD_ENABLED) @@ -100,7 +101,7 @@ private[spark] class ExecutorMonitor( * Should only be called from the EAM thread. */ def timedOutExecutors(): Seq[String] = { - val now = clock.getTimeMillis() + val now = clock.nanoTime() if (now >= nextTimeout.get()) { // Temporarily set the next timeout at Long.MaxValue. This ensures that after // scanning all executors below, we know when the next timeout for non-timed out @@ -437,7 +438,7 @@ private[spark] class ExecutorMonitor( def updateRunningTasks(delta: Int): Unit = { runningTasks = math.max(0, runningTasks + delta) - idleStart = if (runningTasks == 0) clock.getTimeMillis() else -1L + idleStart = if (runningTasks == 0) clock.nanoTime() else -1L updateTimeout() } @@ -445,15 +446,15 @@ private[spark] class ExecutorMonitor( val oldDeadline = timeoutAt val newDeadline = if (idleStart >= 0) { val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) { - val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutMs else Long.MaxValue + val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else Long.MaxValue val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) { - shuffleTimeoutMs + shuffleTimeoutNs } else { Long.MaxValue } math.min(_cacheTimeout, _shuffleTimeout) } else { - idleTimeoutMs + idleTimeoutNs } val deadline = idleStart + timeout if (deadline >= 0) deadline else Long.MaxValue diff --git a/core/src/main/scala/org/apache/spark/util/Clock.scala b/core/src/main/scala/org/apache/spark/util/Clock.scala index e92ed11bd1..d2674d4f47 100644 --- a/core/src/main/scala/org/apache/spark/util/Clock.scala +++ b/core/src/main/scala/org/apache/spark/util/Clock.scala @@ -21,7 +21,37 @@ package org.apache.spark.util * An interface to represent clocks, so that they can be mocked out in unit tests. */ private[spark] trait Clock { + /** @return Current system time, in ms. */ def getTimeMillis(): Long + + // scalastyle:off line.size.limit + /** + * Current value of high resolution time source, in ns. + * + * This method abstracts the call to the JRE's `System.nanoTime()` call. As with that method, the + * value here is not guaranteed to be monotonically increasing, but rather a higher resolution + * time source for use in the calculation of time intervals. The characteristics of the values + * returned may very from JVM to JVM (or even the same JVM running on different OSes or CPUs), but + * in general it should be preferred over [[getTimeMillis()]] when calculating time differences. + * + * Specifically for Linux on x64 architecture, the following links provide useful information + * about the characteristics of the value returned: + * + * http://btorpey.github.io/blog/2014/02/18/clock-sources-in-linux/ + * https://stackoverflow.com/questions/10921210/cpu-tsc-fetch-operation-especially-in-multicore-multi-processor-environment + * + * TL;DR: on modern (2.6.32+) Linux kernels with modern (AMD K8+) CPUs, the values returned by + * `System.nanoTime()` are consistent across CPU cores *and* packages, and provide always + * increasing values (although it may not be completely monotonic when the the system clock is + * adjusted by NTP daemons using time slew). + */ + // scalastyle:on line.size.limit + def nanoTime(): Long + + /** + * Wait until the wall clock reaches at least the given time. Note this may not actually wait for + * the actual difference between the current and target times, since the wall clock may drift. + */ def waitTillTime(targetTime: Long): Long } @@ -36,15 +66,19 @@ private[spark] class SystemClock extends Clock { * @return the same time (milliseconds since the epoch) * as is reported by `System.currentTimeMillis()` */ - def getTimeMillis(): Long = System.currentTimeMillis() + override def getTimeMillis(): Long = System.currentTimeMillis() + + /** + * @return value reported by `System.nanoTime()`. + */ + override def nanoTime(): Long = System.nanoTime() /** * @param targetTime block until the current time is at least this value * @return current system time when wait has completed */ - def waitTillTime(targetTime: Long): Long = { - var currentTime = 0L - currentTime = System.currentTimeMillis() + override def waitTillTime(targetTime: Long): Long = { + var currentTime = System.currentTimeMillis() var waitTime = targetTime - currentTime if (waitTime <= 0) { diff --git a/core/src/main/scala/org/apache/spark/util/ManualClock.scala b/core/src/main/scala/org/apache/spark/util/ManualClock.scala index e7a65d74a4..36d6820eba 100644 --- a/core/src/main/scala/org/apache/spark/util/ManualClock.scala +++ b/core/src/main/scala/org/apache/spark/util/ManualClock.scala @@ -17,11 +17,16 @@ package org.apache.spark.util +import java.util.concurrent.TimeUnit + /** * A `Clock` whose time can be manually set and modified. Its reported time does not change * as time elapses, but only as its time is modified by callers. This is mainly useful for * testing. * + * For this implementation, `getTimeMillis()` and `nanoTime()` always return the same value + * (adjusted for the correct unit). + * * @param time initial time (in milliseconds since the epoch) */ private[spark] class ManualClock(private var time: Long) extends Clock { @@ -31,10 +36,11 @@ private[spark] class ManualClock(private var time: Long) extends Clock { */ def this() = this(0L) - def getTimeMillis(): Long = - synchronized { - time - } + override def getTimeMillis(): Long = synchronized { + time + } + + override def nanoTime(): Long = TimeUnit.MILLISECONDS.toNanos(getTimeMillis()) /** * @param timeToSet new time (in milliseconds) that the clock should represent @@ -56,7 +62,7 @@ private[spark] class ManualClock(private var time: Long) extends Clock { * @param targetTime block until the clock time is set or advanced to at least this time * @return current time reported by the clock when waiting finishes */ - def waitTillTime(targetTime: Long): Long = synchronized { + override def waitTillTime(targetTime: Long): Long = synchronized { while (time < targetTime) { wait(10) } diff --git a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala index 04c00f1181..6ae1f197cf 100644 --- a/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ExecutorAllocationManagerSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark +import java.util.concurrent.TimeUnit + import scala.collection.mutable import org.mockito.ArgumentMatchers.{any, eq => meq} @@ -541,7 +543,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(addTime(manager) === NOT_SET) onSchedulerBacklogged(manager) val firstAddTime = addTime(manager) - assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000) + assert(firstAddTime === clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeout)) clock.advance(100L) onSchedulerBacklogged(manager) assert(addTime(manager) === firstAddTime) // timer is already started @@ -555,7 +557,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(addTime(manager) === NOT_SET) onSchedulerBacklogged(manager) val secondAddTime = addTime(manager) - assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000) + assert(secondAddTime === clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeout)) clock.advance(100L) onSchedulerBacklogged(manager) assert(addTime(manager) === secondAddTime) // timer is already started @@ -936,7 +938,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty))) post(SparkListenerStageSubmitted(createStageInfo(0, 2))) clock.advance(1000) - manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis()) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) assert(numExecutorsTarget(manager) === 2) val taskInfo0 = createTaskInfo(0, 0, "executor-1") post(SparkListenerTaskStart(0, 0, taskInfo0)) @@ -952,7 +954,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite { assert(maxNumExecutorsNeeded(manager) === 1) assert(numExecutorsTarget(manager) === 2) clock.advance(1000) - manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis()) + manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime()) assert(numExecutorsTarget(manager) === 1) verify(client, never).killExecutors(any(), any(), any(), any()) diff --git a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala index d3feb35537..1397cb7b39 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala @@ -33,9 +33,9 @@ import org.apache.spark.util.ManualClock class ExecutorMonitorSuite extends SparkFunSuite { - private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(60L) - private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(120L) - private val shuffleTimeoutMs = TimeUnit.SECONDS.toMillis(240L) + private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(60L) + private val storageTimeoutNs = TimeUnit.SECONDS.toNanos(120L) + private val shuffleTimeoutNs = TimeUnit.SECONDS.toNanos(240L) private val conf = new SparkConf() .set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "60s") @@ -111,8 +111,8 @@ class ExecutorMonitorSuite extends SparkFunSuite { monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, taskInfo("1", 1), new ExecutorMetrics, null)) assert(monitor.isExecutorIdle("1")) - assert(monitor.timedOutExecutors(clock.getTimeMillis()).isEmpty) - assert(monitor.timedOutExecutors(clock.getTimeMillis() + idleTimeoutMs + 1) === Seq("1")) + assert(monitor.timedOutExecutors(clock.nanoTime()).isEmpty) + assert(monitor.timedOutExecutors(clock.nanoTime() + idleTimeoutNs + 1) === Seq("1")) } test("use appropriate time out depending on whether blocks are stored") { @@ -166,7 +166,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { // originally went idle. clock.setTime(idleDeadline) monitor.onUnpersistRDD(SparkListenerUnpersistRDD(2)) - assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1")) + assert(monitor.timedOutExecutors(clock.nanoTime()) === Seq("1")) } test("handle timeouts correctly with multiple executors") { @@ -186,7 +186,7 @@ class ExecutorMonitorSuite extends SparkFunSuite { // start exec 3 at 60s (should idle timeout at 120s, exec 1 should time out) clock.setTime(TimeUnit.SECONDS.toMillis(60)) monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null)) - assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1")) + assert(monitor.timedOutExecutors(clock.nanoTime()) === Seq("1")) // store block on exec 3 (should now idle time out at 180s) monitor.onBlockUpdated(rddUpdate(1, 0, "3")) @@ -196,11 +196,11 @@ class ExecutorMonitorSuite extends SparkFunSuite { // advance to 140s, remove block from exec 3 (time out immediately) clock.setTime(TimeUnit.SECONDS.toMillis(140)) monitor.onBlockUpdated(rddUpdate(1, 0, "3", level = StorageLevel.NONE)) - assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", "3")) + assert(monitor.timedOutExecutors(clock.nanoTime()).toSet === Set("1", "3")) // advance to 150s, now exec 2 should time out clock.setTime(TimeUnit.SECONDS.toMillis(150)) - assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", "2", "3")) + assert(monitor.timedOutExecutors(clock.nanoTime()).toSet === Set("1", "2", "3")) } test("SPARK-27677: don't track blocks stored on disk when using shuffle service") { @@ -410,9 +410,9 @@ class ExecutorMonitorSuite extends SparkFunSuite { assert(monitor.timedOutExecutors(idleDeadline).isEmpty) } - private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1 - private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1 - private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1 + private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1 + private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1 + private def shuffleDeadline: Long = clock.nanoTime() + shuffleTimeoutNs + 1 private def stageInfo(id: Int, shuffleId: Int = -1): StageInfo = { new StageInfo(id, 0, s"stage$id", 1, Nil, Nil, "",