[SPARK-21923][CORE] Avoid calling reserveUnrollMemoryForThisTask for every record
## What changes were proposed in this pull request? When Spark persist data to Unsafe memory, we call the method `MemoryStore.putIteratorAsBytes`, which need synchronize the `memoryManager` for every record write. This implementation is not necessary, we can apply for more memory at a time to reduce unnecessary synchronization. ## How was this patch tested? Test case (with 1 executor 20 core): ```scala val start = System.currentTimeMillis() val data = sc.parallelize(0 until Integer.MAX_VALUE, 100) .persist(StorageLevel.OFF_HEAP) .count() println(System.currentTimeMillis() - start) ``` Test result: before | 27647 | 29108 | 28591 | 28264 | 27232 | after | 26868 | 26358 | 27767 | 26653 | 26693 | Author: Xianyang Liu <xianyang.liu@intel.com> Closes #19135 from ConeyLiu/memorystore.
This commit is contained in:
parent
10f45b3c84
commit
a11db942aa
|
@ -385,4 +385,19 @@ package object config {
|
||||||
.checkValue(v => v > 0 && v <= Int.MaxValue,
|
.checkValue(v => v > 0 && v <= Int.MaxValue,
|
||||||
s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")
|
s"The buffer size must be greater than 0 and less than ${Int.MaxValue}.")
|
||||||
.createWithDefault(1024 * 1024)
|
.createWithDefault(1024 * 1024)
|
||||||
|
|
||||||
|
private[spark] val UNROLL_MEMORY_CHECK_PERIOD =
|
||||||
|
ConfigBuilder("spark.storage.unrollMemoryCheckPeriod")
|
||||||
|
.internal()
|
||||||
|
.doc("The memory check period is used to determine how often we should check whether "
|
||||||
|
+ "there is a need to request more memory when we try to unroll the given block in memory.")
|
||||||
|
.longConf
|
||||||
|
.createWithDefault(16)
|
||||||
|
|
||||||
|
private[spark] val UNROLL_MEMORY_GROWTH_FACTOR =
|
||||||
|
ConfigBuilder("spark.storage.unrollMemoryGrowthFactor")
|
||||||
|
.internal()
|
||||||
|
.doc("Memory to request as a multiple of the size that used to unroll the block.")
|
||||||
|
.doubleConf
|
||||||
|
.createWithDefault(1.5)
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,6 +29,7 @@ import com.google.common.io.ByteStreams
|
||||||
|
|
||||||
import org.apache.spark.{SparkConf, TaskContext}
|
import org.apache.spark.{SparkConf, TaskContext}
|
||||||
import org.apache.spark.internal.Logging
|
import org.apache.spark.internal.Logging
|
||||||
|
import org.apache.spark.internal.config.{UNROLL_MEMORY_CHECK_PERIOD, UNROLL_MEMORY_GROWTH_FACTOR}
|
||||||
import org.apache.spark.memory.{MemoryManager, MemoryMode}
|
import org.apache.spark.memory.{MemoryManager, MemoryMode}
|
||||||
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
|
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
|
||||||
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
|
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel, StreamBlockId}
|
||||||
|
@ -190,11 +191,11 @@ private[spark] class MemoryStore(
|
||||||
// Initial per-task memory to request for unrolling blocks (bytes).
|
// Initial per-task memory to request for unrolling blocks (bytes).
|
||||||
val initialMemoryThreshold = unrollMemoryThreshold
|
val initialMemoryThreshold = unrollMemoryThreshold
|
||||||
// How often to check whether we need to request more memory
|
// How often to check whether we need to request more memory
|
||||||
val memoryCheckPeriod = 16
|
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
|
||||||
// Memory currently reserved by this task for this particular unrolling operation
|
// Memory currently reserved by this task for this particular unrolling operation
|
||||||
var memoryThreshold = initialMemoryThreshold
|
var memoryThreshold = initialMemoryThreshold
|
||||||
// Memory to request as a multiple of current vector size
|
// Memory to request as a multiple of current vector size
|
||||||
val memoryGrowthFactor = 1.5
|
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
|
||||||
// Keep track of unroll memory used by this particular block / putIterator() operation
|
// Keep track of unroll memory used by this particular block / putIterator() operation
|
||||||
var unrollMemoryUsedByThisBlock = 0L
|
var unrollMemoryUsedByThisBlock = 0L
|
||||||
// Underlying vector for unrolling the block
|
// Underlying vector for unrolling the block
|
||||||
|
@ -325,6 +326,12 @@ private[spark] class MemoryStore(
|
||||||
|
|
||||||
// Whether there is still enough memory for us to continue unrolling this block
|
// Whether there is still enough memory for us to continue unrolling this block
|
||||||
var keepUnrolling = true
|
var keepUnrolling = true
|
||||||
|
// Number of elements unrolled so far
|
||||||
|
var elementsUnrolled = 0L
|
||||||
|
// How often to check whether we need to request more memory
|
||||||
|
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
|
||||||
|
// Memory to request as a multiple of current bbos size
|
||||||
|
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
|
||||||
// Initial per-task memory to request for unrolling blocks (bytes).
|
// Initial per-task memory to request for unrolling blocks (bytes).
|
||||||
val initialMemoryThreshold = unrollMemoryThreshold
|
val initialMemoryThreshold = unrollMemoryThreshold
|
||||||
// Keep track of unroll memory used by this particular block / putIterator() operation
|
// Keep track of unroll memory used by this particular block / putIterator() operation
|
||||||
|
@ -359,7 +366,7 @@ private[spark] class MemoryStore(
|
||||||
|
|
||||||
def reserveAdditionalMemoryIfNecessary(): Unit = {
|
def reserveAdditionalMemoryIfNecessary(): Unit = {
|
||||||
if (bbos.size > unrollMemoryUsedByThisBlock) {
|
if (bbos.size > unrollMemoryUsedByThisBlock) {
|
||||||
val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
|
val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong
|
||||||
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
|
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
|
||||||
if (keepUnrolling) {
|
if (keepUnrolling) {
|
||||||
unrollMemoryUsedByThisBlock += amountToRequest
|
unrollMemoryUsedByThisBlock += amountToRequest
|
||||||
|
@ -370,7 +377,10 @@ private[spark] class MemoryStore(
|
||||||
// Unroll this block safely, checking whether we have exceeded our threshold
|
// Unroll this block safely, checking whether we have exceeded our threshold
|
||||||
while (values.hasNext && keepUnrolling) {
|
while (values.hasNext && keepUnrolling) {
|
||||||
serializationStream.writeObject(values.next())(classTag)
|
serializationStream.writeObject(values.next())(classTag)
|
||||||
reserveAdditionalMemoryIfNecessary()
|
elementsUnrolled += 1
|
||||||
|
if (elementsUnrolled % memoryCheckPeriod == 0) {
|
||||||
|
reserveAdditionalMemoryIfNecessary()
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure that we have enough memory to store the block. By this point, it is possible that
|
// Make sure that we have enough memory to store the block. By this point, it is possible that
|
||||||
|
|
Loading…
Reference in a new issue