[SPARK-12081] Make unified memory manager work with small heaps
The existing `spark.memory.fraction` (default 0.75) gives the system 25% of the space to work with. For small heaps, this is not enough: e.g. default 1GB leaves only 250MB system memory. This is especially a problem in local mode, where the driver and executor are crammed in the same JVM. Members of the community have reported driver OOM's in such cases. **New proposal.** We now reserve 300MB before taking the 75%. For 1GB JVMs, this leaves `(1024 - 300) * 0.75 = 543MB` for execution and storage. This is proposal (1) listed in the [JIRA](https://issues.apache.org/jira/browse/SPARK-12081). Author: Andrew Or <andrew@databricks.com> Closes #10081 from andrewor14/unified-memory-small-heaps.
This commit is contained in:
parent
1ce4adf55b
commit
d96f8c997b
|
@ -26,7 +26,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
|
||||||
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
|
* A [[MemoryManager]] that enforces a soft boundary between execution and storage such that
|
||||||
* either side can borrow memory from the other.
|
* either side can borrow memory from the other.
|
||||||
*
|
*
|
||||||
* The region shared between execution and storage is a fraction of the total heap space
|
* The region shared between execution and storage is a fraction of (the total heap space - 300MB)
|
||||||
* configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
|
* configurable through `spark.memory.fraction` (default 0.75). The position of the boundary
|
||||||
* within this space is further determined by `spark.memory.storageFraction` (default 0.5).
|
* within this space is further determined by `spark.memory.storageFraction` (default 0.5).
|
||||||
* This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
|
* This means the size of the storage region is 0.75 * 0.5 = 0.375 of the heap space by default.
|
||||||
|
@ -48,7 +48,7 @@ import org.apache.spark.storage.{BlockStatus, BlockId}
|
||||||
*/
|
*/
|
||||||
private[spark] class UnifiedMemoryManager private[memory] (
|
private[spark] class UnifiedMemoryManager private[memory] (
|
||||||
conf: SparkConf,
|
conf: SparkConf,
|
||||||
maxMemory: Long,
|
val maxMemory: Long,
|
||||||
private val storageRegionSize: Long,
|
private val storageRegionSize: Long,
|
||||||
numCores: Int)
|
numCores: Int)
|
||||||
extends MemoryManager(
|
extends MemoryManager(
|
||||||
|
@ -130,6 +130,12 @@ private[spark] class UnifiedMemoryManager private[memory] (
|
||||||
|
|
||||||
object UnifiedMemoryManager {
|
object UnifiedMemoryManager {
|
||||||
|
|
||||||
|
// Set aside a fixed amount of memory for non-storage, non-execution purposes.
|
||||||
|
// This serves a function similar to `spark.memory.fraction`, but guarantees that we reserve
|
||||||
|
// sufficient memory for the system even for small heaps. E.g. if we have a 1GB JVM, then
|
||||||
|
// the memory used for execution and storage will be (1024 - 300) * 0.75 = 543MB by default.
|
||||||
|
private val RESERVED_SYSTEM_MEMORY_BYTES = 300 * 1024 * 1024
|
||||||
|
|
||||||
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
|
def apply(conf: SparkConf, numCores: Int): UnifiedMemoryManager = {
|
||||||
val maxMemory = getMaxMemory(conf)
|
val maxMemory = getMaxMemory(conf)
|
||||||
new UnifiedMemoryManager(
|
new UnifiedMemoryManager(
|
||||||
|
@ -144,8 +150,16 @@ object UnifiedMemoryManager {
|
||||||
* Return the total amount of memory shared between execution and storage, in bytes.
|
* Return the total amount of memory shared between execution and storage, in bytes.
|
||||||
*/
|
*/
|
||||||
private def getMaxMemory(conf: SparkConf): Long = {
|
private def getMaxMemory(conf: SparkConf): Long = {
|
||||||
val systemMaxMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
|
val systemMemory = conf.getLong("spark.testing.memory", Runtime.getRuntime.maxMemory)
|
||||||
|
val reservedMemory = conf.getLong("spark.testing.reservedMemory",
|
||||||
|
if (conf.contains("spark.testing")) 0 else RESERVED_SYSTEM_MEMORY_BYTES)
|
||||||
|
val minSystemMemory = reservedMemory * 1.5
|
||||||
|
if (systemMemory < minSystemMemory) {
|
||||||
|
throw new IllegalArgumentException(s"System memory $systemMemory must " +
|
||||||
|
s"be at least $minSystemMemory. Please use a larger heap size.")
|
||||||
|
}
|
||||||
|
val usableMemory = systemMemory - reservedMemory
|
||||||
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
|
val memoryFraction = conf.getDouble("spark.memory.fraction", 0.75)
|
||||||
(systemMaxMemory * memoryFraction).toLong
|
(usableMemory * memoryFraction).toLong
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -182,4 +182,24 @@ class UnifiedMemoryManagerSuite extends MemoryManagerSuite with PrivateMethodTes
|
||||||
assertEnsureFreeSpaceCalled(ms, 850L)
|
assertEnsureFreeSpaceCalled(ms, 850L)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("small heap") {
|
||||||
|
val systemMemory = 1024 * 1024
|
||||||
|
val reservedMemory = 300 * 1024
|
||||||
|
val memoryFraction = 0.8
|
||||||
|
val conf = new SparkConf()
|
||||||
|
.set("spark.memory.fraction", memoryFraction.toString)
|
||||||
|
.set("spark.testing.memory", systemMemory.toString)
|
||||||
|
.set("spark.testing.reservedMemory", reservedMemory.toString)
|
||||||
|
val mm = UnifiedMemoryManager(conf, numCores = 1)
|
||||||
|
val expectedMaxMemory = ((systemMemory - reservedMemory) * memoryFraction).toLong
|
||||||
|
assert(mm.maxMemory === expectedMaxMemory)
|
||||||
|
|
||||||
|
// Try using a system memory that's too small
|
||||||
|
val conf2 = conf.clone().set("spark.testing.memory", (reservedMemory / 2).toString)
|
||||||
|
val exception = intercept[IllegalArgumentException] {
|
||||||
|
UnifiedMemoryManager(conf2, numCores = 1)
|
||||||
|
}
|
||||||
|
assert(exception.getMessage.contains("larger heap size"))
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -719,8 +719,8 @@ Apart from these, the following properties are also available, and may be useful
|
||||||
<td><code>spark.memory.fraction</code></td>
|
<td><code>spark.memory.fraction</code></td>
|
||||||
<td>0.75</td>
|
<td>0.75</td>
|
||||||
<td>
|
<td>
|
||||||
Fraction of the heap space used for execution and storage. The lower this is, the more
|
Fraction of (heap space - 300MB) used for execution and storage. The lower this is, the
|
||||||
frequently spills and cached data eviction occur. The purpose of this config is to set
|
more frequently spills and cached data eviction occur. The purpose of this config is to set
|
||||||
aside memory for internal metadata, user data structures, and imprecise size estimation
|
aside memory for internal metadata, user data structures, and imprecise size estimation
|
||||||
in the case of sparse, unusually large records. Leaving this at the default value is
|
in the case of sparse, unusually large records. Leaving this at the default value is
|
||||||
recommended. For more detail, see <a href="tuning.html#memory-management-overview">
|
recommended. For more detail, see <a href="tuning.html#memory-management-overview">
|
||||||
|
|
|
@ -114,7 +114,7 @@ variety of workloads without requiring user expertise of how memory is divided i
|
||||||
Although there are two relevant configurations, the typical user should not need to adjust them
|
Although there are two relevant configurations, the typical user should not need to adjust them
|
||||||
as the default values are applicable to most workloads:
|
as the default values are applicable to most workloads:
|
||||||
|
|
||||||
* `spark.memory.fraction` expresses the size of `M` as a fraction of the total JVM heap space
|
* `spark.memory.fraction` expresses the size of `M` as a fraction of the (JVM heap space - 300MB)
|
||||||
(default 0.75). The rest of the space (25%) is reserved for user data structures, internal
|
(default 0.75). The rest of the space (25%) is reserved for user data structures, internal
|
||||||
metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually
|
metadata in Spark, and safeguarding against OOM errors in the case of sparse and unusually
|
||||||
large records.
|
large records.
|
||||||
|
|
Loading…
Reference in a new issue