[SPARK-13992] Add support for off-heap caching
This patch adds support for caching blocks in the executor processes using direct / off-heap memory. ## User-facing changes **Updated semantics of `OFF_HEAP` storage level**: In Spark 1.x, the `OFF_HEAP` storage level indicated that an RDD should be cached in Tachyon. Spark 2.x removed the external block store API that Tachyon caching was based on (see #10752 / SPARK-12667), so `OFF_HEAP` became an alias for `MEMORY_ONLY_SER`. As of this patch, `OFF_HEAP` means "serialized and cached in off-heap memory or on disk". Via the `StorageLevel` constructor, `useOffHeap` can be set if `serialized == true` and can be used to construct custom storage levels which support replication. **Storage UI reporting**: the storage UI will now report whether in-memory blocks are stored on- or off-heap. **Only supported by UnifiedMemoryManager**: for simplicity, this feature is only supported when the default UnifiedMemoryManager is used; applications which use the legacy memory manager (`spark.memory.useLegacyMode=true`) are not currently able to allocate off-heap storage memory, so using off-heap caching will fail with an error when legacy memory management is enabled. Given that we plan to eventually remove the legacy memory manager, this is not a significant restriction. **Memory management policies:** the policies for dividing available memory between execution and storage are the same for both on- and off-heap memory. For off-heap memory, the total amount of memory available for use by Spark is controlled by `spark.memory.offHeap.size`, which is an absolute size. Off-heap storage memory obeys `spark.memory.storageFraction` in order to control the amount of unevictable storage memory. For example, if `spark.memory.offHeap.size` is 1 gigabyte and Spark uses the default `storageFraction` of 0.5, then up to 500 megabytes of off-heap cached blocks will be protected from eviction due to execution memory pressure. If necessary, we can split `spark.memory.storageFraction` into separate on- and off-heap configurations, but this doesn't seem necessary now and can be done later without any breaking changes. **Use of off-heap memory does not imply use of off-heap execution (or vice-versa)**: for now, the settings controlling the use of off-heap execution memory (`spark.memory.offHeap.enabled`) and off-heap caching are completely independent, so Spark SQL can be configured to use off-heap memory for execution while continuing to cache blocks on-heap. If desired, we can change this in a followup patch so that `spark.memory.offHeap.enabled` affect the default storage level for cached SQL tables. ## Internal changes - Rename `ByteArrayChunkOutputStream` to `ChunkedByteBufferOutputStream` - It now returns a `ChunkedByteBuffer` instead of an array of byte arrays. - Its constructor now accept an `allocator` function which is called to allocate `ByteBuffer`s. This allows us to control whether it allocates regular ByteBuffers or off-heap DirectByteBuffers. - Because block serialization is now performed during the unroll process, a `ChunkedByteBufferOutputStream` which is configured with a `DirectByteBuffer` allocator will use off-heap memory for both unroll and storage memory. - The `MemoryStore`'s MemoryEntries now tracks whether blocks are stored on- or off-heap. - `evictBlocksToFreeSpace()` now accepts a `MemoryMode` parameter so that we don't try to evict off-heap blocks in response to on-heap memory pressure (or vice-versa). - Make sure that off-heap buffers are properly de-allocated during MemoryStore eviction. - The JVM limits the total size of allocated direct byte buffers using the `-XX:MaxDirectMemorySize` flag and the default tends to be fairly low (< 512 megabytes in some JVMs). To work around this limitation, this patch adds a custom DirectByteBuffer allocator which ignores this memory limit. Author: Josh Rosen <joshrosen@databricks.com> Closes #11805 from JoshRosen/off-heap-caching.
This commit is contained in:
parent
bd7b91cefb
commit
e41acb7573
|
@ -17,9 +17,12 @@
|
|||
|
||||
package org.apache.spark.unsafe;
|
||||
|
||||
import java.lang.reflect.Constructor;
|
||||
import java.lang.reflect.Field;
|
||||
import java.lang.reflect.Method;
|
||||
import java.nio.ByteBuffer;
|
||||
|
||||
import sun.misc.Cleaner;
|
||||
import sun.misc.Unsafe;
|
||||
|
||||
public final class Platform {
|
||||
|
@ -144,6 +147,35 @@ public final class Platform {
|
|||
return newMemory;
|
||||
}
|
||||
|
||||
/**
|
||||
* Uses internal JDK APIs to allocate a DirectByteBuffer while ignoring the JVM's
|
||||
* MaxDirectMemorySize limit (the default limit is too low and we do not want to require users
|
||||
* to increase it).
|
||||
*/
|
||||
@SuppressWarnings("unchecked")
|
||||
public static ByteBuffer allocateDirectBuffer(int size) {
|
||||
try {
|
||||
Class cls = Class.forName("java.nio.DirectByteBuffer");
|
||||
Constructor constructor = cls.getDeclaredConstructor(Long.TYPE, Integer.TYPE);
|
||||
constructor.setAccessible(true);
|
||||
Field cleanerField = cls.getDeclaredField("cleaner");
|
||||
cleanerField.setAccessible(true);
|
||||
final long memory = allocateMemory(size);
|
||||
ByteBuffer buffer = (ByteBuffer) constructor.newInstance(memory, size);
|
||||
Cleaner cleaner = Cleaner.create(buffer, new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
freeMemory(memory);
|
||||
}
|
||||
});
|
||||
cleanerField.set(buffer, cleaner);
|
||||
return buffer;
|
||||
} catch (Exception e) {
|
||||
throwException(e);
|
||||
}
|
||||
throw new IllegalStateException("unreachable");
|
||||
}
|
||||
|
||||
public static void setMemory(long address, byte value, long size) {
|
||||
_UNSAFE.setMemory(address, size, value);
|
||||
}
|
||||
|
|
|
@ -30,7 +30,7 @@ import org.apache.spark.io.CompressionCodec
|
|||
import org.apache.spark.serializer.Serializer
|
||||
import org.apache.spark.storage.{BlockId, BroadcastBlockId, StorageLevel}
|
||||
import org.apache.spark.util.{ByteBufferInputStream, Utils}
|
||||
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
|
||||
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
|
||||
|
||||
/**
|
||||
* A BitTorrent-like implementation of [[org.apache.spark.broadcast.Broadcast]].
|
||||
|
@ -228,12 +228,12 @@ private object TorrentBroadcast extends Logging {
|
|||
blockSize: Int,
|
||||
serializer: Serializer,
|
||||
compressionCodec: Option[CompressionCodec]): Array[ByteBuffer] = {
|
||||
val bos = new ByteArrayChunkOutputStream(blockSize)
|
||||
val out: OutputStream = compressionCodec.map(c => c.compressedOutputStream(bos)).getOrElse(bos)
|
||||
val cbbos = new ChunkedByteBufferOutputStream(blockSize, ByteBuffer.allocate)
|
||||
val out = compressionCodec.map(c => c.compressedOutputStream(cbbos)).getOrElse(cbbos)
|
||||
val ser = serializer.newInstance()
|
||||
val serOut = ser.serializeStream(out)
|
||||
serOut.writeObject[T](obj).close()
|
||||
bos.toArrays.map(ByteBuffer.wrap)
|
||||
cbbos.toChunkedByteBuffer.getChunks()
|
||||
}
|
||||
|
||||
def unBlockifyObject[T: ClassTag](
|
||||
|
|
|
@ -35,6 +35,11 @@ private[memory] class StorageMemoryPool(
|
|||
memoryMode: MemoryMode
|
||||
) extends MemoryPool(lock) with Logging {
|
||||
|
||||
private[this] val poolName: String = memoryMode match {
|
||||
case MemoryMode.ON_HEAP => "on-heap storage"
|
||||
case MemoryMode.OFF_HEAP => "off-heap storage"
|
||||
}
|
||||
|
||||
@GuardedBy("lock")
|
||||
private[this] var _memoryUsed: Long = 0L
|
||||
|
||||
|
@ -60,7 +65,7 @@ private[memory] class StorageMemoryPool(
|
|||
|
||||
/**
|
||||
* Acquire N bytes of memory to cache the given block, evicting existing ones if necessary.
|
||||
*
|
||||
*
|
||||
* @return whether all N bytes were successfully granted.
|
||||
*/
|
||||
def acquireMemory(blockId: BlockId, numBytes: Long): Boolean = lock.synchronized {
|
||||
|
@ -83,9 +88,8 @@ private[memory] class StorageMemoryPool(
|
|||
assert(numBytesToAcquire >= 0)
|
||||
assert(numBytesToFree >= 0)
|
||||
assert(memoryUsed <= poolSize)
|
||||
// Once we support off-heap caching, this will need to change:
|
||||
if (numBytesToFree > 0 && memoryMode == MemoryMode.ON_HEAP) {
|
||||
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree)
|
||||
if (numBytesToFree > 0) {
|
||||
memoryStore.evictBlocksToFreeSpace(Some(blockId), numBytesToFree, memoryMode)
|
||||
}
|
||||
// NOTE: If the memory store evicts blocks, then those evictions will synchronously call
|
||||
// back into this StorageMemoryPool in order to free memory. Therefore, these variables
|
||||
|
@ -122,14 +126,8 @@ private[memory] class StorageMemoryPool(
|
|||
val remainingSpaceToFree = spaceToFree - spaceFreedByReleasingUnusedMemory
|
||||
if (remainingSpaceToFree > 0) {
|
||||
// If reclaiming free memory did not adequately shrink the pool, begin evicting blocks:
|
||||
val spaceFreedByEviction = {
|
||||
// Once we support off-heap caching, this will need to change:
|
||||
if (memoryMode == MemoryMode.ON_HEAP) {
|
||||
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree)
|
||||
} else {
|
||||
0
|
||||
}
|
||||
}
|
||||
val spaceFreedByEviction =
|
||||
memoryStore.evictBlocksToFreeSpace(None, remainingSpaceToFree, memoryMode)
|
||||
// When a block is released, BlockManager.dropFromMemory() calls releaseMemory(), so we do
|
||||
// not need to decrement _memoryUsed here. However, we do need to decrement the pool size.
|
||||
decrementPoolSize(spaceFreedByEviction)
|
||||
|
|
|
@ -24,7 +24,7 @@ import scala.collection.mutable.HashMap
|
|||
|
||||
import org.apache.spark.{Accumulator, SparkEnv, TaskContext, TaskContextImpl}
|
||||
import org.apache.spark.executor.TaskMetrics
|
||||
import org.apache.spark.memory.TaskMemoryManager
|
||||
import org.apache.spark.memory.{MemoryMode, TaskMemoryManager}
|
||||
import org.apache.spark.metrics.MetricsSystem
|
||||
import org.apache.spark.serializer.SerializerInstance
|
||||
import org.apache.spark.util.{ByteBufferInputStream, ByteBufferOutputStream, Utils}
|
||||
|
@ -90,7 +90,8 @@ private[spark] abstract class Task[T](
|
|||
try {
|
||||
Utils.tryLogNonFatalError {
|
||||
// Release memory used by this thread for unrolling blocks
|
||||
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask()
|
||||
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP)
|
||||
SparkEnv.get.blockManager.memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.OFF_HEAP)
|
||||
// Notify any tasks waiting for execution memory to be freed to wake up and try to
|
||||
// acquire memory again. This makes impossible the scenario where a task sleeps forever
|
||||
// because there are no other tasks left to notify it. Since this is safe to do but may
|
||||
|
|
|
@ -25,7 +25,7 @@ import scala.reflect.ClassTag
|
|||
import org.apache.spark.SparkConf
|
||||
import org.apache.spark.io.CompressionCodec
|
||||
import org.apache.spark.storage._
|
||||
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
|
||||
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
|
||||
|
||||
/**
|
||||
* Component which configures serialization and compression for various Spark components, including
|
||||
|
@ -128,17 +128,9 @@ private[spark] class SerializerManager(defaultSerializer: Serializer, conf: Spar
|
|||
|
||||
/** Serializes into a chunked byte buffer. */
|
||||
def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): ChunkedByteBuffer = {
|
||||
val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(1024 * 1024 * 4)
|
||||
dataSerializeStream(blockId, byteArrayChunkOutputStream, values)
|
||||
new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap))
|
||||
}
|
||||
|
||||
/**
|
||||
* Deserializes a ByteBuffer into an iterator of values and disposes of it when the end of
|
||||
* the iterator is reached.
|
||||
*/
|
||||
def dataDeserialize[T: ClassTag](blockId: BlockId, bytes: ChunkedByteBuffer): Iterator[T] = {
|
||||
dataDeserializeStream[T](blockId, bytes.toInputStream(dispose = true))
|
||||
val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, ByteBuffer.allocate)
|
||||
dataSerializeStream(blockId, bbos, values)
|
||||
bbos.toChunkedByteBuffer
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.spark.storage
|
||||
|
||||
import java.io._
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import scala.collection.mutable.{ArrayBuffer, HashMap}
|
||||
import scala.concurrent.{Await, ExecutionContext, Future}
|
||||
|
@ -39,6 +40,7 @@ import org.apache.spark.rpc.RpcEnv
|
|||
import org.apache.spark.serializer.{SerializerInstance, SerializerManager}
|
||||
import org.apache.spark.shuffle.ShuffleManager
|
||||
import org.apache.spark.storage.memory._
|
||||
import org.apache.spark.unsafe.Platform
|
||||
import org.apache.spark.util._
|
||||
import org.apache.spark.util.io.ChunkedByteBuffer
|
||||
|
||||
|
@ -372,8 +374,12 @@ private[spark] class BlockManager(
|
|||
val onDisk = level.useDisk && diskStore.contains(blockId)
|
||||
val deserialized = if (inMem) level.deserialized else false
|
||||
val replication = if (inMem || onDisk) level.replication else 1
|
||||
val storageLevel =
|
||||
StorageLevel(onDisk, inMem, deserialized, replication)
|
||||
val storageLevel = StorageLevel(
|
||||
useDisk = onDisk,
|
||||
useMemory = inMem,
|
||||
useOffHeap = level.useOffHeap,
|
||||
deserialized = deserialized,
|
||||
replication = replication)
|
||||
val memSize = if (inMem) memoryStore.getSize(blockId) else 0L
|
||||
val diskSize = if (onDisk) diskStore.getSize(blockId) else 0L
|
||||
BlockStatus(storageLevel, memSize, diskSize)
|
||||
|
@ -407,8 +413,8 @@ private[spark] class BlockManager(
|
|||
val iter: Iterator[Any] = if (level.deserialized) {
|
||||
memoryStore.getValues(blockId).get
|
||||
} else {
|
||||
serializerManager.dataDeserialize(
|
||||
blockId, memoryStore.getBytes(blockId).get)(info.classTag)
|
||||
serializerManager.dataDeserializeStream(
|
||||
blockId, memoryStore.getBytes(blockId).get.toInputStream())(info.classTag)
|
||||
}
|
||||
val ci = CompletionIterator[Any, Iterator[Any]](iter, releaseLock(blockId))
|
||||
Some(new BlockResult(ci, DataReadMethod.Memory, info.size))
|
||||
|
@ -416,11 +422,15 @@ private[spark] class BlockManager(
|
|||
val iterToReturn: Iterator[Any] = {
|
||||
val diskBytes = diskStore.getBytes(blockId)
|
||||
if (level.deserialized) {
|
||||
val diskValues = serializerManager.dataDeserialize(blockId, diskBytes)(info.classTag)
|
||||
val diskValues = serializerManager.dataDeserializeStream(
|
||||
blockId,
|
||||
diskBytes.toInputStream(dispose = true))(info.classTag)
|
||||
maybeCacheDiskValuesInMemory(info, blockId, level, diskValues)
|
||||
} else {
|
||||
val bytes = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
|
||||
serializerManager.dataDeserialize(blockId, bytes)(info.classTag)
|
||||
val stream = maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes)
|
||||
.map {_.toInputStream(dispose = false)}
|
||||
.getOrElse { diskBytes.toInputStream(dispose = true) }
|
||||
serializerManager.dataDeserializeStream(blockId, stream)(info.classTag)
|
||||
}
|
||||
}
|
||||
val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, releaseLock(blockId))
|
||||
|
@ -481,7 +491,8 @@ private[spark] class BlockManager(
|
|||
if (level.useMemory && memoryStore.contains(blockId)) {
|
||||
memoryStore.getBytes(blockId).get
|
||||
} else if (level.useDisk && diskStore.contains(blockId)) {
|
||||
maybeCacheDiskBytesInMemory(info, blockId, level, diskStore.getBytes(blockId))
|
||||
val diskBytes = diskStore.getBytes(blockId)
|
||||
maybeCacheDiskBytesInMemory(info, blockId, level, diskBytes).getOrElse(diskBytes)
|
||||
} else {
|
||||
releaseLock(blockId)
|
||||
throw new SparkException(s"Block $blockId was not found even though it's read-locked")
|
||||
|
@ -496,8 +507,9 @@ private[spark] class BlockManager(
|
|||
*/
|
||||
private def getRemoteValues(blockId: BlockId): Option[BlockResult] = {
|
||||
getRemoteBytes(blockId).map { data =>
|
||||
new BlockResult(
|
||||
serializerManager.dataDeserialize(blockId, data), DataReadMethod.Network, data.size)
|
||||
val values =
|
||||
serializerManager.dataDeserializeStream(blockId, data.toInputStream(dispose = true))
|
||||
new BlockResult(values, DataReadMethod.Network, data.size)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -745,7 +757,8 @@ private[spark] class BlockManager(
|
|||
// Put it in memory first, even if it also has useDisk set to true;
|
||||
// We will drop it to disk later if the memory store can't hold it.
|
||||
val putSucceeded = if (level.deserialized) {
|
||||
val values = serializerManager.dataDeserialize(blockId, bytes)(classTag)
|
||||
val values =
|
||||
serializerManager.dataDeserializeStream(blockId, bytes.toInputStream())(classTag)
|
||||
memoryStore.putIteratorAsValues(blockId, values, classTag) match {
|
||||
case Right(_) => true
|
||||
case Left(iter) =>
|
||||
|
@ -755,7 +768,7 @@ private[spark] class BlockManager(
|
|||
false
|
||||
}
|
||||
} else {
|
||||
memoryStore.putBytes(blockId, size, () => bytes)
|
||||
memoryStore.putBytes(blockId, size, level.memoryMode, () => bytes)
|
||||
}
|
||||
if (!putSucceeded && level.useDisk) {
|
||||
logWarning(s"Persisting block $blockId to disk instead.")
|
||||
|
@ -893,7 +906,7 @@ private[spark] class BlockManager(
|
|||
}
|
||||
}
|
||||
} else { // !level.deserialized
|
||||
memoryStore.putIteratorAsBytes(blockId, iterator(), classTag) match {
|
||||
memoryStore.putIteratorAsBytes(blockId, iterator(), classTag, level.memoryMode) match {
|
||||
case Right(s) =>
|
||||
size = s
|
||||
case Left(partiallySerializedValues) =>
|
||||
|
@ -951,14 +964,16 @@ private[spark] class BlockManager(
|
|||
* Attempts to cache spilled bytes read from disk into the MemoryStore in order to speed up
|
||||
* subsequent reads. This method requires the caller to hold a read lock on the block.
|
||||
*
|
||||
* @return a copy of the bytes. The original bytes passed this method should no longer
|
||||
* be used after this method returns.
|
||||
* @return a copy of the bytes from the memory store if the put succeeded, otherwise None.
|
||||
* If this returns bytes from the memory store then the original disk store bytes will
|
||||
* automatically be disposed and the caller should not continue to use them. Otherwise,
|
||||
* if this returns None then the original disk store bytes will be unaffected.
|
||||
*/
|
||||
private def maybeCacheDiskBytesInMemory(
|
||||
blockInfo: BlockInfo,
|
||||
blockId: BlockId,
|
||||
level: StorageLevel,
|
||||
diskBytes: ChunkedByteBuffer): ChunkedByteBuffer = {
|
||||
diskBytes: ChunkedByteBuffer): Option[ChunkedByteBuffer] = {
|
||||
require(!level.deserialized)
|
||||
if (level.useMemory) {
|
||||
// Synchronize on blockInfo to guard against a race condition where two readers both try to
|
||||
|
@ -966,25 +981,29 @@ private[spark] class BlockManager(
|
|||
blockInfo.synchronized {
|
||||
if (memoryStore.contains(blockId)) {
|
||||
diskBytes.dispose()
|
||||
memoryStore.getBytes(blockId).get
|
||||
Some(memoryStore.getBytes(blockId).get)
|
||||
} else {
|
||||
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, () => {
|
||||
val allocator = level.memoryMode match {
|
||||
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
|
||||
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
|
||||
}
|
||||
val putSucceeded = memoryStore.putBytes(blockId, diskBytes.size, level.memoryMode, () => {
|
||||
// https://issues.apache.org/jira/browse/SPARK-6076
|
||||
// If the file size is bigger than the free memory, OOM will happen. So if we
|
||||
// cannot put it into MemoryStore, copyForMemory should not be created. That's why
|
||||
// this action is put into a `() => ChunkedByteBuffer` and created lazily.
|
||||
diskBytes.copy()
|
||||
diskBytes.copy(allocator)
|
||||
})
|
||||
if (putSucceeded) {
|
||||
diskBytes.dispose()
|
||||
memoryStore.getBytes(blockId).get
|
||||
Some(memoryStore.getBytes(blockId).get)
|
||||
} else {
|
||||
diskBytes
|
||||
None
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
diskBytes
|
||||
None
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1055,7 +1074,12 @@ private[spark] class BlockManager(
|
|||
val peersForReplication = new ArrayBuffer[BlockManagerId]
|
||||
val peersReplicatedTo = new ArrayBuffer[BlockManagerId]
|
||||
val peersFailedToReplicateTo = new ArrayBuffer[BlockManagerId]
|
||||
val tLevel = StorageLevel(level.useDisk, level.useMemory, level.deserialized, 1)
|
||||
val tLevel = StorageLevel(
|
||||
useDisk = level.useDisk,
|
||||
useMemory = level.useMemory,
|
||||
useOffHeap = level.useOffHeap,
|
||||
deserialized = level.deserialized,
|
||||
replication = 1)
|
||||
val startTime = System.currentTimeMillis
|
||||
val random = new Random(blockId.hashCode)
|
||||
|
||||
|
|
|
@ -453,7 +453,7 @@ private[spark] class BlockManagerInfo(
|
|||
}
|
||||
|
||||
if (storageLevel.isValid) {
|
||||
/* isValid means it is either stored in-memory, on-disk or on-externalBlockStore.
|
||||
/* isValid means it is either stored in-memory or on-disk.
|
||||
* The memSize here indicates the data size in or dropped from memory,
|
||||
* externalBlockStoreSize here indicates the data size in or dropped from externalBlockStore,
|
||||
* and the diskSize here indicates the data size in or dropped to disk.
|
||||
|
|
|
@ -60,10 +60,7 @@ class StorageLevel private(
|
|||
assert(replication < 40, "Replication restricted to be less than 40 for calculating hash codes")
|
||||
|
||||
if (useOffHeap) {
|
||||
require(!useDisk, "Off-heap storage level does not support using disk")
|
||||
require(!useMemory, "Off-heap storage level does not support using heap memory")
|
||||
require(!deserialized, "Off-heap storage level does not support deserialized storage")
|
||||
require(replication == 1, "Off-heap storage level does not support multiple replication")
|
||||
}
|
||||
|
||||
private[spark] def memoryMode: MemoryMode = {
|
||||
|
@ -86,7 +83,7 @@ class StorageLevel private(
|
|||
false
|
||||
}
|
||||
|
||||
def isValid: Boolean = (useMemory || useDisk || useOffHeap) && (replication > 0)
|
||||
def isValid: Boolean = (useMemory || useDisk) && (replication > 0)
|
||||
|
||||
def toInt: Int = {
|
||||
var ret = 0
|
||||
|
@ -123,7 +120,8 @@ class StorageLevel private(
|
|||
private def readResolve(): Object = StorageLevel.getCachedStorageLevel(this)
|
||||
|
||||
override def toString: String = {
|
||||
s"StorageLevel($useDisk, $useMemory, $useOffHeap, $deserialized, $replication)"
|
||||
s"StorageLevel(disk=$useDisk, memory=$useMemory, offheap=$useOffHeap, " +
|
||||
s"deserialized=$deserialized, replication=$replication)"
|
||||
}
|
||||
|
||||
override def hashCode(): Int = toInt * 41 + replication
|
||||
|
@ -131,8 +129,9 @@ class StorageLevel private(
|
|||
def description: String = {
|
||||
var result = ""
|
||||
result += (if (useDisk) "Disk " else "")
|
||||
result += (if (useMemory) "Memory " else "")
|
||||
result += (if (useOffHeap) "ExternalBlockStore " else "")
|
||||
if (useMemory) {
|
||||
result += (if (useOffHeap) "Memory (off heap) " else "Memory ")
|
||||
}
|
||||
result += (if (deserialized) "Deserialized " else "Serialized ")
|
||||
result += s"${replication}x Replicated"
|
||||
result
|
||||
|
@ -156,9 +155,7 @@ object StorageLevel {
|
|||
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
|
||||
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
|
||||
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
|
||||
|
||||
// Redirect to MEMORY_ONLY_SER for now.
|
||||
val OFF_HEAP = MEMORY_ONLY_SER
|
||||
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
|
||||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
|
@ -183,7 +180,7 @@ object StorageLevel {
|
|||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
* Create a new StorageLevel object without setting useOffHeap.
|
||||
* Create a new StorageLevel object.
|
||||
*/
|
||||
@DeveloperApi
|
||||
def apply(
|
||||
|
@ -198,7 +195,7 @@ object StorageLevel {
|
|||
|
||||
/**
|
||||
* :: DeveloperApi ::
|
||||
* Create a new StorageLevel object.
|
||||
* Create a new StorageLevel object without setting useOffHeap.
|
||||
*/
|
||||
@DeveloperApi
|
||||
def apply(
|
||||
|
|
|
@ -32,20 +32,25 @@ import org.apache.spark.internal.Logging
|
|||
import org.apache.spark.memory.{MemoryManager, MemoryMode}
|
||||
import org.apache.spark.serializer.{SerializationStream, SerializerManager}
|
||||
import org.apache.spark.storage.{BlockId, BlockInfoManager, StorageLevel}
|
||||
import org.apache.spark.unsafe.Platform
|
||||
import org.apache.spark.util.{CompletionIterator, SizeEstimator, Utils}
|
||||
import org.apache.spark.util.collection.SizeTrackingVector
|
||||
import org.apache.spark.util.io.{ByteArrayChunkOutputStream, ChunkedByteBuffer}
|
||||
import org.apache.spark.util.io.{ChunkedByteBuffer, ChunkedByteBufferOutputStream}
|
||||
|
||||
private sealed trait MemoryEntry[T] {
|
||||
def size: Long
|
||||
def memoryMode: MemoryMode
|
||||
def classTag: ClassTag[T]
|
||||
}
|
||||
private case class DeserializedMemoryEntry[T](
|
||||
value: Array[T],
|
||||
size: Long,
|
||||
classTag: ClassTag[T]) extends MemoryEntry[T]
|
||||
classTag: ClassTag[T]) extends MemoryEntry[T] {
|
||||
val memoryMode: MemoryMode = MemoryMode.ON_HEAP
|
||||
}
|
||||
private case class SerializedMemoryEntry[T](
|
||||
buffer: ChunkedByteBuffer,
|
||||
memoryMode: MemoryMode,
|
||||
classTag: ClassTag[T]) extends MemoryEntry[T] {
|
||||
def size: Long = buffer.size
|
||||
}
|
||||
|
@ -86,7 +91,10 @@ private[spark] class MemoryStore(
|
|||
|
||||
// A mapping from taskAttemptId to amount of memory used for unrolling a block (in bytes)
|
||||
// All accesses of this map are assumed to have manually synchronized on `memoryManager`
|
||||
private val unrollMemoryMap = mutable.HashMap[Long, Long]()
|
||||
private val onHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
|
||||
// Note: off-heap unroll memory is only used in putIteratorAsBytes() because off-heap caching
|
||||
// always stores serialized values.
|
||||
private val offHeapUnrollMemoryMap = mutable.HashMap[Long, Long]()
|
||||
|
||||
// Initial memory to request before unrolling any block
|
||||
private val unrollMemoryThreshold: Long =
|
||||
|
@ -131,13 +139,14 @@ private[spark] class MemoryStore(
|
|||
def putBytes[T: ClassTag](
|
||||
blockId: BlockId,
|
||||
size: Long,
|
||||
memoryMode: MemoryMode,
|
||||
_bytes: () => ChunkedByteBuffer): Boolean = {
|
||||
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
|
||||
if (memoryManager.acquireStorageMemory(blockId, size, MemoryMode.ON_HEAP)) {
|
||||
if (memoryManager.acquireStorageMemory(blockId, size, memoryMode)) {
|
||||
// We acquired enough memory for the block, so go ahead and put it
|
||||
val bytes = _bytes()
|
||||
assert(bytes.size == size)
|
||||
val entry = new SerializedMemoryEntry[T](bytes, implicitly[ClassTag[T]])
|
||||
val entry = new SerializedMemoryEntry[T](bytes, memoryMode, implicitly[ClassTag[T]])
|
||||
entries.synchronized {
|
||||
entries.put(blockId, entry)
|
||||
}
|
||||
|
@ -190,7 +199,8 @@ private[spark] class MemoryStore(
|
|||
var vector = new SizeTrackingVector[T]()(classTag)
|
||||
|
||||
// Request enough memory to begin unrolling
|
||||
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
|
||||
keepUnrolling =
|
||||
reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, MemoryMode.ON_HEAP)
|
||||
|
||||
if (!keepUnrolling) {
|
||||
logWarning(s"Failed to reserve initial memory threshold of " +
|
||||
|
@ -207,7 +217,8 @@ private[spark] class MemoryStore(
|
|||
val currentSize = vector.estimateSize()
|
||||
if (currentSize >= memoryThreshold) {
|
||||
val amountToRequest = (currentSize * memoryGrowthFactor - memoryThreshold).toLong
|
||||
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
|
||||
keepUnrolling =
|
||||
reserveUnrollMemoryForThisTask(blockId, amountToRequest, MemoryMode.ON_HEAP)
|
||||
if (keepUnrolling) {
|
||||
unrollMemoryUsedByThisBlock += amountToRequest
|
||||
}
|
||||
|
@ -228,7 +239,7 @@ private[spark] class MemoryStore(
|
|||
def transferUnrollToStorage(amount: Long): Unit = {
|
||||
// Synchronize so that transfer is atomic
|
||||
memoryManager.synchronized {
|
||||
releaseUnrollMemoryForThisTask(amount)
|
||||
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, amount)
|
||||
val success = memoryManager.acquireStorageMemory(blockId, amount, MemoryMode.ON_HEAP)
|
||||
assert(success, "transferring unroll memory to storage memory failed")
|
||||
}
|
||||
|
@ -247,7 +258,7 @@ private[spark] class MemoryStore(
|
|||
// If this task attempt already owns more unroll memory than is necessary to store the
|
||||
// block, then release the extra memory that will not be used.
|
||||
val excessUnrollMemory = unrollMemoryUsedByThisBlock - size
|
||||
releaseUnrollMemoryForThisTask(excessUnrollMemory)
|
||||
releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, excessUnrollMemory)
|
||||
transferUnrollToStorage(size)
|
||||
true
|
||||
}
|
||||
|
@ -295,10 +306,16 @@ private[spark] class MemoryStore(
|
|||
private[storage] def putIteratorAsBytes[T](
|
||||
blockId: BlockId,
|
||||
values: Iterator[T],
|
||||
classTag: ClassTag[T]): Either[PartiallySerializedBlock[T], Long] = {
|
||||
classTag: ClassTag[T],
|
||||
memoryMode: MemoryMode): Either[PartiallySerializedBlock[T], Long] = {
|
||||
|
||||
require(!contains(blockId), s"Block $blockId is already present in the MemoryStore")
|
||||
|
||||
val allocator = memoryMode match {
|
||||
case MemoryMode.ON_HEAP => ByteBuffer.allocate _
|
||||
case MemoryMode.OFF_HEAP => Platform.allocateDirectBuffer _
|
||||
}
|
||||
|
||||
// Whether there is still enough memory for us to continue unrolling this block
|
||||
var keepUnrolling = true
|
||||
// Initial per-task memory to request for unrolling blocks (bytes).
|
||||
|
@ -307,15 +324,15 @@ private[spark] class MemoryStore(
|
|||
var unrollMemoryUsedByThisBlock = 0L
|
||||
// Underlying buffer for unrolling the block
|
||||
val redirectableStream = new RedirectableOutputStream
|
||||
val byteArrayChunkOutputStream = new ByteArrayChunkOutputStream(initialMemoryThreshold.toInt)
|
||||
redirectableStream.setOutputStream(byteArrayChunkOutputStream)
|
||||
val bbos = new ChunkedByteBufferOutputStream(initialMemoryThreshold.toInt, allocator)
|
||||
redirectableStream.setOutputStream(bbos)
|
||||
val serializationStream: SerializationStream = {
|
||||
val ser = serializerManager.getSerializer(classTag).newInstance()
|
||||
ser.serializeStream(serializerManager.wrapForCompression(blockId, redirectableStream))
|
||||
}
|
||||
|
||||
// Request enough memory to begin unrolling
|
||||
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold)
|
||||
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, initialMemoryThreshold, memoryMode)
|
||||
|
||||
if (!keepUnrolling) {
|
||||
logWarning(s"Failed to reserve initial memory threshold of " +
|
||||
|
@ -325,9 +342,9 @@ private[spark] class MemoryStore(
|
|||
}
|
||||
|
||||
def reserveAdditionalMemoryIfNecessary(): Unit = {
|
||||
if (byteArrayChunkOutputStream.size > unrollMemoryUsedByThisBlock) {
|
||||
val amountToRequest = byteArrayChunkOutputStream.size - unrollMemoryUsedByThisBlock
|
||||
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest)
|
||||
if (bbos.size > unrollMemoryUsedByThisBlock) {
|
||||
val amountToRequest = bbos.size - unrollMemoryUsedByThisBlock
|
||||
keepUnrolling = reserveUnrollMemoryForThisTask(blockId, amountToRequest, memoryMode)
|
||||
if (keepUnrolling) {
|
||||
unrollMemoryUsedByThisBlock += amountToRequest
|
||||
}
|
||||
|
@ -349,12 +366,11 @@ private[spark] class MemoryStore(
|
|||
}
|
||||
|
||||
if (keepUnrolling) {
|
||||
val entry = SerializedMemoryEntry[T](
|
||||
new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)), classTag)
|
||||
val entry = SerializedMemoryEntry[T](bbos.toChunkedByteBuffer, memoryMode, classTag)
|
||||
// Synchronize so that transfer is atomic
|
||||
memoryManager.synchronized {
|
||||
releaseUnrollMemoryForThisTask(unrollMemoryUsedByThisBlock)
|
||||
val success = memoryManager.acquireStorageMemory(blockId, entry.size, MemoryMode.ON_HEAP)
|
||||
releaseUnrollMemoryForThisTask(memoryMode, unrollMemoryUsedByThisBlock)
|
||||
val success = memoryManager.acquireStorageMemory(blockId, entry.size, memoryMode)
|
||||
assert(success, "transferring unroll memory to storage memory failed")
|
||||
}
|
||||
entries.synchronized {
|
||||
|
@ -365,7 +381,7 @@ private[spark] class MemoryStore(
|
|||
Right(entry.size)
|
||||
} else {
|
||||
// We ran out of space while unrolling the values for this block
|
||||
logUnrollFailureMessage(blockId, byteArrayChunkOutputStream.size)
|
||||
logUnrollFailureMessage(blockId, bbos.size)
|
||||
Left(
|
||||
new PartiallySerializedBlock(
|
||||
this,
|
||||
|
@ -374,7 +390,8 @@ private[spark] class MemoryStore(
|
|||
serializationStream,
|
||||
redirectableStream,
|
||||
unrollMemoryUsedByThisBlock,
|
||||
new ChunkedByteBuffer(byteArrayChunkOutputStream.toArrays.map(ByteBuffer.wrap)),
|
||||
memoryMode,
|
||||
bbos.toChunkedByteBuffer,
|
||||
values,
|
||||
classTag))
|
||||
}
|
||||
|
@ -386,7 +403,7 @@ private[spark] class MemoryStore(
|
|||
case null => None
|
||||
case e: DeserializedMemoryEntry[_] =>
|
||||
throw new IllegalArgumentException("should only call getBytes on serialized blocks")
|
||||
case SerializedMemoryEntry(bytes, _) => Some(bytes)
|
||||
case SerializedMemoryEntry(bytes, _, _) => Some(bytes)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -407,8 +424,12 @@ private[spark] class MemoryStore(
|
|||
entries.remove(blockId)
|
||||
}
|
||||
if (entry != null) {
|
||||
memoryManager.releaseStorageMemory(entry.size, MemoryMode.ON_HEAP)
|
||||
logInfo(s"Block $blockId of size ${entry.size} dropped " +
|
||||
entry match {
|
||||
case SerializedMemoryEntry(buffer, _, _) => buffer.dispose()
|
||||
case _ =>
|
||||
}
|
||||
memoryManager.releaseStorageMemory(entry.size, entry.memoryMode)
|
||||
logDebug(s"Block $blockId of size ${entry.size} dropped " +
|
||||
s"from memory (free ${maxMemory - blocksMemoryUsed})")
|
||||
true
|
||||
} else {
|
||||
|
@ -420,7 +441,8 @@ private[spark] class MemoryStore(
|
|||
entries.synchronized {
|
||||
entries.clear()
|
||||
}
|
||||
unrollMemoryMap.clear()
|
||||
onHeapUnrollMemoryMap.clear()
|
||||
offHeapUnrollMemoryMap.clear()
|
||||
memoryManager.releaseAllStorageMemory()
|
||||
logInfo("MemoryStore cleared")
|
||||
}
|
||||
|
@ -440,16 +462,20 @@ private[spark] class MemoryStore(
|
|||
*
|
||||
* @param blockId the ID of the block we are freeing space for, if any
|
||||
* @param space the size of this block
|
||||
* @param memoryMode the type of memory to free (on- or off-heap)
|
||||
* @return the amount of memory (in bytes) freed by eviction
|
||||
*/
|
||||
private[spark] def evictBlocksToFreeSpace(blockId: Option[BlockId], space: Long): Long = {
|
||||
private[spark] def evictBlocksToFreeSpace(
|
||||
blockId: Option[BlockId],
|
||||
space: Long,
|
||||
memoryMode: MemoryMode): Long = {
|
||||
assert(space > 0)
|
||||
memoryManager.synchronized {
|
||||
var freedMemory = 0L
|
||||
val rddToAdd = blockId.flatMap(getRddId)
|
||||
val selectedBlocks = new ArrayBuffer[BlockId]
|
||||
def blockIsEvictable(blockId: BlockId): Boolean = {
|
||||
rddToAdd.isEmpty || rddToAdd != getRddId(blockId)
|
||||
def blockIsEvictable(blockId: BlockId, entry: MemoryEntry[_]): Boolean = {
|
||||
entry.memoryMode == memoryMode && (rddToAdd.isEmpty || rddToAdd != getRddId(blockId))
|
||||
}
|
||||
// This is synchronized to ensure that the set of entries is not changed
|
||||
// (because of getValue or getBytes) while traversing the iterator, as that
|
||||
|
@ -459,7 +485,8 @@ private[spark] class MemoryStore(
|
|||
while (freedMemory < space && iterator.hasNext) {
|
||||
val pair = iterator.next()
|
||||
val blockId = pair.getKey
|
||||
if (blockIsEvictable(blockId)) {
|
||||
val entry = pair.getValue
|
||||
if (blockIsEvictable(blockId, entry)) {
|
||||
// We don't want to evict blocks which are currently being read, so we need to obtain
|
||||
// an exclusive write lock on blocks which are candidates for eviction. We perform a
|
||||
// non-blocking "tryLock" here in order to ignore blocks which are locked for reading:
|
||||
|
@ -474,7 +501,7 @@ private[spark] class MemoryStore(
|
|||
def dropBlock[T](blockId: BlockId, entry: MemoryEntry[T]): Unit = {
|
||||
val data = entry match {
|
||||
case DeserializedMemoryEntry(values, _, _) => Left(values)
|
||||
case SerializedMemoryEntry(buffer, _) => Right(buffer)
|
||||
case SerializedMemoryEntry(buffer, _, _) => Right(buffer)
|
||||
}
|
||||
val newEffectiveStorageLevel =
|
||||
blockEvictionHandler.dropFromMemory(blockId, () => data)(entry.classTag)
|
||||
|
@ -530,11 +557,18 @@ private[spark] class MemoryStore(
|
|||
*
|
||||
* @return whether the request is granted.
|
||||
*/
|
||||
def reserveUnrollMemoryForThisTask(blockId: BlockId, memory: Long): Boolean = {
|
||||
def reserveUnrollMemoryForThisTask(
|
||||
blockId: BlockId,
|
||||
memory: Long,
|
||||
memoryMode: MemoryMode): Boolean = {
|
||||
memoryManager.synchronized {
|
||||
val success = memoryManager.acquireUnrollMemory(blockId, memory, MemoryMode.ON_HEAP)
|
||||
val success = memoryManager.acquireUnrollMemory(blockId, memory, memoryMode)
|
||||
if (success) {
|
||||
val taskAttemptId = currentTaskAttemptId()
|
||||
val unrollMemoryMap = memoryMode match {
|
||||
case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
|
||||
case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
|
||||
}
|
||||
unrollMemoryMap(taskAttemptId) = unrollMemoryMap.getOrElse(taskAttemptId, 0L) + memory
|
||||
}
|
||||
success
|
||||
|
@ -545,9 +579,13 @@ private[spark] class MemoryStore(
|
|||
* Release memory used by this task for unrolling blocks.
|
||||
* If the amount is not specified, remove the current task's allocation altogether.
|
||||
*/
|
||||
def releaseUnrollMemoryForThisTask(memory: Long = Long.MaxValue): Unit = {
|
||||
def releaseUnrollMemoryForThisTask(memoryMode: MemoryMode, memory: Long = Long.MaxValue): Unit = {
|
||||
val taskAttemptId = currentTaskAttemptId()
|
||||
memoryManager.synchronized {
|
||||
val unrollMemoryMap = memoryMode match {
|
||||
case MemoryMode.ON_HEAP => onHeapUnrollMemoryMap
|
||||
case MemoryMode.OFF_HEAP => offHeapUnrollMemoryMap
|
||||
}
|
||||
if (unrollMemoryMap.contains(taskAttemptId)) {
|
||||
val memoryToRelease = math.min(memory, unrollMemoryMap(taskAttemptId))
|
||||
if (memoryToRelease > 0) {
|
||||
|
@ -555,7 +593,7 @@ private[spark] class MemoryStore(
|
|||
if (unrollMemoryMap(taskAttemptId) == 0) {
|
||||
unrollMemoryMap.remove(taskAttemptId)
|
||||
}
|
||||
memoryManager.releaseUnrollMemory(memoryToRelease, MemoryMode.ON_HEAP)
|
||||
memoryManager.releaseUnrollMemory(memoryToRelease, memoryMode)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -565,20 +603,23 @@ private[spark] class MemoryStore(
|
|||
* Return the amount of memory currently occupied for unrolling blocks across all tasks.
|
||||
*/
|
||||
def currentUnrollMemory: Long = memoryManager.synchronized {
|
||||
unrollMemoryMap.values.sum
|
||||
onHeapUnrollMemoryMap.values.sum + offHeapUnrollMemoryMap.values.sum
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the amount of memory currently occupied for unrolling blocks by this task.
|
||||
*/
|
||||
def currentUnrollMemoryForThisTask: Long = memoryManager.synchronized {
|
||||
unrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
|
||||
onHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L) +
|
||||
offHeapUnrollMemoryMap.getOrElse(currentTaskAttemptId(), 0L)
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the number of tasks currently unrolling blocks.
|
||||
*/
|
||||
private def numTasksUnrolling: Int = memoryManager.synchronized { unrollMemoryMap.keys.size }
|
||||
private def numTasksUnrolling: Int = memoryManager.synchronized {
|
||||
(onHeapUnrollMemoryMap.keys ++ offHeapUnrollMemoryMap.keys).toSet.size
|
||||
}
|
||||
|
||||
/**
|
||||
* Log information about current memory usage.
|
||||
|
@ -627,7 +668,7 @@ private[storage] class PartiallyUnrolledIterator[T](
|
|||
private[this] var iter: Iterator[T] = {
|
||||
val completionIterator = CompletionIterator[T, Iterator[T]](unrolled, {
|
||||
unrolledIteratorIsConsumed = true
|
||||
memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
|
||||
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
|
||||
})
|
||||
completionIterator ++ rest
|
||||
}
|
||||
|
@ -640,7 +681,7 @@ private[storage] class PartiallyUnrolledIterator[T](
|
|||
*/
|
||||
def close(): Unit = {
|
||||
if (!unrolledIteratorIsConsumed) {
|
||||
memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
|
||||
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, unrollMemory)
|
||||
unrolledIteratorIsConsumed = true
|
||||
}
|
||||
iter = null
|
||||
|
@ -669,6 +710,7 @@ private class RedirectableOutputStream extends OutputStream {
|
|||
* @param serializationStream a serialization stream which writes to [[redirectableOutputStream]].
|
||||
* @param redirectableOutputStream an OutputStream which can be redirected to a different sink.
|
||||
* @param unrollMemory the amount of unroll memory used by the values in `unrolled`.
|
||||
* @param memoryMode whether the unroll memory is on- or off-heap
|
||||
* @param unrolled a byte buffer containing the partially-serialized values.
|
||||
* @param rest the rest of the original iterator passed to
|
||||
* [[MemoryStore.putIteratorAsValues()]].
|
||||
|
@ -681,18 +723,36 @@ private[storage] class PartiallySerializedBlock[T](
|
|||
serializationStream: SerializationStream,
|
||||
redirectableOutputStream: RedirectableOutputStream,
|
||||
unrollMemory: Long,
|
||||
memoryMode: MemoryMode,
|
||||
unrolled: ChunkedByteBuffer,
|
||||
rest: Iterator[T],
|
||||
classTag: ClassTag[T]) {
|
||||
|
||||
// If the task does not fully consume `valuesIterator` or otherwise fails to consume or dispose of
|
||||
// this PartiallySerializedBlock then we risk leaking of direct buffers, so we use a task
|
||||
// completion listener here in order to ensure that `unrolled.dispose()` is called at least once.
|
||||
// The dispose() method is idempotent, so it's safe to call it unconditionally.
|
||||
Option(TaskContext.get()).foreach { taskContext =>
|
||||
taskContext.addTaskCompletionListener { _ =>
|
||||
// When a task completes, its unroll memory will automatically be freed. Thus we do not call
|
||||
// releaseUnrollMemoryForThisTask() here because we want to avoid double-freeing.
|
||||
unrolled.dispose()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called to dispose of this block and free its memory.
|
||||
*/
|
||||
def discard(): Unit = {
|
||||
try {
|
||||
// We want to close the output stream in order to free any resources associated with the
|
||||
// serializer itself (such as Kryo's internal buffers). close() might cause data to be
|
||||
// written, so redirect the output stream to discard that data.
|
||||
redirectableOutputStream.setOutputStream(ByteStreams.nullOutputStream())
|
||||
serializationStream.close()
|
||||
} finally {
|
||||
memoryStore.releaseUnrollMemoryForThisTask(unrollMemory)
|
||||
unrolled.dispose()
|
||||
memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -701,12 +761,14 @@ private[storage] class PartiallySerializedBlock[T](
|
|||
* and then serializing the values from the original input iterator.
|
||||
*/
|
||||
def finishWritingToStream(os: OutputStream): Unit = {
|
||||
ByteStreams.copy(unrolled.toInputStream(), os)
|
||||
// `unrolled`'s underlying buffers will be freed once this input stream is fully read:
|
||||
ByteStreams.copy(unrolled.toInputStream(dispose = true), os)
|
||||
memoryStore.releaseUnrollMemoryForThisTask(memoryMode, unrollMemory)
|
||||
redirectableOutputStream.setOutputStream(os)
|
||||
while (rest.hasNext) {
|
||||
serializationStream.writeObject(rest.next())(classTag)
|
||||
}
|
||||
discard()
|
||||
serializationStream.close()
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -717,10 +779,13 @@ private[storage] class PartiallySerializedBlock[T](
|
|||
* `close()` on it to free its resources.
|
||||
*/
|
||||
def valuesIterator: PartiallyUnrolledIterator[T] = {
|
||||
// `unrolled`'s underlying buffers will be freed once this input stream is fully read:
|
||||
val unrolledIter = serializerManager.dataDeserializeStream(
|
||||
blockId, unrolled.toInputStream(dispose = true))(classTag)
|
||||
new PartiallyUnrolledIterator(
|
||||
memoryStore,
|
||||
unrollMemory,
|
||||
unrolled = serializerManager.dataDeserialize(blockId, unrolled)(classTag),
|
||||
unrolled = CompletionIterator[T, Iterator[T]](unrolledIter, discard()),
|
||||
rest = rest)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -41,6 +41,8 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
|
|||
require(chunks.forall(_.limit() > 0), "chunks must be non-empty")
|
||||
require(chunks.forall(_.position() == 0), "chunks' positions must be 0")
|
||||
|
||||
private[this] var disposed: Boolean = false
|
||||
|
||||
/**
|
||||
* This size of this buffer, in bytes.
|
||||
*/
|
||||
|
@ -117,11 +119,12 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
|
|||
/**
|
||||
* Make a copy of this ChunkedByteBuffer, copying all of the backing data into new buffers.
|
||||
* The new buffer will share no resources with the original buffer.
|
||||
*
|
||||
* @param allocator a method for allocating byte buffers
|
||||
*/
|
||||
def copy(): ChunkedByteBuffer = {
|
||||
def copy(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
|
||||
val copiedChunks = getChunks().map { chunk =>
|
||||
// TODO: accept an allocator in this copy method to integrate with mem. accounting systems
|
||||
val newChunk = ByteBuffer.allocate(chunk.limit())
|
||||
val newChunk = allocator(chunk.limit())
|
||||
newChunk.put(chunk)
|
||||
newChunk.flip()
|
||||
newChunk
|
||||
|
@ -136,7 +139,10 @@ private[spark] class ChunkedByteBuffer(var chunks: Array[ByteBuffer]) {
|
|||
* unfortunately no standard API to do this.
|
||||
*/
|
||||
def dispose(): Unit = {
|
||||
chunks.foreach(StorageUtils.dispose)
|
||||
if (!disposed) {
|
||||
chunks.foreach(StorageUtils.dispose)
|
||||
disposed = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -18,19 +18,25 @@
|
|||
package org.apache.spark.util.io
|
||||
|
||||
import java.io.OutputStream
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import org.apache.spark.storage.StorageUtils
|
||||
|
||||
/**
|
||||
* An OutputStream that writes to fixed-size chunks of byte arrays.
|
||||
*
|
||||
* @param chunkSize size of each chunk, in bytes.
|
||||
*/
|
||||
private[spark]
|
||||
class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
|
||||
private[spark] class ChunkedByteBufferOutputStream(
|
||||
chunkSize: Int,
|
||||
allocator: Int => ByteBuffer)
|
||||
extends OutputStream {
|
||||
|
||||
private[this] val chunks = new ArrayBuffer[Array[Byte]]
|
||||
private[this] var toChunkedByteBufferWasCalled = false
|
||||
|
||||
private val chunks = new ArrayBuffer[ByteBuffer]
|
||||
|
||||
/** Index of the last chunk. Starting with -1 when the chunks array is empty. */
|
||||
private[this] var lastChunkIndex = -1
|
||||
|
@ -48,7 +54,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
|
|||
|
||||
override def write(b: Int): Unit = {
|
||||
allocateNewChunkIfNeeded()
|
||||
chunks(lastChunkIndex)(position) = b.toByte
|
||||
chunks(lastChunkIndex).put(b.toByte)
|
||||
position += 1
|
||||
_size += 1
|
||||
}
|
||||
|
@ -58,7 +64,7 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
|
|||
while (written < len) {
|
||||
allocateNewChunkIfNeeded()
|
||||
val thisBatch = math.min(chunkSize - position, len - written)
|
||||
System.arraycopy(bytes, written + off, chunks(lastChunkIndex), position, thisBatch)
|
||||
chunks(lastChunkIndex).put(bytes, written + off, thisBatch)
|
||||
written += thisBatch
|
||||
position += thisBatch
|
||||
}
|
||||
|
@ -67,33 +73,41 @@ class ByteArrayChunkOutputStream(chunkSize: Int) extends OutputStream {
|
|||
|
||||
@inline
|
||||
private def allocateNewChunkIfNeeded(): Unit = {
|
||||
require(!toChunkedByteBufferWasCalled, "cannot write after toChunkedByteBuffer() is called")
|
||||
if (position == chunkSize) {
|
||||
chunks += new Array[Byte](chunkSize)
|
||||
chunks += allocator(chunkSize)
|
||||
lastChunkIndex += 1
|
||||
position = 0
|
||||
}
|
||||
}
|
||||
|
||||
def toArrays: Array[Array[Byte]] = {
|
||||
def toChunkedByteBuffer: ChunkedByteBuffer = {
|
||||
require(!toChunkedByteBufferWasCalled, "toChunkedByteBuffer() can only be called once")
|
||||
toChunkedByteBufferWasCalled = true
|
||||
if (lastChunkIndex == -1) {
|
||||
new Array[Array[Byte]](0)
|
||||
new ChunkedByteBuffer(Array.empty[ByteBuffer])
|
||||
} else {
|
||||
// Copy the first n-1 chunks to the output, and then create an array that fits the last chunk.
|
||||
// An alternative would have been returning an array of ByteBuffers, with the last buffer
|
||||
// bounded to only the last chunk's position. However, given our use case in Spark (to put
|
||||
// the chunks in block manager), only limiting the view bound of the buffer would still
|
||||
// require the block manager to store the whole chunk.
|
||||
val ret = new Array[Array[Byte]](chunks.size)
|
||||
val ret = new Array[ByteBuffer](chunks.size)
|
||||
for (i <- 0 until chunks.size - 1) {
|
||||
ret(i) = chunks(i)
|
||||
ret(i).flip()
|
||||
}
|
||||
if (position == chunkSize) {
|
||||
ret(lastChunkIndex) = chunks(lastChunkIndex)
|
||||
ret(lastChunkIndex).flip()
|
||||
} else {
|
||||
ret(lastChunkIndex) = new Array[Byte](position)
|
||||
System.arraycopy(chunks(lastChunkIndex), 0, ret(lastChunkIndex), 0, position)
|
||||
ret(lastChunkIndex) = allocator(position)
|
||||
chunks(lastChunkIndex).flip()
|
||||
ret(lastChunkIndex).put(chunks(lastChunkIndex))
|
||||
ret(lastChunkIndex).flip()
|
||||
StorageUtils.dispose(chunks(lastChunkIndex))
|
||||
}
|
||||
ret
|
||||
new ChunkedByteBuffer(ret)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -198,8 +198,8 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex
|
|||
blockManager.master.getLocations(blockId).foreach { cmId =>
|
||||
val bytes = blockTransfer.fetchBlockSync(cmId.host, cmId.port, cmId.executorId,
|
||||
blockId.toString)
|
||||
val deserialized = serializerManager.dataDeserialize[Int](blockId,
|
||||
new ChunkedByteBuffer(bytes.nioByteBuffer())).toList
|
||||
val deserialized = serializerManager.dataDeserializeStream[Int](blockId,
|
||||
new ChunkedByteBuffer(bytes.nioByteBuffer()).toInputStream()).toList
|
||||
assert(deserialized === (1 to 100).toList)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,7 +52,7 @@ class ChunkedByteBufferSuite extends SparkFunSuite {
|
|||
|
||||
test("copy() does not affect original buffer's position") {
|
||||
val chunkedByteBuffer = new ChunkedByteBuffer(Array(ByteBuffer.allocate(8)))
|
||||
chunkedByteBuffer.copy()
|
||||
chunkedByteBuffer.copy(ByteBuffer.allocate)
|
||||
assert(chunkedByteBuffer.getChunks().head.position() === 0)
|
||||
}
|
||||
|
||||
|
|
|
@ -71,7 +71,8 @@ private[memory] trait MemoryManagerSuite extends SparkFunSuite with BeforeAndAft
|
|||
*/
|
||||
protected def makeMemoryStore(mm: MemoryManager): MemoryStore = {
|
||||
val ms = mock(classOf[MemoryStore], RETURNS_SMART_NULLS)
|
||||
when(ms.evictBlocksToFreeSpace(any(), anyLong())).thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
|
||||
when(ms.evictBlocksToFreeSpace(any(), anyLong(), any()))
|
||||
.thenAnswer(evictBlocksToFreeSpaceAnswer(mm))
|
||||
mm.setMemoryStore(ms)
|
||||
ms
|
||||
}
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.scalatest.{BeforeAndAfter, Matchers}
|
|||
import org.scalatest.concurrent.Eventually._
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.memory.StaticMemoryManager
|
||||
import org.apache.spark.memory.UnifiedMemoryManager
|
||||
import org.apache.spark.network.BlockTransferService
|
||||
import org.apache.spark.network.netty.NettyBlockTransferService
|
||||
import org.apache.spark.rpc.RpcEnv
|
||||
|
@ -60,8 +60,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
|
|||
private def makeBlockManager(
|
||||
maxMem: Long,
|
||||
name: String = SparkContext.DRIVER_IDENTIFIER): BlockManager = {
|
||||
conf.set("spark.testing.memory", maxMem.toString)
|
||||
conf.set("spark.memory.offHeap.size", maxMem.toString)
|
||||
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
|
||||
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
|
||||
val memManager = UnifiedMemoryManager(conf, numCores = 1)
|
||||
val serializerManager = new SerializerManager(serializer, conf)
|
||||
val store = new BlockManager(name, rpcEnv, master, serializerManager, conf,
|
||||
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
|
||||
|
@ -76,6 +78,9 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
|
|||
|
||||
conf.set("spark.authenticate", "false")
|
||||
conf.set("spark.driver.port", rpcEnv.address.port.toString)
|
||||
conf.set("spark.testing", "true")
|
||||
conf.set("spark.memory.fraction", "1")
|
||||
conf.set("spark.memory.storageFraction", "1")
|
||||
conf.set("spark.storage.unrollFraction", "0.4")
|
||||
conf.set("spark.storage.unrollMemoryThreshold", "512")
|
||||
|
||||
|
@ -172,6 +177,10 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
|
|||
testReplication(5, storageLevels)
|
||||
}
|
||||
|
||||
test("block replication - off-heap") {
|
||||
testReplication(2, Seq(OFF_HEAP, StorageLevel(true, true, true, false, 2)))
|
||||
}
|
||||
|
||||
test("block replication - 2x replication without peers") {
|
||||
intercept[org.scalatest.exceptions.TestFailedException] {
|
||||
testReplication(1,
|
||||
|
@ -262,7 +271,8 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
|
|||
val failableTransfer = mock(classOf[BlockTransferService]) // this wont actually work
|
||||
when(failableTransfer.hostName).thenReturn("some-hostname")
|
||||
when(failableTransfer.port).thenReturn(1000)
|
||||
val memManager = new StaticMemoryManager(conf, Long.MaxValue, 10000, numCores = 1)
|
||||
conf.set("spark.testing.memory", "10000")
|
||||
val memManager = UnifiedMemoryManager(conf, numCores = 1)
|
||||
val serializerManager = new SerializerManager(serializer, conf)
|
||||
val failableStore = new BlockManager("failable-store", rpcEnv, master, serializerManager, conf,
|
||||
memManager, mapOutputTracker, shuffleManager, failableTransfer, securityMgr, 0)
|
||||
|
@ -392,10 +402,14 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
|
|||
// If the block is supposed to be in memory, then drop the copy of the block in
|
||||
// this store test whether master is updated with zero memory usage this store
|
||||
if (storageLevel.useMemory) {
|
||||
val sl = if (storageLevel.useOffHeap) {
|
||||
StorageLevel(false, true, true, false, 1)
|
||||
} else {
|
||||
MEMORY_ONLY_SER
|
||||
}
|
||||
// Force the block to be dropped by adding a number of dummy blocks
|
||||
(1 to 10).foreach {
|
||||
i =>
|
||||
testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
|
||||
i => testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), sl)
|
||||
}
|
||||
(1 to 10).foreach {
|
||||
i => testStore.removeBlock(s"dummy-block-$i")
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.scalatest.concurrent.Timeouts._
|
|||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.executor.DataReadMethod
|
||||
import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
|
||||
import org.apache.spark.memory.UnifiedMemoryManager
|
||||
import org.apache.spark.network.{BlockDataManager, BlockTransferService}
|
||||
import org.apache.spark.network.buffer.{ManagedBuffer, NioManagedBuffer}
|
||||
import org.apache.spark.network.netty.NettyBlockTransferService
|
||||
|
@ -74,10 +74,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
|
|||
name: String = SparkContext.DRIVER_IDENTIFIER,
|
||||
master: BlockManagerMaster = this.master,
|
||||
transferService: Option[BlockTransferService] = Option.empty): BlockManager = {
|
||||
conf.set("spark.testing.memory", maxMem.toString)
|
||||
conf.set("spark.memory.offHeap.size", maxMem.toString)
|
||||
val serializer = new KryoSerializer(conf)
|
||||
val transfer = transferService
|
||||
.getOrElse(new NettyBlockTransferService(conf, securityMgr, numCores = 1))
|
||||
val memManager = new StaticMemoryManager(conf, Long.MaxValue, maxMem, numCores = 1)
|
||||
val memManager = UnifiedMemoryManager(conf, numCores = 1)
|
||||
val serializerManager = new SerializerManager(serializer, conf)
|
||||
val blockManager = new BlockManager(name, rpcEnv, master, serializerManager, conf,
|
||||
memManager, mapOutputTracker, shuffleManager, transfer, securityMgr, 0)
|
||||
|
@ -92,6 +94,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
|
|||
System.setProperty("os.arch", "amd64")
|
||||
conf = new SparkConf(false)
|
||||
.set("spark.app.id", "test")
|
||||
.set("spark.testing", "true")
|
||||
.set("spark.memory.fraction", "1")
|
||||
.set("spark.memory.storageFraction", "1")
|
||||
.set("spark.kryoserializer.buffer", "1m")
|
||||
.set("spark.test.useCompressedOops", "true")
|
||||
.set("spark.storage.unrollFraction", "0.4")
|
||||
|
@ -518,6 +523,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
|
|||
testInMemoryLRUStorage(StorageLevel.MEMORY_ONLY_SER)
|
||||
}
|
||||
|
||||
test("in-memory LRU storage with off-heap") {
|
||||
testInMemoryLRUStorage(StorageLevel(
|
||||
useDisk = false,
|
||||
useMemory = true,
|
||||
useOffHeap = true,
|
||||
deserialized = false, replication = 1))
|
||||
}
|
||||
|
||||
private def testInMemoryLRUStorage(storageLevel: StorageLevel): Unit = {
|
||||
store = makeBlockManager(12000)
|
||||
val a1 = new Array[Byte](4000)
|
||||
|
@ -608,6 +621,14 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
|
|||
testDiskAndMemoryStorage(StorageLevel.MEMORY_AND_DISK_SER, getAsBytes = true)
|
||||
}
|
||||
|
||||
test("disk and off-heap memory storage") {
|
||||
testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = false)
|
||||
}
|
||||
|
||||
test("disk and off-heap memory storage with getLocalBytes") {
|
||||
testDiskAndMemoryStorage(StorageLevel.OFF_HEAP, getAsBytes = true)
|
||||
}
|
||||
|
||||
def testDiskAndMemoryStorage(
|
||||
storageLevel: StorageLevel,
|
||||
getAsBytes: Boolean): Unit = {
|
||||
|
@ -817,12 +838,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
|
|||
|
||||
test("block store put failure") {
|
||||
// Use Java serializer so we can create an unserializable error.
|
||||
conf.set("spark.testing.memory", "1200")
|
||||
val transfer = new NettyBlockTransferService(conf, securityMgr, numCores = 1)
|
||||
val memoryManager = new StaticMemoryManager(
|
||||
conf,
|
||||
maxOnHeapExecutionMemory = Long.MaxValue,
|
||||
maxOnHeapStorageMemory = 1200,
|
||||
numCores = 1)
|
||||
val memoryManager = UnifiedMemoryManager(conf, numCores = 1)
|
||||
val serializerManager = new SerializerManager(new JavaSerializer(conf), conf)
|
||||
store = new BlockManager(SparkContext.DRIVER_IDENTIFIER, rpcEnv, master,
|
||||
serializerManager, conf, memoryManager, mapOutputTracker,
|
||||
|
|
|
@ -27,7 +27,7 @@ import scala.reflect.ClassTag
|
|||
import org.scalatest._
|
||||
|
||||
import org.apache.spark._
|
||||
import org.apache.spark.memory.StaticMemoryManager
|
||||
import org.apache.spark.memory.{MemoryMode, StaticMemoryManager}
|
||||
import org.apache.spark.serializer.{KryoSerializer, SerializerManager}
|
||||
import org.apache.spark.storage.memory.{BlockEvictionHandler, MemoryStore, PartiallySerializedBlock, PartiallyUnrolledIterator}
|
||||
import org.apache.spark.util._
|
||||
|
@ -86,7 +86,7 @@ class MemoryStoreSuite
|
|||
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
|
||||
|
||||
def reserveUnrollMemoryForThisTask(memory: Long): Boolean = {
|
||||
memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory)
|
||||
memoryStore.reserveUnrollMemoryForThisTask(TestBlockId(""), memory, MemoryMode.ON_HEAP)
|
||||
}
|
||||
|
||||
// Reserve
|
||||
|
@ -99,9 +99,9 @@ class MemoryStoreSuite
|
|||
assert(!reserveUnrollMemoryForThisTask(1000000))
|
||||
assert(memoryStore.currentUnrollMemoryForThisTask === 800) // not granted
|
||||
// Release
|
||||
memoryStore.releaseUnrollMemoryForThisTask(100)
|
||||
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 100)
|
||||
assert(memoryStore.currentUnrollMemoryForThisTask === 700)
|
||||
memoryStore.releaseUnrollMemoryForThisTask(100)
|
||||
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 100)
|
||||
assert(memoryStore.currentUnrollMemoryForThisTask === 600)
|
||||
// Reserve again
|
||||
assert(reserveUnrollMemoryForThisTask(4400))
|
||||
|
@ -109,9 +109,9 @@ class MemoryStoreSuite
|
|||
assert(!reserveUnrollMemoryForThisTask(20000))
|
||||
assert(memoryStore.currentUnrollMemoryForThisTask === 5000) // not granted
|
||||
// Release again
|
||||
memoryStore.releaseUnrollMemoryForThisTask(1000)
|
||||
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP, 1000)
|
||||
assert(memoryStore.currentUnrollMemoryForThisTask === 4000)
|
||||
memoryStore.releaseUnrollMemoryForThisTask() // release all
|
||||
memoryStore.releaseUnrollMemoryForThisTask(MemoryMode.ON_HEAP) // release all
|
||||
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
|
||||
}
|
||||
|
||||
|
@ -254,7 +254,7 @@ class MemoryStoreSuite
|
|||
assert(blockInfoManager.lockNewBlockForWriting(
|
||||
blockId,
|
||||
new BlockInfo(StorageLevel.MEMORY_ONLY_SER, classTag, tellMaster = false)))
|
||||
val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag)
|
||||
val res = memoryStore.putIteratorAsBytes(blockId, iter, classTag, MemoryMode.ON_HEAP)
|
||||
blockInfoManager.unlock(blockId)
|
||||
res
|
||||
}
|
||||
|
@ -312,7 +312,7 @@ class MemoryStoreSuite
|
|||
assert(blockInfoManager.lockNewBlockForWriting(
|
||||
"b1",
|
||||
new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
|
||||
val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
|
||||
val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any, MemoryMode.ON_HEAP)
|
||||
blockInfoManager.unlock("b1")
|
||||
assert(res.isLeft)
|
||||
assert(memoryStore.currentUnrollMemoryForThisTask > 0)
|
||||
|
@ -333,7 +333,7 @@ class MemoryStoreSuite
|
|||
assert(blockInfoManager.lockNewBlockForWriting(
|
||||
"b1",
|
||||
new BlockInfo(StorageLevel.MEMORY_ONLY_SER, ClassTag.Any, tellMaster = false)))
|
||||
val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any)
|
||||
val res = memoryStore.putIteratorAsBytes("b1", bigIterator, ClassTag.Any, MemoryMode.ON_HEAP)
|
||||
blockInfoManager.unlock("b1")
|
||||
assert(res.isLeft)
|
||||
assert(memoryStore.currentUnrollMemoryForThisTask > 0)
|
||||
|
@ -395,7 +395,7 @@ class MemoryStoreSuite
|
|||
val blockId = BlockId("rdd_3_10")
|
||||
blockInfoManager.lockNewBlockForWriting(
|
||||
blockId, new BlockInfo(StorageLevel.MEMORY_ONLY, ClassTag.Any, tellMaster = false))
|
||||
memoryStore.putBytes(blockId, 13000, () => {
|
||||
memoryStore.putBytes(blockId, 13000, MemoryMode.ON_HEAP, () => {
|
||||
fail("A big ByteBuffer that cannot be put into MemoryStore should not be created")
|
||||
})
|
||||
}
|
||||
|
@ -404,7 +404,7 @@ class MemoryStoreSuite
|
|||
val (memoryStore, _) = makeMemoryStore(12000)
|
||||
val blockId = BlockId("rdd_3_10")
|
||||
var bytes: ChunkedByteBuffer = null
|
||||
memoryStore.putBytes(blockId, 10000, () => {
|
||||
memoryStore.putBytes(blockId, 10000, MemoryMode.ON_HEAP, () => {
|
||||
bytes = new ChunkedByteBuffer(ByteBuffer.allocate(10000))
|
||||
bytes
|
||||
})
|
||||
|
|
|
@ -17,48 +17,53 @@
|
|||
|
||||
package org.apache.spark.util.io
|
||||
|
||||
import java.nio.ByteBuffer
|
||||
|
||||
import scala.util.Random
|
||||
|
||||
import org.apache.spark.SparkFunSuite
|
||||
|
||||
|
||||
class ByteArrayChunkOutputStreamSuite extends SparkFunSuite {
|
||||
class ChunkedByteBufferOutputStreamSuite extends SparkFunSuite {
|
||||
|
||||
test("empty output") {
|
||||
val o = new ByteArrayChunkOutputStream(1024)
|
||||
assert(o.toArrays.length === 0)
|
||||
val o = new ChunkedByteBufferOutputStream(1024, ByteBuffer.allocate)
|
||||
assert(o.toChunkedByteBuffer.size === 0)
|
||||
}
|
||||
|
||||
test("write a single byte") {
|
||||
val o = new ByteArrayChunkOutputStream(1024)
|
||||
val o = new ChunkedByteBufferOutputStream(1024, ByteBuffer.allocate)
|
||||
o.write(10)
|
||||
assert(o.toArrays.length === 1)
|
||||
assert(o.toArrays.head.toSeq === Seq(10.toByte))
|
||||
val chunkedByteBuffer = o.toChunkedByteBuffer
|
||||
assert(chunkedByteBuffer.getChunks().length === 1)
|
||||
assert(chunkedByteBuffer.getChunks().head.array().toSeq === Seq(10.toByte))
|
||||
}
|
||||
|
||||
test("write a single near boundary") {
|
||||
val o = new ByteArrayChunkOutputStream(10)
|
||||
val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
|
||||
o.write(new Array[Byte](9))
|
||||
o.write(99)
|
||||
assert(o.toArrays.length === 1)
|
||||
assert(o.toArrays.head(9) === 99.toByte)
|
||||
val chunkedByteBuffer = o.toChunkedByteBuffer
|
||||
assert(chunkedByteBuffer.getChunks().length === 1)
|
||||
assert(chunkedByteBuffer.getChunks().head.array()(9) === 99.toByte)
|
||||
}
|
||||
|
||||
test("write a single at boundary") {
|
||||
val o = new ByteArrayChunkOutputStream(10)
|
||||
val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
|
||||
o.write(new Array[Byte](10))
|
||||
o.write(99)
|
||||
assert(o.toArrays.length === 2)
|
||||
assert(o.toArrays(1).length === 1)
|
||||
assert(o.toArrays(1)(0) === 99.toByte)
|
||||
val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
|
||||
assert(arrays.length === 2)
|
||||
assert(arrays(1).length === 1)
|
||||
assert(arrays(1)(0) === 99.toByte)
|
||||
}
|
||||
|
||||
test("single chunk output") {
|
||||
val ref = new Array[Byte](8)
|
||||
Random.nextBytes(ref)
|
||||
val o = new ByteArrayChunkOutputStream(10)
|
||||
val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
|
||||
o.write(ref)
|
||||
val arrays = o.toArrays
|
||||
val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
|
||||
assert(arrays.length === 1)
|
||||
assert(arrays.head.length === ref.length)
|
||||
assert(arrays.head.toSeq === ref.toSeq)
|
||||
|
@ -67,9 +72,9 @@ class ByteArrayChunkOutputStreamSuite extends SparkFunSuite {
|
|||
test("single chunk output at boundary size") {
|
||||
val ref = new Array[Byte](10)
|
||||
Random.nextBytes(ref)
|
||||
val o = new ByteArrayChunkOutputStream(10)
|
||||
val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
|
||||
o.write(ref)
|
||||
val arrays = o.toArrays
|
||||
val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
|
||||
assert(arrays.length === 1)
|
||||
assert(arrays.head.length === ref.length)
|
||||
assert(arrays.head.toSeq === ref.toSeq)
|
||||
|
@ -78,9 +83,9 @@ class ByteArrayChunkOutputStreamSuite extends SparkFunSuite {
|
|||
test("multiple chunk output") {
|
||||
val ref = new Array[Byte](26)
|
||||
Random.nextBytes(ref)
|
||||
val o = new ByteArrayChunkOutputStream(10)
|
||||
val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
|
||||
o.write(ref)
|
||||
val arrays = o.toArrays
|
||||
val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
|
||||
assert(arrays.length === 3)
|
||||
assert(arrays(0).length === 10)
|
||||
assert(arrays(1).length === 10)
|
||||
|
@ -94,9 +99,9 @@ class ByteArrayChunkOutputStreamSuite extends SparkFunSuite {
|
|||
test("multiple chunk output at boundary size") {
|
||||
val ref = new Array[Byte](30)
|
||||
Random.nextBytes(ref)
|
||||
val o = new ByteArrayChunkOutputStream(10)
|
||||
val o = new ChunkedByteBufferOutputStream(10, ByteBuffer.allocate)
|
||||
o.write(ref)
|
||||
val arrays = o.toArrays
|
||||
val arrays = o.toChunkedByteBuffer.getChunks().map(_.array())
|
||||
assert(arrays.length === 3)
|
||||
assert(arrays(0).length === 10)
|
||||
assert(arrays(1).length === 10)
|
|
@ -162,7 +162,8 @@ class WriteAheadLogBackedBlockRDD[T: ClassTag](
|
|||
logDebug(s"Stored partition data of $this into block manager with level $storageLevel")
|
||||
dataRead.rewind()
|
||||
}
|
||||
serializerManager.dataDeserialize(blockId, new ChunkedByteBuffer(dataRead))
|
||||
serializerManager
|
||||
.dataDeserializeStream(blockId, new ChunkedByteBuffer(dataRead).toInputStream())
|
||||
.asInstanceOf[Iterator[T]]
|
||||
}
|
||||
|
||||
|
|
|
@ -157,7 +157,8 @@ class ReceivedBlockHandlerSuite
|
|||
val reader = new FileBasedWriteAheadLogRandomReader(fileSegment.path, hadoopConf)
|
||||
val bytes = reader.read(fileSegment)
|
||||
reader.close()
|
||||
serializerManager.dataDeserialize(generateBlockId(), new ChunkedByteBuffer(bytes)).toList
|
||||
serializerManager.dataDeserializeStream(
|
||||
generateBlockId(), new ChunkedByteBuffer(bytes).toInputStream()).toList
|
||||
}
|
||||
loggedData shouldEqual data
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue