[SPARK-22068][CORE] Reduce the duplicate code between putIteratorAsValues and putIteratorAsBytes

## What changes were proposed in this pull request?

The code logic between `MemoryStore.putIteratorAsValues` and `Memory.putIteratorAsBytes` are almost same, so we should reduce the duplicate code between them.

## How was this patch tested?

Existing UT.

Author: Xianyang Liu <xianyang.liu@intel.com>

Closes #19285 from ConeyLiu/rmemorystore.
This commit is contained in:
Xianyang Liu 2018-01-26 19:18:18 +08:00 committed by Wenchen Fan
parent c22eaa94e8
commit 3e25251474

View file

@ -162,7 +162,7 @@ private[spark] class MemoryStore(
}
/**
* Attempt to put the given block in memory store as values.
* Attempt to put the given block in memory store as values or bytes.
*
* It's possible that the iterator is too large to materialize and store in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while periodically checking
@ -170,18 +170,24 @@ private[spark] class MemoryStore(
* temporary unroll memory used during the materialization is "transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the block.
*
* @return in case of success, the estimated size of the stored data. In case of failure, return
* an iterator containing the values of the block. The returned iterator will be backed
* by the combination of the partially-unrolled block and the remaining elements of the
* original input iterator. The caller must either fully consume this iterator or call
* `close()` on it in order to free the storage memory consumed by the partially-unrolled
* block.
* @param blockId The block id.
* @param values The values which need be stored.
* @param classTag the [[ClassTag]] for the block.
* @param memoryMode The values saved memory mode(ON_HEAP or OFF_HEAP).
* @param valuesHolder A holder that supports storing record of values into memory store as
* values or bytes.
* @return if the block is stored successfully, return the stored data size. Else return the
* memory has reserved for unrolling the block (There are two reasons for store failed:
* First, the block is partially-unrolled; second, the block is entirely unrolled and
* the actual stored data size is larger than reserved, but we can't request extra
* memory).
*/
private[storage] def putIteratorAsValues[T](
private def putIterator[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
classTag: ClassTag[T],
memoryMode: MemoryMode,
valuesHolder: ValuesHolder[T]): Either[Long, Long] = {
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
// Number of elements unrolled so far
@ -198,12 +204,10 @@ private[spark] class MemoryStore(
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[T]()(classTag)
// Request enough memory to begin unrolling
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
@ -214,14 +218,14 @@ private[spark] class MemoryStore(
// Unroll this block safely, checking whether we have exceeded our threshold periodically
while (values.hasNext && keepUnrolling) {
vector += values.next()
valuesHolder.storeValue(values.next())
if (elementsUnrolled % memoryCheckPeriod == 0) {
val currentSize = valuesHolder.estimatedSize()
// If our vector's size has exceeded the threshold, request more memory
val currentSize = vector.estimateSize()
if (currentSize >= memoryThreshold) {
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
keepUnrolling =
reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
@ -232,78 +236,86 @@ private[spark] class MemoryStore(
elementsUnrolled += 1
}
// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
if (keepUnrolling) {
// We successfully unrolled the entirety of this block
val arrayValues = vector.toArray
vector = null
val entry =
new DeserializedMemoryEntry[T](arrayValues, SizeEstimator.estimate(arrayValues), classTag)
val size = entry.size
def transferUnrollToStorage(amount: Long): Unit = {
val entryBuilder = valuesHolder.getBuilder()
val size = entryBuilder.preciseSize
if (size > unrollMemoryUsedByThisBlock) {
val amountToRequest = size - unrollMemoryUsedByThisBlock
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
}
if (keepUnrolling) {
val entry = entryBuilder.build()
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
}
// Acquire storage memory if necessary to store this block in memory.
val enoughStorageMemory = {
if (unrollMemoryUsedByThisBlock <= size) {
val acquiredExtra =
memoryManager.acquireStorageMemory(
blockId, size - unrollMemoryUsedByThisBlock, MemoryMode.ON_HEAP)
if (acquiredExtra) {
transferUnrollToStorage(unrollMemoryUsedByThisBlock)
}
acquiredExtra
} else { // unrollMemoryUsedByThisBlock > size
// If this task attempt already owns more unroll memory than is necessary to store the
// block, then release the extra memory that will not be used.
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
transferUnrollToStorage(size)
true
}
}
if (enoughStorageMemory) {
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(size)
logInfo("Block %s stored as values in memory (estimated size %s, free %s)".format(blockId,
Utils.bytesToString(entry.size), Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(entry.size)
} else {
assert(currentUnrollMemoryForThisTask >= unrollMemoryUsedByThisBlock,
"released too much unroll memory")
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, entryBuilder.preciseSize)
Left(unrollMemoryUsedByThisBlock)
}
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, valuesHolder.estimatedSize())
Left(unrollMemoryUsedByThisBlock)
}
}
/**
* Attempt to put the given block in memory store as values.
*
* @return in case of success, the estimated size of the stored data. In case of failure, return
* an iterator containing the values of the block. The returned iterator will be backed
* by the combination of the partially-unrolled block and the remaining elements of the
* original input iterator. The caller must either fully consume this iterator or call
* `close()` on it in order to free the storage memory consumed by the partially-unrolled
* block.
*/
private[storage] def putIteratorAsValues[T](
blockId: BlockId,
values: Iterator[T],
classTag: ClassTag[T]): Either[PartiallyUnrolledIterator[T], Long] = {
val valuesHolder = new DeserializedValuesHolder[T](classTag)
putIterator(blockId, values, classTag, MemoryMode.ON_HEAP, valuesHolder) match {
case Right(storedSize) => Right(storedSize)
case Left(unrollMemoryUsedByThisBlock) =>
val unrolledIterator = if (valuesHolder.vector != null) {
valuesHolder.vector.iterator
} else {
valuesHolder.arrayValues.toIterator
}
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = arrayValues.toIterator,
rest = Iterator.empty))
}
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, vector.estimateSize())
Left(new PartiallyUnrolledIterator(
this,
MemoryMode.ON_HEAP,
unrollMemoryUsedByThisBlock,
unrolled = vector.iterator,
rest = values))
unrolled = unrolledIterator,
rest = values))
}
}
/**
* Attempt to put the given block in memory store as bytes.
*
* It's possible that the iterator is too large to materialize and store in memory. To avoid
* OOM exceptions, this method will gradually unroll the iterator while periodically checking
* whether there is enough free memory. If the block is successfully materialized, then the
* temporary unroll memory used during the materialization is "transferred" to storage memory,
* so we won't acquire more memory than is actually needed to store the block.
*
* @return in case of success, the estimated size of the stored data. In case of failure,
* return a handle which allows the caller to either finish the serialization by
* spilling to disk or to deserialize the partially-serialized block and reconstruct
@ -319,25 +331,8 @@ private[spark] class MemoryStore(
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
// Whether there is still enough memory for us to continue unrolling this block
var keepUnrolling = true
// Number of elements unrolled so far
var elementsUnrolled = 0L
// How often to check whether we need to request more memory
val memoryCheckPeriod = conf.get(UNROLL_MEMORY_CHECK_PERIOD)
// Memory to request as a multiple of current bbos size
val memoryGrowthFactor = conf.get(UNROLL_MEMORY_GROWTH_FACTOR)
// Initial per-task memory to request for unrolling blocks (bytes).
val initialMemoryThreshold = unrollMemoryThreshold
// Keep track of unroll memory used by this particular block / putIterator() operation
var unrollMemoryUsedByThisBlock = 0L
// Underlying buffer for unrolling the block
val redirectableStream = new RedirectableOutputStream
val chunkSize = if (initialMemoryThreshold > Int.MaxValue) {
logWarning(s"Initial memory threshold of ${Utils.bytesToString(initialMemoryThreshold)} " +
s"is too large to be set as chunk size. Chunk size has been capped to " +
@ -346,85 +341,22 @@ private[spark] class MemoryStore(
} else {
initialMemoryThreshold.toInt
}
val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}
// Request enough memory to begin unrolling
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
val valuesHolder = new SerializedValuesHolder[T](blockId, chunkSize, classTag,
memoryMode, serializerManager)
if (!keepUnrolling) {
logWarning(s"Failed to reserve initial memory threshold of " +
s"${Utils.bytesToString(initialMemoryThreshold)} for computing block $blockId in memory.")
} else {
unrollMemoryUsedByThisBlock += initialMemoryThreshold
}
def reserveAdditionalMemoryIfNecessary(): Unit = {
if (bbos.size > unrollMemoryUsedByThisBlock) {
val amountToRequest = (bbos.size * memoryGrowthFactor - unrollMemoryUsedByThisBlock).toLong
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
}
}
// Unroll this block safely, checking whether we have exceeded our threshold
while (values.hasNext && keepUnrolling) {
serializationStream.writeObject(values.next())(classTag)
elementsUnrolled += 1
if (elementsUnrolled % memoryCheckPeriod == 0) {
reserveAdditionalMemoryIfNecessary()
}
}
// Make sure that we have enough memory to store the block. By this point, it is possible that
// the block's actual memory usage has exceeded the unroll memory by a small amount, so we
// perform one final call to attempt to allocate additional memory if necessary.
if (keepUnrolling) {
serializationStream.close()
if (bbos.size > unrollMemoryUsedByThisBlock) {
val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
if (keepUnrolling) {
unrollMemoryUsedByThisBlock += amountToRequest
}
}
}
if (keepUnrolling) {
val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
// Synchronize so that transfer is atomic
memoryManager.synchronized {
releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
assert(success, "transferring unroll memory to storage memory failed")
}
entries.synchronized {
entries.put(blockId, entry)
}
logInfo("Block %s stored as bytes in memory (estimated size %s, free %s)".format(
blockId, Utils.bytesToString(entry.size),
Utils.bytesToString(maxMemory - blocksMemoryUsed)))
Right(entry.size)
} else {
// We ran out of space while unrolling the values for this block
logUnrollFailureMessage(blockId, bbos.size)
Left(
new PartiallySerializedBlock(
putIterator(blockId, values, classTag, memoryMode, valuesHolder) match {
case Right(storedSize) => Right(storedSize)
case Left(unrollMemoryUsedByThisBlock) =>
Left(new PartiallySerializedBlock(
this,
serializerManager,
blockId,
serializationStream,
redirectableStream,
valuesHolder.serializationStream,
valuesHolder.redirectableStream,
unrollMemoryUsedByThisBlock,
memoryMode,
bbos,
valuesHolder.bbos,
values,
classTag))
}
@ -702,6 +634,94 @@ private[spark] class MemoryStore(
}
}
private trait MemoryEntryBuilder[T] {
def preciseSize: Long
def build(): MemoryEntry[T]
}
private trait ValuesHolder[T] {
def storeValue(value: T): Unit
def estimatedSize(): Long
/**
* Note: After this method is called, the ValuesHolder is invalid, we can't store data and
* get estimate size again.
* @return a MemoryEntryBuilder which is used to build a memory entry and get the stored data
* size.
*/
def getBuilder(): MemoryEntryBuilder[T]
}
/**
* A holder for storing the deserialized values.
*/
private class DeserializedValuesHolder[T] (classTag: ClassTag[T]) extends ValuesHolder[T] {
// Underlying vector for unrolling the block
var vector = new SizeTrackingVector[T]()(classTag)
var arrayValues: Array[T] = null
override def storeValue(value: T): Unit = {
vector += value
}
override def estimatedSize(): Long = {
vector.estimateSize()
}
override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
// We successfully unrolled the entirety of this block
arrayValues = vector.toArray
vector = null
override val preciseSize: Long = SizeEstimator.estimate(arrayValues)
override def build(): MemoryEntry[T] =
DeserializedMemoryEntry[T](arrayValues, preciseSize, classTag)
}
}
/**
* A holder for storing the serialized values.
*/
private class SerializedValuesHolder[T](
blockId: BlockId,
chunkSize: Int,
classTag: ClassTag[T],
memoryMode: MemoryMode,
serializerManager: SerializerManager) extends ValuesHolder[T] {
val allocator = memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
val redirectableStream = new RedirectableOutputStream
val bbos = new ChunkedByteBufferOutputStream(chunkSize, allocator)
redirectableStream.setOutputStream(bbos)
val serializationStream: SerializationStream = {
val autoPick = !blockId.isInstanceOf[StreamBlockId]
val ser = serializerManager.getSerializer(classTag, autoPick).newInstance()
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
}
override def storeValue(value: T): Unit = {
serializationStream.writeObject(value)(classTag)
}
override def estimatedSize(): Long = {
bbos.size
}
override def getBuilder(): MemoryEntryBuilder[T] = new MemoryEntryBuilder[T] {
// We successfully unrolled the entirety of this block
serializationStream.close()
override def preciseSize(): Long = bbos.size
override def build(): MemoryEntry[T] =
SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
}
}
/**
* The result of a failed [[MemoryStore.putIteratorAsValues()]] call.
*