[SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
Added faster RM-heartbeats on pending container allocations with multiplicative back-off. Also updated related documentations. Author: ehnalis <zoltan.zvara@gmail.com> Closes #6082 from ehnalis/yarn and squashes the following commits: a1d2101 [ehnalis] MIss-spell fixed. 90f8ba4 [ehnalis] Changed default HB values. 6120295 [ehnalis] Removed the bug, when allocation heartbeat would not start from initial value. 08bac63 [ehnalis] Refined style, grammar, removed duplicated code. 073d283 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats. d4408c9 [ehnalis] [SPARK-7533] [YARN] Decrease spacing between AM-RM heartbeats.
This commit is contained in:
parent
09265ad7c8
commit
3ddf051ee7
|
@ -71,9 +71,22 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
|
|||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.yarn.scheduler.heartbeat.interval-ms</code></td>
|
||||
<td>5000</td>
|
||||
<td>3000</td>
|
||||
<td>
|
||||
The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager.
|
||||
The value is capped at half the value of YARN's configuration for the expiry interval
|
||||
(<code>yarn.am.liveness-monitor.expiry-interval-ms</code>).
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td><code>spark.yarn.scheduler.initial-allocation.interval</code></td>
|
||||
<td>200ms</td>
|
||||
<td>
|
||||
The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager
|
||||
when there are pending container allocation requests. It should be no larger than
|
||||
<code>spark.yarn.scheduler.heartbeat.interval-ms</code>. The allocation interval will doubled on
|
||||
successive eager heartbeats if pending containers still exist, until
|
||||
<code>spark.yarn.scheduler.heartbeat.interval-ms</code> is reached.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
|
|
|
@ -300,11 +300,14 @@ private[spark] class ApplicationMaster(
|
|||
val expiryInterval = yarnConf.getInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 120000)
|
||||
|
||||
// we want to be reasonably responsive without causing too many requests to RM.
|
||||
val schedulerInterval =
|
||||
sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "5s")
|
||||
val heartbeatInterval = math.max(0, math.min(expiryInterval / 2,
|
||||
sparkConf.getTimeAsMs("spark.yarn.scheduler.heartbeat.interval-ms", "3s")))
|
||||
|
||||
// must be <= expiryInterval / 2.
|
||||
val interval = math.max(0, math.min(expiryInterval / 2, schedulerInterval))
|
||||
// we want to check more frequently for pending containers
|
||||
val initialAllocationInterval = math.min(heartbeatInterval,
|
||||
sparkConf.getTimeAsMs("spark.yarn.scheduler.initial-allocation.interval", "200ms"))
|
||||
|
||||
var nextAllocationInterval = initialAllocationInterval
|
||||
|
||||
// The number of failures in a row until Reporter thread give up
|
||||
val reporterMaxFailures = sparkConf.getInt("spark.yarn.scheduler.reporterThread.maxFailures", 5)
|
||||
|
@ -330,15 +333,27 @@ private[spark] class ApplicationMaster(
|
|||
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
|
||||
finish(FinalApplicationStatus.FAILED,
|
||||
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
|
||||
s"${failureCount} time(s) from Reporter thread.")
|
||||
|
||||
s"$failureCount time(s) from Reporter thread.")
|
||||
} else {
|
||||
logWarning(s"Reporter thread fails ${failureCount} time(s) in a row.", e)
|
||||
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
try {
|
||||
Thread.sleep(interval)
|
||||
val numPendingAllocate = allocator.getNumPendingAllocate
|
||||
val sleepInterval =
|
||||
if (numPendingAllocate > 0) {
|
||||
val currentAllocationInterval =
|
||||
math.min(heartbeatInterval, nextAllocationInterval)
|
||||
nextAllocationInterval *= 2
|
||||
currentAllocationInterval
|
||||
} else {
|
||||
nextAllocationInterval = initialAllocationInterval
|
||||
heartbeatInterval
|
||||
}
|
||||
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
|
||||
s"Sleeping for $sleepInterval.")
|
||||
Thread.sleep(sleepInterval)
|
||||
} catch {
|
||||
case e: InterruptedException =>
|
||||
}
|
||||
|
@ -349,7 +364,8 @@ private[spark] class ApplicationMaster(
|
|||
t.setDaemon(true)
|
||||
t.setName("Reporter")
|
||||
t.start()
|
||||
logInfo("Started progress reporter thread - sleep time : " + interval)
|
||||
logInfo(s"Started progress reporter thread with (heartbeat : $heartbeatInterval, " +
|
||||
s"initial allocation : $initialAllocationInterval) intervals")
|
||||
t
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue