[SPARK-10614][CORE] Add monotonic time to Clock interface

This change adds a new method to the Clock interface that returns
the time from a monotonic time source, so that code that needs that
feature can also mock the Clock in tests.

The original getTimeMillis and waitTillTime methods are unchanged, since
streaming code that uses the Clock interface seems to rely on wall clock
semantics, not monotonic clock. So, in a way, this doesn't directly
address the problem raised in the bug, that waitTillTime can be affected
by drift, but then the places being modified to use the new API don't
really rely on that API.

The dynamic allocation code was modified to use the new time source,
since they shouldn't be basing their decisions on wall clock time.

For a longer discussion on how monotonic clocks work on Linux/x64, the
following blog post (and links within) shed a lot of light on the safety of
`System.nanoTime()`:

http://btorpey.github.io/blog/2014/02/18/clock-sources-in-linux/

Tested with unit test and also running apps with dynamic allocation on.

Closes #26058 from vanzin/SPARK-10614.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Marcelo Vanzin 2019-10-14 19:38:04 -07:00 committed by Dongjoon Hyun
parent 9407fba037
commit 857f109c47
6 changed files with 79 additions and 36 deletions

View file

@ -288,7 +288,7 @@ private[spark] class ExecutorAllocationManager(
}
// Update executor target number only after initializing flag is unset
updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
updateAndSyncNumExecutorsTarget(clock.nanoTime())
if (executorIdsToBeRemoved.nonEmpty) {
removeExecutors(executorIdsToBeRemoved)
}
@ -336,7 +336,7 @@ private[spark] class ExecutorAllocationManager(
val delta = addExecutors(maxNeeded)
logDebug(s"Starting timer to add more executors (to " +
s"expire in $sustainedSchedulerBacklogTimeoutS seconds)")
addTime = now + (sustainedSchedulerBacklogTimeoutS * 1000)
addTime = now + TimeUnit.SECONDS.toNanos(sustainedSchedulerBacklogTimeoutS)
delta
} else {
0
@ -481,7 +481,7 @@ private[spark] class ExecutorAllocationManager(
if (addTime == NOT_SET) {
logDebug(s"Starting timer to add executors because pending tasks " +
s"are building up (to expire in $schedulerBacklogTimeoutS seconds)")
addTime = clock.getTimeMillis + schedulerBacklogTimeoutS * 1000
addTime = clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeoutS)
}
}

View file

@ -39,11 +39,12 @@ private[spark] class ExecutorMonitor(
listenerBus: LiveListenerBus,
clock: Clock) extends SparkListener with CleanerListener with Logging {
private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(
private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(
conf.get(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT))
private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(
private val storageTimeoutNs = TimeUnit.SECONDS.toNanos(
conf.get(DYN_ALLOCATION_CACHED_EXECUTOR_IDLE_TIMEOUT))
private val shuffleTimeoutMs = conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT)
private val shuffleTimeoutNs = TimeUnit.MILLISECONDS.toNanos(
conf.get(DYN_ALLOCATION_SHUFFLE_TIMEOUT))
private val fetchFromShuffleSvcEnabled = conf.get(SHUFFLE_SERVICE_ENABLED) &&
conf.get(SHUFFLE_SERVICE_FETCH_RDD_ENABLED)
@ -100,7 +101,7 @@ private[spark] class ExecutorMonitor(
* Should only be called from the EAM thread.
*/
def timedOutExecutors(): Seq[String] = {
val now = clock.getTimeMillis()
val now = clock.nanoTime()
if (now >= nextTimeout.get()) {
// Temporarily set the next timeout at Long.MaxValue. This ensures that after
// scanning all executors below, we know when the next timeout for non-timed out
@ -437,7 +438,7 @@ private[spark] class ExecutorMonitor(
def updateRunningTasks(delta: Int): Unit = {
runningTasks = math.max(0, runningTasks + delta)
idleStart = if (runningTasks == 0) clock.getTimeMillis() else -1L
idleStart = if (runningTasks == 0) clock.nanoTime() else -1L
updateTimeout()
}
@ -445,15 +446,15 @@ private[spark] class ExecutorMonitor(
val oldDeadline = timeoutAt
val newDeadline = if (idleStart >= 0) {
val timeout = if (cachedBlocks.nonEmpty || (shuffleIds != null && shuffleIds.nonEmpty)) {
val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutMs else Long.MaxValue
val _cacheTimeout = if (cachedBlocks.nonEmpty) storageTimeoutNs else Long.MaxValue
val _shuffleTimeout = if (shuffleIds != null && shuffleIds.nonEmpty) {
shuffleTimeoutMs
shuffleTimeoutNs
} else {
Long.MaxValue
}
math.min(_cacheTimeout, _shuffleTimeout)
} else {
idleTimeoutMs
idleTimeoutNs
}
val deadline = idleStart + timeout
if (deadline >= 0) deadline else Long.MaxValue

View file

@ -21,7 +21,37 @@ package org.apache.spark.util
* An interface to represent clocks, so that they can be mocked out in unit tests.
*/
private[spark] trait Clock {
/** @return Current system time, in ms. */
def getTimeMillis(): Long
// scalastyle:off line.size.limit
/**
* Current value of high resolution time source, in ns.
*
* This method abstracts the call to the JRE's `System.nanoTime()` call. As with that method, the
* value here is not guaranteed to be monotonically increasing, but rather a higher resolution
* time source for use in the calculation of time intervals. The characteristics of the values
* returned may very from JVM to JVM (or even the same JVM running on different OSes or CPUs), but
* in general it should be preferred over [[getTimeMillis()]] when calculating time differences.
*
* Specifically for Linux on x64 architecture, the following links provide useful information
* about the characteristics of the value returned:
*
* http://btorpey.github.io/blog/2014/02/18/clock-sources-in-linux/
* https://stackoverflow.com/questions/10921210/cpu-tsc-fetch-operation-especially-in-multicore-multi-processor-environment
*
* TL;DR: on modern (2.6.32+) Linux kernels with modern (AMD K8+) CPUs, the values returned by
* `System.nanoTime()` are consistent across CPU cores *and* packages, and provide always
* increasing values (although it may not be completely monotonic when the the system clock is
* adjusted by NTP daemons using time slew).
*/
// scalastyle:on line.size.limit
def nanoTime(): Long
/**
* Wait until the wall clock reaches at least the given time. Note this may not actually wait for
* the actual difference between the current and target times, since the wall clock may drift.
*/
def waitTillTime(targetTime: Long): Long
}
@ -36,15 +66,19 @@ private[spark] class SystemClock extends Clock {
* @return the same time (milliseconds since the epoch)
* as is reported by `System.currentTimeMillis()`
*/
def getTimeMillis(): Long = System.currentTimeMillis()
override def getTimeMillis(): Long = System.currentTimeMillis()
/**
* @return value reported by `System.nanoTime()`.
*/
override def nanoTime(): Long = System.nanoTime()
/**
* @param targetTime block until the current time is at least this value
* @return current system time when wait has completed
*/
def waitTillTime(targetTime: Long): Long = {
var currentTime = 0L
currentTime = System.currentTimeMillis()
override def waitTillTime(targetTime: Long): Long = {
var currentTime = System.currentTimeMillis()
var waitTime = targetTime - currentTime
if (waitTime <= 0) {

View file

@ -17,11 +17,16 @@
package org.apache.spark.util
import java.util.concurrent.TimeUnit
/**
* A `Clock` whose time can be manually set and modified. Its reported time does not change
* as time elapses, but only as its time is modified by callers. This is mainly useful for
* testing.
*
* For this implementation, `getTimeMillis()` and `nanoTime()` always return the same value
* (adjusted for the correct unit).
*
* @param time initial time (in milliseconds since the epoch)
*/
private[spark] class ManualClock(private var time: Long) extends Clock {
@ -31,11 +36,12 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
*/
def this() = this(0L)
def getTimeMillis(): Long =
synchronized {
override def getTimeMillis(): Long = synchronized {
time
}
override def nanoTime(): Long = TimeUnit.MILLISECONDS.toNanos(getTimeMillis())
/**
* @param timeToSet new time (in milliseconds) that the clock should represent
*/
@ -56,7 +62,7 @@ private[spark] class ManualClock(private var time: Long) extends Clock {
* @param targetTime block until the clock time is set or advanced to at least this time
* @return current time reported by the clock when waiting finishes
*/
def waitTillTime(targetTime: Long): Long = synchronized {
override def waitTillTime(targetTime: Long): Long = synchronized {
while (time < targetTime) {
wait(10)
}

View file

@ -17,6 +17,8 @@
package org.apache.spark
import java.util.concurrent.TimeUnit
import scala.collection.mutable
import org.mockito.ArgumentMatchers.{any, eq => meq}
@ -541,7 +543,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
assert(addTime(manager) === NOT_SET)
onSchedulerBacklogged(manager)
val firstAddTime = addTime(manager)
assert(firstAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
assert(firstAddTime === clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeout))
clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === firstAddTime) // timer is already started
@ -555,7 +557,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
assert(addTime(manager) === NOT_SET)
onSchedulerBacklogged(manager)
val secondAddTime = addTime(manager)
assert(secondAddTime === clock.getTimeMillis + schedulerBacklogTimeout * 1000)
assert(secondAddTime === clock.nanoTime() + TimeUnit.SECONDS.toNanos(schedulerBacklogTimeout))
clock.advance(100L)
onSchedulerBacklogged(manager)
assert(addTime(manager) === secondAddTime) // timer is already started
@ -936,7 +938,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
clock.getTimeMillis(), "executor-1", new ExecutorInfo("host1", 1, Map.empty, Map.empty)))
post(SparkListenerStageSubmitted(createStageInfo(0, 2)))
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 2)
val taskInfo0 = createTaskInfo(0, 0, "executor-1")
post(SparkListenerTaskStart(0, 0, taskInfo0))
@ -952,7 +954,7 @@ class ExecutorAllocationManagerSuite extends SparkFunSuite {
assert(maxNumExecutorsNeeded(manager) === 1)
assert(numExecutorsTarget(manager) === 2)
clock.advance(1000)
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.getTimeMillis())
manager invokePrivate _updateAndSyncNumExecutorsTarget(clock.nanoTime())
assert(numExecutorsTarget(manager) === 1)
verify(client, never).killExecutors(any(), any(), any(), any())

View file

@ -33,9 +33,9 @@ import org.apache.spark.util.ManualClock
class ExecutorMonitorSuite extends SparkFunSuite {
private val idleTimeoutMs = TimeUnit.SECONDS.toMillis(60L)
private val storageTimeoutMs = TimeUnit.SECONDS.toMillis(120L)
private val shuffleTimeoutMs = TimeUnit.SECONDS.toMillis(240L)
private val idleTimeoutNs = TimeUnit.SECONDS.toNanos(60L)
private val storageTimeoutNs = TimeUnit.SECONDS.toNanos(120L)
private val shuffleTimeoutNs = TimeUnit.SECONDS.toNanos(240L)
private val conf = new SparkConf()
.set(DYN_ALLOCATION_EXECUTOR_IDLE_TIMEOUT.key, "60s")
@ -111,8 +111,8 @@ class ExecutorMonitorSuite extends SparkFunSuite {
monitor.onTaskEnd(SparkListenerTaskEnd(1, 1, "foo", Success, taskInfo("1", 1),
new ExecutorMetrics, null))
assert(monitor.isExecutorIdle("1"))
assert(monitor.timedOutExecutors(clock.getTimeMillis()).isEmpty)
assert(monitor.timedOutExecutors(clock.getTimeMillis() + idleTimeoutMs + 1) === Seq("1"))
assert(monitor.timedOutExecutors(clock.nanoTime()).isEmpty)
assert(monitor.timedOutExecutors(clock.nanoTime() + idleTimeoutNs + 1) === Seq("1"))
}
test("use appropriate time out depending on whether blocks are stored") {
@ -166,7 +166,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
// originally went idle.
clock.setTime(idleDeadline)
monitor.onUnpersistRDD(SparkListenerUnpersistRDD(2))
assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1"))
assert(monitor.timedOutExecutors(clock.nanoTime()) === Seq("1"))
}
test("handle timeouts correctly with multiple executors") {
@ -186,7 +186,7 @@ class ExecutorMonitorSuite extends SparkFunSuite {
// start exec 3 at 60s (should idle timeout at 120s, exec 1 should time out)
clock.setTime(TimeUnit.SECONDS.toMillis(60))
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "3", null))
assert(monitor.timedOutExecutors(clock.getTimeMillis()) === Seq("1"))
assert(monitor.timedOutExecutors(clock.nanoTime()) === Seq("1"))
// store block on exec 3 (should now idle time out at 180s)
monitor.onBlockUpdated(rddUpdate(1, 0, "3"))
@ -196,11 +196,11 @@ class ExecutorMonitorSuite extends SparkFunSuite {
// advance to 140s, remove block from exec 3 (time out immediately)
clock.setTime(TimeUnit.SECONDS.toMillis(140))
monitor.onBlockUpdated(rddUpdate(1, 0, "3", level = StorageLevel.NONE))
assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", "3"))
assert(monitor.timedOutExecutors(clock.nanoTime()).toSet === Set("1", "3"))
// advance to 150s, now exec 2 should time out
clock.setTime(TimeUnit.SECONDS.toMillis(150))
assert(monitor.timedOutExecutors(clock.getTimeMillis()).toSet === Set("1", "2", "3"))
assert(monitor.timedOutExecutors(clock.nanoTime()).toSet === Set("1", "2", "3"))
}
test("SPARK-27677: don't track blocks stored on disk when using shuffle service") {
@ -410,9 +410,9 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
}
private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1
private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1
private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1
private def idleDeadline: Long = clock.nanoTime() + idleTimeoutNs + 1
private def storageDeadline: Long = clock.nanoTime() + storageTimeoutNs + 1
private def shuffleDeadline: Long = clock.nanoTime() + shuffleTimeoutNs + 1
private def stageInfo(id: Int, shuffleId: Int = -1): StageInfo = {
new StageInfo(id, 0, s"stage$id", 1, Nil, Nil, "",