[SPARK-27094][YARN] Work around RackResolver swallowing thread interrupt.

To avoid the case where the YARN libraries would swallow the exception and
prevent YarnAllocator from shutting down, call the offending code in a
separate thread, so that the parent thread can respond appropriately to
the shut down.

As a safeguard, also explicitly stop the executor launch thread pool when
shutting down the application, to prevent new executors from coming up
after the application started its shutdown.

Tested with unit tests + some internal tests on real cluster.

Closes #24017 from vanzin/SPARK-27094.

Authored-by: Marcelo Vanzin <vanzin@cloudera.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
Marcelo Vanzin 2019-03-20 11:48:06 -07:00
parent c65f9b2bc3
commit ec5e34205a
2 changed files with 122 additions and 81 deletions

View file

@ -550,88 +550,94 @@ private[spark] class ApplicationMaster(
reporterThread.join()
}
private def launchReporterThread(): Thread = {
// The number of failures in a row until Reporter thread give up
private def allocationThreadImpl(): Unit = {
// The number of failures in a row until the allocation thread gives up.
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else if (allocator.isAllNodeBlacklisted) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Due to executor failures all available nodes are blacklisted")
} else {
logDebug("Sending progress")
allocator.allocateResources()
}
failureCount = 0
} catch {
case i: InterruptedException => // do nothing
case e: ApplicationAttemptNotFoundException =>
failureCount += 1
logError("Exception from Reporter thread.", e)
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
e.getMessage)
case e: Throwable =>
failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
currentAllocationInterval
} else {
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
sleepStartNs = System.nanoTime()
allocatorLock.wait(sleepInterval)
}
val sleepDuration = System.nanoTime() - sleepStartNs
if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
// log when sleep is interrupted
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval ms.")
// if sleep was less than the minimum interval, sleep for the rest of it
val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
if (toSleep > 0) {
logDebug(s"Going back to sleep for $toSleep ms")
// use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
// by the methods that signal allocatorLock because this is just finishing the min
// sleep interval, which should happen even if this is signalled again.
Thread.sleep(toSleep)
}
} else {
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval.")
}
} catch {
case e: InterruptedException =>
}
}
}
private def launchReporterThread(): Thread = {
val t = new Thread {
override def run() {
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else if (allocator.isAllNodeBlacklisted) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Due to executor failures all available nodes are blacklisted")
} else {
logDebug("Sending progress")
allocator.allocateResources()
}
failureCount = 0
} catch {
case i: InterruptedException => // do nothing
case e: ApplicationAttemptNotFoundException =>
failureCount += 1
logError("Exception from Reporter thread.", e)
finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_REPORTER_FAILURE,
e.getMessage)
case e: Throwable =>
failureCount += 1
if (!NonFatal(e) || failureCount >= reporterMaxFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_REPORTER_FAILURE, "Exception was thrown " +
s"$failureCount time(s) from Reporter thread.")
} else {
logWarning(s"Reporter thread fails $failureCount time(s) in a row.", e)
}
}
try {
val numPendingAllocate = allocator.getPendingAllocate.size
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
currentAllocationInterval
} else {
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
sleepStartNs = System.nanoTime()
allocatorLock.wait(sleepInterval)
}
val sleepDuration = System.nanoTime() - sleepStartNs
if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
// log when sleep is interrupted
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval ms.")
// if sleep was less than the minimum interval, sleep for the rest of it
val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
if (toSleep > 0) {
logDebug(s"Going back to sleep for $toSleep ms")
// use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
// by the methods that signal allocatorLock because this is just finishing the min
// sleep interval, which should happen even if this is signalled again.
Thread.sleep(toSleep)
}
} else {
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval.")
}
} catch {
case e: InterruptedException =>
}
override def run(): Unit = {
try {
allocationThreadImpl()
} finally {
allocator.stop()
}
}
}
// setting to daemon status, though this is usually not a good idea.
t.setDaemon(true)
t.setName("Reporter")
t.start()

View file

@ -381,6 +381,13 @@ private[yarn] class YarnAllocator(
}
}
def stop(): Unit = {
// Forcefully shut down the launcher pool, in case this is being called in the middle of
// container allocation. This will prevent queued executors from being started - and
// potentially interrupt active ExecutorRunnable instaces too.
launcherPool.shutdownNow()
}
private def hostStr(request: ContainerRequest): String = {
Option(request.getNodes) match {
case Some(nodes) => nodes.asScala.mkString(",")
@ -417,12 +424,40 @@ private[yarn] class YarnAllocator(
containersToUse, remainingAfterHostMatches)
}
// Match remaining by rack
// Match remaining by rack. Because YARN's RackResolver swallows thread interrupts
// (see SPARK-27094), which can cause this code to miss interrupts from the AM, use
// a separate thread to perform the operation.
val remainingAfterRackMatches = new ArrayBuffer[Container]
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
if (remainingAfterHostMatches.nonEmpty) {
var exception: Option[Throwable] = None
val thread = new Thread("spark-rack-resolver") {
override def run(): Unit = {
try {
for (allocatedContainer <- remainingAfterHostMatches) {
val rack = resolver.resolve(conf, allocatedContainer.getNodeId.getHost)
matchContainerToRequest(allocatedContainer, rack, containersToUse,
remainingAfterRackMatches)
}
} catch {
case e: Throwable =>
exception = Some(e)
}
}
}
thread.setDaemon(true)
thread.start()
try {
thread.join()
} catch {
case e: InterruptedException =>
thread.interrupt()
throw e
}
if (exception.isDefined) {
throw exception.get
}
}
// Assign remaining that are neither node-local nor rack-local