[SPARK-4067] refactor ExecutorUncaughtExceptionHandler

https://issues.apache.org/jira/browse/SPARK-4067

currently , we call Utils.tryOrExit everywhere
AppClient
Executor
TaskSchedulerImpl
It makes the name of ExecutorUncaughtExceptionHandler unfit to the real case....

Author: Nan Zhu <nanzhu@Nans-MacBook-Pro.local>
Author: Nan Zhu <nanzhu@nans-mbp.home>

Closes #2913 from CodingCat/SPARK-4067 and squashes the following commits:

035ee3d [Nan Zhu] make RAT happy
e62e416 [Nan Zhu] add some general Exit code
a10b63f [Nan Zhu] refactor
This commit is contained in:
Nan Zhu 2014-10-24 13:46:45 -07:00 committed by Andrew Or
parent b563987e8d
commit f80dcf2aee
5 changed files with 45 additions and 22 deletions

View file

@ -33,7 +33,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._ import org.apache.spark.scheduler._
import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId} import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.{AkkaUtils, Utils} import org.apache.spark.util.{SparkUncaughtExceptionHandler, AkkaUtils, Utils}
/** /**
* Spark executor used with Mesos, YARN, and the standalone scheduler. * Spark executor used with Mesos, YARN, and the standalone scheduler.
@ -72,7 +72,7 @@ private[spark] class Executor(
// Setup an uncaught exception handler for non-local mode. // Setup an uncaught exception handler for non-local mode.
// Make any thread terminations due to uncaught exceptions kill the entire // Make any thread terminations due to uncaught exceptions kill the entire
// executor process to avoid surprising stalls. // executor process to avoid surprising stalls.
Thread.setDefaultUncaughtExceptionHandler(ExecutorUncaughtExceptionHandler) Thread.setDefaultUncaughtExceptionHandler(SparkUncaughtExceptionHandler)
} }
val executorSource = new ExecutorSource(this, executorId) val executorSource = new ExecutorSource(this, executorId)
@ -258,7 +258,7 @@ private[spark] class Executor(
// Don't forcibly exit unless the exception was inherently fatal, to avoid // Don't forcibly exit unless the exception was inherently fatal, to avoid
// stopping other tasks unnecessarily. // stopping other tasks unnecessarily.
if (Utils.isFatalError(t)) { if (Utils.isFatalError(t)) {
ExecutorUncaughtExceptionHandler.uncaughtException(t) SparkUncaughtExceptionHandler.uncaughtException(t)
} }
} }
} finally { } finally {

View file

@ -17,6 +17,8 @@
package org.apache.spark.executor package org.apache.spark.executor
import org.apache.spark.util.SparkExitCode._
/** /**
* These are exit codes that executors should use to provide the master with information about * These are exit codes that executors should use to provide the master with information about
* executor failures assuming that cluster management framework can capture the exit codes (but * executor failures assuming that cluster management framework can capture the exit codes (but
@ -27,16 +29,6 @@ package org.apache.spark.executor
*/ */
private[spark] private[spark]
object ExecutorExitCode { object ExecutorExitCode {
/** The default uncaught exception handler was reached. */
val UNCAUGHT_EXCEPTION = 50
/** The default uncaught exception handler was called and an exception was encountered while
logging the exception. */
val UNCAUGHT_EXCEPTION_TWICE = 51
/** The default uncaught exception handler was reached, and the uncaught exception was an
OutOfMemoryError. */
val OOM = 52
/** DiskStore failed to create a local temporary directory after many attempts. */ /** DiskStore failed to create a local temporary directory after many attempts. */
val DISK_STORE_FAILED_TO_CREATE_DIR = 53 val DISK_STORE_FAILED_TO_CREATE_DIR = 53

View file

@ -0,0 +1,32 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.util
private[spark] object SparkExitCode {
/** The default uncaught exception handler was reached. */
val UNCAUGHT_EXCEPTION = 50
/** The default uncaught exception handler was called and an exception was encountered while
logging the exception. */
val UNCAUGHT_EXCEPTION_TWICE = 51
/** The default uncaught exception handler was reached, and the uncaught exception was an
OutOfMemoryError. */
val OOM = 52
}

View file

@ -15,17 +15,16 @@
* limitations under the License. * limitations under the License.
*/ */
package org.apache.spark.executor package org.apache.spark.util
import org.apache.spark.Logging import org.apache.spark.Logging
import org.apache.spark.util.Utils
/** /**
* The default uncaught exception handler for Executors terminates the whole process, to avoid * The default uncaught exception handler for Executors terminates the whole process, to avoid
* getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better * getting into a bad state indefinitely. Since Executors are relatively lightweight, it's better
* to fail fast when things go wrong. * to fail fast when things go wrong.
*/ */
private[spark] object ExecutorUncaughtExceptionHandler private[spark] object SparkUncaughtExceptionHandler
extends Thread.UncaughtExceptionHandler with Logging { extends Thread.UncaughtExceptionHandler with Logging {
override def uncaughtException(thread: Thread, exception: Throwable) { override def uncaughtException(thread: Thread, exception: Throwable) {
@ -36,14 +35,14 @@ private[spark] object ExecutorUncaughtExceptionHandler
// (If we do, we will deadlock.) // (If we do, we will deadlock.)
if (!Utils.inShutdown()) { if (!Utils.inShutdown()) {
if (exception.isInstanceOf[OutOfMemoryError]) { if (exception.isInstanceOf[OutOfMemoryError]) {
System.exit(ExecutorExitCode.OOM) System.exit(SparkExitCode.OOM)
} else { } else {
System.exit(ExecutorExitCode.UNCAUGHT_EXCEPTION) System.exit(SparkExitCode.UNCAUGHT_EXCEPTION)
} }
} }
} catch { } catch {
case oom: OutOfMemoryError => Runtime.getRuntime.halt(ExecutorExitCode.OOM) case oom: OutOfMemoryError => Runtime.getRuntime.halt(SparkExitCode.OOM)
case t: Throwable => Runtime.getRuntime.halt(ExecutorExitCode.UNCAUGHT_EXCEPTION_TWICE) case t: Throwable => Runtime.getRuntime.halt(SparkExitCode.UNCAUGHT_EXCEPTION_TWICE)
} }
} }

View file

@ -43,7 +43,7 @@ import org.json4s._
import tachyon.client.{TachyonFile,TachyonFS} import tachyon.client.{TachyonFile,TachyonFS}
import org.apache.spark._ import org.apache.spark._
import org.apache.spark.executor.ExecutorUncaughtExceptionHandler import org.apache.spark.util.SparkUncaughtExceptionHandler
import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance} import org.apache.spark.serializer.{DeserializationStream, SerializationStream, SerializerInstance}
/** CallSite represents a place in user code. It can have a short and a long form. */ /** CallSite represents a place in user code. It can have a short and a long form. */
@ -965,7 +965,7 @@ private[spark] object Utils extends Logging {
block block
} catch { } catch {
case e: ControlThrowable => throw e case e: ControlThrowable => throw e
case t: Throwable => ExecutorUncaughtExceptionHandler.uncaughtException(t) case t: Throwable => SparkUncaughtExceptionHandler.uncaughtException(t)
} }
} }