[SPARK-36532][CORE] Fix deadlock in CoarseGrainedExecutorBackend.onDisconnected to avoid executor shutdown hang

### What changes were proposed in this pull request?

Instead of exiting the executor within the RpcEnv's thread, exit the executor in a separate thread.

### Why are the changes needed?

The current exit way in `onDisconnected` can cause the deadlock, which has the exact same root cause with https://github.com/apache/spark/pull/12012:

* `onDisconnected` -> `System.exit` are called in sequence in the thread of `MessageLoop.threadpool`
* `System.exit` triggers shutdown hooks and `executor.stop` is one of the hooks.
* `executor.stop` stops the `Dispatcher`, which waits for the `MessageLoop.threadpool`  to shutdown further.
* Thus, the thread which runs `System.exit` waits for hooks to be done, but the `MessageLoop.threadpool` in the hook waits that thread to finish. Finally, this mutual dependence results in the deadlock.

### Does this PR introduce _any_ user-facing change?

Yes, the executor shutdown won't hang.

### How was this patch tested?

Pass existing tests.

Closes #33759 from Ngone51/fix-executor-shutdown-hang.

Authored-by: yi.wu <yi.wu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
yi.wu 2021-08-18 22:46:48 +08:00 committed by Wenchen Fan
parent a1ecf83f2e
commit 996551fece

View file

@ -202,11 +202,17 @@ private[spark] class CoarseGrainedExecutorBackend(
stopping.set(true) stopping.set(true)
new Thread("CoarseGrainedExecutorBackend-stop-executor") { new Thread("CoarseGrainedExecutorBackend-stop-executor") {
override def run(): Unit = { override def run(): Unit = {
// executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally. // `executor` can be null if there's any error in `CoarseGrainedExecutorBackend.onStart`
// However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to // or fail to create `Executor`.
// stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180). if (executor == null) {
// Therefore, we put this line in a new thread. System.exit(1)
executor.stop() } else {
// executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally.
// However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to
// stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180).
// Therefore, we put this line in a new thread.
executor.stop()
}
} }
}.start() }.start()
@ -286,8 +292,7 @@ private[spark] class CoarseGrainedExecutorBackend(
if (notifyDriver && driver.nonEmpty) { if (notifyDriver && driver.nonEmpty) {
driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason))) driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason)))
} }
self.send(Shutdown)
System.exit(code)
} else { } else {
logInfo("Skip exiting executor since it's been already asked to exit before.") logInfo("Skip exiting executor since it's been already asked to exit before.")
} }