[SPARK-17022][YARN] Handle potential deadlock in driver handling messages
## What changes were proposed in this pull request? We directly send RequestExecutors to AM instead of transfer it to yarnShedulerBackend first, to avoid potential deadlock. ## How was this patch tested? manual tests Author: WangTaoTheTonic <wangtao111@huawei.com> Closes #14605 from WangTaoTheTonic/lock.
This commit is contained in:
parent
4ec5c360ce
commit
ea0bf91b4a
|
@ -125,8 +125,20 @@ private[spark] abstract class YarnSchedulerBackend(
|
|||
* This includes executors already pending or running.
|
||||
*/
|
||||
override def doRequestTotalExecutors(requestedTotal: Int): Boolean = {
|
||||
yarnSchedulerEndpointRef.askWithRetry[Boolean](
|
||||
RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount))
|
||||
val r = RequestExecutors(requestedTotal, localityAwareTasks, hostToLocalTaskCount)
|
||||
yarnSchedulerEndpoint.amEndpoint match {
|
||||
case Some(am) =>
|
||||
try {
|
||||
am.askWithRetry[Boolean](r)
|
||||
} catch {
|
||||
case NonFatal(e) =>
|
||||
logError(s"Sending $r to AM was unsuccessful", e)
|
||||
return false
|
||||
}
|
||||
case None =>
|
||||
logWarning("Attempted to request executors before the AM has registered!")
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -209,7 +221,7 @@ private[spark] abstract class YarnSchedulerBackend(
|
|||
*/
|
||||
private class YarnSchedulerEndpoint(override val rpcEnv: RpcEnv)
|
||||
extends ThreadSafeRpcEndpoint with Logging {
|
||||
private var amEndpoint: Option[RpcEndpointRef] = None
|
||||
var amEndpoint: Option[RpcEndpointRef] = None
|
||||
|
||||
private val askAmThreadPool =
|
||||
ThreadUtils.newDaemonCachedThreadPool("yarn-scheduler-ask-am-thread-pool")
|
||||
|
|
Loading…
Reference in a new issue