[SPARK-15273] YarnSparkHadoopUtil#getOutOfMemoryErrorArgument should respect OnOutOfMemoryError parameter given by user

## What changes were proposed in this pull request?

As Nirav reported in this thread:
http://search-hadoop.com/m/q3RTtdF3yNLMd7u

YarnSparkHadoopUtil#getOutOfMemoryErrorArgument previously specified 'kill %p' unconditionally.
We should respect the parameter given by user.

## How was this patch tested?

Existing tests

Author: tedyu <yuzhihong@gmail.com>

Closes #13057 from tedyu/master.
This commit is contained in:
tedyu 2016-05-20 18:13:18 -05:00 committed by Sean Owen
parent a78d6ce376
commit 06c9f52071
2 changed files with 17 additions and 15 deletions

View file

@ -211,15 +211,10 @@ private[yarn] class ExecutorRunnable(
Seq("--user-class-path", "file:" + absPath) Seq("--user-class-path", "file:" + absPath)
}.toSeq }.toSeq
YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts)
val commands = prefixEnv ++ Seq( val commands = prefixEnv ++ Seq(
YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java", YarnSparkHadoopUtil.expandEnvironment(Environment.JAVA_HOME) + "/bin/java",
"-server", "-server") ++
// Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
// Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
// an inconsistent state.
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do
// 'something' to fail job ... akin to blacklisting trackers in mapred ?
YarnSparkHadoopUtil.getOutOfMemoryErrorArgument) ++
javaOpts ++ javaOpts ++
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress.toString, "--driver-url", masterAddress.toString,

View file

@ -24,7 +24,8 @@ import java.security.PrivilegedExceptionAction
import java.util.regex.Matcher import java.util.regex.Matcher
import java.util.regex.Pattern import java.util.regex.Pattern
import scala.collection.mutable.HashMap import scala.collection.JavaConverters._
import scala.collection.mutable.{HashMap, ListBuffer}
import scala.reflect.runtime._ import scala.reflect.runtime._
import scala.util.Try import scala.util.Try
@ -405,6 +406,12 @@ object YarnSparkHadoopUtil {
} }
/** /**
* Kill if OOM is raised - leverage yarn's failure handling to cause rescheduling.
* Not killing the task leaves various aspects of the executor and (to some extent) the jvm in
* an inconsistent state.
* TODO: If the OOM is not recoverable by rescheduling it on different node, then do
* 'something' to fail job ... akin to blacklisting trackers in mapred ?
*
* The handler if an OOM Exception is thrown by the JVM must be configured on Windows * The handler if an OOM Exception is thrown by the JVM must be configured on Windows
* differently: the 'taskkill' command should be used, whereas Unix-based systems use 'kill'. * differently: the 'taskkill' command should be used, whereas Unix-based systems use 'kill'.
* *
@ -415,14 +422,14 @@ object YarnSparkHadoopUtil {
* the behavior of '%' in a .cmd file: it gets interpreted as an incomplete environment * the behavior of '%' in a .cmd file: it gets interpreted as an incomplete environment
* variable. Windows .cmd files escape a '%' by '%%'. Thus, the correct way of writing * variable. Windows .cmd files escape a '%' by '%%'. Thus, the correct way of writing
* '%%p' in an escaped way is '%%%%p'. * '%%p' in an escaped way is '%%%%p'.
*
* @return The correct OOM Error handler JVM option, platform dependent.
*/ */
def getOutOfMemoryErrorArgument: String = { private[yarn] def addOutOfMemoryErrorArgument(javaOpts: ListBuffer[String]): Unit = {
if (Utils.isWindows) { if (!javaOpts.exists(_.contains("-XX:OnOutOfMemoryError"))) {
escapeForShell("-XX:OnOutOfMemoryError=taskkill /F /PID %%%%p") if (Utils.isWindows) {
} else { javaOpts += escapeForShell("-XX:OnOutOfMemoryError=taskkill /F /PID %%%%p")
"-XX:OnOutOfMemoryError='kill %p'" } else {
javaOpts += "-XX:OnOutOfMemoryError='kill %p'"
}
} }
} }