[SPARK-13220][CORE] deprecate yarn-client and yarn-cluster mode
Author: jerryshao <sshao@hortonworks.com> Closes #11229 from jerryshao/SPARK-13220.
This commit is contained in:
parent
87250580f2
commit
e99d017098
|
@ -503,6 +503,31 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {
|
||||||
set("spark.executor.instances", value)
|
set("spark.executor.instances", value)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (contains("spark.master") && get("spark.master").startsWith("yarn-")) {
|
||||||
|
val warning = s"spark.master ${get("spark.master")} is deprecated in Spark 2.0+, please " +
|
||||||
|
"instead use \"yarn\" with specified deploy mode."
|
||||||
|
|
||||||
|
get("spark.master") match {
|
||||||
|
case "yarn-cluster" =>
|
||||||
|
logWarning(warning)
|
||||||
|
set("spark.master", "yarn")
|
||||||
|
set("spark.submit.deployMode", "cluster")
|
||||||
|
case "yarn-client" =>
|
||||||
|
logWarning(warning)
|
||||||
|
set("spark.master", "yarn")
|
||||||
|
set("spark.submit.deployMode", "client")
|
||||||
|
case _ => // Any other unexpected master will be checked when creating scheduler backend.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (contains("spark.submit.deployMode")) {
|
||||||
|
get("spark.submit.deployMode") match {
|
||||||
|
case "cluster" | "client" =>
|
||||||
|
case e => throw new SparkException("spark.submit.deployMode can only be \"cluster\" or " +
|
||||||
|
"\"client\".")
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -237,6 +237,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
||||||
def jars: Seq[String] = _jars
|
def jars: Seq[String] = _jars
|
||||||
def files: Seq[String] = _files
|
def files: Seq[String] = _files
|
||||||
def master: String = _conf.get("spark.master")
|
def master: String = _conf.get("spark.master")
|
||||||
|
def deployMode: String = _conf.getOption("spark.submit.deployMode").getOrElse("client")
|
||||||
def appName: String = _conf.get("spark.app.name")
|
def appName: String = _conf.get("spark.app.name")
|
||||||
|
|
||||||
private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
|
private[spark] def isEventLogEnabled: Boolean = _conf.getBoolean("spark.eventLog.enabled", false)
|
||||||
|
@ -375,10 +376,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
||||||
}
|
}
|
||||||
|
|
||||||
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
|
// System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
|
||||||
// yarn-standalone is deprecated, but still supported
|
if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
|
||||||
if ((master == "yarn-cluster" || master == "yarn-standalone") &&
|
throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
|
||||||
!_conf.contains("spark.yarn.app.id")) {
|
|
||||||
throw new SparkException("Detected yarn-cluster mode, but isn't running on a cluster. " +
|
|
||||||
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
|
"Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -414,7 +413,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")
|
if (master == "yarn" && deployMode == "client") System.setProperty("SPARK_YARN_MODE", "true")
|
||||||
|
|
||||||
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
|
// "_jobProgressListener" should be set up before creating SparkEnv because when creating
|
||||||
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
|
// "SparkEnv", some messages will be posted to "listenerBus" and we should not miss them.
|
||||||
|
@ -491,7 +490,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
||||||
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
|
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
|
||||||
|
|
||||||
// Create and start the scheduler
|
// Create and start the scheduler
|
||||||
val (sched, ts) = SparkContext.createTaskScheduler(this, master)
|
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
|
||||||
_schedulerBackend = sched
|
_schedulerBackend = sched
|
||||||
_taskScheduler = ts
|
_taskScheduler = ts
|
||||||
_dagScheduler = new DAGScheduler(this)
|
_dagScheduler = new DAGScheduler(this)
|
||||||
|
@ -1590,10 +1589,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
|
||||||
key = uri.getScheme match {
|
key = uri.getScheme match {
|
||||||
// A JAR file which exists only on the driver node
|
// A JAR file which exists only on the driver node
|
||||||
case null | "file" =>
|
case null | "file" =>
|
||||||
// yarn-standalone is deprecated, but still supported
|
if (master == "yarn" && deployMode == "cluster") {
|
||||||
if (SparkHadoopUtil.get.isYarnMode() &&
|
// In order for this to work in yarn cluster mode the user must specify the
|
||||||
(master == "yarn-standalone" || master == "yarn-cluster")) {
|
|
||||||
// In order for this to work in yarn-cluster mode the user must specify the
|
|
||||||
// --addJars option to the client to upload the file into the distributed cache
|
// --addJars option to the client to upload the file into the distributed cache
|
||||||
// of the AM to make it show up in the current working directory.
|
// of the AM to make it show up in the current working directory.
|
||||||
val fileName = new Path(uri.getPath).getName()
|
val fileName = new Path(uri.getPath).getName()
|
||||||
|
@ -2319,7 +2316,8 @@ object SparkContext extends Logging {
|
||||||
*/
|
*/
|
||||||
private def createTaskScheduler(
|
private def createTaskScheduler(
|
||||||
sc: SparkContext,
|
sc: SparkContext,
|
||||||
master: String): (SchedulerBackend, TaskScheduler) = {
|
master: String,
|
||||||
|
deployMode: String): (SchedulerBackend, TaskScheduler) = {
|
||||||
import SparkMasterRegex._
|
import SparkMasterRegex._
|
||||||
|
|
||||||
// When running locally, don't try to re-execute tasks on failure.
|
// When running locally, don't try to re-execute tasks on failure.
|
||||||
|
@ -2381,11 +2379,7 @@ object SparkContext extends Logging {
|
||||||
}
|
}
|
||||||
(backend, scheduler)
|
(backend, scheduler)
|
||||||
|
|
||||||
case "yarn-standalone" | "yarn-cluster" =>
|
case "yarn" if deployMode == "cluster" =>
|
||||||
if (master == "yarn-standalone") {
|
|
||||||
logWarning(
|
|
||||||
"\"yarn-standalone\" is deprecated as of Spark 1.0. Use \"yarn-cluster\" instead.")
|
|
||||||
}
|
|
||||||
val scheduler = try {
|
val scheduler = try {
|
||||||
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
|
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnClusterScheduler")
|
||||||
val cons = clazz.getConstructor(classOf[SparkContext])
|
val cons = clazz.getConstructor(classOf[SparkContext])
|
||||||
|
@ -2410,7 +2404,7 @@ object SparkContext extends Logging {
|
||||||
scheduler.initialize(backend)
|
scheduler.initialize(backend)
|
||||||
(backend, scheduler)
|
(backend, scheduler)
|
||||||
|
|
||||||
case "yarn-client" =>
|
case "yarn" if deployMode == "client" =>
|
||||||
val scheduler = try {
|
val scheduler = try {
|
||||||
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
|
val clazz = Utils.classForName("org.apache.spark.scheduler.cluster.YarnScheduler")
|
||||||
val cons = clazz.getConstructor(classOf[SparkContext])
|
val cons = clazz.getConstructor(classOf[SparkContext])
|
||||||
|
@ -2451,7 +2445,7 @@ object SparkContext extends Logging {
|
||||||
case zkUrl if zkUrl.startsWith("zk://") =>
|
case zkUrl if zkUrl.startsWith("zk://") =>
|
||||||
logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
|
logWarning("Master URL for a multi-master Mesos cluster managed by ZooKeeper should be " +
|
||||||
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
|
"in the form mesos://zk://host:port. Current Master URL will stop working in Spark 2.0.")
|
||||||
createTaskScheduler(sc, "mesos://" + zkUrl)
|
createTaskScheduler(sc, "mesos://" + zkUrl, deployMode)
|
||||||
|
|
||||||
case _ =>
|
case _ =>
|
||||||
throw new SparkException("Could not parse Master URL: '" + master + "'")
|
throw new SparkException("Could not parse Master URL: '" + master + "'")
|
||||||
|
|
|
@ -226,11 +226,17 @@ object SparkSubmit {
|
||||||
|
|
||||||
// Set the cluster manager
|
// Set the cluster manager
|
||||||
val clusterManager: Int = args.master match {
|
val clusterManager: Int = args.master match {
|
||||||
case m if m.startsWith("yarn") => YARN
|
case "yarn" => YARN
|
||||||
|
case "yarn-client" | "yarn-cluster" =>
|
||||||
|
printWarning(s"Master ${args.master} is deprecated since 2.0." +
|
||||||
|
" Please use master \"yarn\" with specified deploy mode instead.")
|
||||||
|
YARN
|
||||||
case m if m.startsWith("spark") => STANDALONE
|
case m if m.startsWith("spark") => STANDALONE
|
||||||
case m if m.startsWith("mesos") => MESOS
|
case m if m.startsWith("mesos") => MESOS
|
||||||
case m if m.startsWith("local") => LOCAL
|
case m if m.startsWith("local") => LOCAL
|
||||||
case _ => printErrorAndExit("Master must start with yarn, spark, mesos, or local"); -1
|
case _ =>
|
||||||
|
printErrorAndExit("Master must either be yarn or start with spark, mesos, local")
|
||||||
|
-1
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set the deploy mode; default is client mode
|
// Set the deploy mode; default is client mode
|
||||||
|
@ -240,23 +246,20 @@ object SparkSubmit {
|
||||||
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
|
case _ => printErrorAndExit("Deploy mode must be either client or cluster"); -1
|
||||||
}
|
}
|
||||||
|
|
||||||
// Because "yarn-cluster" and "yarn-client" encapsulate both the master
|
// Because the deprecated way of specifying "yarn-cluster" and "yarn-client" encapsulate both
|
||||||
// and deploy mode, we have some logic to infer the master and deploy mode
|
// the master and deploy mode, we have some logic to infer the master and deploy mode
|
||||||
// from each other if only one is specified, or exit early if they are at odds.
|
// from each other if only one is specified, or exit early if they are at odds.
|
||||||
if (clusterManager == YARN) {
|
if (clusterManager == YARN) {
|
||||||
if (args.master == "yarn-standalone") {
|
|
||||||
printWarning("\"yarn-standalone\" is deprecated. Use \"yarn-cluster\" instead.")
|
|
||||||
args.master = "yarn-cluster"
|
|
||||||
}
|
|
||||||
(args.master, args.deployMode) match {
|
(args.master, args.deployMode) match {
|
||||||
case ("yarn-cluster", null) =>
|
case ("yarn-cluster", null) =>
|
||||||
deployMode = CLUSTER
|
deployMode = CLUSTER
|
||||||
|
args.master = "yarn"
|
||||||
case ("yarn-cluster", "client") =>
|
case ("yarn-cluster", "client") =>
|
||||||
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
|
printErrorAndExit("Client deploy mode is not compatible with master \"yarn-cluster\"")
|
||||||
case ("yarn-client", "cluster") =>
|
case ("yarn-client", "cluster") =>
|
||||||
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
|
printErrorAndExit("Cluster deploy mode is not compatible with master \"yarn-client\"")
|
||||||
case (_, mode) =>
|
case (_, mode) =>
|
||||||
args.master = "yarn-" + Option(mode).getOrElse("client")
|
args.master = "yarn"
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure YARN is included in our build if we're trying to use it
|
// Make sure YARN is included in our build if we're trying to use it
|
||||||
|
|
|
@ -29,15 +29,21 @@ class SparkContextSchedulerCreationSuite
|
||||||
extends SparkFunSuite with LocalSparkContext with PrivateMethodTester with Logging {
|
extends SparkFunSuite with LocalSparkContext with PrivateMethodTester with Logging {
|
||||||
|
|
||||||
def createTaskScheduler(master: String): TaskSchedulerImpl =
|
def createTaskScheduler(master: String): TaskSchedulerImpl =
|
||||||
createTaskScheduler(master, new SparkConf())
|
createTaskScheduler(master, "client")
|
||||||
|
|
||||||
def createTaskScheduler(master: String, conf: SparkConf): TaskSchedulerImpl = {
|
def createTaskScheduler(master: String, deployMode: String): TaskSchedulerImpl =
|
||||||
|
createTaskScheduler(master, deployMode, new SparkConf())
|
||||||
|
|
||||||
|
def createTaskScheduler(
|
||||||
|
master: String,
|
||||||
|
deployMode: String,
|
||||||
|
conf: SparkConf): TaskSchedulerImpl = {
|
||||||
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
|
// Create local SparkContext to setup a SparkEnv. We don't actually want to start() the
|
||||||
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
|
// real schedulers, so we don't want to create a full SparkContext with the desired scheduler.
|
||||||
sc = new SparkContext("local", "test", conf)
|
sc = new SparkContext("local", "test", conf)
|
||||||
val createTaskSchedulerMethod =
|
val createTaskSchedulerMethod =
|
||||||
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
|
PrivateMethod[Tuple2[SchedulerBackend, TaskScheduler]]('createTaskScheduler)
|
||||||
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master)
|
val (_, sched) = SparkContext invokePrivate createTaskSchedulerMethod(sc, master, deployMode)
|
||||||
sched.asInstanceOf[TaskSchedulerImpl]
|
sched.asInstanceOf[TaskSchedulerImpl]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,7 +113,7 @@ class SparkContextSchedulerCreationSuite
|
||||||
|
|
||||||
test("local-default-parallelism") {
|
test("local-default-parallelism") {
|
||||||
val conf = new SparkConf().set("spark.default.parallelism", "16")
|
val conf = new SparkConf().set("spark.default.parallelism", "16")
|
||||||
val sched = createTaskScheduler("local", conf)
|
val sched = createTaskScheduler("local", "client", conf)
|
||||||
|
|
||||||
sched.backend match {
|
sched.backend match {
|
||||||
case s: LocalBackend => assert(s.defaultParallelism() === 16)
|
case s: LocalBackend => assert(s.defaultParallelism() === 16)
|
||||||
|
@ -122,9 +128,9 @@ class SparkContextSchedulerCreationSuite
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def testYarn(master: String, expectedClassName: String) {
|
def testYarn(master: String, deployMode: String, expectedClassName: String) {
|
||||||
try {
|
try {
|
||||||
val sched = createTaskScheduler(master)
|
val sched = createTaskScheduler(master, deployMode)
|
||||||
assert(sched.getClass === Utils.classForName(expectedClassName))
|
assert(sched.getClass === Utils.classForName(expectedClassName))
|
||||||
} catch {
|
} catch {
|
||||||
case e: SparkException =>
|
case e: SparkException =>
|
||||||
|
@ -135,21 +141,17 @@ class SparkContextSchedulerCreationSuite
|
||||||
}
|
}
|
||||||
|
|
||||||
test("yarn-cluster") {
|
test("yarn-cluster") {
|
||||||
testYarn("yarn-cluster", "org.apache.spark.scheduler.cluster.YarnClusterScheduler")
|
testYarn("yarn", "cluster", "org.apache.spark.scheduler.cluster.YarnClusterScheduler")
|
||||||
}
|
|
||||||
|
|
||||||
test("yarn-standalone") {
|
|
||||||
testYarn("yarn-standalone", "org.apache.spark.scheduler.cluster.YarnClusterScheduler")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
test("yarn-client") {
|
test("yarn-client") {
|
||||||
testYarn("yarn-client", "org.apache.spark.scheduler.cluster.YarnScheduler")
|
testYarn("yarn", "client", "org.apache.spark.scheduler.cluster.YarnScheduler")
|
||||||
}
|
}
|
||||||
|
|
||||||
def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
|
def testMesos(master: String, expectedClass: Class[_], coarse: Boolean) {
|
||||||
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
|
val conf = new SparkConf().set("spark.mesos.coarse", coarse.toString)
|
||||||
try {
|
try {
|
||||||
val sched = createTaskScheduler(master, conf)
|
val sched = createTaskScheduler(master, "client", conf)
|
||||||
assert(sched.backend.getClass === expectedClass)
|
assert(sched.backend.getClass === expectedClass)
|
||||||
} catch {
|
} catch {
|
||||||
case e: UnsatisfiedLinkError =>
|
case e: UnsatisfiedLinkError =>
|
||||||
|
|
|
@ -358,7 +358,8 @@ class SparkSubmitSuite
|
||||||
val appArgs = new SparkSubmitArguments(clArgs)
|
val appArgs = new SparkSubmitArguments(clArgs)
|
||||||
val (_, _, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
|
val (_, _, sysProps, mainClass) = prepareSubmitEnvironment(appArgs)
|
||||||
sysProps("spark.executor.memory") should be ("5g")
|
sysProps("spark.executor.memory") should be ("5g")
|
||||||
sysProps("spark.master") should be ("yarn-cluster")
|
sysProps("spark.master") should be ("yarn")
|
||||||
|
sysProps("spark.submit.deployMode") should be ("cluster")
|
||||||
mainClass should be ("org.apache.spark.deploy.yarn.Client")
|
mainClass should be ("org.apache.spark.deploy.yarn.Client")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -454,7 +455,7 @@ class SparkSubmitSuite
|
||||||
|
|
||||||
// Test files and archives (Yarn)
|
// Test files and archives (Yarn)
|
||||||
val clArgs2 = Seq(
|
val clArgs2 = Seq(
|
||||||
"--master", "yarn-client",
|
"--master", "yarn",
|
||||||
"--class", "org.SomeClass",
|
"--class", "org.SomeClass",
|
||||||
"--files", files,
|
"--files", files,
|
||||||
"--archives", archives,
|
"--archives", archives,
|
||||||
|
@ -512,7 +513,7 @@ class SparkSubmitSuite
|
||||||
writer2.println("spark.yarn.dist.archives " + archives)
|
writer2.println("spark.yarn.dist.archives " + archives)
|
||||||
writer2.close()
|
writer2.close()
|
||||||
val clArgs2 = Seq(
|
val clArgs2 = Seq(
|
||||||
"--master", "yarn-client",
|
"--master", "yarn",
|
||||||
"--class", "org.SomeClass",
|
"--class", "org.SomeClass",
|
||||||
"--properties-file", f2.getPath,
|
"--properties-file", f2.getPath,
|
||||||
"thejar.jar"
|
"thejar.jar"
|
||||||
|
|
|
@ -150,12 +150,6 @@ The master URL passed to Spark can be in one of the following formats:
|
||||||
<code>client</code> or <code>cluster</code> mode depending on the value of <code>--deploy-mode</code>.
|
<code>client</code> or <code>cluster</code> mode depending on the value of <code>--deploy-mode</code>.
|
||||||
The cluster location will be found based on the <code>HADOOP_CONF_DIR</code> or <code>YARN_CONF_DIR</code> variable.
|
The cluster location will be found based on the <code>HADOOP_CONF_DIR</code> or <code>YARN_CONF_DIR</code> variable.
|
||||||
</td></tr>
|
</td></tr>
|
||||||
<tr><td> <code>yarn-client</code> </td><td> Equivalent to <code>yarn</code> with <code>--deploy-mode client</code>,
|
|
||||||
which is preferred to `yarn-client`
|
|
||||||
</td></tr>
|
|
||||||
<tr><td> <code>yarn-cluster</code> </td><td> Equivalent to <code>yarn</code> with <code>--deploy-mode cluster</code>,
|
|
||||||
which is preferred to `yarn-cluster`
|
|
||||||
</td></tr>
|
|
||||||
</table>
|
</table>
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -268,6 +268,10 @@ object MimaExcludes {
|
||||||
// SPARK-13413 Remove SparkContext.metricsSystem/schedulerBackend_ setter
|
// SPARK-13413 Remove SparkContext.metricsSystem/schedulerBackend_ setter
|
||||||
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metricsSystem"),
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.metricsSystem"),
|
||||||
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.schedulerBackend_=")
|
ProblemFilters.exclude[MissingMethodProblem]("org.apache.spark.SparkContext.schedulerBackend_=")
|
||||||
|
) ++ Seq(
|
||||||
|
// SPARK-13220 Deprecate yarn-client and yarn-cluster mode
|
||||||
|
ProblemFilters.exclude[MissingMethodProblem](
|
||||||
|
"org.apache.spark.SparkContext.org$apache$spark$SparkContext$$createTaskScheduler")
|
||||||
)
|
)
|
||||||
case v if v.startsWith("1.6") =>
|
case v if v.startsWith("1.6") =>
|
||||||
Seq(
|
Seq(
|
||||||
|
|
|
@ -129,8 +129,9 @@ private[spark] class ApplicationMaster(
|
||||||
// other spark processes running on the same box
|
// other spark processes running on the same box
|
||||||
System.setProperty("spark.ui.port", "0")
|
System.setProperty("spark.ui.port", "0")
|
||||||
|
|
||||||
// Set the master property to match the requested mode.
|
// Set the master and deploy mode property to match the requested mode.
|
||||||
System.setProperty("spark.master", "yarn-cluster")
|
System.setProperty("spark.master", "yarn")
|
||||||
|
System.setProperty("spark.submit.deployMode", "cluster")
|
||||||
|
|
||||||
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
|
// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
|
||||||
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
|
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())
|
||||||
|
|
|
@ -136,7 +136,7 @@ abstract class BaseYarnClusterSuite
|
||||||
extraJars: Seq[String] = Nil,
|
extraJars: Seq[String] = Nil,
|
||||||
extraConf: Map[String, String] = Map(),
|
extraConf: Map[String, String] = Map(),
|
||||||
extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
|
extraEnv: Map[String, String] = Map()): SparkAppHandle.State = {
|
||||||
val master = if (clientMode) "yarn-client" else "yarn-cluster"
|
val deployMode = if (clientMode) "client" else "cluster"
|
||||||
val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
|
val propsFile = createConfFile(extraClassPath = extraClassPath, extraConf = extraConf)
|
||||||
val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
|
val env = Map("YARN_CONF_DIR" -> hadoopConfDir.getAbsolutePath()) ++ extraEnv
|
||||||
|
|
||||||
|
@ -148,7 +148,8 @@ abstract class BaseYarnClusterSuite
|
||||||
launcher.setAppResource(fakeSparkJar.getAbsolutePath())
|
launcher.setAppResource(fakeSparkJar.getAbsolutePath())
|
||||||
}
|
}
|
||||||
launcher.setSparkHome(sys.props("spark.test.home"))
|
launcher.setSparkHome(sys.props("spark.test.home"))
|
||||||
.setMaster(master)
|
.setMaster("yarn")
|
||||||
|
.setDeployMode(deployMode)
|
||||||
.setConf("spark.executor.instances", "1")
|
.setConf("spark.executor.instances", "1")
|
||||||
.setPropertiesFile(propsFile)
|
.setPropertiesFile(propsFile)
|
||||||
.addAppArgs(appArgs.toArray: _*)
|
.addAppArgs(appArgs.toArray: _*)
|
||||||
|
|
|
@ -115,7 +115,8 @@ class YarnClusterSuite extends BaseYarnClusterSuite {
|
||||||
.setSparkHome(sys.props("spark.test.home"))
|
.setSparkHome(sys.props("spark.test.home"))
|
||||||
.setConf("spark.ui.enabled", "false")
|
.setConf("spark.ui.enabled", "false")
|
||||||
.setPropertiesFile(propsFile)
|
.setPropertiesFile(propsFile)
|
||||||
.setMaster("yarn-client")
|
.setMaster("yarn")
|
||||||
|
.setDeployMode("client")
|
||||||
.setAppResource("spark-internal")
|
.setAppResource("spark-internal")
|
||||||
.setMainClass(mainClassName(YarnLauncherTestApp.getClass))
|
.setMainClass(mainClassName(YarnLauncherTestApp.getClass))
|
||||||
.startApplication()
|
.startApplication()
|
||||||
|
|
Loading…
Reference in a new issue