Merge pull request #89 from rxin/executor
Don't setup the uncaught exception handler in local mode. This avoids unit test failures for Spark streaming. java.util.concurrent.RejectedExecutionException: Task org.apache.spark.streaming.JobManager$JobHandler@38cf728d rejected from java.util.concurrent.ThreadPoolExecutor@3b69a41e[Terminated, pool size = 0, active threads = 0, queued tasks = 0, completed tasks = 14] at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2048) at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:821) at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1372) at org.apache.spark.streaming.JobManager.runJob(JobManager.scala:54) at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108) at org.apache.spark.streaming.Scheduler$$anonfun$generateJobs$2.apply(Scheduler.scala:108) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:60) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.streaming.Scheduler.generateJobs(Scheduler.scala:108) at org.apache.spark.streaming.Scheduler$$anonfun$1.apply$mcVJ$sp(Scheduler.scala:41) at org.apache.spark.streaming.util.RecurringTimer.org$apache$spark$streaming$util$RecurringTimer$$loop(RecurringTimer.scala:66) at org.apache.spark.streaming.util.RecurringTimer$$anon$1.run(RecurringTimer.scala:34)
This commit is contained in:
commit
5b9380e017
|
@ -74,30 +74,33 @@ private[spark] class Executor(
|
|||
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)
|
||||
Thread.currentThread.setContextClassLoader(replClassLoader)
|
||||
|
||||
// Make any thread terminations due to uncaught exceptions kill the entire
|
||||
// executor process to avoid surprising stalls.
|
||||
Thread.setDefaultUncaughtExceptionHandler(
|
||||
new Thread.UncaughtExceptionHandler {
|
||||
override def uncaughtException(thread: Thread, exception: Throwable) {
|
||||
try {
|
||||
logError("Uncaught exception in thread " + thread, exception)
|
||||
if (!isLocal) {
|
||||
// Setup an uncaught exception handler for non-local mode.
|
||||
// Make any thread terminations due to uncaught exceptions kill the entire
|
||||
// executor process to avoid surprising stalls.
|
||||
Thread.setDefaultUncaughtExceptionHandler(
|
||||
new Thread.UncaughtExceptionHandler {
|
||||
override def uncaughtException(thread: Thread, exception: Throwable) {
|
||||
try {
|
||||
logError("Uncaught exception in thread " + thread, exception)
|
||||
|
||||
// We may have been called from a shutdown hook. If so, we must not call System.exit().
|
||||
// (If we do, we will deadlock.)
|
||||
if (!Utils.inShutdown()) {
|
||||
if (exception.isInstanceOf[OutOfMemoryError]) {
|
||||
System.exit(ExecutorExitCode.OOM)
|
||||
} else {
|
||||
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
|
||||
// We may have been called from a shutdown hook. If so, we must not call System.exit().
|
||||
// (If we do, we will deadlock.)
|
||||
if (!Utils.inShutdown()) {
|
||||
if (exception.isInstanceOf[OutOfMemoryError]) {
|
||||
System.exit(ExecutorExitCode.OOM)
|
||||
} else {
|
||||
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
|
||||
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
|
||||
}
|
||||
} catch {
|
||||
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM)
|
||||
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE)
|
||||
}
|
||||
}
|
||||
}
|
||||
)
|
||||
)
|
||||
}
|
||||
|
||||
val executorSource = new ExecutorSource(this, executorId)
|
||||
|
||||
|
|
Loading…
Reference in a new issue