From ecfdffcb3560e21ccd318de6a0c614fa0c3aabf5 Mon Sep 17 00:00:00 2001 From: uncleGen Date: Tue, 23 Apr 2019 07:11:58 -0700 Subject: [PATCH] [SPARK-27503][DSTREAM] JobGenerator thread exit for some fatal errors but application keeps running ## What changes were proposed in this pull request? In some corner cases, `JobGenerator` thread (including some other EventLoop threads) may exit for some fatal error, like OOM, but Spark Streaming job keep running with no batch job generating. Currently, we only report any non-fatal error. ``` override def run(): Unit = { try { while (!stopped.get) { val event = eventQueue.take() try { onReceive(event) } catch { case NonFatal(e) => try { onError(e) } catch { case NonFatal(e) => logError("Unexpected error in " + name, e) } } } } catch { case ie: InterruptedException => // exit even if eventQueue is not empty case NonFatal(e) => logError("Unexpected error in " + name, e) } } ``` In this PR, we double check if event thread alive when post Event ## How was this patch tested? existing unit tests Closes #24400 from uncleGen/SPARK-27503. Authored-by: uncleGen Signed-off-by: Sean Owen --- core/src/main/scala/org/apache/spark/util/EventLoop.scala | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/EventLoop.scala b/core/src/main/scala/org/apache/spark/util/EventLoop.scala index 651ea4996f..5125adc9f7 100644 --- a/core/src/main/scala/org/apache/spark/util/EventLoop.scala +++ b/core/src/main/scala/org/apache/spark/util/EventLoop.scala @@ -100,7 +100,13 @@ private[spark] abstract class EventLoop[E](name: String) extends Logging { * Put the event into the event queue. The event thread will process it later. */ def post(event: E): Unit = { - eventQueue.put(event) + if (!stopped.get) { + if (eventThread.isAlive) { + eventQueue.put(event) + } else { + onError(new IllegalStateException(s"$name has already been stopped accidentally.")) + } + } } /**