[SPARK-25422][CORE] Don't memory map blocks streamed to disk.

After data has been streamed to disk, the buffers are inserted into the
memory store in some cases (eg., with broadcast blocks).  But broadcast
code also disposes of those buffers when the data has been read, to
ensure that we don't leave mapped buffers using up memory, which then
leads to garbage data in the memory store.

## How was this patch tested?

Ran the old failing test in a loop. Full tests on jenkins

Closes #22546 from squito/SPARK-25422-master.

Authored-by: Imran Rashid <irashid@cloudera.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This commit is contained in:
Imran Rashid 2018-09-26 08:45:27 +08:00 committed by Wenchen Fan
parent 66d29870c0
commit 9bb3a0c67b
2 changed files with 31 additions and 29 deletions

View file

@ -438,10 +438,8 @@ private[spark] class BlockManager(
// stream.
channel.close()
// TODO SPARK-25035 Even if we're only going to write the data to disk after this, we end up
// using a lot of memory here. With encryption, we'll read the whole file into a regular
// byte buffer and OOM. Without encryption, we'll memory map the file and won't get a jvm
// OOM, but might get killed by the OS / cluster manager. We could at least read the tmp
// file as a stream in both cases.
// using a lot of memory here. We'll read the whole file into a regular
// byte buffer and OOM. We could at least read the tmp file as a stream.
val buffer = securityManager.getIOEncryptionKey() match {
case Some(key) =>
// we need to pass in the size of the unencrypted block
@ -453,7 +451,7 @@ private[spark] class BlockManager(
new EncryptedBlockData(tmpFile, blockSize, conf, key).toChunkedByteBuffer(allocator)
case None =>
ChunkedByteBuffer.map(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
ChunkedByteBuffer.fromFile(tmpFile, conf.get(config.MEMORY_MAP_LIMIT_FOR_TESTS).toInt)
}
putBytes(blockId, buffer, level)(classTag)
tmpFile.delete()
@ -726,10 +724,9 @@ private[spark] class BlockManager(
*/
def getRemoteBytes(blockId: BlockId): Option[ChunkedByteBuffer] = {
// TODO if we change this method to return the ManagedBuffer, then getRemoteValues
// could just use the inputStream on the temp file, rather than memory-mapping the file.
// could just use the inputStream on the temp file, rather than reading the file into memory.
// Until then, replication can cause the process to use too much memory and get killed
// by the OS / cluster manager (not a java OOM, since it's a memory-mapped file) even though
// we've read the data to disk.
// even though we've read the data to disk.
logDebug(s"Getting remote block $blockId")
require(blockId != null, "BlockId is null")
var runningFailureCount = 0

View file

@ -19,17 +19,16 @@ package org.apache.spark.util.io
import java.io.{File, FileInputStream, InputStream}
import java.nio.ByteBuffer
import java.nio.channels.{FileChannel, WritableByteChannel}
import java.nio.file.StandardOpenOption
import scala.collection.mutable.ListBuffer
import java.nio.channels.WritableByteChannel
import com.google.common.io.ByteStreams
import com.google.common.primitives.UnsignedBytes
import org.apache.commons.io.IOUtils
import org.apache.spark.SparkEnv
import org.apache.spark.internal.config
import org.apache.spark.network.buffer.{FileSegmentManagedBuffer, ManagedBuffer}
import org.apache.spark.network.util.ByteArrayWritableChannel
import org.apache.spark.network.util.{ByteArrayWritableChannel, LimitedInputStream}
import org.apache.spark.storage.StorageUtils
import org.apache.spark.util.Utils
@ -175,30 +174,36 @@ object ChunkedByteBuffer {
def fromManagedBuffer(data: ManagedBuffer, maxChunkSize: Int): ChunkedByteBuffer = {
data match {
case f: FileSegmentManagedBuffer =>
map(f.getFile, maxChunkSize, f.getOffset, f.getLength)
fromFile(f.getFile, maxChunkSize, f.getOffset, f.getLength)
case other =>
new ChunkedByteBuffer(other.nioByteBuffer())
}
}
def map(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
map(file, maxChunkSize, 0, file.length())
def fromFile(file: File, maxChunkSize: Int): ChunkedByteBuffer = {
fromFile(file, maxChunkSize, 0, file.length())
}
def map(file: File, maxChunkSize: Int, offset: Long, length: Long): ChunkedByteBuffer = {
Utils.tryWithResource(FileChannel.open(file.toPath, StandardOpenOption.READ)) { channel =>
var remaining = length
var pos = offset
val chunks = new ListBuffer[ByteBuffer]()
while (remaining > 0) {
val chunkSize = math.min(remaining, maxChunkSize)
val chunk = channel.map(FileChannel.MapMode.READ_ONLY, pos, chunkSize)
pos += chunkSize
remaining -= chunkSize
chunks += chunk
}
new ChunkedByteBuffer(chunks.toArray)
private def fromFile(
file: File,
maxChunkSize: Int,
offset: Long,
length: Long): ChunkedByteBuffer = {
// We do *not* memory map the file, because we may end up putting this into the memory store,
// and spark currently is not expecting memory-mapped buffers in the memory store, it conflicts
// with other parts that manage the lifecyle of buffers and dispose them. See SPARK-25422.
val is = new FileInputStream(file)
ByteStreams.skipFully(is, offset)
val in = new LimitedInputStream(is, length)
val chunkSize = math.min(maxChunkSize, length).toInt
val out = new ChunkedByteBufferOutputStream(chunkSize, ByteBuffer.allocate _)
Utils.tryWithSafeFinally {
IOUtils.copy(in, out)
} {
in.close()
out.close()
}
out.toChunkedByteBuffer
}
}