diff --git a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java index 672552cc65..bdf52f32c6 100644 --- a/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java +++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java @@ -17,9 +17,12 @@ package org.apache.spark.unsafe; +import java.lang.reflect.Constructor; import java.lang.reflect.Field; import java.lang.reflect.Method; +import java.nio.ByteBuffer; +import sun.misc.Cleaner; import sun.misc.Unsafe; public final class Platform { @@ -144,6 +147,35 @@ public final class Platform { return newMemory; } + /** + * Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's + * MaxDirectMemorySize limit (the default limit is too low and we do not want to require users + * to increase it). + */ + @SuppressWarnings("unchecked") + public static ByteBuffer allocateDirectBuffer(int size) { + try { + Class cls = Class.forName("java.nio.DirectByteBuffer"); + Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE); + constructor.setAccessible(true); + Field cleanerField = cls.getDeclaredField("cleaner"); + cleanerField.setAccessible(true); + final long memory = allocateMemory(size); + ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size); + Cleaner cleaner = Cleaner.create(buffer, new Runnable() { + @Override + public void run() { + freeMemory(memory); + } + }); + cleanerField.set(buffer, cleaner); + return buffer; + } catch (Exception e) { + throwException(e); + } + throw new IllegalStateException("unreachable"); + } + public static void setMemory(long address, byte value, long size) { _UNSAFE.setMemory(address, size, value); } diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index e5e6a9e4a8..632b0ae9c2 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec import org.apache.spark.serializer.Serializer import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel} import org.apache.spark.util.{ByteBufferInputStream, Utils} -import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer} +import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} /** * A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]]. @@ -228,12 +228,12 @@ private object TorrentBroadcast extends Logging { blockSize: Int, serializer: Serializer, compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = { - val bos = new ByteArrayChunkOutputStream(blockSize) - val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos) + val cbbos = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate) + val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos) val ser = serializer.newInstance() val serOut = ser.serializeStream(out) serOut.writeObject[T](obj).close() - bos.toArrays.map(ByteBuffer.wrap) + cbbos.toChunkedByteBuffer.getChunks() } def unBlockifyObject[T: ClassTag]( diff --git a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala index a67e8da26b..0b552cabfc 100644 --- a/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala +++ b/core/src/main/scala/org/apache/spark/memory/StorageMemoryPool.scala @@ -35,6 +35,11 @@ private[memory] class StorageMemoryPool( memoryMode: MemoryMode ) extends MemoryPool(lock) with Logging { + private[this] val poolName: String = memoryMode match { + case MemoryMode.ON_HEAP => "on-heap storage" + case MemoryMode.OFF_HEAP => "off-heap storage" + } + @GuardedBy("lock") private[this] var _memoryUsed: Long = 0L @@ -60,7 +65,7 @@ private[memory] class StorageMemoryPool( /** * Acquire N bytes of memory to cache the given block, evicting existing ones if necessary. - * + * * @return whether all N bytes were successfully granted. */ def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized { @@ -83,9 +88,8 @@ private[memory] class StorageMemoryPool( assert(numBytesToAcquire >= 0) assert(numBytesToFree >= 0) assert(memoryUsed <= poolSize) - // Once we support off-heap caching, this will need to change: - if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) { - memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree) + if (numBytesToFree > 0) { + memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode) } // NOTE: If the memory store evicts blocks, then those evictions will synchronously call // back into this StorageMemoryPool in order to free memory. Therefore, these variables @@ -122,14 +126,8 @@ private[memory] class StorageMemoryPool( val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory if (remainingSpaceToFree > 0) { // If reclaiming free memory did not adequately shrink the pool, begin evicting blocks: - val spaceFreedByEviction = { - // Once we support off-heap caching, this will need to change: - if (memoryMode == MemoryMode.ON_HEAP) { - memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree) - } else { - 0 - } - } + val spaceFreedByEviction = + memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode) // When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do // not need to decrement _memoryUsed here. However, we do need to decrement the pool size. decrementPoolSize(spaceFreedByEviction) diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index d2b8ca90a9..46c64f61de 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -24,7 +24,7 @@ import scala.collection.mutable.HashMap import org.apache.spark.{Accumulator, SparkEnv, TaskContext, TaskContextImpl} import org.apache.spark.executor.TaskMetrics -import org.apache.spark.memory.TaskMemoryManager +import org.apache.spark.memory.{MemoryMode, TaskMemoryManager} import org.apache.spark.metrics.MetricsSystem import org.apache.spark.serializer.SerializerInstance import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils} @@ -90,7 +90,8 @@ private[spark] abstract class Task[T]( try { Utils.tryLogNonFatalError { // Release memory used by this thread for unrolling blocks - SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask() + SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) + SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP) // Notify any tasks waiting for execution memory to be freed to wake up and try to // acquire memory again. This makes impossible the scenario where a task sleeps forever // because there are no other tasks left to notify it. Since this is safe to do but may diff --git a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala index 27e5fa4c2b..745ef12691 100644 --- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala +++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala @@ -25,7 +25,7 @@ import scala.reflect.ClassTag import org.apache.spark.SparkConf import org.apache.spark.io.CompressionCodec import org.apache.spark.storage._ -import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer} +import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} /** * Component which configures serialization and compression for various Spark components, including @@ -128,17 +128,9 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar /** Serializes into a chunked byte buffer. */ def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = { - val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4) - dataSerializeStream(blockId, byteArrayChunkOutputStream, values) - new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)) - } - - /** - * Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of - * the iterator is reached. - */ - def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[T] = { - dataDeserializeStream[T](blockId, bytes.toInputStream(dispose = true)) + val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate) + dataSerializeStream(blockId, bbos, values) + bbos.toChunkedByteBuffer } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index 3014cafc28..9608418b43 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -18,6 +18,7 @@ package org.apache.spark.storage import java.io._ +import java.nio.ByteBuffer import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.concurrent.{Await, ExecutionContext, Future} @@ -39,6 +40,7 @@ import org.apache.spark.rpc.RpcEnv import org.apache.spark.serializer.{SerializerInstance, SerializerManager} import org.apache.spark.shuffle.ShuffleManager import org.apache.spark.storage.memory._ +import org.apache.spark.unsafe.Platform import org.apache.spark.util._ import org.apache.spark.util.io.ChunkedByteBuffer @@ -372,8 +374,12 @@ private[spark] class BlockManager( val onDisk = level.useDisk && diskStore.contains(blockId) val deserialized = if (inMem) level.deserialized else false val replication = if (inMem || onDisk) level.replication else 1 - val storageLevel = - StorageLevel(onDisk, inMem, deserialized, replication) + val storageLevel = StorageLevel( + useDisk = onDisk, + useMemory = inMem, + useOffHeap = level.useOffHeap, + deserialized = deserialized, + replication = replication) val memSize = if (inMem) memoryStore.getSize(blockId) else 0L val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L BlockStatus(storageLevel, memSize, diskSize) @@ -407,8 +413,8 @@ private[spark] class BlockManager( val iter: Iterator[Any] = if (level.deserialized) { memoryStore.getValues(blockId).get } else { - serializerManager.dataDeserialize( - blockId, memoryStore.getBytes(blockId).get)(info.classTag) + serializerManager.dataDeserializeStream( + blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag) } val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId)) Some(new BlockResult(ci, DataReadMethod.Memory, info.size)) @@ -416,11 +422,15 @@ private[spark] class BlockManager( val iterToReturn: Iterator[Any] = { val diskBytes = diskStore.getBytes(blockId) if (level.deserialized) { - val diskValues = serializerManager.dataDeserialize(blockId, diskBytes)(info.classTag) + val diskValues = serializerManager.dataDeserializeStream( + blockId, + diskBytes.toInputStream(dispose = true))(info.classTag) maybeCacheDiskValuesInMemory(info, blockId, level, diskValues) } else { - val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes) - serializerManager.dataDeserialize(blockId, bytes)(info.classTag) + val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes) + .map {_.toInputStream(dispose = false)} + .getOrElse { diskBytes.toInputStream(dispose = true) } + serializerManager.dataDeserializeStream(blockId, stream)(info.classTag) } } val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId)) @@ -481,7 +491,8 @@ private[spark] class BlockManager( if (level.useMemory && memoryStore.contains(blockId)) { memoryStore.getBytes(blockId).get } else if (level.useDisk && diskStore.contains(blockId)) { - maybeCacheDiskBytesInMemory(info, blockId, level, diskStore.getBytes(blockId)) + val diskBytes = diskStore.getBytes(blockId) + maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes) } else { releaseLock(blockId) throw new SparkException(s"Block $blockId was not found even though it's read-locked") @@ -496,8 +507,9 @@ private[spark] class BlockManager( */ private def getRemoteValues(blockId: BlockId): Option[BlockResult] = { getRemoteBytes(blockId).map { data => - new BlockResult( - serializerManager.dataDeserialize(blockId, data), DataReadMethod.Network, data.size) + val values = + serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true)) + new BlockResult(values, DataReadMethod.Network, data.size) } } @@ -745,7 +757,8 @@ private[spark] class BlockManager( // Put it in memory first, even if it also has useDisk set to true; // We will drop it to disk later if the memory store can't hold it. val putSucceeded = if (level.deserialized) { - val values = serializerManager.dataDeserialize(blockId, bytes)(classTag) + val values = + serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag) memoryStore.putIteratorAsValues(blockId, values, classTag) match { case Right(_) => true case Left(iter) => @@ -755,7 +768,7 @@ private[spark] class BlockManager( false } } else { - memoryStore.putBytes(blockId, size, () => bytes) + memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes) } if (!putSucceeded && level.useDisk) { logWarning(s"Persisting block $blockId to disk instead.") @@ -893,7 +906,7 @@ private[spark] class BlockManager( } } } else { // !level.deserialized - memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match { + memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match { case Right(s) => size = s case Left(partiallySerializedValues) => @@ -951,14 +964,16 @@ private[spark] class BlockManager( * Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up * subsequent reads. This method requires the caller to hold a read lock on the block. * - * @return a copy of the bytes. The original bytes passed this method should no longer - * be used after this method returns. + * @return a copy of the bytes from the memory store if the put succeeded, otherwise None. + * If this returns bytes from the memory store then the original disk store bytes will + * automatically be disposed and the caller should not continue to use them. Otherwise, + * if this returns None then the original disk store bytes will be unaffected. */ private def maybeCacheDiskBytesInMemory( blockInfo: BlockInfo, blockId: BlockId, level: StorageLevel, - diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = { + diskBytes: ChunkedByteBuffer): Option[ChunkedByteBuffer] = { require(!level.deserialized) if (level.useMemory) { // Synchronize on blockInfo to guard against a race condition where two readers both try to @@ -966,25 +981,29 @@ private[spark] class BlockManager( blockInfo.synchronized { if (memoryStore.contains(blockId)) { diskBytes.dispose() - memoryStore.getBytes(blockId).get + Some(memoryStore.getBytes(blockId).get) } else { - val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => { + val allocator = level.memoryMode match { + case MemoryMode.ON_HEAP => ByteBuffer.allocate _ + case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, level.memoryMode, () => { // https://issues.apache.org/jira/browse/SPARK-6076 // If the file size is bigger than the free memory, OOM will happen. So if we // cannot put it into MemoryStore, copyForMemory should not be created. That's why // this action is put into a `() => ChunkedByteBuffer` and created lazily. - diskBytes.copy() + diskBytes.copy(allocator) }) if (putSucceeded) { diskBytes.dispose() - memoryStore.getBytes(blockId).get + Some(memoryStore.getBytes(blockId).get) } else { - diskBytes + None } } } } else { - diskBytes + None } } @@ -1055,7 +1074,12 @@ private[spark] class BlockManager( val peersForReplication = new ArrayBuffer[BlockManagerId] val peersReplicatedTo = new ArrayBuffer[BlockManagerId] val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId] - val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1) + val tLevel = StorageLevel( + useDisk = level.useDisk, + useMemory = level.useMemory, + useOffHeap = level.useOffHeap, + deserialized = level.deserialized, + replication = 1) val startTime = System.currentTimeMillis val random = new Random(blockId.hashCode) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index d2a5c69e15..8fa1215011 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -453,7 +453,7 @@ private[spark] class BlockManagerInfo( } if (storageLevel.isValid) { - /* isValid means it is either stored in-memory, on-disk or on-externalBlockStore. + /* isValid means it is either stored in-memory or on-disk. * The memSize here indicates the data size in or dropped from memory, * externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore, * and the diskSize here indicates the data size in or dropped to disk. diff --git a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala index 7d23295e25..216ec07934 100644 --- a/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala +++ b/core/src/main/scala/org/apache/spark/storage/StorageLevel.scala @@ -60,10 +60,7 @@ class StorageLevel private( assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes") if (useOffHeap) { - require(!useDisk, "Off-heap storage level does not support using disk") - require(!useMemory, "Off-heap storage level does not support using heap memory") require(!deserialized, "Off-heap storage level does not support deserialized storage") - require(replication == 1, "Off-heap storage level does not support multiple replication") } private[spark] def memoryMode: MemoryMode = { @@ -86,7 +83,7 @@ class StorageLevel private( false } - def isValid: Boolean = (useMemory || useDisk || useOffHeap) && (replication > 0) + def isValid: Boolean = (useMemory || useDisk) && (replication > 0) def toInt: Int = { var ret = 0 @@ -123,7 +120,8 @@ class StorageLevel private( private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this) override def toString: String = { - s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)" + s"StorageLevel(disk=$useDisk, memory=$useMemory, offheap=$useOffHeap, " + + s"deserialized=$deserialized, replication=$replication)" } override def hashCode(): Int = toInt * 41 + replication @@ -131,8 +129,9 @@ class StorageLevel private( def description: String = { var result = "" result += (if (useDisk) "Disk " else "") - result += (if (useMemory) "Memory " else "") - result += (if (useOffHeap) "ExternalBlockStore " else "") + if (useMemory) { + result += (if (useOffHeap) "Memory (off heap) " else "Memory ") + } result += (if (deserialized) "Deserialized " else "Serialized ") result += s"${replication}x Replicated" result @@ -156,9 +155,7 @@ object StorageLevel { val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2) val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false) val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2) - - // Redirect to MEMORY_ONLY_SER for now. - val OFF_HEAP = MEMORY_ONLY_SER + val OFF_HEAP = new StorageLevel(true, true, true, false, 1) /** * :: DeveloperApi :: @@ -183,7 +180,7 @@ object StorageLevel { /** * :: DeveloperApi :: - * Create a new StorageLevel object without setting useOffHeap. + * Create a new StorageLevel object. */ @DeveloperApi def apply( @@ -198,7 +195,7 @@ object StorageLevel { /** * :: DeveloperApi :: - * Create a new StorageLevel object. + * Create a new StorageLevel object without setting useOffHeap. */ @DeveloperApi def apply( diff --git a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala index 3ca41f32c1..df38d11e43 100644 --- a/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala +++ b/core/src/main/scala/org/apache/spark/storage/memory/MemoryStore.scala @@ -32,20 +32,25 @@ import org.apache.spark.internal.Logging import org.apache.spark.memory.{MemoryManager, MemoryMode} import org.apache.spark.serializer.{SerializationStream, SerializerManager} import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel} +import org.apache.spark.unsafe.Platform import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils} import org.apache.spark.util.collection.SizeTrackingVector -import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer} +import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream} private sealed trait MemoryEntry[T] { def size: Long + def memoryMode: MemoryMode def classTag: ClassTag[T] } private case class DeserializedMemoryEntry[T]( value: Array[T], size: Long, - classTag: ClassTag[T]) extends MemoryEntry[T] + classTag: ClassTag[T]) extends MemoryEntry[T] { + val memoryMode: MemoryMode = MemoryMode.ON_HEAP +} private case class SerializedMemoryEntry[T]( buffer: ChunkedByteBuffer, + memoryMode: MemoryMode, classTag: ClassTag[T]) extends MemoryEntry[T] { def size: Long = buffer.size } @@ -86,7 +91,10 @@ private[spark] class MemoryStore( // A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes) // All accesses of this map are assumed to have manually synchronized on `memoryManager` - private val unrollMemoryMap = mutable.HashMap[Long, Long]() + private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]() + // Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching + // always stores serialized values. + private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]() // Initial memory to request before unrolling any block private val unrollMemoryThreshold: Long = @@ -131,13 +139,14 @@ private[spark] class MemoryStore( def putBytes[T: ClassTag]( blockId: BlockId, size: Long, + memoryMode: MemoryMode, _bytes: () => ChunkedByteBuffer): Boolean = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") - if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) { + if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) { // We acquired enough memory for the block, so go ahead and put it val bytes = _bytes() assert(bytes.size == size) - val entry = new SerializedMemoryEntry[T](bytes, implicitly[ClassTag[T]]) + val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]]) entries.synchronized { entries.put(blockId, entry) } @@ -190,7 +199,8 @@ private[spark] class MemoryStore( var vector = new SizeTrackingVector[T]()(classTag) // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + keepUnrolling = + reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + @@ -207,7 +217,8 @@ private[spark] class MemoryStore( val currentSize = vector.estimateSize() if (currentSize >= memoryThreshold) { val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) + keepUnrolling = + reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } @@ -228,7 +239,7 @@ private[spark] class MemoryStore( def transferUnrollToStorage(amount: Long): Unit = { // Synchronize so that transfer is atomic memoryManager.synchronized { - releaseUnrollMemoryForThisTask(amount) + releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount) val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP) assert(success, "transferring unroll memory to storage memory failed") } @@ -247,7 +258,7 @@ private[spark] class MemoryStore( // If this task attempt already owns more unroll memory than is necessary to store the // block, then release the extra memory that will not be used. val excessUnrollMemory = unrollMemoryUsedByThisBlock - size - releaseUnrollMemoryForThisTask(excessUnrollMemory) + releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory) transferUnrollToStorage(size) true } @@ -295,10 +306,16 @@ private[spark] class MemoryStore( private[storage] def putIteratorAsBytes[T]( blockId: BlockId, values: Iterator[T], - classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = { + classTag: ClassTag[T], + memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = { require(!contains(blockId), s"Block $blockId is already present in the MemoryStore") + val allocator = memoryMode match { + case MemoryMode.ON_HEAP => ByteBuffer.allocate _ + case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _ + } + // Whether there is still enough memory for us to continue unrolling this block var keepUnrolling = true // Initial per-task memory to request for unrolling blocks (bytes). @@ -307,15 +324,15 @@ private[spark] class MemoryStore( var unrollMemoryUsedByThisBlock = 0L // Underlying buffer for unrolling the block val redirectableStream = new RedirectableOutputStream - val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt) - redirectableStream.setOutputStream(byteArrayChunkOutputStream) + val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator) + redirectableStream.setOutputStream(bbos) val serializationStream: SerializationStream = { val ser = serializerManager.getSerializer(classTag).newInstance() ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream)) } // Request enough memory to begin unrolling - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold) + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode) if (!keepUnrolling) { logWarning(s"Failed to reserve initial memory threshold of " + @@ -325,9 +342,9 @@ private[spark] class MemoryStore( } def reserveAdditionalMemoryIfNecessary(): Unit = { - if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) { - val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock - keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest) + if (bbos.size > unrollMemoryUsedByThisBlock) { + val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock + keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode) if (keepUnrolling) { unrollMemoryUsedByThisBlock += amountToRequest } @@ -349,12 +366,11 @@ private[spark] class MemoryStore( } if (keepUnrolling) { - val entry = SerializedMemoryEntry[T]( - new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), classTag) + val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag) // Synchronize so that transfer is atomic memoryManager.synchronized { - releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock) - val success = memoryManager.acquireStorageMemory(blockId, entry.size, MemoryMode.ON_HEAP) + releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock) + val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode) assert(success, "transferring unroll memory to storage memory failed") } entries.synchronized { @@ -365,7 +381,7 @@ private[spark] class MemoryStore( Right(entry.size) } else { // We ran out of space while unrolling the values for this block - logUnrollFailureMessage(blockId, byteArrayChunkOutputStream.size) + logUnrollFailureMessage(blockId, bbos.size) Left( new PartiallySerializedBlock( this, @@ -374,7 +390,8 @@ private[spark] class MemoryStore( serializationStream, redirectableStream, unrollMemoryUsedByThisBlock, - new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), + memoryMode, + bbos.toChunkedByteBuffer, values, classTag)) } @@ -386,7 +403,7 @@ private[spark] class MemoryStore( case null => None case e: DeserializedMemoryEntry[_] => throw new IllegalArgumentException("should only call getBytes on serialized blocks") - case SerializedMemoryEntry(bytes, _) => Some(bytes) + case SerializedMemoryEntry(bytes, _, _) => Some(bytes) } } @@ -407,8 +424,12 @@ private[spark] class MemoryStore( entries.remove(blockId) } if (entry != null) { - memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP) - logInfo(s"Block $blockId of size ${entry.size} dropped " + + entry match { + case SerializedMemoryEntry(buffer, _, _) => buffer.dispose() + case _ => + } + memoryManager.releaseStorageMemory(entry.size, entry.memoryMode) + logDebug(s"Block $blockId of size ${entry.size} dropped " + s"from memory (free ${maxMemory - blocksMemoryUsed})") true } else { @@ -420,7 +441,8 @@ private[spark] class MemoryStore( entries.synchronized { entries.clear() } - unrollMemoryMap.clear() + onHeapUnrollMemoryMap.clear() + offHeapUnrollMemoryMap.clear() memoryManager.releaseAllStorageMemory() logInfo("MemoryStore cleared") } @@ -440,16 +462,20 @@ private[spark] class MemoryStore( * * @param blockId the ID of the block we are freeing space for, if any * @param space the size of this block + * @param memoryMode the type of memory to free (on- or off-heap) * @return the amount of memory (in bytes) freed by eviction */ - private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = { + private[spark] def evictBlocksToFreeSpace( + blockId: Option[BlockId], + space: Long, + memoryMode: MemoryMode): Long = { assert(space > 0) memoryManager.synchronized { var freedMemory = 0L val rddToAdd = blockId.flatMap(getRddId) val selectedBlocks = new ArrayBuffer[BlockId] - def blockIsEvictable(blockId: BlockId): Boolean = { - rddToAdd.isEmpty || rddToAdd != getRddId(blockId) + def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = { + entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId)) } // This is synchronized to ensure that the set of entries is not changed // (because of getValue or getBytes) while traversing the iterator, as that @@ -459,7 +485,8 @@ private[spark] class MemoryStore( while (freedMemory < space && iterator.hasNext) { val pair = iterator.next() val blockId = pair.getKey - if (blockIsEvictable(blockId)) { + val entry = pair.getValue + if (blockIsEvictable(blockId, entry)) { // We don't want to evict blocks which are currently being read, so we need to obtain // an exclusive write lock on blocks which are candidates for eviction. We perform a // non-blocking "tryLock" here in order to ignore blocks which are locked for reading: @@ -474,7 +501,7 @@ private[spark] class MemoryStore( def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = { val data = entry match { case DeserializedMemoryEntry(values, _, _) => Left(values) - case SerializedMemoryEntry(buffer, _) => Right(buffer) + case SerializedMemoryEntry(buffer, _, _) => Right(buffer) } val newEffectiveStorageLevel = blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag) @@ -530,11 +557,18 @@ private[spark] class MemoryStore( * * @return whether the request is granted. */ - def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = { + def reserveUnrollMemoryForThisTask( + blockId: BlockId, + memory: Long, + memoryMode: MemoryMode): Boolean = { memoryManager.synchronized { - val success = memoryManager.acquireUnrollMemory(blockId, memory, MemoryMode.ON_HEAP) + val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode) if (success) { val taskAttemptId = currentTaskAttemptId() + val unrollMemoryMap = memoryMode match { + case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap + case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap + } unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory } success @@ -545,9 +579,13 @@ private[spark] class MemoryStore( * Release memory used by this task for unrolling blocks. * If the amount is not specified, remove the current task's allocation altogether. */ - def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = { + def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = { val taskAttemptId = currentTaskAttemptId() memoryManager.synchronized { + val unrollMemoryMap = memoryMode match { + case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap + case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap + } if (unrollMemoryMap.contains(taskAttemptId)) { val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId)) if (memoryToRelease > 0) { @@ -555,7 +593,7 @@ private[spark] class MemoryStore( if (unrollMemoryMap(taskAttemptId) == 0) { unrollMemoryMap.remove(taskAttemptId) } - memoryManager.releaseUnrollMemory(memoryToRelease, MemoryMode.ON_HEAP) + memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode) } } } @@ -565,20 +603,23 @@ private[spark] class MemoryStore( * Return the amount of memory currently occupied for unrolling blocks across all tasks. */ def currentUnrollMemory: Long = memoryManager.synchronized { - unrollMemoryMap.values.sum + onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum } /** * Return the amount of memory currently occupied for unrolling blocks by this task. */ def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized { - unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) + onHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) + + offHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) } /** * Return the number of tasks currently unrolling blocks. */ - private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size } + private def numTasksUnrolling: Int = memoryManager.synchronized { + (onHeapUnrollMemoryMap.keys ++ offHeapUnrollMemoryMap.keys).toSet.size + } /** * Log information about current memory usage. @@ -627,7 +668,7 @@ private[storage] class PartiallyUnrolledIterator[T]( private[this] var iter: Iterator[T] = { val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, { unrolledIteratorIsConsumed = true - memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory) }) completionIterator ++ rest } @@ -640,7 +681,7 @@ private[storage] class PartiallyUnrolledIterator[T]( */ def close(): Unit = { if (!unrolledIteratorIsConsumed) { - memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory) unrolledIteratorIsConsumed = true } iter = null @@ -669,6 +710,7 @@ private class RedirectableOutputStream extends OutputStream { * @param serializationStream a serialization stream which writes to [[redirectableOutputStream]]. * @param redirectableOutputStream an OutputStream which can be redirected to a different sink. * @param unrollMemory the amount of unroll memory used by the values in `unrolled`. + * @param memoryMode whether the unroll memory is on- or off-heap * @param unrolled a byte buffer containing the partially-serialized values. * @param rest the rest of the original iterator passed to * [[MemoryStore.putIteratorAsValues()]]. @@ -681,18 +723,36 @@ private[storage] class PartiallySerializedBlock[T]( serializationStream: SerializationStream, redirectableOutputStream: RedirectableOutputStream, unrollMemory: Long, + memoryMode: MemoryMode, unrolled: ChunkedByteBuffer, rest: Iterator[T], classTag: ClassTag[T]) { + // If the task does not fully consume `valuesIterator` or otherwise fails to consume or dispose of + // this PartiallySerializedBlock then we risk leaking of direct buffers, so we use a task + // completion listener here in order to ensure that `unrolled.dispose()` is called at least once. + // The dispose() method is idempotent, so it's safe to call it unconditionally. + Option(TaskContext.get()).foreach { taskContext => + taskContext.addTaskCompletionListener { _ => + // When a task completes, its unroll memory will automatically be freed. Thus we do not call + // releaseUnrollMemoryForThisTask() here because we want to avoid double-freeing. + unrolled.dispose() + } + } + /** * Called to dispose of this block and free its memory. */ def discard(): Unit = { try { + // We want to close the output stream in order to free any resources associated with the + // serializer itself (such as Kryo's internal buffers). close() might cause data to be + // written, so redirect the output stream to discard that data. + redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream()) serializationStream.close() } finally { - memoryStore.releaseUnrollMemoryForThisTask(unrollMemory) + unrolled.dispose() + memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory) } } @@ -701,12 +761,14 @@ private[storage] class PartiallySerializedBlock[T]( * and then serializing the values from the original input iterator. */ def finishWritingToStream(os: OutputStream): Unit = { - ByteStreams.copy(unrolled.toInputStream(), os) + // `unrolled`'s underlying buffers will be freed once this input stream is fully read: + ByteStreams.copy(unrolled.toInputStream(dispose = true), os) + memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory) redirectableOutputStream.setOutputStream(os) while (rest.hasNext) { serializationStream.writeObject(rest.next())(classTag) } - discard() + serializationStream.close() } /** @@ -717,10 +779,13 @@ private[storage] class PartiallySerializedBlock[T]( * `close()` on it to free its resources. */ def valuesIterator: PartiallyUnrolledIterator[T] = { + // `unrolled`'s underlying buffers will be freed once this input stream is fully read: + val unrolledIter = serializerManager.dataDeserializeStream( + blockId, unrolled.toInputStream(dispose = true))(classTag) new PartiallyUnrolledIterator( memoryStore, unrollMemory, - unrolled = serializerManager.dataDeserialize(blockId, unrolled)(classTag), + unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()), rest = rest) } } diff --git a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala index c643c4b63c..fb4706e78d 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBuffer.scala @@ -41,6 +41,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { require(chunks.forall(_.limit() > 0), "chunks must be non-empty") require(chunks.forall(_.position() == 0), "chunks' positions must be 0") + private[this] var disposed: Boolean = false + /** * This size of this buffer, in bytes. */ @@ -117,11 +119,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { /** * Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers. * The new buffer will share no resources with the original buffer. + * + * @param allocator a method for allocating byte buffers */ - def copy(): ChunkedByteBuffer = { + def copy(allocator: Int => ByteBuffer): ChunkedByteBuffer = { val copiedChunks = getChunks().map { chunk => - // TODO: accept an allocator in this copy method to integrate with mem. accounting systems - val newChunk = ByteBuffer.allocate(chunk.limit()) + val newChunk = allocator(chunk.limit()) newChunk.put(chunk) newChunk.flip() newChunk @@ -136,7 +139,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) { * unfortunately no standard API to do this. */ def dispose(): Unit = { - chunks.foreach(StorageUtils.dispose) + if (!disposed) { + chunks.foreach(StorageUtils.dispose) + disposed = true + } } } diff --git a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala similarity index 69% rename from core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala rename to core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala index 16fe3be303..67b50d1e70 100644 --- a/core/src/main/scala/org/apache/spark/util/io/ByteArrayChunkOutputStream.scala +++ b/core/src/main/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStream.scala @@ -18,19 +18,25 @@ package org.apache.spark.util.io import java.io.OutputStream +import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer +import org.apache.spark.storage.StorageUtils /** * An OutputStream that writes to fixed-size chunks of byte arrays. * * @param chunkSize size of each chunk, in bytes. */ -private[spark] -class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream { +private[spark] class ChunkedByteBufferOutputStream( + chunkSize: Int, + allocator: Int => ByteBuffer) + extends OutputStream { - private[this] val chunks = new ArrayBuffer[Array[Byte]] + private[this] var toChunkedByteBufferWasCalled = false + + private val chunks = new ArrayBuffer[ByteBuffer] /** Index of the last chunk. Starting with -1 when the chunks array is empty. */ private[this] var lastChunkIndex = -1 @@ -48,7 +54,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream { override def write(b: Int): Unit = { allocateNewChunkIfNeeded() - chunks(lastChunkIndex)(position) = b.toByte + chunks(lastChunkIndex).put(b.toByte) position += 1 _size += 1 } @@ -58,7 +64,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream { while (written < len) { allocateNewChunkIfNeeded() val thisBatch = math.min(chunkSize - position, len - written) - System.arraycopy(bytes, written + off, chunks(lastChunkIndex), position, thisBatch) + chunks(lastChunkIndex).put(bytes, written + off, thisBatch) written += thisBatch position += thisBatch } @@ -67,33 +73,41 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream { @inline private def allocateNewChunkIfNeeded(): Unit = { + require(!toChunkedByteBufferWasCalled, "cannot write after toChunkedByteBuffer() is called") if (position == chunkSize) { - chunks += new Array[Byte](chunkSize) + chunks += allocator(chunkSize) lastChunkIndex += 1 position = 0 } } - def toArrays: Array[Array[Byte]] = { + def toChunkedByteBuffer: ChunkedByteBuffer = { + require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only be called once") + toChunkedByteBufferWasCalled = true if (lastChunkIndex == -1) { - new Array[Array[Byte]](0) + new ChunkedByteBuffer(Array.empty[ByteBuffer]) } else { // Copy the first n-1 chunks to the output, and then create an array that fits the last chunk. // An alternative would have been returning an array of ByteBuffers, with the last buffer // bounded to only the last chunk's position. However, given our use case in Spark (to put // the chunks in block manager), only limiting the view bound of the buffer would still // require the block manager to store the whole chunk. - val ret = new Array[Array[Byte]](chunks.size) + val ret = new Array[ByteBuffer](chunks.size) for (i <- 0 until chunks.size - 1) { ret(i) = chunks(i) + ret(i).flip() } if (position == chunkSize) { ret(lastChunkIndex) = chunks(lastChunkIndex) + ret(lastChunkIndex).flip() } else { - ret(lastChunkIndex) = new Array[Byte](position) - System.arraycopy(chunks(lastChunkIndex), 0, ret(lastChunkIndex), 0, position) + ret(lastChunkIndex) = allocator(position) + chunks(lastChunkIndex).flip() + ret(lastChunkIndex).put(chunks(lastChunkIndex)) + ret(lastChunkIndex).flip() + StorageUtils.dispose(chunks(lastChunkIndex)) } - ret + new ChunkedByteBuffer(ret) } } } diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index 3dded4d486..67d722c1dc 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -198,8 +198,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex blockManager.master.getLocations(blockId).foreach { cmId => val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId, blockId.toString) - val deserialized = serializerManager.dataDeserialize[Int](blockId, - new ChunkedByteBuffer(bytes.nioByteBuffer())).toList + val deserialized = serializerManager.dataDeserializeStream[Int](blockId, + new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream()).toList assert(deserialized === (1 to 100).toList) } } diff --git a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala index aab70e7431..f205d4f0d6 100644 --- a/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala +++ b/core/src/test/scala/org/apache/spark/io/ChunkedByteBufferSuite.scala @@ -52,7 +52,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite { test("copy() does not affect original buffer's position") { val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8))) - chunkedByteBuffer.copy() + chunkedByteBuffer.copy(ByteBuffer.allocate) assert(chunkedByteBuffer.getChunks().head.position() === 0) } diff --git a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala index aaca653c58..3d1a0e9795 100644 --- a/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/memory/MemoryManagerSuite.scala @@ -71,7 +71,8 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft */ protected def makeMemoryStore(mm: MemoryManager): MemoryStore = { val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS) - when(ms.evictBlocksToFreeSpace(any(), anyLong())).thenAnswer(evictBlocksToFreeSpaceAnswer(mm)) + when(ms.evictBlocksToFreeSpace(any(), anyLong(), any())) + .thenAnswer(evictBlocksToFreeSpaceAnswer(mm)) mm.setMemoryStore(ms) ms } diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala index 98e8450fa1..2ec5319d55 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala @@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, Matchers} import org.scalatest.concurrent.Eventually._ import org.apache.spark._ -import org.apache.spark.memory.StaticMemoryManager +import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.BlockTransferService import org.apache.spark.network.netty.NettyBlockTransferService import org.apache.spark.rpc.RpcEnv @@ -60,8 +60,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo private def makeBlockManager( maxMem: Long, name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = { + conf.set("spark.testing.memory", maxMem.toString) + conf.set("spark.memory.offHeap.size", maxMem.toString) val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) + val memManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(serializer, conf) val store = new BlockManager(name, rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) @@ -76,6 +78,9 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo conf.set("spark.authenticate", "false") conf.set("spark.driver.port", rpcEnv.address.port.toString) + conf.set("spark.testing", "true") + conf.set("spark.memory.fraction", "1") + conf.set("spark.memory.storageFraction", "1") conf.set("spark.storage.unrollFraction", "0.4") conf.set("spark.storage.unrollMemoryThreshold", "512") @@ -172,6 +177,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo testReplication(5, storageLevels) } + test("block replication - off-heap") { + testReplication(2, Seq(OFF_HEAP, StorageLevel(true, true, true, false, 2))) + } + test("block replication - 2x replication without peers") { intercept[org.scalatest.exceptions.TestFailedException] { testReplication(1, @@ -262,7 +271,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work when(failableTransfer.hostName).thenReturn("some-hostname") when(failableTransfer.port).thenReturn(1000) - val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000, numCores = 1) + conf.set("spark.testing.memory", "10000") + val memManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(serializer, conf) val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0) @@ -392,10 +402,14 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo // If the block is supposed to be in memory, then drop the copy of the block in // this store test whether master is updated with zero memory usage this store if (storageLevel.useMemory) { + val sl = if (storageLevel.useOffHeap) { + StorageLevel(false, true, true, false, 1) + } else { + MEMORY_ONLY_SER + } // Force the block to be dropped by adding a number of dummy blocks (1 to 10).foreach { - i => - testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER) + i => testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), sl) } (1 to 10).foreach { i => testStore.removeBlock(s"dummy-block-$i") diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala index 9f3a775654..32c00ac687 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala @@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._ import org.apache.spark._ import org.apache.spark.executor.DataReadMethod -import org.apache.spark.memory.{MemoryMode, StaticMemoryManager} +import org.apache.spark.memory.UnifiedMemoryManager import org.apache.spark.network.{BlockDataManager, BlockTransferService} import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer} import org.apache.spark.network.netty.NettyBlockTransferService @@ -74,10 +74,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE name: String = SparkContext.DRIVER_IDENTIFIER, master: BlockManagerMaster = this.master, transferService: Option[BlockTransferService] = Option.empty): BlockManager = { + conf.set("spark.testing.memory", maxMem.toString) + conf.set("spark.memory.offHeap.size", maxMem.toString) val serializer = new KryoSerializer(conf) val transfer = transferService .getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1)) - val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1) + val memManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(serializer, conf) val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf, memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0) @@ -92,6 +94,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE System.setProperty("os.arch", "amd64") conf = new SparkConf(false) .set("spark.app.id", "test") + .set("spark.testing", "true") + .set("spark.memory.fraction", "1") + .set("spark.memory.storageFraction", "1") .set("spark.kryoserializer.buffer", "1m") .set("spark.test.useCompressedOops", "true") .set("spark.storage.unrollFraction", "0.4") @@ -518,6 +523,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY_SER) } + test("in-memory LRU storage with off-heap") { + testInMemoryLRUStorage(StorageLevel( + useDisk = false, + useMemory = true, + useOffHeap = true, + deserialized = false, replication = 1)) + } + private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = { store = makeBlockManager(12000) val a1 = new Array[Byte](4000) @@ -608,6 +621,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true) } + test("disk and off-heap memory storage") { + testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = false) + } + + test("disk and off-heap memory storage with getLocalBytes") { + testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = true) + } + def testDiskAndMemoryStorage( storageLevel: StorageLevel, getAsBytes: Boolean): Unit = { @@ -817,12 +838,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE test("block store put failure") { // Use Java serializer so we can create an unserializable error. + conf.set("spark.testing.memory", "1200") val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1) - val memoryManager = new StaticMemoryManager( - conf, - maxOnHeapExecutionMemory = Long.MaxValue, - maxOnHeapStorageMemory = 1200, - numCores = 1) + val memoryManager = UnifiedMemoryManager(conf, numCores = 1) val serializerManager = new SerializerManager(new JavaSerializer(conf), conf) store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master, serializerManager, conf, memoryManager, mapOutputTracker, diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index 43e832dc02..145d432afe 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -27,7 +27,7 @@ import scala.reflect.ClassTag import org.scalatest._ import org.apache.spark._ -import org.apache.spark.memory.StaticMemoryManager +import org.apache.spark.memory.{MemoryMode, StaticMemoryManager} import org.apache.spark.serializer.{KryoSerializer, SerializerManager} import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator} import org.apache.spark.util._ @@ -86,7 +86,7 @@ class MemoryStoreSuite assert(memoryStore.currentUnrollMemoryForThisTask === 0) def reserveUnrollMemoryForThisTask(memory: Long): Boolean = { - memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory) + memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory, MemoryMode.ON_HEAP) } // Reserve @@ -99,9 +99,9 @@ class MemoryStoreSuite assert(!reserveUnrollMemoryForThisTask(1000000)) assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted // Release - memoryStore.releaseUnrollMemoryForThisTask(100) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 100) assert(memoryStore.currentUnrollMemoryForThisTask === 700) - memoryStore.releaseUnrollMemoryForThisTask(100) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 100) assert(memoryStore.currentUnrollMemoryForThisTask === 600) // Reserve again assert(reserveUnrollMemoryForThisTask(4400)) @@ -109,9 +109,9 @@ class MemoryStoreSuite assert(!reserveUnrollMemoryForThisTask(20000)) assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted // Release again - memoryStore.releaseUnrollMemoryForThisTask(1000) + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 1000) assert(memoryStore.currentUnrollMemoryForThisTask === 4000) - memoryStore.releaseUnrollMemoryForThisTask() // release all + memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) // release all assert(memoryStore.currentUnrollMemoryForThisTask === 0) } @@ -254,7 +254,7 @@ class MemoryStoreSuite assert(blockInfoManager.lockNewBlockForWriting( blockId, new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = false))) - val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag) + val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag, MemoryMode.ON_HEAP) blockInfoManager.unlock(blockId) res } @@ -312,7 +312,7 @@ class MemoryStoreSuite assert(blockInfoManager.lockNewBlockForWriting( "b1", new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false))) - val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any) + val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any, MemoryMode.ON_HEAP) blockInfoManager.unlock("b1") assert(res.isLeft) assert(memoryStore.currentUnrollMemoryForThisTask > 0) @@ -333,7 +333,7 @@ class MemoryStoreSuite assert(blockInfoManager.lockNewBlockForWriting( "b1", new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false))) - val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any) + val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any, MemoryMode.ON_HEAP) blockInfoManager.unlock("b1") assert(res.isLeft) assert(memoryStore.currentUnrollMemoryForThisTask > 0) @@ -395,7 +395,7 @@ class MemoryStoreSuite val blockId = BlockId("rdd_3_10") blockInfoManager.lockNewBlockForWriting( blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false)) - memoryStore.putBytes(blockId, 13000, () => { + memoryStore.putBytes(blockId, 13000, MemoryMode.ON_HEAP, () => { fail("A big ByteBuffer that cannot be put into MemoryStore should not be created") }) } @@ -404,7 +404,7 @@ class MemoryStoreSuite val (memoryStore, _) = makeMemoryStore(12000) val blockId = BlockId("rdd_3_10") var bytes: ChunkedByteBuffer = null - memoryStore.putBytes(blockId, 10000, () => { + memoryStore.putBytes(blockId, 10000, MemoryMode.ON_HEAP, () => { bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000)) bytes }) diff --git a/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala b/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala similarity index 62% rename from core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala rename to core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala index 361ec95654..226622075a 100644 --- a/core/src/test/scala/org/apache/spark/util/io/ByteArrayChunkOutputStreamSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/io/ChunkedByteBufferOutputStreamSuite.scala @@ -17,48 +17,53 @@ package org.apache.spark.util.io +import java.nio.ByteBuffer + import scala.util.Random import org.apache.spark.SparkFunSuite -class ByteArrayChunkOutputStreamSuite extends SparkFunSuite { +class ChunkedByteBufferOutputStreamSuite extends SparkFunSuite { test("empty output") { - val o = new ByteArrayChunkOutputStream(1024) - assert(o.toArrays.length === 0) + val o = new ChunkedByteBufferOutputStream(1024, ByteBuffer.allocate) + assert(o.toChunkedByteBuffer.size === 0) } test("write a single byte") { - val o = new ByteArrayChunkOutputStream(1024) + val o = new ChunkedByteBufferOutputStream(1024, ByteBuffer.allocate) o.write(10) - assert(o.toArrays.length === 1) - assert(o.toArrays.head.toSeq === Seq(10.toByte)) + val chunkedByteBuffer = o.toChunkedByteBuffer + assert(chunkedByteBuffer.getChunks().length === 1) + assert(chunkedByteBuffer.getChunks().head.array().toSeq === Seq(10.toByte)) } test("write a single near boundary") { - val o = new ByteArrayChunkOutputStream(10) + val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate) o.write(new Array[Byte](9)) o.write(99) - assert(o.toArrays.length === 1) - assert(o.toArrays.head(9) === 99.toByte) + val chunkedByteBuffer = o.toChunkedByteBuffer + assert(chunkedByteBuffer.getChunks().length === 1) + assert(chunkedByteBuffer.getChunks().head.array()(9) === 99.toByte) } test("write a single at boundary") { - val o = new ByteArrayChunkOutputStream(10) + val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate) o.write(new Array[Byte](10)) o.write(99) - assert(o.toArrays.length === 2) - assert(o.toArrays(1).length === 1) - assert(o.toArrays(1)(0) === 99.toByte) + val arrays = o.toChunkedByteBuffer.getChunks().map(_.array()) + assert(arrays.length === 2) + assert(arrays(1).length === 1) + assert(arrays(1)(0) === 99.toByte) } test("single chunk output") { val ref = new Array[Byte](8) Random.nextBytes(ref) - val o = new ByteArrayChunkOutputStream(10) + val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate) o.write(ref) - val arrays = o.toArrays + val arrays = o.toChunkedByteBuffer.getChunks().map(_.array()) assert(arrays.length === 1) assert(arrays.head.length === ref.length) assert(arrays.head.toSeq === ref.toSeq) @@ -67,9 +72,9 @@ class ByteArrayChunkOutputStreamSuite extends SparkFunSuite { test("single chunk output at boundary size") { val ref = new Array[Byte](10) Random.nextBytes(ref) - val o = new ByteArrayChunkOutputStream(10) + val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate) o.write(ref) - val arrays = o.toArrays + val arrays = o.toChunkedByteBuffer.getChunks().map(_.array()) assert(arrays.length === 1) assert(arrays.head.length === ref.length) assert(arrays.head.toSeq === ref.toSeq) @@ -78,9 +83,9 @@ class ByteArrayChunkOutputStreamSuite extends SparkFunSuite { test("multiple chunk output") { val ref = new Array[Byte](26) Random.nextBytes(ref) - val o = new ByteArrayChunkOutputStream(10) + val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate) o.write(ref) - val arrays = o.toArrays + val arrays = o.toChunkedByteBuffer.getChunks().map(_.array()) assert(arrays.length === 3) assert(arrays(0).length === 10) assert(arrays(1).length === 10) @@ -94,9 +99,9 @@ class ByteArrayChunkOutputStreamSuite extends SparkFunSuite { test("multiple chunk output at boundary size") { val ref = new Array[Byte](30) Random.nextBytes(ref) - val o = new ByteArrayChunkOutputStream(10) + val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate) o.write(ref) - val arrays = o.toArrays + val arrays = o.toChunkedByteBuffer.getChunks().map(_.array()) assert(arrays.length === 3) assert(arrays(0).length === 10) assert(arrays(1).length === 10) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala index c56520b1e2..53fccd8d5e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/rdd/WriteAheadLogBackedBlockRDD.scala @@ -162,7 +162,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag]( logDebug(s"Stored partition data of $this into block manager with level $storageLevel") dataRead.rewind() } - serializerManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead)) + serializerManager + .dataDeserializeStream(blockId, new ChunkedByteBuffer(dataRead).toInputStream()) .asInstanceOf[Iterator[T]] } diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 4e77cd6347..5fc53bcb91 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -157,7 +157,8 @@ class ReceivedBlockHandlerSuite val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf) val bytes = reader.read(fileSegment) reader.close() - serializerManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList + serializerManager.dataDeserializeStream( + generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream()).toList } loggedData shouldEqual data }