[SPARK-17204][CORE] Fix replicated off heap storage

(Jira: https://issues.apache.org/jira/browse/SPARK-17204)

## What changes were proposed in this pull request?

There are a couple of bugs in the `BlockManager` with respect to support for replicated off-heap storage. First, the locally-stored off-heap byte buffer is disposed of when it is replicated. It should not be. Second, the replica byte buffers are stored as heap byte buffers instead of direct byte buffers even when the storage level memory mode is off-heap. This PR addresses both of these problems.

## How was this patch tested?

`BlockManagerReplicationSuite` was enhanced to fill in the coverage gaps. It now fails if either of the bugs in this PR exist.

Author: Michael Allman <michael@videoamp.com>

Closes #16499 from mallman/spark-17204-replicated_off_heap_storage.
This commit is contained in:
Michael Allman 2017-03-21 11:51:22 +08:00 committed by Wenchen Fan
parent 0ec1db5475
commit 7fa116f8fc
5 changed files with 105 additions and 25 deletions

View file

@ -317,6 +317,9 @@ private[spark] class BlockManager(
/**
* Put the block locally, using the given storage level.
*
* '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
* so may corrupt or change the data stored by the `BlockManager`.
*/
override def putBlockData(
blockId: BlockId,
@ -755,6 +758,9 @@ private[spark] class BlockManager(
/**
* Put a new block of serialized bytes to the block manager.
*
* '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
* so may corrupt or change the data stored by the `BlockManager`.
*
* @param encrypt If true, asks the block manager to encrypt the data block before storing,
* when I/O encryption is enabled. This is required for blocks that have been
* read from unencrypted sources, since all the BlockManager read APIs
@ -773,7 +779,7 @@ private[spark] class BlockManager(
if (encrypt && securityManager.ioEncryptionKey.isDefined) {
try {
val data = bytes.toByteBuffer
val in = new ByteBufferInputStream(data, true)
val in = new ByteBufferInputStream(data)
val byteBufOut = new ByteBufferOutputStream(data.remaining())
val out = CryptoStreamUtils.createCryptoOutputStream(byteBufOut, conf,
securityManager.ioEncryptionKey.get)
@ -800,6 +806,9 @@ private[spark] class BlockManager(
*
* If the block already exists, this method will not overwrite it.
*
* '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
* so may corrupt or change the data stored by the `BlockManager`.
*
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
@ -843,7 +852,15 @@ private[spark] class BlockManager(
false
}
} else {
memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
val memoryMode = level.memoryMode
memoryStore.putBytes(blockId, size, memoryMode, () => {
if (memoryMode == MemoryMode.OFF_HEAP &&
bytes.chunks.exists(buffer => !buffer.isDirect)) {
bytes.copy(Platform.allocateDirectBuffer)
} else {
bytes
}
})
}
if (!putSucceeded && level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
@ -1048,7 +1065,7 @@ private[spark] class BlockManager(
try {
replicate(blockId, bytesToReplicate, level, remoteClassTag)
} finally {
bytesToReplicate.dispose()
bytesToReplicate.unmap()
}
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))

View file

@ -236,22 +236,60 @@ class StorageStatus(val blockManagerId: BlockManagerId, val maxMem: Long) {
/** Helper methods for storage-related objects. */
private[spark] object StorageUtils extends Logging {
// Ewwww... Reflection!!! See the unmap method for justification
private val memoryMappedBufferFileDescriptorField = {
val mappedBufferClass = classOf[java.nio.MappedByteBuffer]
val fdField = mappedBufferClass.getDeclaredField("fd")
fdField.setAccessible(true)
fdField
}
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
* waiting for the GC to find it because that could lead to huge numbers of open files. There's
* unfortunately no standard API to do this.
* Attempt to clean up a ByteBuffer if it is direct or memory-mapped. This uses an *unsafe* Sun
* API that will cause errors if one attempts to read from the disposed buffer. However, neither
* the bytes allocated to direct buffers nor file descriptors opened for memory-mapped buffers put
* pressure on the garbage collector. Waiting for garbage collection may lead to the depletion of
* off-heap memory or huge numbers of open files. There's unfortunately no standard API to
* manually dispose of these kinds of buffers.
*
* See also [[unmap]]
*/
def dispose(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
logTrace(s"Unmapping $buffer")
if (buffer.asInstanceOf[DirectBuffer].cleaner() != null) {
buffer.asInstanceOf[DirectBuffer].cleaner().clean()
logTrace(s"Disposing of $buffer")
cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
}
}
/**
* Attempt to unmap a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that will
* cause errors if one attempts to read from the unmapped buffer. However, the file descriptors of
* memory-mapped buffers do not put pressure on the garbage collector. Waiting for garbage
* collection may lead to huge numbers of open files. There's unfortunately no standard API to
* manually unmap memory-mapped buffers.
*
* See also [[dispose]]
*/
def unmap(buffer: ByteBuffer): Unit = {
if (buffer != null && buffer.isInstanceOf[MappedByteBuffer]) {
// Note that direct buffers are instances of MappedByteBuffer. As things stand in Java 8, the
// JDK does not provide a public API to distinguish between direct buffers and memory-mapped
// buffers. As an alternative, we peek beneath the curtains and look for a non-null file
// descriptor in mappedByteBuffer
if (memoryMappedBufferFileDescriptorField.get(buffer) != null) {
logTrace(s"Unmapping $buffer")
cleanDirectBuffer(buffer.asInstanceOf[DirectBuffer])
}
}
}
private def cleanDirectBuffer(buffer: DirectBuffer) = {
val cleaner = buffer.cleaner()
if (cleaner != null) {
cleaner.clean()
}
}
/**
* Update the given list of RDDInfo with the given list of storage statuses.
* This method overwrites the old values stored in the RDDInfo's.

View file

@ -23,11 +23,10 @@ import java.nio.ByteBuffer
import org.apache.spark.storage.StorageUtils
/**
* Reads data from a ByteBuffer, and optionally cleans it up using StorageUtils.dispose()
* at the end of the stream (e.g. to close a memory-mapped file).
* Reads data from a ByteBuffer.
*/
private[spark]
class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = false)
class ByteBufferInputStream(private var buffer: ByteBuffer)
extends InputStream {
override def read(): Int = {
@ -72,9 +71,6 @@ class ByteBufferInputStream(private var buffer: ByteBuffer, dispose: Boolean = f
*/
private def cleanUp() {
if (buffer != null) {
if (dispose) {
StorageUtils.dispose(buffer)
}
buffer = null
}
}

View file

@ -86,7 +86,11 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
}
/**
* Copy this buffer into a new ByteBuffer.
* Convert this buffer to a ByteBuffer. If this buffer is backed by a single chunk, its underlying
* data will not be copied. Instead, it will be duplicated. If this buffer is backed by multiple
* chunks, the data underlying this buffer will be copied into a new byte buffer. As a result, it
* is suggested to use this method only if the caller does not need to manage the memory
* underlying this buffer.
*
* @throws UnsupportedOperationException if this buffer's size exceeds the max ByteBuffer size.
*/
@ -132,10 +136,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
}
/**
* Attempt to clean up a ByteBuffer if it is memory-mapped. This uses an *unsafe* Sun API that
* might cause errors if one attempts to read from the unmapped buffer, but it's better than
* waiting for the GC to find it because that could lead to huge numbers of open files. There's
* unfortunately no standard API to do this.
* Attempt to clean up any ByteBuffer in this ChunkedByteBuffer which is direct or memory-mapped.
* See [[StorageUtils.dispose]] for more information.
*
* See also [[unmap]]
*/
def dispose(): Unit = {
if (!disposed) {
@ -143,6 +147,19 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
disposed = true
}
}
/**
* Attempt to unmap any ByteBuffer in this ChunkedByteBuffer if it is memory-mapped. See
* [[StorageUtils.unmap]] for more information.
*
* See also [[dispose]]
*/
def unmap(): Unit = {
if (!disposed) {
chunks.foreach(StorageUtils.unmap)
disposed = true
}
}
}
/**

View file

@ -374,7 +374,8 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
// Put the block into one of the stores
val blockId = new TestBlockId(
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
val testValue = Array.fill[Byte](blockSize)(1)
stores(0).putSingle(blockId, testValue, storageLevel)
// Assert that master know two locations for the block
val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@ -386,12 +387,23 @@ trait BlockManagerReplicationBehavior extends SparkFunSuite
testStore => blockLocations.contains(testStore.blockManagerId.executorId)
}.foreach { testStore =>
val testStoreName = testStore.blockManagerId.executorId
assert(
testStore.getLocalValues(blockId).isDefined, s"$blockId was not found in $testStoreName")
testStore.releaseLock(blockId)
val blockResultOpt = testStore.getLocalValues(blockId)
assert(blockResultOpt.isDefined, s"$blockId was not found in $testStoreName")
val localValues = blockResultOpt.get.data.toSeq
assert(localValues.size == 1)
assert(localValues.head === testValue)
assert(master.getLocations(blockId).map(_.executorId).toSet.contains(testStoreName),
s"master does not have status for ${blockId.name} in $testStoreName")
val memoryStore = testStore.memoryStore
if (memoryStore.contains(blockId) && !storageLevel.deserialized) {
memoryStore.getBytes(blockId).get.chunks.foreach { byteBuffer =>
assert(storageLevel.useOffHeap == byteBuffer.isDirect,
s"memory mode ${storageLevel.memoryMode} is not compatible with " +
byteBuffer.getClass.getSimpleName)
}
}
val blockStatus = master.getBlockStatus(blockId)(testStore.blockManagerId)
// Assert that block status in the master for this store has expected storage level