Merge pull request #92 from tgravescs/sparkYarnFixClasspath

Fix the Worker to use CoarseGrainedExecutorBackend and modify classpath ...

...to be explicit about inclusion of spark.jar and app.jar.  Be explicit so if there are any conflicts in packaging between spark.jar and app.jar we don't get random results due to the classpath having /*, which can including things in different order.
This commit is contained in:
Matei Zaharia 2013-10-21 23:35:13 -07:00
commit b84193c5b8
2 changed files with 29 additions and 10 deletions

View file

@ -265,11 +265,7 @@ class Client(conf: Configuration, args: ClientArguments) extends YarnClientImpl
val env = new HashMap[String, String]() val env = new HashMap[String, String]()
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) Client.populateClasspath(yarnConf, log4jConfLocalRes != null, env)
Apps.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*")
Client.populateHadoopClasspath(yarnConf, env)
env("SPARK_YARN_MODE") = "true" env("SPARK_YARN_MODE") = "true"
env("SPARK_YARN_JAR_PATH") = env("SPARK_YARN_JAR_PATH") =
localResources("spark.jar").getResource().getScheme.toString() + "://" + localResources("spark.jar").getResource().getScheme.toString() + "://" +
@ -451,4 +447,30 @@ object Client {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim) Apps.addToEnvironment(env, Environment.CLASSPATH.name, c.trim)
} }
} }
def populateClasspath(conf: Configuration, addLog4j: Boolean, env: HashMap[String, String]) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$())
// If log4j present, ensure ours overrides all others
if (addLog4j) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "log4j.properties")
}
// normally the users app.jar is last in case conflicts with spark jars
val userClasspathFirst = System.getProperty("spark.yarn.user.classpath.first", "false")
.toBoolean
if (userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "app.jar")
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "spark.jar")
Client.populateHadoopClasspath(conf, env)
if (!userClasspathFirst) {
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "app.jar")
}
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$() +
Path.SEPARATOR + "*")
}
} }

View file

@ -121,7 +121,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
// TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ? // TODO: If the OOM is not recoverable by rescheduling it on different node, then do 'something' to fail job ... akin to blacklisting trackers in mapred ?
" -XX:OnOutOfMemoryError='kill %p' " + " -XX:OnOutOfMemoryError='kill %p' " +
JAVA_OPTS + JAVA_OPTS +
" org.apache.spark.executor.StandaloneExecutorBackend " + " org.apache.spark.executor.CoarseGrainedExecutorBackend " +
masterAddress + " " + masterAddress + " " +
slaveId + " " + slaveId + " " +
hostname + " " + hostname + " " +
@ -216,10 +216,7 @@ class WorkerRunnable(container: Container, conf: Configuration, masterAddress: S
def prepareEnvironment: HashMap[String, String] = { def prepareEnvironment: HashMap[String, String] = {
val env = new HashMap[String, String]() val env = new HashMap[String, String]()
Apps.addToEnvironment(env, Environment.CLASSPATH.name, Environment.PWD.$()) Client.populateClasspath(yarnConf, System.getenv("SPARK_YARN_LOG4J_PATH") != null, env)
Apps.addToEnvironment(env, Environment.CLASSPATH.name,
Environment.PWD.$() + Path.SEPARATOR + "*")
Client.populateHadoopClasspath(yarnConf, env)
// allow users to specify some environment variables // allow users to specify some environment variables
Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV")) Apps.setEnvFromInputString(env, System.getenv("SPARK_YARN_USER_ENV"))