[SPARK-12817] Add BlockManager.getOrElseUpdate and remove CacheManager

CacheManager directly calls MemoryStore.unrollSafely() and has its own logic for handling graceful fallback to disk when cached data does not fit in memory. However, this logic also exists inside of the MemoryStore itself, so this appears to be unnecessary duplication.

Thanks to the addition of block-level read/write locks in #10705, we can refactor the code to remove the CacheManager and replace it with an atomic `BlockManager.getOrElseUpdate()` method.

This pull request replaces / subsumes #10748.

/cc andrewor14 and nongli for review. Note that this changes the locking semantics of a couple of internal BlockManager methods (`doPut()` and `lockNewBlockForWriting`), so please pay attention to the Scaladoc changes and new test cases for those methods.

Author: Josh Rosen <joshrosen@databricks.com>

Closes #11436 from JoshRosen/remove-cachemanager.
This commit is contained in:
Josh Rosen 2016-03-02 10:26:47 -08:00 committed by Andrew Or
parent 8f8d8a2315
commit d6969ffc0f
16 changed files with 365 additions and 597 deletions

View file

@ -1,179 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import scala.collection.mutable
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
import org.apache.spark.util.CompletionIterator
/**
* Spark class responsible for passing RDDs partition contents to the BlockManager and making
* sure a node doesn't load two copies of an RDD at once.
*/
private[spark] class CacheManager(blockManager: BlockManager) extends Logging {
/** Keys of RDD partitions that are being computed/loaded. */
private val loading = new mutable.HashSet[RDDBlockId]
/** Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached. */
def getOrCompute[T](
rdd: RDD[T],
partition: Partition,
context: TaskContext,
storageLevel: StorageLevel): Iterator[T] = {
val key = RDDBlockId(rdd.id, partition.index)
logDebug(s"Looking for partition $key")
blockManager.get(key) match {
case Some(blockResult) =>
// Partition is already materialized, so just return its values
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
existingMetrics.incBytesReadInternal(blockResult.bytes)
val iter = blockResult.data.asInstanceOf[Iterator[T]]
new InterruptibleIterator[T](context, iter) {
override def next(): T = {
existingMetrics.incRecordsReadInternal(1)
delegate.next()
}
}
case None =>
// Acquire a lock for loading this partition
// If another thread already holds the lock, wait for it to finish return its results
val storedValues = acquireLockForPartition[T](key)
if (storedValues.isDefined) {
return new InterruptibleIterator[T](context, storedValues.get)
}
// Otherwise, we have to load the partition ourselves
try {
logInfo(s"Partition $key not found, computing it")
val computedValues = rdd.computeOrReadCheckpoint(partition, context)
val cachedValues = putInBlockManager(key, computedValues, storageLevel)
new InterruptibleIterator(context, cachedValues)
} finally {
loading.synchronized {
loading.remove(key)
loading.notifyAll()
}
}
}
}
/**
* Acquire a loading lock for the partition identified by the given block ID.
*
* If the lock is free, just acquire it and return None. Otherwise, another thread is already
* loading the partition, so we wait for it to finish and return the values loaded by the thread.
*/
private def acquireLockForPartition[T](id: RDDBlockId): Option[Iterator[T]] = {
loading.synchronized {
if (!loading.contains(id)) {
// If the partition is free, acquire its lock to compute its value
loading.add(id)
None
} else {
// Otherwise, wait for another thread to finish and return its result
logInfo(s"Another thread is loading $id, waiting for it to finish...")
while (loading.contains(id)) {
try {
loading.wait()
} catch {
case e: Exception =>
logWarning(s"Exception while waiting for another thread to load $id", e)
}
}
logInfo(s"Finished waiting for $id")
val values = blockManager.get(id)
if (!values.isDefined) {
/* The block is not guaranteed to exist even after the other thread has finished.
* For instance, the block could be evicted after it was put, but before our get.
* In this case, we still need to load the partition ourselves. */
logInfo(s"Whoever was loading $id failed; we'll try it ourselves")
loading.add(id)
}
values.map(_.data.asInstanceOf[Iterator[T]])
}
}
}
/**
* Cache the values of a partition, keeping track of any updates in the storage statuses of
* other blocks along the way.
*
* The effective storage level refers to the level that actually specifies BlockManager put
* behavior, not the level originally specified by the user. This is mainly for forcing a
* MEMORY_AND_DISK partition to disk if there is not enough room to unroll the partition,
* while preserving the original semantics of the RDD as specified by the application.
*/
private def putInBlockManager[T](
key: BlockId,
values: Iterator[T],
level: StorageLevel,
effectiveStorageLevel: Option[StorageLevel] = None): Iterator[T] = {
val putLevel = effectiveStorageLevel.getOrElse(level)
if (!putLevel.useMemory) {
/*
* This RDD is not to be cached in memory, so we can just pass the computed values as an
* iterator directly to the BlockManager rather than first fully unrolling it in memory.
*/
blockManager.putIterator(key, values, level, tellMaster = true, effectiveStorageLevel)
blockManager.get(key) match {
case Some(v) => v.data.asInstanceOf[Iterator[T]]
case None =>
logInfo(s"Failure to store $key")
throw new BlockException(key, s"Block manager failed to return cached value for $key!")
}
} else {
/*
* This RDD is to be cached in memory. In this case we cannot pass the computed values
* to the BlockManager as an iterator and expect to read it back later. This is because
* we may end up dropping a partition from memory store before getting it back.
*
* In addition, we must be careful to not unroll the entire partition in memory at once.
* Otherwise, we may cause an OOM exception if the JVM does not have enough space for this
* single partition. Instead, we unroll the values cautiously, potentially aborting and
* dropping the partition to disk if applicable.
*/
blockManager.memoryStore.unrollSafely(key, values) match {
case Left(arr) =>
// We have successfully unrolled the entire partition, so cache it in memory
blockManager.putArray(key, arr, level, tellMaster = true, effectiveStorageLevel)
CompletionIterator[T, Iterator[T]](
arr.iterator.asInstanceOf[Iterator[T]],
blockManager.releaseLock(key))
case Right(it) =>
// There is not enough space to cache this partition in memory
val returnValues = it.asInstanceOf[Iterator[T]]
if (putLevel.useDisk) {
logWarning(s"Persisting partition $key to disk instead.")
val diskOnlyLevel = StorageLevel(useDisk = true, useMemory = false,
useOffHeap = false, deserialized = false, putLevel.replication)
putInBlockManager[T](key, returnValues, level, Some(diskOnlyLevel))
} else {
returnValues
}
}
}
}
}

View file

@ -56,7 +56,6 @@ class SparkEnv (
private[spark] val rpcEnv: RpcEnv,
val serializer: Serializer,
val closureSerializer: Serializer,
val cacheManager: CacheManager,
val mapOutputTracker: MapOutputTracker,
val shuffleManager: ShuffleManager,
val broadcastManager: BroadcastManager,
@ -333,8 +332,6 @@ object SparkEnv extends Logging {
val broadcastManager = new BroadcastManager(isDriver, conf, securityManager)
val cacheManager = new CacheManager(blockManager)
val metricsSystem = if (isDriver) {
// Don't start metrics system right now for Driver.
// We need to wait for the task scheduler to give us an app ID.
@ -371,7 +368,6 @@ object SparkEnv extends Logging {
rpcEnv,
serializer,
closureSerializer,
cacheManager,
mapOutputTracker,
shuffleManager,
broadcastManager,

View file

@ -99,18 +99,14 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
// Store a copy of the broadcast variable in the driver so that tasks run on the driver
// do not create a duplicate copy of the broadcast variable's value.
val blockManager = SparkEnv.get.blockManager
if (blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
blockManager.releaseLock(broadcastId)
} else {
if (!blockManager.putSingle(broadcastId, value, MEMORY_AND_DISK, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
val blocks =
TorrentBroadcast.blockifyObject(value, blockSize, SparkEnv.get.serializer, compressionCodec)
blocks.zipWithIndex.foreach { case (block, i) =>
val pieceId = BroadcastBlockId(id, "piece" + i)
if (blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
blockManager.releaseLock(pieceId)
} else {
if (!blockManager.putBytes(pieceId, block, MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
}
@ -130,22 +126,24 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
// First try getLocalBytes because there is a chance that previous attempts to fetch the
// broadcast blocks have already fetched some of the blocks. In that case, some blocks
// would be available locally (on this executor).
def getLocal: Option[ByteBuffer] = bm.getLocalBytes(pieceId)
def getRemote: Option[ByteBuffer] = bm.getRemoteBytes(pieceId).map { block =>
// If we found the block from remote executors/driver's BlockManager, put the block
// in this executor's BlockManager.
if (!bm.putBytes(pieceId, block, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(
s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
block
bm.getLocalBytes(pieceId) match {
case Some(block) =>
blocks(pid) = block
releaseLock(pieceId)
case None =>
bm.getRemoteBytes(pieceId) match {
case Some(b) =>
// We found the block from remote executors/driver's BlockManager, so put the block
// in this executor's BlockManager.
if (!bm.putBytes(pieceId, b, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)) {
throw new SparkException(
s"Failed to store $pieceId of $broadcastId in local BlockManager")
}
blocks(pid) = b
case None =>
throw new SparkException(s"Failed to get $pieceId of $broadcastId")
}
}
val block: ByteBuffer = getLocal.orElse(getRemote).getOrElse(
throw new SparkException(s"Failed to get $pieceId of $broadcastId"))
// At this point we are guaranteed to hold a read lock, since we either got the block locally
// or stored the remotely-fetched block and automatically downgraded the write lock.
blocks(pid) = block
releaseLock(pieceId)
}
blocks
}
@ -191,9 +189,7 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
// Store the merged copy in BlockManager so other tasks on this executor don't
// need to re-fetch it.
val storageLevel = StorageLevel.MEMORY_AND_DISK
if (blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
releaseLock(broadcastId)
} else {
if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
throw new SparkException(s"Failed to store $broadcastId in BlockManager")
}
obj

View file

@ -292,11 +292,8 @@ private[spark] class Executor(
ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
} else if (resultSize >= maxRpcMessageSize) {
val blockId = TaskResultBlockId(taskId)
val putSucceeded = env.blockManager.putBytes(
env.blockManager.putBytes(
blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
if (putSucceeded) {
env.blockManager.releaseLock(blockId)
}
logInfo(
s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))

View file

@ -66,10 +66,7 @@ class NettyBlockRpcServer(
serializer.newInstance().deserialize(ByteBuffer.wrap(uploadBlock.metadata))
val data = new NioManagedBuffer(ByteBuffer.wrap(uploadBlock.blockData))
val blockId = BlockId(uploadBlock.blockId)
val putSucceeded = blockManager.putBlockData(blockId, data, level)
if (putSucceeded) {
blockManager.releaseLock(blockId)
}
blockManager.putBlockData(blockId, data, level)
responseContext.onSuccess(ByteBuffer.allocate(0))
}
}

View file

@ -37,7 +37,7 @@ import org.apache.spark.partial.BoundedDouble
import org.apache.spark.partial.CountEvaluator
import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.storage.{RDDBlockId, StorageLevel}
import org.apache.spark.util.{BoundedPriorityQueue, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliCellSampler, BernoulliSampler, PoissonSampler,
@ -272,7 +272,7 @@ abstract class RDD[T: ClassTag](
*/
final def iterator(split: Partition, context: TaskContext): Iterator[T] = {
if (storageLevel != StorageLevel.NONE) {
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
getOrCompute(split, context)
} else {
computeOrReadCheckpoint(split, context)
}
@ -314,6 +314,35 @@ abstract class RDD[T: ClassTag](
}
}
/**
* Gets or computes an RDD partition. Used by RDD.iterator() when an RDD is cached.
*/
private[spark] def getOrCompute(partition: Partition, context: TaskContext): Iterator[T] = {
val blockId = RDDBlockId(id, partition.index)
var readCachedBlock = true
// This method is called on executors, so we need call SparkEnv.get instead of sc.env.
SparkEnv.get.blockManager.getOrElseUpdate(blockId, storageLevel, () => {
readCachedBlock = false
computeOrReadCheckpoint(partition, context)
}) match {
case Left(blockResult) =>
if (readCachedBlock) {
val existingMetrics = context.taskMetrics().registerInputMetrics(blockResult.readMethod)
existingMetrics.incBytesReadInternal(blockResult.bytes)
new InterruptibleIterator[T](context, blockResult.data.asInstanceOf[Iterator[T]]) {
override def next(): T = {
existingMetrics.incRecordsReadInternal(1)
delegate.next()
}
}
} else {
new InterruptibleIterator(context, blockResult.data.asInstanceOf[Iterator[T]])
}
case Right(iter) =>
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[T]])
}
}
/**
* Execute a block of code in a scope such that all new RDDs created in this body will
* be part of the same scope. For more detail, see {{org.apache.spark.rdd.RDDOperationScope}}.

View file

@ -71,27 +71,13 @@ private[storage] class BlockInfo(val level: StorageLevel, val tellMaster: Boolea
_writerTask = t
checkInvariants()
}
private[this] var _writerTask: Long = 0
/**
* True if this block has been removed from the BlockManager and false otherwise.
* This field is used to communicate block deletion to blocked readers / writers (see its usage
* in [[BlockInfoManager]]).
*/
def removed: Boolean = _removed
def removed_=(r: Boolean): Unit = {
_removed = r
checkInvariants()
}
private[this] var _removed: Boolean = false
private[this] var _writerTask: Long = BlockInfo.NO_WRITER
private def checkInvariants(): Unit = {
// A block's reader count must be non-negative:
assert(_readerCount >= 0)
// A block is either locked for reading or for writing, but not for both at the same time:
assert(_readerCount == 0 || _writerTask == BlockInfo.NO_WRITER)
// If a block is removed then it is not locked:
assert(!_removed || (_readerCount == 0 && _writerTask == BlockInfo.NO_WRITER))
}
checkInvariants()
@ -195,16 +181,22 @@ private[storage] class BlockInfoManager extends Logging {
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo] = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire read lock for $blockId")
infos.get(blockId).map { info =>
while (info.writerTask != BlockInfo.NO_WRITER) {
if (blocking) wait() else return None
do {
infos.get(blockId) match {
case None => return None
case Some(info) =>
if (info.writerTask == BlockInfo.NO_WRITER) {
info.readerCount += 1
readLocksByTask(currentTaskAttemptId).add(blockId)
logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
return Some(info)
}
}
if (info.removed) return None
info.readerCount += 1
readLocksByTask(currentTaskAttemptId).add(blockId)
logTrace(s"Task $currentTaskAttemptId acquired read lock for $blockId")
info
}
if (blocking) {
wait()
}
} while (blocking)
None
}
/**
@ -226,21 +218,25 @@ private[storage] class BlockInfoManager extends Logging {
blockId: BlockId,
blocking: Boolean = true): Option[BlockInfo] = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to acquire write lock for $blockId")
infos.get(blockId).map { info =>
if (info.writerTask == currentTaskAttemptId) {
throw new IllegalStateException(
s"Task $currentTaskAttemptId has already locked $blockId for writing")
} else {
while (info.writerTask != BlockInfo.NO_WRITER || info.readerCount != 0) {
if (blocking) wait() else return None
}
if (info.removed) return None
do {
infos.get(blockId) match {
case None => return None
case Some(info) =>
if (info.writerTask == currentTaskAttemptId) {
throw new IllegalStateException(
s"Task $currentTaskAttemptId has already locked $blockId for writing")
} else if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) {
info.writerTask = currentTaskAttemptId
writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
return Some(info)
}
}
info.writerTask = currentTaskAttemptId
writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId acquired write lock for $blockId")
info
}
if (blocking) {
wait()
}
} while (blocking)
None
}
/**
@ -306,29 +302,30 @@ private[storage] class BlockInfoManager extends Logging {
}
/**
* Atomically create metadata for a block and acquire a write lock for it, if it doesn't already
* exist.
* Attempt to acquire the appropriate lock for writing a new block.
*
* This enforces the first-writer-wins semantics. If we are the first to write the block,
* then just go ahead and acquire the write lock. Otherwise, if another thread is already
* writing the block, then we wait for the write to finish before acquiring the read lock.
*
* @param blockId the block id.
* @param newBlockInfo the block info for the new block.
* @return true if the block did not already exist, false otherwise. If this returns false, then
* no new locks are acquired. If this returns true, a write lock on the new block will
* be held.
* a read lock on the existing block will be held. If this returns true, a write lock on
* the new block will be held.
*/
def lockNewBlockForWriting(
blockId: BlockId,
newBlockInfo: BlockInfo): Boolean = synchronized {
logTrace(s"Task $currentTaskAttemptId trying to put $blockId")
if (!infos.contains(blockId)) {
infos(blockId) = newBlockInfo
newBlockInfo.writerTask = currentTaskAttemptId
writeLocksByTask.addBinding(currentTaskAttemptId, blockId)
logTrace(s"Task $currentTaskAttemptId successfully locked new block $blockId")
true
} else {
logTrace(s"Task $currentTaskAttemptId did not create and lock block $blockId " +
s"because that block already exists")
false
lockForReading(blockId) match {
case Some(info) =>
// Block already exists. This could happen if another thread races with us to compute
// the same block. In this case, just keep the read lock and return.
false
case None =>
// Block does not yet exist or is removed, so we are free to acquire the write lock
infos(blockId) = newBlockInfo
lockForWriting(blockId)
true
}
}
@ -418,7 +415,6 @@ private[storage] class BlockInfoManager extends Logging {
infos.remove(blockId)
blockInfo.readerCount = 0
blockInfo.writerTask = BlockInfo.NO_WRITER
blockInfo.removed = true
}
case None =>
throw new IllegalArgumentException(
@ -434,7 +430,6 @@ private[storage] class BlockInfoManager extends Logging {
infos.valuesIterator.foreach { blockInfo =>
blockInfo.readerCount = 0
blockInfo.writerTask = BlockInfo.NO_WRITER
blockInfo.removed = true
}
infos.clear()
readLocksByTask.clear()

View file

@ -44,8 +44,7 @@ import org.apache.spark.util._
private[spark] sealed trait BlockValues
private[spark] case class ByteBufferValues(buffer: ByteBuffer) extends BlockValues
private[spark] case class IteratorValues(iterator: Iterator[Any]) extends BlockValues
private[spark] case class ArrayValues(buffer: Array[Any]) extends BlockValues
private[spark] case class IteratorValues(iterator: () => Iterator[Any]) extends BlockValues
/* Class for returning a fetched block and associated metrics. */
private[spark] class BlockResult(
@ -648,8 +647,38 @@ private[spark] class BlockManager(
}
/**
* @return true if the block was stored or false if the block was already stored or an
* error occurred.
* Retrieve the given block if it exists, otherwise call the provided `makeIterator` method
* to compute the block, persist it, and return its values.
*
* @return either a BlockResult if the block was successfully cached, or an iterator if the block
* could not be cached.
*/
def getOrElseUpdate(
blockId: BlockId,
level: StorageLevel,
makeIterator: () => Iterator[Any]): Either[BlockResult, Iterator[Any]] = {
// Initially we hold no locks on this block.
doPut(blockId, IteratorValues(makeIterator), level, keepReadLock = true) match {
case None =>
// doPut() didn't hand work back to us, so the block already existed or was successfully
// stored. Therefore, we now hold a read lock on the block.
val blockResult = get(blockId).getOrElse {
// Since we held a read lock between the doPut() and get() calls, the block should not
// have been evicted, so get() not returning the block indicates some internal error.
releaseLock(blockId)
throw new SparkException(s"get() failed for block $blockId even though we held a lock")
}
Left(blockResult)
case Some(failedPutResult) =>
// The put failed, likely because the data was too large to fit in memory and could not be
// dropped to disk. Therefore, we need to pass the input iterator back to the caller so
// that they can decide what to do with the values (e.g. process them without caching).
Right(failedPutResult.data.left.get)
}
}
/**
* @return true if the block was stored or false if an error occurred.
*/
def putIterator(
blockId: BlockId,
@ -658,7 +687,7 @@ private[spark] class BlockManager(
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(values != null, "Values is null")
doPut(blockId, IteratorValues(values), level, tellMaster, effectiveStorageLevel)
doPut(blockId, IteratorValues(() => values), level, tellMaster, effectiveStorageLevel).isEmpty
}
/**
@ -678,27 +707,10 @@ private[spark] class BlockManager(
syncWrites, writeMetrics, blockId)
}
/**
* Put a new block of values to the block manager.
*
* @return true if the block was stored or false if the block was already stored or an
* error occurred.
*/
def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(values != null, "Values is null")
doPut(blockId, ArrayValues(values), level, tellMaster, effectiveStorageLevel)
}
/**
* Put a new block of serialized bytes to the block manager.
*
* @return true if the block was stored or false if the block was already stored or an
* error occurred.
* @return true if the block was stored or false if an error occurred.
*/
def putBytes(
blockId: BlockId,
@ -707,26 +719,32 @@ private[spark] class BlockManager(
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
require(bytes != null, "Bytes is null")
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel)
doPut(blockId, ByteBufferValues(bytes), level, tellMaster, effectiveStorageLevel).isEmpty
}
/**
* Put the given block according to the given level in one of the block stores, replicating
* the values if necessary.
*
* The effective storage level refers to the level according to which the block will actually be
* handled. This allows the caller to specify an alternate behavior of doPut while preserving
* the original level specified by the user.
* If the block already exists, this method will not overwrite it.
*
* @return true if the block was stored or false if the block was already stored or an
* error occurred.
* @param effectiveStorageLevel the level according to which the block will actually be handled.
* This allows the caller to specify an alternate behavior of doPut
* while preserving the original level specified by the user.
* @param keepReadLock if true, this method will hold the read lock when it returns (even if the
* block already exists). If false, this method will hold no locks when it
* returns.
* @return `Some(PutResult)` if the block did not exist and could not be successfully cached,
* or None if the block already existed or was successfully stored (fully consuming
* the input data / input iterator).
*/
private def doPut(
blockId: BlockId,
data: BlockValues,
level: StorageLevel,
tellMaster: Boolean = true,
effectiveStorageLevel: Option[StorageLevel] = None): Boolean = {
effectiveStorageLevel: Option[StorageLevel] = None,
keepReadLock: Boolean = false): Option[PutResult] = {
require(blockId != null, "BlockId is null")
require(level != null && level.isValid, "StorageLevel is null or invalid")
@ -743,7 +761,11 @@ private[spark] class BlockManager(
newInfo
} else {
logWarning(s"Block $blockId already exists on this machine; not re-adding it")
return false
if (!keepReadLock) {
// lockNewBlockForWriting returned a read lock on the existing block, so we must free it:
releaseLock(blockId)
}
return None
}
}
@ -779,6 +801,7 @@ private[spark] class BlockManager(
}
var blockWasSuccessfullyStored = false
var result: PutResult = null
putBlockInfo.synchronized {
logTrace("Put for block %s took %s to get into synchronized block"
@ -803,11 +826,9 @@ private[spark] class BlockManager(
}
// Actually put the values
val result = data match {
result = data match {
case IteratorValues(iterator) =>
blockStore.putIterator(blockId, iterator, putLevel, returnValues)
case ArrayValues(array) =>
blockStore.putArray(blockId, array, putLevel, returnValues)
blockStore.putIterator(blockId, iterator(), putLevel, returnValues)
case ByteBufferValues(bytes) =>
bytes.rewind()
blockStore.putBytes(blockId, bytes, putLevel)
@ -834,7 +855,11 @@ private[spark] class BlockManager(
}
} finally {
if (blockWasSuccessfullyStored) {
blockInfoManager.downgradeLock(blockId)
if (keepReadLock) {
blockInfoManager.downgradeLock(blockId)
} else {
blockInfoManager.unlock(blockId)
}
} else {
blockInfoManager.removeBlock(blockId)
logWarning(s"Putting block $blockId failed")
@ -852,18 +877,20 @@ private[spark] class BlockManager(
Await.ready(replicationFuture, Duration.Inf)
}
case _ =>
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
if (blockWasSuccessfullyStored) {
val remoteStartTime = System.currentTimeMillis
// Serialize the block if not already done
if (bytesAfterPut == null) {
if (valuesAfterPut == null) {
throw new SparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
replicate(blockId, bytesAfterPut, putLevel)
logDebug("Put block %s remotely took %s"
.format(blockId, Utils.getUsedTimeMs(remoteStartTime)))
}
}
@ -877,7 +904,11 @@ private[spark] class BlockManager(
.format(blockId, Utils.getUsedTimeMs(startTimeMs)))
}
blockWasSuccessfullyStored
if (blockWasSuccessfullyStored) {
None
} else {
Some(result)
}
}
/**
@ -1033,7 +1064,7 @@ private[spark] class BlockManager(
logInfo(s"Writing block $blockId to disk")
data() match {
case Left(elements) =>
diskStore.putArray(blockId, elements, level, returnValues = false)
diskStore.putIterator(blockId, elements.toIterator, level, returnValues = false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}

View file

@ -19,8 +19,6 @@ package org.apache.spark.storage
import java.nio.ByteBuffer
import scala.collection.mutable.ArrayBuffer
import org.apache.spark.Logging
/**
@ -43,12 +41,6 @@ private[spark] abstract class BlockStore(val blockManager: BlockManager) extends
level: StorageLevel,
returnValues: Boolean): PutResult
def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult
/**
* Return the size of a block in bytes.
*/

View file

@ -58,14 +58,6 @@ private[spark] class DiskStore(blockManager: BlockManager, diskManager: DiskBloc
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
putIterator(blockId, values.toIterator, level, returnValues)
}
override def putIterator(
blockId: BlockId,
values: Iterator[Any],

View file

@ -120,22 +120,6 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
PutResult(size, data)
}
override def putArray(
blockId: BlockId,
values: Array[Any],
level: StorageLevel,
returnValues: Boolean): PutResult = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(values.asInstanceOf[AnyRef])
tryToPut(blockId, () => values, sizeEstimate, deserialized = true)
PutResult(sizeEstimate, Left(values.iterator))
} else {
val bytes = blockManager.dataSerialize(blockId, values.iterator)
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
override def putIterator(
blockId: BlockId,
values: Iterator[Any],
@ -166,7 +150,17 @@ private[spark] class MemoryStore(blockManager: BlockManager, memoryManager: Memo
unrolledValues match {
case Left(arrayValues) =>
// Values are fully unrolled in memory, so store them as an array
val res = putArray(blockId, arrayValues, level, returnValues)
val res = {
if (level.deserialized) {
val sizeEstimate = SizeEstimator.estimate(arrayValues.asInstanceOf[AnyRef])
tryToPut(blockId, () => arrayValues, sizeEstimate, deserialized = true)
PutResult(sizeEstimate, Left(arrayValues.iterator))
} else {
val bytes = blockManager.dataSerialize(blockId, arrayValues.iterator)
tryToPut(blockId, () => bytes, bytes.limit, deserialized = false)
PutResult(bytes.limit(), Right(bytes.duplicate()))
}
}
PutResult(res.size, res.data)
case Right(iteratorValues) =>
// Not enough space to unroll this block; drop to disk if applicable

View file

@ -1,97 +0,0 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark
import org.mockito.Mockito._
import org.scalatest.BeforeAndAfter
import org.scalatest.mock.MockitoSugar
import org.apache.spark.executor.{DataReadMethod, TaskMetrics}
import org.apache.spark.rdd.RDD
import org.apache.spark.storage._
// TODO: Test the CacheManager's thread-safety aspects
class CacheManagerSuite extends SparkFunSuite with LocalSparkContext with BeforeAndAfter
with MockitoSugar {
var blockManager: BlockManager = _
var cacheManager: CacheManager = _
var split: Partition = _
/** An RDD which returns the values [1, 2, 3, 4]. */
var rdd: RDD[Int] = _
var rdd2: RDD[Int] = _
var rdd3: RDD[Int] = _
before {
sc = new SparkContext("local", "test")
blockManager = mock[BlockManager]
cacheManager = new CacheManager(blockManager)
split = new Partition { override def index: Int = 0 }
rdd = new RDD[Int](sc, Nil) {
override def getPartitions: Array[Partition] = Array(split)
override val getDependencies = List[Dependency[_]]()
override def compute(split: Partition, context: TaskContext): Iterator[Int] =
Array(1, 2, 3, 4).iterator
}
rdd2 = new RDD[Int](sc, List(new OneToOneDependency(rdd))) {
override def getPartitions: Array[Partition] = firstParent[Int].partitions
override def compute(split: Partition, context: TaskContext): Iterator[Int] =
firstParent[Int].iterator(split, context)
}.cache()
rdd3 = new RDD[Int](sc, List(new OneToOneDependency(rdd2))) {
override def getPartitions: Array[Partition] = firstParent[Int].partitions
override def compute(split: Partition, context: TaskContext): Iterator[Int] =
firstParent[Int].iterator(split, context)
}.cache()
}
test("get uncached rdd") {
// Do not mock this test, because attempting to match Array[Any], which is not covariant,
// in blockManager.put is a losing battle. You have been warned.
blockManager = sc.env.blockManager
cacheManager = sc.env.cacheManager
val context = TaskContext.empty()
val computeValue = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
val getValue = blockManager.get(RDDBlockId(rdd.id, split.index))
assert(computeValue.toList === List(1, 2, 3, 4))
assert(getValue.isDefined, "Block cached from getOrCompute is not found!")
assert(getValue.get.data.toList === List(1, 2, 3, 4))
}
test("get cached rdd") {
val result = new BlockResult(Array(5, 6, 7).iterator, DataReadMethod.Memory, 12)
when(blockManager.get(RDDBlockId(0, 0))).thenReturn(Some(result))
val context = TaskContext.empty()
val value = cacheManager.getOrCompute(rdd, split, context, StorageLevel.MEMORY_ONLY)
assert(value.toList === List(5, 6, 7))
}
test("verify task metrics updated correctly") {
cacheManager = sc.env.cacheManager
val context = TaskContext.empty()
try {
TaskContext.setTaskContext(context)
sc.env.blockManager.registerTask(0)
cacheManager.getOrCompute(rdd3, split, context, StorageLevel.MEMORY_ONLY)
assert(context.taskMetrics.updatedBlockStatuses.size === 2)
} finally {
TaskContext.unset()
}
}
}

View file

@ -80,10 +80,18 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
withTaskId(1) {
assert(blockInfoManager.lockNewBlockForWriting("block", blockInfo))
assert(blockInfoManager.get("block").get eq blockInfo)
assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
assert(blockInfoManager.get("block").get eq blockInfo)
assert(blockInfo.readerCount === 0)
assert(blockInfo.writerTask === 1)
// Downgrade lock so that second call doesn't block:
blockInfoManager.downgradeLock("block")
assert(blockInfo.readerCount === 1)
assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
assert(!blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
assert(blockInfo.readerCount === 2)
assert(blockInfoManager.get("block").get eq blockInfo)
assert(blockInfo.readerCount === 2)
assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
blockInfoManager.unlock("block")
blockInfoManager.unlock("block")
assert(blockInfo.readerCount === 0)
assert(blockInfo.writerTask === BlockInfo.NO_WRITER)
@ -92,6 +100,67 @@ class BlockInfoManagerSuite extends SparkFunSuite with BeforeAndAfterEach {
assert(blockInfoManager.getNumberOfMapEntries === initialNumMapEntries + 1)
}
test("lockNewBlockForWriting blocks while write lock is held, then returns false after release") {
withTaskId(0) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
}
val lock1Future = Future {
withTaskId(1) {
blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
}
}
val lock2Future = Future {
withTaskId(2) {
blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
}
}
Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting
withTaskId(0) {
blockInfoManager.downgradeLock("block")
}
// After downgrading to a read lock, both threads should wake up and acquire the shared
// read lock.
assert(!Await.result(lock1Future, 1.seconds))
assert(!Await.result(lock2Future, 1.seconds))
assert(blockInfoManager.get("block").get.readerCount === 3)
}
test("lockNewBlockForWriting blocks while write lock is held, then returns true after removal") {
withTaskId(0) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))
}
val lock1Future = Future {
withTaskId(1) {
blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
}
}
val lock2Future = Future {
withTaskId(2) {
blockInfoManager.lockNewBlockForWriting("block", newBlockInfo())
}
}
Thread.sleep(300) // Hack to try to ensure that both future tasks are waiting
withTaskId(0) {
blockInfoManager.removeBlock("block")
}
// After removing the block, the write lock is released. Both threads should wake up but only
// one should acquire the write lock. The second thread should block until the winner of the
// write race releases its lock.
val winningFuture: Future[Boolean] =
Await.ready(Future.firstCompletedOf(Seq(lock1Future, lock2Future)), 1.seconds)
assert(winningFuture.value.get.get)
val winningTID = blockInfoManager.get("block").get.writerTask
assert(winningTID === 1 || winningTID === 2)
val losingFuture: Future[Boolean] = if (winningTID == 1) lock2Future else lock1Future
assert(!losingFuture.isCompleted)
// Once the writer releases its lock, the blocked future should wake up again and complete.
withTaskId(winningTID) {
blockInfoManager.unlock("block")
}
assert(!Await.result(losingFuture, 1.seconds))
assert(blockInfoManager.get("block").get.readerCount === 1)
}
test("read locks are reentrant") {
withTaskId(1) {
assert(blockInfoManager.lockNewBlockForWriting("block", newBlockInfo()))

View file

@ -190,7 +190,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
def putBlockAndGetLocations(blockId: String, level: StorageLevel): Set[BlockManagerId] = {
stores.head.putSingle(blockId, new Array[Byte](blockSize), level)
stores.head.releaseLock(blockId)
val locations = master.getLocations(blockId).sortBy { _.executorId }.toSet
stores.foreach { _.removeBlock(blockId) }
master.removeBlock(blockId)
@ -252,7 +251,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
// Insert a block with 2x replication and return the number of copies of the block
def replicateAndGetNumCopies(blockId: String): Int = {
store.putSingle(blockId, new Array[Byte](1000), StorageLevel.MEMORY_AND_DISK_2)
store.releaseLock(blockId)
val numLocations = master.getLocations(blockId).size
allStores.foreach { _.removeBlock(blockId) }
numLocations
@ -290,7 +288,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
def replicateAndGetNumCopies(blockId: String, replicationFactor: Int): Int = {
val storageLevel = StorageLevel(true, true, false, true, replicationFactor)
initialStores.head.putSingle(blockId, new Array[Byte](blockSize), storageLevel)
initialStores.head.releaseLock(blockId)
val numLocations = master.getLocations(blockId).size
allStores.foreach { _.removeBlock(blockId) }
numLocations
@ -358,7 +355,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
val blockId = new TestBlockId(
"block-with-" + storageLevel.description.replace(" ", "-").toLowerCase)
stores(0).putSingle(blockId, new Array[Byte](blockSize), storageLevel)
stores(0).releaseLock(blockId)
// Assert that master know two locations for the block
val blockLocations = master.getLocations(blockId).map(_.executorId).toSet
@ -397,7 +393,6 @@ class BlockManagerReplicationSuite extends SparkFunSuite with Matchers with Befo
(1 to 10).foreach {
i =>
testStore.putSingle(s"dummy-block-$i", new Array[Byte](1000), MEMORY_ONLY_SER)
testStore.releaseLock(s"dummy-block-$i")
}
(1 to 10).foreach {
i => testStore.removeBlock(s"dummy-block-$i")

View file

@ -172,9 +172,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
@ -205,7 +205,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_2)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_2)
store2.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_2)
assert(master.getLocations("a1").size === 2, "master did not report 2 locations for a1")
assert(master.getLocations("a2").size === 2, "master did not report 2 locations for a2")
@ -218,9 +218,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory and telling master only about a1 and a2
store.putSingleAndReleaseLock("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
store.putSingle("a1-to-remove", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a2-to-remove", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a3-to-remove", a3, StorageLevel.MEMORY_ONLY, tellMaster = false)
// Checking whether blocks are in memory and memory size
val memStatus = master.getMemoryStatus.head._2
@ -265,9 +265,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
// Putting a1, a2 and a3 in memory.
store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
store.putSingle("nonrddblock", a3, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = false)
eventually(timeout(1000 milliseconds), interval(10 milliseconds)) {
@ -283,8 +283,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.getLocations("nonrddblock") should have size (1)
}
store.putSingleAndReleaseLock(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 0), a1, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 1), a2, StorageLevel.MEMORY_ONLY)
master.removeRdd(0, blocking = true)
store.getSingleAndReleaseLock(rdd(0, 0)) should be (None)
master.getLocations(rdd(0, 0)) should have size 0
@ -308,10 +308,10 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// insert broadcast blocks in both the stores
Seq(driverStore, executorStore).foreach { case s =>
s.putSingleAndReleaseLock(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
s.putSingleAndReleaseLock(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
s.putSingleAndReleaseLock(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
s.putSingleAndReleaseLock(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
s.putSingle(broadcast0BlockId, a1, StorageLevel.DISK_ONLY)
s.putSingle(broadcast1BlockId, a2, StorageLevel.DISK_ONLY)
s.putSingle(broadcast2BlockId, a3, StorageLevel.DISK_ONLY)
s.putSingle(broadcast2BlockId2, a4, StorageLevel.DISK_ONLY)
}
// verify whether the blocks exist in both the stores
@ -366,7 +366,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store = makeBlockManager(2000)
val a1 = new Array[Byte](400)
store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(master.getLocations("a1").size > 0, "master was not told about a1")
@ -384,13 +384,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
assert(master.getLocations("a1").size > 0, "master was not told about a1")
master.removeExecutor(store.blockManagerId.executorId)
assert(master.getLocations("a1").size == 0, "a1 was not removed from master")
store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY)
store.waitForAsyncReregister()
assert(master.getLocations("a1").size > 0, "a1 was not reregistered with master")
@ -407,13 +407,13 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
master.removeExecutor(store.blockManagerId.executorId)
val t1 = new Thread {
override def run() {
store.putIteratorAndReleaseLock(
store.putIterator(
"a2", a2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
}
val t2 = new Thread {
override def run() {
store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
}
}
val t3 = new Thread {
@ -441,11 +441,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list2 = List(new Array[Byte](500), new Array[Byte](1000), new Array[Byte](1500))
val list1SizeEstimate = SizeEstimator.estimate(list1.iterator.toArray)
val list2SizeEstimate = SizeEstimator.estimate(list2.iterator.toArray)
store.putIteratorAndReleaseLock(
store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list2memory", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list2disk", list2.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val list1Get = store.get("list1")
assert(list1Get.isDefined, "list1 expected to be in store")
@ -486,9 +486,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store2 = makeBlockManager(8000, "executor2")
store3 = makeBlockManager(8000, "executor3")
val list1 = List(new Array[Byte](4000))
store2.putIteratorAndReleaseLock(
store2.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store3.putIteratorAndReleaseLock(
store3.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getRemoteBytes("list1").isDefined, "list1Get expected to be fetched")
store2.stop()
@ -515,15 +515,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingleAndReleaseLock("a1", a1, storageLevel)
store.putSingleAndReleaseLock("a2", a2, storageLevel)
store.putSingleAndReleaseLock("a3", a3, storageLevel)
store.putSingle("a1", a1, storageLevel)
store.putSingle("a2", a2, storageLevel)
store.putSingle("a3", a3, storageLevel)
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
// At this point a2 was gotten last, so LRU will getSingle rid of a3
store.putSingleAndReleaseLock("a1", a1, storageLevel)
store.putSingle("a1", a1, storageLevel)
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3") === None, "a3 was in store")
@ -534,9 +534,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingleAndReleaseLock(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 1), a1, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 2), a2, StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 3), a3, StorageLevel.MEMORY_ONLY)
// Even though we accessed rdd_0_3 last, it should not have replaced partitions 1 and 2
// from the same RDD
assert(store.getSingleAndReleaseLock(rdd(0, 3)) === None, "rdd_0_3 was in store")
@ -550,9 +550,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("in-memory LRU for partitions of multiple RDDs") {
store = makeBlockManager(12000)
store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 2), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(1, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// At this point rdd_1_1 should've replaced rdd_0_1
assert(store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was not in store")
assert(!store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was in store")
@ -560,8 +560,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Do a get() on rdd_0_2 so that it is the most recently used item
assert(store.getSingleAndReleaseLock(rdd(0, 2)).isDefined, "rdd_0_2 was not in store")
// Put in more partitions from RDD 0; they should replace rdd_1_1
store.putSingleAndReleaseLock(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 3), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 4), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
// when we try to add rdd_0_4.
assert(!store.memoryStore.contains(rdd(1, 1)), "rdd_1_1 was in store")
@ -576,9 +576,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](400)
val a2 = new Array[Byte](400)
val a3 = new Array[Byte](400)
store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
store.putSingleAndReleaseLock("a2", a2, StorageLevel.DISK_ONLY)
store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
store.putSingle("a2", a2, StorageLevel.DISK_ONLY)
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was in store")
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was in store")
@ -607,9 +607,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a1 = new Array[Byte](4000)
val a2 = new Array[Byte](4000)
val a3 = new Array[Byte](4000)
store.putSingleAndReleaseLock("a1", a1, storageLevel)
store.putSingleAndReleaseLock("a2", a2, storageLevel)
store.putSingleAndReleaseLock("a3", a3, storageLevel)
store.putSingle("a1", a1, storageLevel)
store.putSingle("a2", a2, storageLevel)
store.putSingle("a3", a3, storageLevel)
assert(accessMethod(store)("a2").isDefined, "a2 was not in store")
assert(accessMethod(store)("a3").isDefined, "a3 was not in store")
assert(store.memoryStore.getValues("a1").isEmpty, "a1 was in memory store")
@ -624,15 +624,15 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val a3 = new Array[Byte](4000)
val a4 = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
store.putSingleAndReleaseLock("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingleAndReleaseLock("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingleAndReleaseLock("a3", a3, StorageLevel.DISK_ONLY)
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a2", a2, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", a3, StorageLevel.DISK_ONLY)
// At this point LRU should not kick in because a3 is only on disk
assert(store.getSingleAndReleaseLock("a1").isDefined, "a1 was not in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
// Now let's add in a4, which uses both disk and memory; a1 should drop out
store.putSingleAndReleaseLock("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
store.putSingle("a4", a4, StorageLevel.MEMORY_AND_DISK_SER)
assert(store.getSingleAndReleaseLock("a1") == None, "a1 was in store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
assert(store.getSingleAndReleaseLock("a3").isDefined, "a3 was not in store")
@ -644,11 +644,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list1 = List(new Array[Byte](2000), new Array[Byte](2000))
val list2 = List(new Array[Byte](2000), new Array[Byte](2000))
val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
store.putIteratorAndReleaseLock(
store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list2", list2.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list3", list3.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
@ -658,7 +658,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
assert(store.get("list2").get.data.size === 2)
// At this point list2 was gotten last, so LRU will getSingle rid of list3
store.putIteratorAndReleaseLock(
store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
assert(store.getAndReleaseLock("list1").isDefined, "list1 was not in store")
assert(store.get("list1").get.data.size === 2)
@ -674,11 +674,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list3 = List(new Array[Byte](2000), new Array[Byte](2000))
val list4 = List(new Array[Byte](2000), new Array[Byte](2000))
// First store list1 and list2, both in memory, and list3, on disk only
store.putIteratorAndReleaseLock(
store.putIterator(
"list1", list1.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list2", list2.iterator, StorageLevel.MEMORY_ONLY_SER, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list3", list3.iterator, StorageLevel.DISK_ONLY, tellMaster = true)
val listForSizeEstimate = new ArrayBuffer[Any]
listForSizeEstimate ++= list1.iterator
@ -697,7 +697,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.getAndReleaseLock("list3").isDefined, "list3 was not in store")
assert(store.get("list3").get.data.size === 2)
// Now let's add in list4, which uses both disk and memory; list1 should drop out
store.putIteratorAndReleaseLock(
store.putIterator(
"list4", list4.iterator, StorageLevel.MEMORY_AND_DISK_SER, tellMaster = true)
assert(store.getAndReleaseLock("list1") === None, "list1 was in store")
assert(store.getAndReleaseLock("list2").isDefined, "list2 was not in store")
@ -722,9 +722,9 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("overly large block") {
store = makeBlockManager(5000)
store.putSingleAndReleaseLock("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
store.putSingle("a1", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.getSingleAndReleaseLock("a1") === None, "a1 was in store")
store.putSingleAndReleaseLock("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
store.putSingle("a2", new Array[Byte](10000), StorageLevel.MEMORY_AND_DISK)
assert(store.memoryStore.getValues("a2") === None, "a2 was in memory store")
assert(store.getSingleAndReleaseLock("a2").isDefined, "a2 was not in store")
}
@ -733,7 +733,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
try {
conf.set("spark.shuffle.compress", "true")
store = makeBlockManager(20000, "exec1")
store.putSingleAndReleaseLock(
store.putSingle(
ShuffleBlockId(0, 0, 0), new Array[Byte](1000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) <= 100,
"shuffle_0_0_0 was not compressed")
@ -742,7 +742,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.shuffle.compress", "false")
store = makeBlockManager(20000, "exec2")
store.putSingleAndReleaseLock(
store.putSingle(
ShuffleBlockId(0, 0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(ShuffleBlockId(0, 0, 0)) >= 10000,
"shuffle_0_0_0 was compressed")
@ -751,7 +751,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.broadcast.compress", "true")
store = makeBlockManager(20000, "exec3")
store.putSingleAndReleaseLock(
store.putSingle(
BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) <= 1000,
"broadcast_0 was not compressed")
@ -760,7 +760,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.broadcast.compress", "false")
store = makeBlockManager(20000, "exec4")
store.putSingleAndReleaseLock(
store.putSingle(
BroadcastBlockId(0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(BroadcastBlockId(0)) >= 10000, "broadcast_0 was compressed")
store.stop()
@ -768,21 +768,21 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
conf.set("spark.rdd.compress", "true")
store = makeBlockManager(20000, "exec5")
store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) <= 1000, "rdd_0_0 was not compressed")
store.stop()
store = null
conf.set("spark.rdd.compress", "false")
store = makeBlockManager(20000, "exec6")
store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
store.putSingle(rdd(0, 0), new Array[Byte](10000), StorageLevel.MEMORY_ONLY_SER)
assert(store.memoryStore.getSize(rdd(0, 0)) >= 10000, "rdd_0_0 was compressed")
store.stop()
store = null
// Check that any other block types are also kept uncompressed
store = makeBlockManager(20000, "exec7")
store.putSingleAndReleaseLock("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
store.putSingle("other_block", new Array[Byte](10000), StorageLevel.MEMORY_ONLY)
assert(store.memoryStore.getSize("other_block") >= 10000, "other_block was compressed")
store.stop()
store = null
@ -810,7 +810,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
class UnserializableClass
val a1 = new UnserializableClass
intercept[java.io.NotSerializableException] {
store.putSingleAndReleaseLock("a1", a1, StorageLevel.DISK_ONLY)
store.putSingle("a1", a1, StorageLevel.DISK_ONLY)
}
// Make sure get a1 doesn't hang and returns None.
@ -882,7 +882,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 1 updated block (i.e. list1)
val updatedBlocks1 = getUpdatedBlocks {
store.putIteratorAndReleaseLock(
store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks1.size === 1)
@ -891,7 +891,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 1 updated block (i.e. list2)
val updatedBlocks2 = getUpdatedBlocks {
store.putIteratorAndReleaseLock(
store.putIterator(
"list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
}
assert(updatedBlocks2.size === 1)
@ -900,7 +900,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 2 updated blocks - list1 is kicked out of memory while list3 is added
val updatedBlocks3 = getUpdatedBlocks {
store.putIteratorAndReleaseLock(
store.putIterator(
"list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks3.size === 2)
@ -915,7 +915,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// 2 updated blocks - list2 is kicked out of memory (but put on disk) while list4 is added
val updatedBlocks4 = getUpdatedBlocks {
store.putIteratorAndReleaseLock(
store.putIterator(
"list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks4.size === 2)
@ -931,7 +931,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// No updated blocks - list5 is too big to fit in store and nothing is kicked out
val updatedBlocks5 = getUpdatedBlocks {
store.putIteratorAndReleaseLock(
store.putIterator(
"list5", bigList.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
assert(updatedBlocks5.size === 0)
@ -956,11 +956,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](2000))
// Tell master. By LRU, only list2 and list3 remains.
store.putIteratorAndReleaseLock(
store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list3", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
@ -975,11 +975,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(store.master.getBlockStatus("list3", askSlaves = true).size === 1)
// This time don't tell master and see what happens. By LRU, only list5 and list6 remains.
store.putIteratorAndReleaseLock(
store.putIterator(
"list4", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
store.putIteratorAndReleaseLock(
store.putIterator(
"list5", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
store.putIteratorAndReleaseLock(
store.putIterator(
"list6", list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = false)
// getLocations should return nothing because the master is not informed
@ -1001,11 +1001,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val list = List.fill(2)(new Array[Byte](100))
// insert some blocks
store.putIteratorAndReleaseLock(
store.putIterator(
"list1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"list3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
// getLocations and getBlockStatus should yield the same locations
@ -1015,11 +1015,11 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
=== 1)
// insert some more blocks
store.putIteratorAndReleaseLock(
store.putIterator(
"newlist1", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = true)
store.putIteratorAndReleaseLock(
store.putIterator(
"newlist2", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
store.putIteratorAndReleaseLock(
store.putIterator(
"newlist3", list.iterator, StorageLevel.MEMORY_AND_DISK, tellMaster = false)
// getLocations and getBlockStatus should yield the same locations
@ -1030,7 +1030,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
val blockIds = Seq(RDDBlockId(1, 0), RDDBlockId(1, 1), RDDBlockId(2, 0))
blockIds.foreach { blockId =>
store.putIteratorAndReleaseLock(
store.putIterator(
blockId, list.iterator, StorageLevel.MEMORY_ONLY, tellMaster = true)
}
val matchedBlockIds = store.master.getMatchingBlockIds(_ match {
@ -1042,12 +1042,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
test("SPARK-1194 regression: fix the same-RDD rule for cache replacement") {
store = makeBlockManager(12000)
store.putSingleAndReleaseLock(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingleAndReleaseLock(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(1, 0), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// Access rdd_1_0 to ensure it's not least recently used.
assert(store.getSingleAndReleaseLock(rdd(1, 0)).isDefined, "rdd_1_0 was not in store")
// According to the same-RDD rule, rdd_1_0 should be replaced here.
store.putSingleAndReleaseLock(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
store.putSingle(rdd(0, 1), new Array[Byte](4000), StorageLevel.MEMORY_ONLY)
// rdd_1_0 should have been replaced, even it's not least recently used.
assert(store.memoryStore.contains(rdd(0, 0)), "rdd_0_0 was not in store")
assert(store.memoryStore.contains(rdd(0, 1)), "rdd_0_1 was not in store")
@ -1126,8 +1126,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
memoryStore.releasePendingUnrollMemoryForThisTask()
// Unroll with not enough space. This should succeed after kicking out someBlock1.
store.putIteratorAndReleaseLock("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
store.putIteratorAndReleaseLock("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
store.putIterator("someBlock1", smallList.iterator, StorageLevel.MEMORY_ONLY)
store.putIterator("someBlock2", smallList.iterator, StorageLevel.MEMORY_ONLY)
unrollResult = memoryStore.unrollSafely("unroll", smallList.iterator)
verifyUnroll(smallList.iterator, unrollResult, shouldBeArray = true)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
@ -1138,7 +1138,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// Unroll huge block with not enough space. Even after ensuring free space of 12000 * 0.4 =
// 4800 bytes, there is still not enough room to unroll this block. This returns an iterator.
// In the mean time, however, we kicked out someBlock2 before giving up.
store.putIteratorAndReleaseLock("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
store.putIterator("someBlock3", smallList.iterator, StorageLevel.MEMORY_ONLY)
unrollResult = memoryStore.unrollSafely("unroll", bigList.iterator)
verifyUnroll(bigList.iterator, unrollResult, shouldBeArray = false)
assert(memoryStore.currentUnrollMemoryForThisTask > 0) // we returned an iterator
@ -1170,8 +1170,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
// would not know how to drop them from memory later.
memoryStore.remove("b1")
memoryStore.remove("b2")
store.putIteratorAndReleaseLock("b1", smallIterator, memOnly)
store.putIteratorAndReleaseLock("b2", smallIterator, memOnly)
store.putIterator("b1", smallIterator, memOnly)
store.putIterator("b2", smallIterator, memOnly)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
val result3 = memoryStore.putIterator("b3", smallIterator, memOnly, returnValues = true)
@ -1182,7 +1182,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(memoryStore.contains("b3"))
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
memoryStore.remove("b3")
store.putIteratorAndReleaseLock("b3", smallIterator, memOnly)
store.putIterator("b3", smallIterator, memOnly)
// Unroll huge block with not enough space. This should fail and kick out b2 in the process.
val result4 = memoryStore.putIterator("b4", bigIterator, memOnly, returnValues = true)
@ -1209,8 +1209,8 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
def bigIterator: Iterator[Any] = bigList.iterator.asInstanceOf[Iterator[Any]]
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
store.putIteratorAndReleaseLock("b1", smallIterator, memAndDisk)
store.putIteratorAndReleaseLock("b2", smallIterator, memAndDisk)
store.putIterator("b1", smallIterator, memAndDisk)
store.putIterator("b2", smallIterator, memAndDisk)
// Unroll with not enough space. This should succeed but kick out b1 in the process.
// Memory store should contain b2 and b3, while disk store should contain only b1
@ -1223,7 +1223,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
assert(!diskStore.contains("b2"))
assert(!diskStore.contains("b3"))
memoryStore.remove("b3")
store.putIteratorAndReleaseLock("b3", smallIterator, StorageLevel.MEMORY_ONLY)
store.putIterator("b3", smallIterator, StorageLevel.MEMORY_ONLY)
assert(memoryStore.currentUnrollMemoryForThisTask === 0)
// Unroll huge block with not enough space. This should fail and drop the new block to disk
@ -1310,12 +1310,12 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store = makeBlockManager(12000)
val arr = new Array[Byte](4000)
// First store a1 and a2, both in memory, and a3, on disk only
store.putSingleAndReleaseLock("a1", arr, StorageLevel.MEMORY_ONLY_SER)
store.putSingleAndReleaseLock("a2", arr, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a1", arr, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a2", arr, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
// This put should fail because both a1 and a2 should be read-locked:
store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a3").isEmpty, "a3 was in store")
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a2").isDefined, "a2 was not in store")
@ -1324,7 +1324,7 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with BeforeAndAfterE
store.releaseLock("a2")
// Block a1 is the least-recently accessed, so an LRU eviction policy would evict it before
// block a2. However, a1 is still pinned so this put of a3 should evict a2 instead:
store.putSingleAndReleaseLock("a3", arr, StorageLevel.MEMORY_ONLY_SER)
store.putSingle("a3", arr, StorageLevel.MEMORY_ONLY_SER)
assert(store.getSingle("a2").isEmpty, "a2 was in store")
assert(store.getSingle("a1").isDefined, "a1 was not in store")
assert(store.getSingle("a3").isDefined, "a3 was not in store")
@ -1335,41 +1335,6 @@ private object BlockManagerSuite {
private implicit class BlockManagerTestUtils(store: BlockManager) {
def putSingleAndReleaseLock(
block: BlockId,
value: Any,
storageLevel: StorageLevel,
tellMaster: Boolean): Unit = {
if (store.putSingle(block, value, storageLevel, tellMaster)) {
store.releaseLock(block)
}
}
def putSingleAndReleaseLock(block: BlockId, value: Any, storageLevel: StorageLevel): Unit = {
if (store.putSingle(block, value, storageLevel)) {
store.releaseLock(block)
}
}
def putIteratorAndReleaseLock(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel): Unit = {
if (store.putIterator(blockId, values, level)) {
store.releaseLock(blockId)
}
}
def putIteratorAndReleaseLock(
blockId: BlockId,
values: Iterator[Any],
level: StorageLevel,
tellMaster: Boolean): Unit = {
if (store.putIterator(blockId, values, level, tellMaster)) {
store.releaseLock(blockId)
}
}
def dropFromMemoryIfExists(
blockId: BlockId,
data: () => Either[Array[Any], ByteBuffer]): Unit = {

View file

@ -91,8 +91,6 @@ private[streaming] class BlockManagerBasedBlockHandler(
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
} else {
blockManager.releaseLock(blockId)
}
BlockManagerBasedStoreResult(blockId, numRecords)
}
@ -191,8 +189,6 @@ private[streaming] class WriteAheadLogBasedBlockHandler(
if (!putSucceeded) {
throw new SparkException(
s"Could not store $blockId to block manager with storage level $storageLevel")
} else {
blockManager.releaseLock(blockId)
}
}