From 181d33e16edfb6fa5abde29de87634bdf4ce7e61 Mon Sep 17 00:00:00 2001 From: "yi.wu" Date: Wed, 18 Aug 2021 22:46:48 +0800 Subject: [PATCH] [SPARK-36532][CORE] Fix deadlock in CoarseGrainedExecutorBackend.onDisconnected to avoid executor shutdown hang ### What changes were proposed in this pull request? Instead of exiting the executor within the RpcEnv's thread, exit the executor in a separate thread. ### Why are the changes needed? The current exit way in `onDisconnected` can cause the deadlock, which has the exact same root cause with https://github.com/apache/spark/pull/12012: * `onDisconnected` -> `System.exit` are called in sequence in the thread of `MessageLoop.threadpool` * `System.exit` triggers shutdown hooks and `executor.stop` is one of the hooks. * `executor.stop` stops the `Dispatcher`, which waits for the `MessageLoop.threadpool` to shutdown further. * Thus, the thread which runs `System.exit` waits for hooks to be done, but the `MessageLoop.threadpool` in the hook waits that thread to finish. Finally, this mutual dependence results in the deadlock. ### Does this PR introduce _any_ user-facing change? Yes, the executor shutdown won't hang. ### How was this patch tested? Pass existing tests. Closes #33759 from Ngone51/fix-executor-shutdown-hang. Authored-by: yi.wu Signed-off-by: Wenchen Fan (cherry picked from commit 996551fecee8c3549438c4f536f8ab9607c644c5) Signed-off-by: Wenchen Fan --- .../CoarseGrainedExecutorBackend.scala | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index d18ffaaa36..ffcb30d74e 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -202,11 +202,17 @@ private[spark] class CoarseGrainedExecutorBackend( stopping.set(true) new Thread("CoarseGrainedExecutorBackend-stop-executor") { override def run(): Unit = { - // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally. - // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to - // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180). - // Therefore, we put this line in a new thread. - executor.stop() + // `executor` can be null if there's any error in `CoarseGrainedExecutorBackend.onStart` + // or fail to create `Executor`. + if (executor == null) { + System.exit(1) + } else { + // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally. + // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to + // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180). + // Therefore, we put this line in a new thread. + executor.stop() + } } }.start() @@ -282,8 +288,7 @@ private[spark] class CoarseGrainedExecutorBackend( if (notifyDriver && driver.nonEmpty) { driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason))) } - - System.exit(code) + self.send(Shutdown) } else { logInfo("Skip exiting executor since it's been already asked to exit before.") }