[SPARK-28455][CORE] Avoid overflow when calculating executor timeout time

This would cause the timeout time to be negative, so executors would be
timed out immediately (instead of never).

I also tweaked a couple of log messages that could get pretty long when
lots of executors were active.

Added unit test (which failed without the fix).

Closes #25208 from vanzin/SPARK-28455.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Marcelo Vanzin 2019-07-22 14:31:58 -07:00 committed by Dongjoon Hyun
parent 5b378e6efc
commit a3e013391e
2 changed files with 48 additions and 9 deletions

View file

@ -182,19 +182,19 @@ private[spark] class ExecutorMonitor(
if (updateExecutors) { if (updateExecutors) {
val activeShuffleIds = shuffleStages.map(_._2).toSeq val activeShuffleIds = shuffleStages.map(_._2).toSeq
var needTimeoutUpdate = false var needTimeoutUpdate = false
val activatedExecs = new mutable.ArrayBuffer[String]() val activatedExecs = new ExecutorIdCollector()
executors.asScala.foreach { case (id, exec) => executors.asScala.foreach { case (id, exec) =>
if (!exec.hasActiveShuffle) { if (!exec.hasActiveShuffle) {
exec.updateActiveShuffles(activeShuffleIds) exec.updateActiveShuffles(activeShuffleIds)
if (exec.hasActiveShuffle) { if (exec.hasActiveShuffle) {
needTimeoutUpdate = true needTimeoutUpdate = true
activatedExecs += id activatedExecs.add(id)
} }
} }
} }
logDebug(s"Activated executors ${activatedExecs.mkString(",")} due to shuffle data " + logDebug(s"Activated executors $activatedExecs due to shuffle data needed by new job" +
s"needed by new job ${event.jobId}.") s"${event.jobId}.")
if (needTimeoutUpdate) { if (needTimeoutUpdate) {
nextTimeout.set(Long.MinValue) nextTimeout.set(Long.MinValue)
@ -233,18 +233,18 @@ private[spark] class ExecutorMonitor(
} }
} }
val deactivatedExecs = new mutable.ArrayBuffer[String]() val deactivatedExecs = new ExecutorIdCollector()
executors.asScala.foreach { case (id, exec) => executors.asScala.foreach { case (id, exec) =>
if (exec.hasActiveShuffle) { if (exec.hasActiveShuffle) {
exec.updateActiveShuffles(activeShuffles) exec.updateActiveShuffles(activeShuffles)
if (!exec.hasActiveShuffle) { if (!exec.hasActiveShuffle) {
deactivatedExecs += id deactivatedExecs.add(id)
} }
} }
} }
logDebug(s"Executors ${deactivatedExecs.mkString(",")} do not have active shuffle data " + logDebug(s"Executors $deactivatedExecs do not have active shuffle data after job " +
s"after job ${event.jobId} finished.") s"${event.jobId} finished.")
} }
jobToStageIDs.remove(event.jobId).foreach { stages => jobToStageIDs.remove(event.jobId).foreach { stages =>
@ -448,7 +448,8 @@ private[spark] class ExecutorMonitor(
} else { } else {
idleTimeoutMs idleTimeoutMs
} }
idleStart + timeout val deadline = idleStart + timeout
if (deadline >= 0) deadline else Long.MaxValue
} else { } else {
Long.MaxValue Long.MaxValue
} }
@ -491,4 +492,22 @@ private[spark] class ExecutorMonitor(
private case class ShuffleCleanedEvent(id: Int) extends SparkListenerEvent { private case class ShuffleCleanedEvent(id: Int) extends SparkListenerEvent {
override protected[spark] def logEvent: Boolean = false override protected[spark] def logEvent: Boolean = false
} }
/** Used to collect executor IDs for debug messages (and avoid too long messages). */
private class ExecutorIdCollector {
private val ids = if (log.isDebugEnabled) new mutable.ArrayBuffer[String]() else null
private var excess = 0
def add(id: String): Unit = if (log.isDebugEnabled) {
if (ids.size < 10) {
ids += id
} else {
excess += 1
}
}
override def toString(): String = {
ids.mkString(",") + (if (excess > 0) s" (and $excess more)" else "")
}
}
} }

View file

@ -367,6 +367,26 @@ class ExecutorMonitorSuite extends SparkFunSuite {
assert(monitor.timedOutExecutors(idleDeadline).toSet === Set("1", "2")) assert(monitor.timedOutExecutors(idleDeadline).toSet === Set("1", "2"))
} }
test("SPARK-28455: avoid overflow in timeout calculation") {
conf
.set(DYN_ALLOCATION_SHUFFLE_TIMEOUT, Long.MaxValue)
.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true)
.set(SHUFFLE_SERVICE_ENABLED, false)
monitor = new ExecutorMonitor(conf, client, null, clock)
// Generate events that will make executor 1 be idle, while still holding shuffle data.
// The executor should not be eligible for removal since the timeout is basically "infinite".
val stage = stageInfo(1, shuffleId = 0)
monitor.onJobStart(SparkListenerJobStart(1, clock.getTimeMillis(), Seq(stage)))
clock.advance(1000L)
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
monitor.onTaskStart(SparkListenerTaskStart(1, 0, taskInfo("1", 1)))
monitor.onTaskEnd(SparkListenerTaskEnd(1, 0, "foo", Success, taskInfo("1", 1), null))
monitor.onJobEnd(SparkListenerJobEnd(1, clock.getTimeMillis(), JobSucceeded))
assert(monitor.timedOutExecutors(idleDeadline).isEmpty)
}
private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1 private def idleDeadline: Long = clock.getTimeMillis() + idleTimeoutMs + 1
private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1 private def storageDeadline: Long = clock.getTimeMillis() + storageTimeoutMs + 1
private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1 private def shuffleDeadline: Long = clock.getTimeMillis() + shuffleTimeoutMs + 1