[SPARK-28577][YARN] Resource capability requested for each executor add offHeapMemorySize

## What changes were proposed in this pull request?

If MEMORY_OFFHEAP_ENABLED is true, add MEMORY_OFFHEAP_SIZE to resource requested for executor to ensure instance has enough memory to use.

In this pr add a helper method `executorOffHeapMemorySizeAsMb` in `YarnSparkHadoopUtil`.

## How was this patch tested?
Add 3 new test suite to test `YarnSparkHadoopUtil#executorOffHeapMemorySizeAsMb`

Closes #25309 from LuciferYang/spark-28577.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Thomas Graves <tgraves@apache.org>
This commit is contained in:
yangjie01 2019-09-04 09:00:12 -05:00 committed by Thomas Graves
parent df39855db8
commit a07f795aea
6 changed files with 82 additions and 18 deletions

View file

@ -271,17 +271,17 @@ of the most common options to set are:
<td><code>spark.executor.memoryOverhead</code></td>
<td>executorMemory * 0.10, with minimum of 384 </td>
<td>
Amount of non-heap memory to be allocated per executor process in cluster mode, in MiB unless
Amount of additional memory to be allocated per executor process in cluster mode, in MiB unless
otherwise specified. This is memory that accounts for things like VM overheads, interned strings,
other native overheads, etc. This tends to grow with the executor size (typically 6-10%).
This option is currently supported on YARN and Kubernetes.
<br/>
<em>Note:</em> Non-heap memory includes off-heap memory
(when <code>spark.memory.offHeap.enabled=true</code>) and memory used by other executor processes
(e.g. python process that goes with a PySpark executor) and memory used by other non-executor
processes running in the same container. The maximum memory size of container to running executor
is determined by the sum of <code>spark.executor.memoryOverhead</code> and
<code>spark.executor.memory</code>.
<em>Note:</em> Additional memory includes PySpark executor memory
(when <code>spark.executor.pyspark.memory</code> is not configured) and memory used by other
non-executor processes running in the same container. The maximum memory size of container to
running executor is determined by the sum of <code>spark.executor.memoryOverhead</code>,
<code>spark.executor.memory</code>, <code>spark.memory.offHeap.size</code> and
<code>spark.executor.pyspark.memory</code>.
</td>
</tr>
<tr>
@ -1378,9 +1378,6 @@ Apart from these, the following properties are also available, and may be useful
<td>
If true, Spark will attempt to use off-heap memory for certain operations. If off-heap memory
use is enabled, then <code>spark.memory.offHeap.size</code> must be positive.
<em>Note:</em> If off-heap memory is enabled, may need to raise the non-heap memory size
(e.g. increase <code>spark.driver.memoryOverhead</code> or
<code>spark.executor.memoryOverhead</code>).
</td>
</tr>
<tr>

View file

@ -97,6 +97,8 @@ private[spark] class Client(
// Executor related configurations
private val executorMemory = sparkConf.get(EXECUTOR_MEMORY)
// Executor offHeap memory in MiB.
protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
private val executorMemoryOverhead = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toLong, MEMORY_OVERHEAD_MIN)).toInt
@ -346,12 +348,14 @@ private[spark] class Client(
val maxMem = newAppResponse.getMaximumResourceCapability().getMemory()
logInfo("Verifying our application has not requested more than the maximum " +
s"memory capability of the cluster ($maxMem MB per container)")
val executorMem = executorMemory + executorMemoryOverhead + pysparkWorkerMemory
val executorMem =
executorMemory + executorOffHeapMemory + executorMemoryOverhead + pysparkWorkerMemory
if (executorMem > maxMem) {
throw new IllegalArgumentException(s"Required executor memory ($executorMemory), overhead " +
s"($executorMemoryOverhead MB), and PySpark memory ($pysparkWorkerMemory MB) is above " +
s"the max threshold ($maxMem MB) of this cluster! Please check the values of " +
s"'yarn.scheduler.maximum-allocation-mb' and/or 'yarn.nodemanager.resource.memory-mb'.")
throw new IllegalArgumentException(s"Required executor memory ($executorMemory MB), " +
s"offHeap memory ($executorOffHeapMemory) MB, overhead ($executorMemoryOverhead MB), " +
s"and PySpark memory ($pysparkWorkerMemory MB) is above the max threshold ($maxMem MB) " +
"of this cluster! Please check the values of 'yarn.scheduler.maximum-allocation-mb' " +
"and/or 'yarn.nodemanager.resource.memory-mb'.")
}
val amMem = amMemory + amMemoryOverhead
if (amMem > maxMem) {

View file

@ -131,6 +131,8 @@ private[yarn] class YarnAllocator(
// Executor memory in MiB.
protected val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
// Executor offHeap memory in MiB.
protected val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
// Additional memory overhead.
protected val memoryOverhead: Int = sparkConf.get(EXECUTOR_MEMORY_OVERHEAD).getOrElse(
math.max((MEMORY_OVERHEAD_FACTOR * executorMemory).toInt, MEMORY_OVERHEAD_MIN)).toInt
@ -149,7 +151,7 @@ private[yarn] class YarnAllocator(
// Resource capability requested for each executor
private[yarn] val resource: Resource = {
val resource = Resource.newInstance(
executorMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
executorMemory + executorOffHeapMemory + memoryOverhead + pysparkWorkerMemory, executorCores)
ResourceRequestHelper.setResourceRequests(executorResourceRequests, resource)
logDebug(s"Created resource capability: $resource")
resource

View file

@ -26,9 +26,8 @@ import org.apache.hadoop.yarn.api.records.{ApplicationAccessType, ContainerId, P
import org.apache.hadoop.yarn.util.ConverterUtils
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.config._
import org.apache.spark.launcher.YarnCommandBuilderUtils
import org.apache.spark.resource.ResourceID
import org.apache.spark.resource.ResourceUtils._
import org.apache.spark.util.Utils
object YarnSparkHadoopUtil {
@ -184,4 +183,17 @@ object YarnSparkHadoopUtil {
ConverterUtils.toContainerId(containerIdString)
}
/**
* Convert MEMORY_OFFHEAP_SIZE to MB Unit, return 0 if MEMORY_OFFHEAP_ENABLED is false.
*/
def executorOffHeapMemorySizeAsMb(sparkConf: SparkConf): Int = {
if (sparkConf.get(MEMORY_OFFHEAP_ENABLED)) {
val sizeInMB = Utils.memoryStringToMb(sparkConf.get(MEMORY_OFFHEAP_SIZE).toString)
require(sizeInMB > 0,
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true")
sizeInMB
} else {
0
}
}
}

View file

@ -514,4 +514,25 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
verify(rmClientSpy)
.updateBlacklist(hosts.slice(10, 11).asJava, Collections.emptyList())
}
test("SPARK-28577#YarnAllocator.resource.memory should include offHeapSize " +
"when offHeapEnabled is true.") {
val originalOffHeapEnabled = sparkConf.get(MEMORY_OFFHEAP_ENABLED)
val originalOffHeapSize = sparkConf.get(MEMORY_OFFHEAP_SIZE)
val executorMemory = sparkConf.get(EXECUTOR_MEMORY).toInt
val offHeapMemoryInMB = 1024L
val offHeapMemoryInByte = offHeapMemoryInMB * 1024 * 1024
try {
sparkConf.set(MEMORY_OFFHEAP_ENABLED, true)
sparkConf.set(MEMORY_OFFHEAP_SIZE, offHeapMemoryInByte)
val allocator = createAllocator(maxExecutors = 1,
additionalConfigs = Map(EXECUTOR_MEMORY.key -> executorMemory.toString))
val memory = allocator.resource.getMemory
assert(memory ==
executorMemory + offHeapMemoryInMB + YarnSparkHadoopUtil.MEMORY_OVERHEAD_MIN)
} finally {
sparkConf.set(MEMORY_OFFHEAP_ENABLED, originalOffHeapEnabled)
sparkConf.set(MEMORY_OFFHEAP_SIZE, originalOffHeapSize)
}
}
}

View file

@ -28,6 +28,7 @@ import org.scalatest.Matchers
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.UI._
import org.apache.spark.util.{ResetSystemProperties, Utils}
@ -140,4 +141,31 @@ class YarnSparkHadoopUtilSuite extends SparkFunSuite with Matchers with Logging
}
}
test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is false") {
val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(new SparkConf())
assert(executorOffHeapMemory == 0)
}
test("executorOffHeapMemorySizeAsMb when MEMORY_OFFHEAP_ENABLED is true") {
val offHeapMemoryInMB = 50
val offHeapMemory: Long = offHeapMemoryInMB * 1024 * 1024
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
.set(MEMORY_OFFHEAP_SIZE, offHeapMemory)
val executorOffHeapMemory = YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
assert(executorOffHeapMemory == offHeapMemoryInMB)
}
test("executorMemoryOverhead when MEMORY_OFFHEAP_ENABLED is true, " +
"but MEMORY_OFFHEAP_SIZE not config scene") {
val sparkConf = new SparkConf()
.set(MEMORY_OFFHEAP_ENABLED, true)
val expected =
s"${MEMORY_OFFHEAP_SIZE.key} must be > 0 when ${MEMORY_OFFHEAP_ENABLED.key} == true"
val message = intercept[IllegalArgumentException] {
YarnSparkHadoopUtil.executorOffHeapMemorySizeAsMb(sparkConf)
}.getMessage
assert(message.contains(expected))
}
}