[SPARK-1953][YARN]yarn client mode Application Master memory size is same as driver memory...
... size Ways to set Application Master's memory on yarn-client mode: 1. `spark.yarn.am.memory` in SparkConf or System Properties 2. default value 512m Note: this arguments is only available in yarn-client mode. Author: WangTaoTheTonic <barneystinson@aliyun.com> Closes #3607 from WangTaoTheTonic/SPARK4181 and squashes the following commits: d5ceb1b [WangTaoTheTonic] spark.driver.memeory is used in both modes 6c1b264 [WangTaoTheTonic] rebase b8410c0 [WangTaoTheTonic] minor optiminzation ddcd592 [WangTaoTheTonic] fix the bug produced in rebase and some improvements 3bf70cc [WangTaoTheTonic] rebase and give proper hint 987b99d [WangTaoTheTonic] disable --driver-memory in client mode 2b27928 [WangTaoTheTonic] inaccurate description b7acbb2 [WangTaoTheTonic] incorrect method invoked 2557c5e [WangTaoTheTonic] missing a single blank 42075b0 [WangTaoTheTonic] arrange the args and warn logging 69c7dba [WangTaoTheTonic] rebase 1960d16 [WangTaoTheTonic] fix wrong comment 7fa9e2e [WangTaoTheTonic] log a warning f6bee0e [WangTaoTheTonic] docs issue d619996 [WangTaoTheTonic] Merge branch 'master' into SPARK4181 b09c309 [WangTaoTheTonic] use code format ab16bb5 [WangTaoTheTonic] fix bug and add comments 44e48c2 [WangTaoTheTonic] minor fix 6fd13e1 [WangTaoTheTonic] add overhead mem and remove some configs 0566bb8 [WangTaoTheTonic] yarn client mode Application Master memory size is same as driver memory size
This commit is contained in:
parent
7e8e62aec1
commit
e966452060
|
@ -405,7 +405,8 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St
|
||||||
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
|
| --queue QUEUE_NAME The YARN queue to submit to (Default: "default").
|
||||||
| --num-executors NUM Number of executors to launch (Default: 2).
|
| --num-executors NUM Number of executors to launch (Default: 2).
|
||||||
| --archives ARCHIVES Comma separated list of archives to be extracted into the
|
| --archives ARCHIVES Comma separated list of archives to be extracted into the
|
||||||
| working directory of each executor.""".stripMargin
|
| working directory of each executor.
|
||||||
|
""".stripMargin
|
||||||
)
|
)
|
||||||
SparkSubmit.exitFn()
|
SparkSubmit.exitFn()
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
|
||||||
|
|
||||||
<table class="table">
|
<table class="table">
|
||||||
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
<tr><th>Property Name</th><th>Default</th><th>Meaning</th></tr>
|
||||||
|
<tr>
|
||||||
|
<td><code>spark.yarn.am.memory</code></td>
|
||||||
|
<td>512m</td>
|
||||||
|
<td>
|
||||||
|
Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. <code>512m</code>, <code>2g</code>).
|
||||||
|
In cluster mode, use <code>spark.driver.memory</code> instead.
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
<td><code>spark.yarn.am.waitTime</code></td>
|
<td><code>spark.yarn.am.waitTime</code></td>
|
||||||
<td>100000</td>
|
<td>100000</td>
|
||||||
|
@ -90,7 +98,14 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
|
||||||
<td><code>spark.yarn.driver.memoryOverhead</code></td>
|
<td><code>spark.yarn.driver.memoryOverhead</code></td>
|
||||||
<td>driverMemory * 0.07, with minimum of 384 </td>
|
<td>driverMemory * 0.07, with minimum of 384 </td>
|
||||||
<td>
|
<td>
|
||||||
The amount of off heap memory (in megabytes) to be allocated per driver. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
|
The amount of off heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%).
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td><code>spark.yarn.am.memoryOverhead</code></td>
|
||||||
|
<td>AM memory * 0.07, with minimum of 384 </td>
|
||||||
|
<td>
|
||||||
|
Same as <code>spark.yarn.driver.memoryOverhead</code>, but for the Application Master in client mode.
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
<tr>
|
<tr>
|
||||||
|
@ -145,7 +160,7 @@ Most of the configs are the same for Spark on YARN as for other deployment modes
|
||||||
<td><code>spark.yarn.am.extraJavaOptions</code></td>
|
<td><code>spark.yarn.am.extraJavaOptions</code></td>
|
||||||
<td>(none)</td>
|
<td>(none)</td>
|
||||||
<td>
|
<td>
|
||||||
A string of extra JVM options to pass to the Yarn ApplicationMaster in client mode.
|
A string of extra JVM options to pass to the YARN Application Master in client mode.
|
||||||
In cluster mode, use spark.driver.extraJavaOptions instead.
|
In cluster mode, use spark.driver.extraJavaOptions instead.
|
||||||
</td>
|
</td>
|
||||||
</tr>
|
</tr>
|
||||||
|
|
|
@ -65,7 +65,7 @@ private[spark] class Client(
|
||||||
private val amMemoryOverhead = args.amMemoryOverhead // MB
|
private val amMemoryOverhead = args.amMemoryOverhead // MB
|
||||||
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
|
private val executorMemoryOverhead = args.executorMemoryOverhead // MB
|
||||||
private val distCacheMgr = new ClientDistributedCacheManager()
|
private val distCacheMgr = new ClientDistributedCacheManager()
|
||||||
private val isClusterMode = args.userClass != null
|
private val isClusterMode = args.isClusterMode
|
||||||
|
|
||||||
|
|
||||||
def stop(): Unit = yarnClient.stop()
|
def stop(): Unit = yarnClient.stop()
|
||||||
|
|
|
@ -38,23 +38,27 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
|
||||||
var amMemory: Int = 512 // MB
|
var amMemory: Int = 512 // MB
|
||||||
var appName: String = "Spark"
|
var appName: String = "Spark"
|
||||||
var priority = 0
|
var priority = 0
|
||||||
|
def isClusterMode: Boolean = userClass != null
|
||||||
|
|
||||||
|
private var driverMemory: Int = 512 // MB
|
||||||
|
private val driverMemOverheadKey = "spark.yarn.driver.memoryOverhead"
|
||||||
|
private val amMemKey = "spark.yarn.am.memory"
|
||||||
|
private val amMemOverheadKey = "spark.yarn.am.memoryOverhead"
|
||||||
|
private val isDynamicAllocationEnabled =
|
||||||
|
sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
|
||||||
|
|
||||||
parseArgs(args.toList)
|
parseArgs(args.toList)
|
||||||
|
loadEnvironmentArgs()
|
||||||
|
validateArgs()
|
||||||
|
|
||||||
// Additional memory to allocate to containers
|
// Additional memory to allocate to containers
|
||||||
// For now, use driver's memory overhead as our AM container's memory overhead
|
val amMemoryOverheadConf = if (isClusterMode) driverMemOverheadKey else amMemOverheadKey
|
||||||
val amMemoryOverhead = sparkConf.getInt("spark.yarn.driver.memoryOverhead",
|
val amMemoryOverhead = sparkConf.getInt(amMemoryOverheadConf,
|
||||||
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
|
math.max((MEMORY_OVERHEAD_FACTOR * amMemory).toInt, MEMORY_OVERHEAD_MIN))
|
||||||
|
|
||||||
val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
|
val executorMemoryOverhead = sparkConf.getInt("spark.yarn.executor.memoryOverhead",
|
||||||
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
|
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN))
|
||||||
|
|
||||||
private val isDynamicAllocationEnabled =
|
|
||||||
sparkConf.getBoolean("spark.dynamicAllocation.enabled", false)
|
|
||||||
|
|
||||||
loadEnvironmentArgs()
|
|
||||||
validateArgs()
|
|
||||||
|
|
||||||
/** Load any default arguments provided through environment variables and Spark properties. */
|
/** Load any default arguments provided through environment variables and Spark properties. */
|
||||||
private def loadEnvironmentArgs(): Unit = {
|
private def loadEnvironmentArgs(): Unit = {
|
||||||
// For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
|
// For backward compatibility, SPARK_YARN_DIST_{ARCHIVES/FILES} should be resolved to hdfs://,
|
||||||
|
@ -87,6 +91,21 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"You must specify at least 1 executor!\n" + getUsageMessage())
|
"You must specify at least 1 executor!\n" + getUsageMessage())
|
||||||
}
|
}
|
||||||
|
if (isClusterMode) {
|
||||||
|
for (key <- Seq(amMemKey, amMemOverheadKey)) {
|
||||||
|
if (sparkConf.contains(key)) {
|
||||||
|
println(s"$key is set but does not apply in cluster mode.")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
amMemory = driverMemory
|
||||||
|
} else {
|
||||||
|
if (sparkConf.contains(driverMemOverheadKey)) {
|
||||||
|
println(s"$driverMemOverheadKey is set but does not apply in client mode.")
|
||||||
|
}
|
||||||
|
sparkConf.getOption(amMemKey)
|
||||||
|
.map(Utils.memoryStringToMb)
|
||||||
|
.foreach { mem => amMemory = mem }
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def parseArgs(inputArgs: List[String]): Unit = {
|
private def parseArgs(inputArgs: List[String]): Unit = {
|
||||||
|
@ -118,7 +137,7 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
|
||||||
if (args(0) == "--master-memory") {
|
if (args(0) == "--master-memory") {
|
||||||
println("--master-memory is deprecated. Use --driver-memory instead.")
|
println("--master-memory is deprecated. Use --driver-memory instead.")
|
||||||
}
|
}
|
||||||
amMemory = value
|
driverMemory = value
|
||||||
args = tail
|
args = tail
|
||||||
|
|
||||||
case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
|
case ("--num-workers" | "--num-executors") :: IntParam(value) :: tail =>
|
||||||
|
|
|
@ -68,8 +68,6 @@ private[spark] class YarnClientSchedulerBackend(
|
||||||
// List of (target Client argument, environment variable, Spark property)
|
// List of (target Client argument, environment variable, Spark property)
|
||||||
val optionTuples =
|
val optionTuples =
|
||||||
List(
|
List(
|
||||||
("--driver-memory", "SPARK_MASTER_MEMORY", "spark.master.memory"),
|
|
||||||
("--driver-memory", "SPARK_DRIVER_MEMORY", "spark.driver.memory"),
|
|
||||||
("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
|
("--num-executors", "SPARK_WORKER_INSTANCES", "spark.executor.instances"),
|
||||||
("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
|
("--num-executors", "SPARK_EXECUTOR_INSTANCES", "spark.executor.instances"),
|
||||||
("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
|
("--executor-memory", "SPARK_WORKER_MEMORY", "spark.executor.memory"),
|
||||||
|
|
Loading…
Reference in a new issue