[SPARK-25035][CORE] Avoiding memory mapping at disk-stored blocks replication

Before this PR the method `BlockManager#putBlockDataAsStream()` (which is used during block replication where the block data is received as a stream) was reading the whole block content into the memory even at DISK_ONLY storage level.

With this change the received block data (which was temporary stored in a file) is just simply moved into the right location backing the target block. This way a possible OOM error is avoided.

In this implementation to save code duplications the method `doPutBytes` is refactored into a template method called `BlockStoreUpdater` which has a separate implementation to handle byte buffer based and temporary file based block store updates.

With existing unit tests of `DistributedSuite` (the ones dealing with replications):
- caching on disk, replicated (encryption = off) (with replication as stream)
- caching on disk, replicated (encryption = on) (with replication as stream)
- caching in memory, serialized, replicated (encryption = on) (with replication as stream)
- caching in memory, serialized, replicated (encryption = off) (with replication as stream)
- etc.

And with new unit tests testing `putBlockDataAsStream` method directly:
- test putBlockDataAsStream with caching (encryption = off)
- test putBlockDataAsStream with caching (encryption = on)
- test putBlockDataAsStream with caching on disk (encryption = off)
- test putBlockDataAsStream with caching on disk (encryption = on)

Closes #23688 from attilapiros/SPARK-25035.

Authored-by: “attilapiros” <piros.attila.zsolt@gmail.com>
Signed-off-by: Marcelo Vanzin <vanzin@cloudera.com>
This commit is contained in:
“attilapiros” 2019-02-25 11:42:55 -08:00 committed by Marcelo Vanzin
parent c5de804093
commit 0ac516bebd
3 changed files with 263 additions and 142 deletions

View file

@ -33,6 +33,7 @@ import scala.util.Random
import scala.util.control.NonFatal
import com.codahale.metrics.{MetricRegistry, MetricSet}
import org.apache.commons.io.IOUtils
import org.apache.spark._
import org.apache.spark.executor.DataReadMethod
@ -221,6 +222,187 @@ private[spark] class BlockManager(
new BlockManager.RemoteBlockDownloadFileManager(this)
private val maxRemoteBlockToMem = conf.get(config.MAX_REMOTE_BLOCK_SIZE_FETCH_TO_MEM)
/**
* Abstraction for storing blocks from bytes, whether they start in memory or on disk.
*
* @param blockSize the decrypted size of the block
*/
private abstract class BlockStoreUpdater[T](
blockSize: Long,
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean,
keepReadLock: Boolean) {
/**
* Reads the block content into the memory. If the update of the block store is based on a
* temporary file this could lead to loading the whole file into a ChunkedByteBuffer.
*/
protected def readToByteBuffer(): ChunkedByteBuffer
protected def blockData(): BlockData
protected def saveToDiskStore(): Unit
private def saveDeserializedValuesToMemoryStore(inputStream: InputStream): Boolean = {
try {
val values = serializerManager.dataDeserializeStream(blockId, inputStream)(classTag)
memoryStore.putIteratorAsValues(blockId, values, classTag) match {
case Right(_) => true
case Left(iter) =>
// If putting deserialized values in memory failed, we will put the bytes directly
// to disk, so we don't need this iterator and can close it to free resources
// earlier.
iter.close()
false
}
} finally {
IOUtils.closeQuietly(inputStream)
}
}
private def saveSerializedValuesToMemoryStore(bytes: ChunkedByteBuffer): Boolean = {
val memoryMode = level.memoryMode
memoryStore.putBytes(blockId, blockSize, memoryMode, () => {
if (memoryMode == MemoryMode.OFF_HEAP && bytes.chunks.exists(!_.isDirect)) {
bytes.copy(Platform.allocateDirectBuffer)
} else {
bytes
}
})
}
/**
* Put the given data according to the given level in one of the block stores, replicating
* the values if necessary.
*
* If the block already exists, this method will not overwrite it.
*
* If keepReadLock is true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it returns.
*
* @return true if the block was already present or if the put succeeded, false otherwise.
*/
def save(): Boolean = {
doPut(blockId, level, classTag, tellMaster, keepReadLock) { info =>
val startTimeNs = System.nanoTime()
// Since we're storing bytes, initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = if (level.replication > 1) {
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool.
replicate(blockId, blockData(), level, classTag)
}(futureExecutionContext)
} else {
null
}
if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
saveDeserializedValuesToMemoryStore(blockData().toInputStream())
} else {
saveSerializedValuesToMemoryStore(readToByteBuffer())
}
if (!putSucceeded && level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
saveToDiskStore()
}
} else if (level.useDisk) {
saveToDiskStore()
}
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = blockSize
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug(s"Put block ${blockId} locally took ${Utils.getUsedTimeNs(startTimeNs)}")
if (level.replication > 1) {
// Wait for asynchronous replication to finish
try {
ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for replication to finish", t)
}
}
if (blockWasSuccessfullyStored) {
None
} else {
Some(blockSize)
}
}.isEmpty
}
}
/**
* Helper for storing a block from bytes already in memory.
* '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
* so may corrupt or change the data stored by the `BlockManager`.
*/
private case class ByteBufferBlockStoreUpdater[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
bytes: ChunkedByteBuffer,
tellMaster: Boolean = true,
keepReadLock: Boolean = false)
extends BlockStoreUpdater[T](bytes.size, blockId, level, classTag, tellMaster, keepReadLock) {
override def readToByteBuffer(): ChunkedByteBuffer = bytes
/**
* The ByteBufferBlockData wrapper is not disposed of to avoid releasing buffers that are
* owned by the caller.
*/
override def blockData(): BlockData = new ByteBufferBlockData(bytes, false)
override def saveToDiskStore(): Unit = diskStore.putBytes(blockId, bytes)
}
/**
* Helper for storing a block based from bytes already in a local temp file.
*/
private case class TempFileBasedBlockStoreUpdater[T](
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[T],
tmpFile: File,
blockSize: Long,
tellMaster: Boolean = true,
keepReadLock: Boolean = false)
extends BlockStoreUpdater[T](blockSize, blockId, level, classTag, tellMaster, keepReadLock) {
override def readToByteBuffer(): ChunkedByteBuffer = {
val allocator = level.memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
blockData().toChunkedByteBuffer(allocator)
}
override def blockData(): BlockData = diskStore.getBytes(tmpFile, blockSize)
override def saveToDiskStore(): Unit = diskStore.moveFileToBlock(tmpFile, blockSize, blockId)
override def save(): Boolean = {
val res = super.save()
tmpFile.delete()
res
}
}
/**
* Initializes the BlockManager with the given appId. This is not performed in the constructor as
* the appId may not be known at BlockManager instantiation time (in particular for the driver,
@ -412,10 +594,7 @@ private[spark] class BlockManager(
blockId: BlockId,
level: StorageLevel,
classTag: ClassTag[_]): StreamCallbackWithID = {
// TODO if we're going to only put the data in the disk store, we should just write it directly
// to the final location, but that would require a deeper refactor of this code. So instead
// we just write to a temp file, and call putBytes on the data in that file.
val tmpFile = diskBlockManager.createTempLocalBlock()._2
val (_, tmpFile) = diskBlockManager.createTempLocalBlock()
val channel = new CountingWritableChannel(
Channels.newChannel(serializerManager.wrapForEncryption(new FileOutputStream(tmpFile))))
logTrace(s"Streaming block $blockId to tmp file $tmpFile")
@ -431,28 +610,11 @@ private[spark] class BlockManager(
override def onComplete(streamId: String): Unit = {
logTrace(s"Done receiving block $blockId, now putting into local blockManager")
// Read the contents of the downloaded file as a buffer to put into the blockManager.
// Note this is all happening inside the netty thread as soon as it reads the end of the
// stream.
channel.close()
// TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up
// using a lot of memory here. We'll read the whole file into a regular
// byte buffer and OOM. We could at least read the tmp file as a stream.
val buffer = securityManager.getIOEncryptionKey() match {
case Some(key) =>
// we need to pass in the size of the unencrypted block
val blockSize = channel.getCount
val allocator = level.memoryMode match {
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
}
new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)
case None =>
ChunkedByteBuffer.fromFile(tmpFile)
}
putBytes(blockId, buffer, level)(classTag)
tmpFile.delete()
val blockSize = channel.getCount
TempFileBasedBlockStoreUpdater(blockId, level, classTag, tmpFile, blockSize).save()
}
override def onFailure(streamId: String, cause: Throwable): Unit = {
@ -953,111 +1115,14 @@ private[spark] class BlockManager(
level: StorageLevel,
tellMaster: Boolean = true): Boolean = {
require(bytes != null, "Bytes is null")
doPutBytes(blockId, bytes, level, implicitly[ClassTag[T]], tellMaster)
val blockStoreUpdater =
ByteBufferBlockStoreUpdater(blockId, level, implicitly[ClassTag[T]], bytes, tellMaster)
blockStoreUpdater.save()
}
/**
* Put the given bytes according to the given level in one of the block stores, replicating
* the values if necessary.
*
* If the block already exists, this method will not overwrite it.
*
* '''Important!''' Callers must not mutate or release the data buffer underlying `bytes`. Doing
* so may corrupt or change the data stored by the `BlockManager`.
*
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
* @return true if the block was already present or if the put succeeded, false otherwise.
*/
private def doPutBytes[T](
blockId: BlockId,
bytes: ChunkedByteBuffer,
level: StorageLevel,
classTag: ClassTag[T],
tellMaster: Boolean = true,
keepReadLock: Boolean = false): Boolean = {
doPut(blockId, level, classTag, tellMaster = tellMaster, keepReadLock = keepReadLock) { info =>
val startTimeNs = System.nanoTime()
// Since we're storing bytes, initiate the replication before storing them locally.
// This is faster as data is already serialized and ready to send.
val replicationFuture = if (level.replication > 1) {
Future {
// This is a blocking action and should run in futureExecutionContext which is a cached
// thread pool. The ByteBufferBlockData wrapper is not disposed of to avoid releasing
// buffers that are owned by the caller.
replicate(blockId, new ByteBufferBlockData(bytes, false), level, classTag)
}(futureExecutionContext)
} else {
null
}
val size = bytes.size
if (level.useMemory) {
// Put it in memory first, even if it also has useDisk set to true;
// We will drop it to disk later if the memory store can't hold it.
val putSucceeded = if (level.deserialized) {
val values =
serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
memoryStore.putIteratorAsValues(blockId, values, classTag) match {
case Right(_) => true
case Left(iter) =>
// If putting deserialized values in memory failed, we will put the bytes directly to
// disk, so we don't need this iterator and can close it to free resources earlier.
iter.close()
false
}
} else {
val memoryMode = level.memoryMode
memoryStore.putBytes(blockId, size, memoryMode, () => {
if (memoryMode == MemoryMode.OFF_HEAP &&
bytes.chunks.exists(buffer => !buffer.isDirect)) {
bytes.copy(Platform.allocateDirectBuffer)
} else {
bytes
}
})
}
if (!putSucceeded && level.useDisk) {
logWarning(s"Persisting block $blockId to disk instead.")
diskStore.putBytes(blockId, bytes)
}
} else if (level.useDisk) {
diskStore.putBytes(blockId, bytes)
}
val putBlockStatus = getCurrentBlockStatus(blockId, info)
val blockWasSuccessfullyStored = putBlockStatus.storageLevel.isValid
if (blockWasSuccessfullyStored) {
// Now that the block is in either the memory or disk store,
// tell the master about it.
info.size = size
if (tellMaster && info.tellMaster) {
reportBlockStatus(blockId, putBlockStatus)
}
addUpdatedBlockStatusToTaskMetrics(blockId, putBlockStatus)
}
logDebug(s"Put block ${blockId} locally took ${Utils.getUsedTimeNs(startTimeNs)}")
if (level.replication > 1) {
// Wait for asynchronous replication to finish
try {
ThreadUtils.awaitReady(replicationFuture, Duration.Inf)
} catch {
case NonFatal(t) =>
throw new Exception("Error occurred while waiting for replication to finish", t)
}
}
if (blockWasSuccessfullyStored) {
None
} else {
Some(bytes)
}
}.isEmpty
}
/**
* Helper method used to abstract common code from [[doPutBytes()]] and [[doPutIterator()]].
* Helper method used to abstract common code from [[BlockStoreUpdater.save()]]
* and [[doPutIterator()]].
*
* @param putBody a function which attempts the actual put() and returns None on success
* or Some on failure.

View file

@ -27,6 +27,7 @@ import scala.collection.mutable.ListBuffer
import com.google.common.io.Closeables
import io.netty.channel.DefaultFileRegion
import org.apache.commons.io.FileUtils
import org.apache.spark.{SecurityManager, SparkConf}
import org.apache.spark.internal.{config, Logging}
@ -95,18 +96,17 @@ private[spark] class DiskStore(
}
def getBytes(blockId: BlockId): BlockData = {
val file = diskManager.getFile(blockId.name)
val blockSize = getSize(blockId)
getBytes(diskManager.getFile(blockId.name), getSize(blockId))
}
securityManager.getIOEncryptionKey() match {
case Some(key) =>
// Encrypted blocks cannot be memory mapped; return a special object that does decryption
// and provides InputStream / FileRegion implementations for reading the data.
new EncryptedBlockData(file, blockSize, conf, key)
def getBytes(f: File, blockSize: Long): BlockData = securityManager.getIOEncryptionKey() match {
case Some(key) =>
// Encrypted blocks cannot be memory mapped; return a special object that does decryption
// and provides InputStream / FileRegion implementations for reading the data.
new EncryptedBlockData(f, blockSize, conf, key)
case _ =>
new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, file, blockSize)
}
case _ =>
new DiskBlockData(minMemoryMapBytes, maxMemoryMapBytes, f, blockSize)
}
def remove(blockId: BlockId): Boolean = {
@ -123,6 +123,16 @@ private[spark] class DiskStore(
}
}
/**
* @param blockSize if encryption is configured, the file is assumed to already be encrypted and
* blockSize should be the decrypted size
*/
def moveFileToBlock(sourceFile: File, blockSize: Long, targetBlockId: BlockId): Unit = {
blockSizes.put(targetBlockId, blockSize)
val targetFile = diskManager.getFile(targetBlockId.name)
FileUtils.moveFile(sourceFile, targetFile)
}
def contains(blockId: BlockId): Boolean = {
val file = diskManager.getFile(blockId.name)
file.exists()

View file

@ -80,6 +80,16 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
implicit def StringToBlockId(value: String): BlockId = new TestBlockId(value)
def rdd(rddId: Int, splitId: Int): RDDBlockId = RDDBlockId(rddId, splitId)
private def init(sparkConf: SparkConf): Unit = {
sparkConf
.set("spark.app.id", "test")
.set(IS_TESTING, true)
.set(MEMORY_FRACTION, 1.0)
.set(MEMORY_STORAGE_FRACTION, 0.999)
.set("spark.kryoserializer.buffer", "1m")
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
}
private def makeBlockManager(
maxMem: Long,
name: String = SparkContext.DRIVER_IDENTIFIER,
@ -113,12 +123,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Set the arch to 64-bit and compressedOops to true to get a deterministic test-case
System.setProperty("os.arch", "amd64")
conf = new SparkConf(false)
.set("spark.app.id", "test")
.set(IS_TESTING, true)
.set(MEMORY_FRACTION, 1.0)
.set(MEMORY_STORAGE_FRACTION, 0.999)
.set("spark.kryoserializer.buffer", "1m")
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
init(conf)
rpcEnv = RpcEnv.create("test", "localhost", 0, conf, securityMgr)
conf.set(DRIVER_PORT, rpcEnv.address.port)
@ -890,7 +895,6 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, 0)
memoryManager.setMemoryStore(store.memoryStore)
store.initialize("app-id")
// The put should fail since a1 is not serializable.
@ -906,6 +910,48 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
}
}
def testPutBlockDataAsStream(blockManager: BlockManager, storageLevel: StorageLevel): Unit = {
val message = "message"
val ser = serializer.newInstance().serialize(message).array()
val blockId = new RDDBlockId(0, 0)
val streamCallbackWithId =
blockManager.putBlockDataAsStream(blockId, storageLevel, ClassTag(message.getClass))
streamCallbackWithId.onData("0", ByteBuffer.wrap(ser))
streamCallbackWithId.onComplete("0")
val blockStatusOption = blockManager.getStatus(blockId)
assert(!blockStatusOption.isEmpty)
val blockStatus = blockStatusOption.get
assert((blockStatus.diskSize > 0) === !storageLevel.useMemory)
assert((blockStatus.memSize > 0) === storageLevel.useMemory)
assert(blockManager.getBlockData(blockId).nioByteBuffer().array() === ser)
}
Seq(
"caching" -> StorageLevel.MEMORY_ONLY,
"caching, serialized" -> StorageLevel.MEMORY_ONLY_SER,
"caching on disk" -> StorageLevel.DISK_ONLY
).foreach { case (name, storageLevel) =>
encryptionTest(s"test putBlockDataAsStream with $name") { conf =>
init(conf)
val ioEncryptionKey =
if (conf.get(IO_ENCRYPTION_ENABLED)) Some(CryptoStreamUtils.createKey(conf)) else None
val securityMgr = new SecurityManager(conf, ioEncryptionKey)
val serializerManager = new SerializerManager(serializer, conf, ioEncryptionKey)
val transfer =
new NettyBlockTransferService(conf, securityMgr, "localhost", "localhost", 0, 1)
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
val blockManager = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
serializerManager, conf, memoryManager, mapOutputTracker,
shuffleManager, transfer, securityMgr, 0)
try {
blockManager.initialize("app-id")
testPutBlockDataAsStream(blockManager, storageLevel)
} finally {
blockManager.stop()
}
}
}
test("turn off updated block statuses") {
val conf = new SparkConf()
conf.set(TASK_METRICS_TRACK_UPDATED_BLOCK_STATUSES, false)