[SPARK-4012] stop SparkContext when the exception is thrown from an infinite loop
https://issues.apache.org/jira/browse/SPARK-4012 This patch is a resubmission for https://github.com/apache/spark/pull/2864 What I am proposing in this patch is that ***when the exception is thrown from an infinite loop, we should stop the SparkContext, instead of let JVM throws exception forever*** So, in the infinite loops where we originally wrapped with a ` logUncaughtExceptions`, I changed to `tryOrStopSparkContext`, so that the Spark component is stopped Early stopped JVM process is helpful for HA scheme design, for example, The user has a script checking the existence of the pid of the Spark Streaming driver for monitoring the availability; with the code before this patch, the JVM process is still available but not functional when the exceptions are thrown andrewor14, srowen , mind taking further consideration about the change? Author: CodingCat <zhunansjtu@gmail.com> Closes #5004 from CodingCat/SPARK-4012-1 and squashes the following commits: 589276a [CodingCat] throw fatal error again 3c72cd8 [CodingCat] address the comments 6087864 [CodingCat] revise comments 6ad3eb0 [CodingCat] stop SparkContext instead of quit the JVM process 6322959 [CodingCat] exit JVM process when the exception is thrown from an infinite loop
This commit is contained in:
parent
645cf3fcc2
commit
2c3f83c34b
|
@ -145,7 +145,7 @@ private[spark] class ContextCleaner(sc: SparkContext) extends Logging {
|
|||
}
|
||||
|
||||
/** Keep cleaning RDD, shuffle, and broadcast state. */
|
||||
private def keepCleaning(): Unit = Utils.logUncaughtExceptions {
|
||||
private def keepCleaning(): Unit = Utils.tryOrStopSparkContext(sc) {
|
||||
while (!stopped) {
|
||||
try {
|
||||
val reference = Option(referenceQueue.remove(ContextCleaner.REF_QUEUE_POLL_TIMEOUT))
|
||||
|
|
|
@ -1736,7 +1736,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
|||
}
|
||||
}
|
||||
|
||||
listenerBus.start()
|
||||
listenerBus.start(this)
|
||||
}
|
||||
|
||||
/** Post the application start event */
|
||||
|
|
|
@ -93,7 +93,7 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
|
|||
*/
|
||||
private def getRunner(operateFun: () => Unit): Runnable = {
|
||||
new Runnable() {
|
||||
override def run() = Utils.logUncaughtExceptions {
|
||||
override def run() = Utils.tryOrExit {
|
||||
operateFun()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -145,7 +145,7 @@ private[spark] class TaskSchedulerImpl(
|
|||
import sc.env.actorSystem.dispatcher
|
||||
sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
|
||||
SPECULATION_INTERVAL milliseconds) {
|
||||
Utils.tryOrExit { checkSpeculatableTasks() }
|
||||
Utils.tryOrStopSparkContext(sc) { checkSpeculatableTasks() }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,6 +21,7 @@ import java.util.concurrent._
|
|||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting
|
||||
import org.apache.spark.SparkContext
|
||||
|
||||
/**
|
||||
* Asynchronously passes events to registered listeners.
|
||||
|
@ -38,6 +39,8 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
|
|||
|
||||
self =>
|
||||
|
||||
private var sparkContext: SparkContext = null
|
||||
|
||||
/* Cap the capacity of the event queue so we get an explicit error (rather than
|
||||
* an OOM exception) if it's perpetually being added to more quickly than it's being drained. */
|
||||
private val EVENT_QUEUE_CAPACITY = 10000
|
||||
|
@ -57,7 +60,7 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
|
|||
|
||||
private val listenerThread = new Thread(name) {
|
||||
setDaemon(true)
|
||||
override def run(): Unit = Utils.logUncaughtExceptions {
|
||||
override def run(): Unit = Utils.tryOrStopSparkContext(sparkContext) {
|
||||
while (true) {
|
||||
eventLock.acquire()
|
||||
self.synchronized {
|
||||
|
@ -89,9 +92,12 @@ private[spark] abstract class AsynchronousListenerBus[L <: AnyRef, E](name: Stri
|
|||
* This first sends out all buffered events posted before this listener bus has started, then
|
||||
* listens for any additional events asynchronously while the listener bus is still running.
|
||||
* This should only be called once.
|
||||
*
|
||||
* @param sc Used to stop the SparkContext in case the listener thread dies.
|
||||
*/
|
||||
def start() {
|
||||
def start(sc: SparkContext) {
|
||||
if (started.compareAndSet(false, true)) {
|
||||
sparkContext = sc
|
||||
listenerThread.start()
|
||||
} else {
|
||||
throw new IllegalStateException(s"$name already started!")
|
||||
|
|
|
@ -1146,6 +1146,8 @@ private[spark] object Utils extends Logging {
|
|||
/**
|
||||
* Execute a block of code that evaluates to Unit, forwarding any uncaught exceptions to the
|
||||
* default UncaughtExceptionHandler
|
||||
*
|
||||
* NOTE: This method is to be called by the spark-started JVM process.
|
||||
*/
|
||||
def tryOrExit(block: => Unit) {
|
||||
try {
|
||||
|
@ -1156,6 +1158,32 @@ private[spark] object Utils extends Logging {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a block of code that evaluates to Unit, stop SparkContext is there is any uncaught
|
||||
* exception
|
||||
*
|
||||
* NOTE: This method is to be called by the driver-side components to avoid stopping the
|
||||
* user-started JVM process completely; in contrast, tryOrExit is to be called in the
|
||||
* spark-started JVM process .
|
||||
*/
|
||||
def tryOrStopSparkContext(sc: SparkContext)(block: => Unit) {
|
||||
try {
|
||||
block
|
||||
} catch {
|
||||
case e: ControlThrowable => throw e
|
||||
case t: Throwable =>
|
||||
val currentThreadName = Thread.currentThread().getName
|
||||
if (sc != null) {
|
||||
logError(s"uncaught error in thread $currentThreadName, stopping SparkContext", t)
|
||||
sc.stop()
|
||||
}
|
||||
if (!NonFatal(t)) {
|
||||
logError(s"throw uncaught fatal error in thread $currentThreadName", t)
|
||||
throw t
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Execute a block of code that evaluates to Unit, re-throwing any non-fatal uncaught
|
||||
* exceptions as IOException. This is used when implementing Externalizable and Serializable's
|
||||
|
|
|
@ -25,9 +25,9 @@ import scala.io.Source
|
|||
|
||||
import org.apache.hadoop.fs.Path
|
||||
import org.json4s.jackson.JsonMethods._
|
||||
import org.scalatest.{BeforeAndAfter, FunSuite}
|
||||
import org.scalatest.{FunSuiteLike, BeforeAndAfter, FunSuite}
|
||||
|
||||
import org.apache.spark.{Logging, SparkConf, SparkContext, SPARK_VERSION}
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.deploy.SparkHadoopUtil
|
||||
import org.apache.spark.io._
|
||||
import org.apache.spark.util.{JsonProtocol, Utils}
|
||||
|
@ -39,7 +39,8 @@ import org.apache.spark.util.{JsonProtocol, Utils}
|
|||
* logging events, whether the parsing of the file names is correct, and whether the logged events
|
||||
* can be read and deserialized into actual SparkListenerEvents.
|
||||
*/
|
||||
class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Logging {
|
||||
class EventLoggingListenerSuite extends FunSuite with LocalSparkContext with BeforeAndAfter
|
||||
with Logging {
|
||||
import EventLoggingListenerSuite._
|
||||
|
||||
private val fileSystem = Utils.getHadoopFileSystem("/",
|
||||
|
@ -144,7 +145,7 @@ class EventLoggingListenerSuite extends FunSuite with BeforeAndAfter with Loggin
|
|||
|
||||
// A comprehensive test on JSON de/serialization of all events is in JsonProtocolSuite
|
||||
eventLogger.start()
|
||||
listenerBus.start()
|
||||
listenerBus.start(sc)
|
||||
listenerBus.addListener(eventLogger)
|
||||
listenerBus.postToAll(applicationStart)
|
||||
listenerBus.postToAll(applicationEnd)
|
||||
|
|
|
@ -46,7 +46,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
|
|||
assert(counter.count === 0)
|
||||
|
||||
// Starting listener bus should flush all buffered events
|
||||
bus.start()
|
||||
bus.start(sc)
|
||||
assert(bus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS))
|
||||
assert(counter.count === 5)
|
||||
|
||||
|
@ -58,8 +58,8 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
|
|||
// Listener bus must not be started twice
|
||||
intercept[IllegalStateException] {
|
||||
val bus = new LiveListenerBus
|
||||
bus.start()
|
||||
bus.start()
|
||||
bus.start(sc)
|
||||
bus.start(sc)
|
||||
}
|
||||
|
||||
// ... or stopped before starting
|
||||
|
@ -96,7 +96,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
|
|||
val blockingListener = new BlockingListener
|
||||
|
||||
bus.addListener(blockingListener)
|
||||
bus.start()
|
||||
bus.start(sc)
|
||||
bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded))
|
||||
|
||||
listenerStarted.acquire()
|
||||
|
@ -347,7 +347,7 @@ class SparkListenerSuite extends FunSuite with LocalSparkContext with Matchers
|
|||
bus.addListener(badListener)
|
||||
bus.addListener(jobCounter1)
|
||||
bus.addListener(jobCounter2)
|
||||
bus.start()
|
||||
bus.start(sc)
|
||||
|
||||
// Post events to all listeners, and wait until the queue is drained
|
||||
(1 to 5).foreach { _ => bus.post(SparkListenerJobEnd(0, jobCompletionTime, JobSucceeded)) }
|
||||
|
|
|
@ -61,7 +61,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
|
|||
}
|
||||
}), "JobScheduler")
|
||||
|
||||
listenerBus.start()
|
||||
listenerBus.start(ssc.sparkContext)
|
||||
receiverTracker = new ReceiverTracker(ssc)
|
||||
receiverTracker.start()
|
||||
jobGenerator.start()
|
||||
|
|
Loading…
Reference in a new issue