[SPARK-3921] Fix CoarseGrainedExecutorBackend's arguments for Standalone mode

The goal of this patch is to fix the swapped arguments in standalone mode, which was caused by  79e45c9323 (diff-79391110e9f26657e415aa169a004998R153).

More details can be found in the JIRA: [SPARK-3921](https://issues.apache.org/jira/browse/SPARK-3921)

Tested in Standalone mode, but not in Mesos.

Author: Aaron Davidson <aaron@databricks.com>

Closes #2779 from aarondav/fix-standalone and squashes the following commits:

725227a [Aaron Davidson] Fix ExecutorRunnerTest
9d703fe [Aaron Davidson] [SPARK-3921] Fix CoarseGrainedExecutorBackend's arguments for Standalone mode
This commit is contained in:
Aaron Davidson 2014-10-13 23:31:37 -07:00 committed by Andrew Or
parent 4d26aca770
commit 186b497c94
5 changed files with 15 additions and 12 deletions

View file

@ -111,13 +111,14 @@ private[spark] class ExecutorRunner(
case "{{EXECUTOR_ID}}" => execId.toString
case "{{HOSTNAME}}" => host
case "{{CORES}}" => cores.toString
case "{{APP_ID}}" => appId
case other => other
}
def getCommandSeq = {
val command = Command(
appDesc.command.mainClass,
appDesc.command.arguments.map(substituteVariables) ++ Seq(appId),
appDesc.command.arguments.map(substituteVariables),
appDesc.command.environment,
appDesc.command.classPathEntries,
appDesc.command.libraryPathEntries,

View file

@ -152,6 +152,9 @@ private[spark] object CoarseGrainedExecutorBackend extends Logging {
"Usage: CoarseGrainedExecutorBackend <driverUrl> <executorId> <hostname> " +
"<cores> <appid> [<workerUrl>] ")
System.exit(1)
// NB: These arguments are provided by SparkDeploySchedulerBackend (for standalone mode)
// and CoarseMesosSchedulerBackend (for mesos mode).
case 5 =>
run(args(0), args(1), args(2), args(3).toInt, args(4), None)
case x if x > 5 =>

View file

@ -51,7 +51,8 @@ private[spark] class SparkDeploySchedulerBackend(
conf.get("spark.driver.host"),
conf.get("spark.driver.port"),
CoarseGrainedSchedulerBackend.ACTOR_NAME)
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{WORKER_URL}}")
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}", "{{APP_ID}}",
"{{WORKER_URL}}")
val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
.map(Utils.splitCommandString).getOrElse(Seq.empty)
val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath").toSeq.flatMap { cp =>

View file

@ -150,17 +150,17 @@ private[spark] class CoarseMesosSchedulerBackend(
if (uri == null) {
val runScript = new File(executorSparkHome, "./bin/spark-class").getCanonicalPath
command.setValue(
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores))
"\"%s\" org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s".format(
runScript, driverUrl, offer.getSlaveId.getValue, offer.getHostname, numCores, appId))
} else {
// Grab everything to the first '.'. We'll use that and '*' to
// glob the directory "correctly".
val basename = uri.split('/').last.split('.').head
command.setValue(
("cd %s*; " +
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d")
"./bin/spark-class org.apache.spark.executor.CoarseGrainedExecutorBackend %s %s %s %d %s")
.format(basename, driverUrl, offer.getSlaveId.getValue,
offer.getHostname, numCores))
offer.getHostname, numCores, appId))
command.addUris(CommandInfo.URI.newBuilder().setValue(uri))
}
command.build()

View file

@ -26,14 +26,12 @@ import org.apache.spark.SparkConf
class ExecutorRunnerTest extends FunSuite {
test("command includes appId") {
def f(s:String) = new File(s)
val appId = "12345-worker321-9876"
val sparkHome = sys.props.getOrElse("spark.test.home", fail("spark.test.home is not set!"))
val appDesc = new ApplicationDescription("app name", Some(8), 500,
Command("foo", Seq(), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val appId = "12345-worker321-9876"
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321", f(sparkHome),
f("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
Command("foo", Seq(appId), Map(), Seq(), Seq(), Seq()), "appUiUrl")
val er = new ExecutorRunner(appId, 1, appDesc, 8, 500, null, "blah", "worker321",
new File(sparkHome), new File("ooga"), "blah", new SparkConf, ExecutorState.RUNNING)
assert(er.getCommandSeq.last === appId)
}
}