Fixed cache replacement behavior of BlockManager:
- Partitions that get dropped to disk will now be loaded back into RAM after they're accessed again - Same-RDD rule for cache replacement is now implemented (don't drop partitions from an RDD to make room for other partitions from itself) - Items stored as MEMORY_AND_DISK go into memory only first, instead of being eagerly written out to disk - MemoryStore.ensureFreeSpace is called within a lock on the writer thread to prevent race conditions (this can still be optimized to allow multiple concurrent calls to it but it's a start) - MemoryStore does not accept blocks larger than its limit
This commit is contained in:
parent
6112b1a83c
commit
6098f7e87a
|
@ -59,15 +59,31 @@ class BlockLocker(numLockers: Int) {
|
||||||
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
|
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
|
||||||
extends Logging {
|
extends Logging {
|
||||||
|
|
||||||
case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
|
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) {
|
||||||
|
def waitForReady() {
|
||||||
|
if (pending) {
|
||||||
|
synchronized {
|
||||||
|
while (pending) this.wait()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def markReady() {
|
||||||
|
pending = false
|
||||||
|
synchronized {
|
||||||
|
this.notifyAll()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private val NUM_LOCKS = 337
|
private val NUM_LOCKS = 337
|
||||||
private val locker = new BlockLocker(NUM_LOCKS)
|
private val locker = new BlockLocker(NUM_LOCKS)
|
||||||
|
|
||||||
private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
|
private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
|
||||||
|
|
||||||
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
|
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
|
||||||
private[storage] val diskStore: BlockStore = new DiskStore(this,
|
private[storage] val diskStore: BlockStore =
|
||||||
System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
|
new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
|
||||||
|
|
||||||
val connectionManager = new ConnectionManager(0)
|
val connectionManager = new ConnectionManager(0)
|
||||||
implicit val futureExecContext = connectionManager.futureExecContext
|
implicit val futureExecContext = connectionManager.futureExecContext
|
||||||
|
@ -79,7 +95,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
var cacheTracker: CacheTracker = null
|
var cacheTracker: CacheTracker = null
|
||||||
|
|
||||||
val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
|
val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
|
||||||
|
|
||||||
val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
|
val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
|
||||||
|
|
||||||
initialize()
|
initialize()
|
||||||
|
@ -110,45 +125,32 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Change the storage level for a local block in the block info meta data, and
|
* Tell the master about the current storage status of a block. This will send a heartbeat
|
||||||
* tell the master if necessary. Note that this is only a meta data change and
|
* message reflecting the current status, *not* the desired storage level in its block info.
|
||||||
* does NOT actually change the storage of the block. If the new level is
|
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
|
||||||
* invalid, then block info (if exists) will be silently removed.
|
|
||||||
*/
|
*/
|
||||||
private[spark] def setLevelAndTellMaster(
|
def reportBlockStatus(blockId: String) {
|
||||||
blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
|
locker.getLock(blockId).synchronized {
|
||||||
|
val curLevel = blockInfo.get(blockId) match {
|
||||||
if (level == null) {
|
case null =>
|
||||||
throw new IllegalArgumentException("Storage level is null")
|
StorageLevel.NONE
|
||||||
}
|
case info =>
|
||||||
|
info.level match {
|
||||||
// If there was earlier info about the block, then use earlier tellMaster
|
case null =>
|
||||||
val oldInfo = blockInfo.get(blockId)
|
StorageLevel.NONE
|
||||||
val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
|
case level =>
|
||||||
if (oldInfo != null && oldInfo.tellMaster != tellMaster) {
|
val inMem = level.useMemory && memoryStore.contains(blockId)
|
||||||
logWarning("Ignoring tellMaster setting as it is different from earlier setting")
|
val onDisk = level.useDisk && diskStore.contains(blockId)
|
||||||
}
|
new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
|
||||||
|
}
|
||||||
// If level is valid, store the block info, else remove the block info
|
}
|
||||||
if (level.isValid) {
|
|
||||||
blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
|
|
||||||
logDebug("Info for block " + blockId + " updated with new level as " + level)
|
|
||||||
} else {
|
|
||||||
blockInfo.remove(blockId)
|
|
||||||
logDebug("Info for block " + blockId + " removed as new level is null or invalid")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Tell master if necessary
|
|
||||||
if (newTellMaster) {
|
|
||||||
master.mustHeartBeat(HeartBeat(
|
master.mustHeartBeat(HeartBeat(
|
||||||
blockManagerId,
|
blockManagerId,
|
||||||
blockId,
|
blockId,
|
||||||
level,
|
curLevel,
|
||||||
if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0,
|
if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
|
||||||
if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0))
|
if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
|
||||||
logDebug("Told master about block " + blockId)
|
logDebug("Told master about block " + blockId)
|
||||||
} else {
|
|
||||||
logDebug("Did not tell master about block " + blockId)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -180,36 +182,59 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
def getLocal(blockId: String): Option[Iterator[Any]] = {
|
def getLocal(blockId: String): Option[Iterator[Any]] = {
|
||||||
logDebug("Getting local block " + blockId)
|
logDebug("Getting local block " + blockId)
|
||||||
locker.getLock(blockId).synchronized {
|
locker.getLock(blockId).synchronized {
|
||||||
// Check storage level of block
|
val info = blockInfo.get(blockId)
|
||||||
val level = getLevel(blockId)
|
if (info != null) {
|
||||||
if (level != null) {
|
info.waitForReady() // In case the block is still being put() by another thread
|
||||||
logDebug("Level for block " + blockId + " is " + level + " on local machine")
|
val level = info.level
|
||||||
|
logDebug("Level for block " + blockId + " is " + level)
|
||||||
|
|
||||||
// Look for the block in memory
|
// Look for the block in memory
|
||||||
if (level.useMemory) {
|
if (level.useMemory) {
|
||||||
logDebug("Getting block " + blockId + " from memory")
|
logDebug("Getting block " + blockId + " from memory")
|
||||||
memoryStore.getValues(blockId) match {
|
memoryStore.getValues(blockId) match {
|
||||||
case Some(iterator) => {
|
case Some(iterator) =>
|
||||||
logDebug("Block " + blockId + " found in memory")
|
|
||||||
return Some(iterator)
|
return Some(iterator)
|
||||||
}
|
case None =>
|
||||||
case None => {
|
|
||||||
logDebug("Block " + blockId + " not found in memory")
|
logDebug("Block " + blockId + " not found in memory")
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Look for block on disk
|
// Look for block on disk, potentially loading it back into memory if required
|
||||||
if (level.useDisk) {
|
if (level.useDisk) {
|
||||||
logDebug("Getting block " + blockId + " from disk")
|
logDebug("Getting block " + blockId + " from disk")
|
||||||
diskStore.getValues(blockId) match {
|
if (level.useMemory && level.deserialized) {
|
||||||
case Some(iterator) => {
|
diskStore.getValues(blockId) match {
|
||||||
logDebug("Block " + blockId + " found in disk")
|
case Some(iterator) =>
|
||||||
return Some(iterator)
|
// Put the block back in memory before returning it
|
||||||
|
memoryStore.putValues(blockId, iterator, level, true) match {
|
||||||
|
case Left(iterator2) =>
|
||||||
|
return Some(iterator2)
|
||||||
|
case _ =>
|
||||||
|
throw new Exception("Memory store did not return back an iterator")
|
||||||
|
}
|
||||||
|
case None =>
|
||||||
|
throw new Exception("Block " + blockId + " not found on disk, though it should be")
|
||||||
}
|
}
|
||||||
case None => {
|
} else if (level.useMemory && !level.deserialized) {
|
||||||
throw new Exception("Block " + blockId + " not found on disk, though it should be")
|
// Read it as a byte buffer into memory first, then return it
|
||||||
return None
|
diskStore.getBytes(blockId) match {
|
||||||
|
case Some(bytes) =>
|
||||||
|
// Put a copy of the block back in memory before returning it. Note that we can't
|
||||||
|
// put the ByteBuffer returned by the disk store as that's a memory-mapped file.
|
||||||
|
val copyForMemory = ByteBuffer.allocate(bytes.limit)
|
||||||
|
copyForMemory.put(bytes)
|
||||||
|
memoryStore.putBytes(blockId, copyForMemory, level)
|
||||||
|
bytes.rewind()
|
||||||
|
return Some(dataDeserialize(bytes))
|
||||||
|
case None =>
|
||||||
|
throw new Exception("Block " + blockId + " not found on disk, though it should be")
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
diskStore.getValues(blockId) match {
|
||||||
|
case Some(iterator) =>
|
||||||
|
return Some(iterator)
|
||||||
|
case None =>
|
||||||
|
throw new Exception("Block " + blockId + " not found on disk, though it should be")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -224,39 +249,46 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
* Get block from the local block manager as serialized bytes.
|
* Get block from the local block manager as serialized bytes.
|
||||||
*/
|
*/
|
||||||
def getLocalBytes(blockId: String): Option[ByteBuffer] = {
|
def getLocalBytes(blockId: String): Option[ByteBuffer] = {
|
||||||
|
// TODO: This whole thing is very similar to getLocal; we need to refactor it somehow
|
||||||
logDebug("Getting local block " + blockId + " as bytes")
|
logDebug("Getting local block " + blockId + " as bytes")
|
||||||
locker.getLock(blockId).synchronized {
|
locker.getLock(blockId).synchronized {
|
||||||
// Check storage level of block
|
val info = blockInfo.get(blockId)
|
||||||
val level = getLevel(blockId)
|
if (info != null) {
|
||||||
if (level != null) {
|
info.waitForReady() // In case the block is still being put() by another thread
|
||||||
logDebug("Level for block " + blockId + " is " + level + " on local machine")
|
val level = info.level
|
||||||
|
logDebug("Level for block " + blockId + " is " + level)
|
||||||
|
|
||||||
// Look for the block in memory
|
// Look for the block in memory
|
||||||
if (level.useMemory) {
|
if (level.useMemory) {
|
||||||
logDebug("Getting block " + blockId + " from memory")
|
logDebug("Getting block " + blockId + " from memory")
|
||||||
memoryStore.getBytes(blockId) match {
|
memoryStore.getBytes(blockId) match {
|
||||||
case Some(bytes) => {
|
case Some(bytes) =>
|
||||||
logDebug("Block " + blockId + " found in memory")
|
|
||||||
return Some(bytes)
|
return Some(bytes)
|
||||||
}
|
case None =>
|
||||||
case None => {
|
|
||||||
logDebug("Block " + blockId + " not found in memory")
|
logDebug("Block " + blockId + " not found in memory")
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Look for block on disk
|
// Look for block on disk
|
||||||
if (level.useDisk) {
|
if (level.useDisk) {
|
||||||
logDebug("Getting block " + blockId + " from disk")
|
// Read it as a byte buffer into memory first, then return it
|
||||||
diskStore.getBytes(blockId) match {
|
diskStore.getBytes(blockId) match {
|
||||||
case Some(bytes) => {
|
case Some(bytes) =>
|
||||||
logDebug("Block " + blockId + " found in disk")
|
if (level.useMemory) {
|
||||||
|
if (level.deserialized) {
|
||||||
|
memoryStore.putBytes(blockId, bytes, level)
|
||||||
|
} else {
|
||||||
|
// The memory store will hang onto the ByteBuffer, so give it a copy instead of
|
||||||
|
// the memory-mapped file buffer we got from the disk store
|
||||||
|
val copyForMemory = ByteBuffer.allocate(bytes.limit)
|
||||||
|
copyForMemory.put(bytes)
|
||||||
|
memoryStore.putBytes(blockId, copyForMemory, level)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
bytes.rewind()
|
||||||
return Some(bytes)
|
return Some(bytes)
|
||||||
}
|
case None =>
|
||||||
case None => {
|
|
||||||
throw new Exception("Block " + blockId + " not found on disk, though it should be")
|
throw new Exception("Block " + blockId + " not found on disk, though it should be")
|
||||||
return None
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
@ -431,6 +463,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
throw new IllegalArgumentException("Storage level is null or invalid")
|
throw new IllegalArgumentException("Storage level is null or invalid")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (blockInfo.containsKey(blockId)) {
|
||||||
|
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remember the block's storage level so that we can correctly drop it to disk if it needs
|
||||||
|
// to be dropped right after it got put into memory. Note, however, that other threads will
|
||||||
|
// not be able to get() this block until we call markReady on its BlockInfo.
|
||||||
|
val myInfo = new BlockInfo(level, tellMaster)
|
||||||
|
blockInfo.put(blockId, myInfo)
|
||||||
|
|
||||||
val startTimeMs = System.currentTimeMillis
|
val startTimeMs = System.currentTimeMillis
|
||||||
var bytes: ByteBuffer = null
|
var bytes: ByteBuffer = null
|
||||||
|
|
||||||
|
@ -444,32 +487,15 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
|
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
|
||||||
+ " to get into synchronized block")
|
+ " to get into synchronized block")
|
||||||
|
|
||||||
// Check and warn if block with same id already exists
|
if (level.useMemory) {
|
||||||
if (getLevel(blockId) != null) {
|
// Save it just to memory first, even if it also has useDisk set to true; we will later
|
||||||
logWarning("Block " + blockId + " already exists in local machine")
|
// drop it to disk if the memory store can't hold it.
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (level.useMemory && level.useDisk) {
|
|
||||||
// If saving to both memory and disk, then serialize only once
|
|
||||||
memoryStore.putValues(blockId, values, level, true) match {
|
|
||||||
case Left(newValues) =>
|
|
||||||
diskStore.putValues(blockId, newValues, level, true) match {
|
|
||||||
case Right(newBytes) => bytes = newBytes
|
|
||||||
case _ => throw new Exception("Unexpected return value")
|
|
||||||
}
|
|
||||||
case Right(newBytes) =>
|
|
||||||
bytes = newBytes
|
|
||||||
diskStore.putBytes(blockId, newBytes, level)
|
|
||||||
}
|
|
||||||
} else if (level.useMemory) {
|
|
||||||
// If only save to memory
|
|
||||||
memoryStore.putValues(blockId, values, level, true) match {
|
memoryStore.putValues(blockId, values, level, true) match {
|
||||||
case Right(newBytes) => bytes = newBytes
|
case Right(newBytes) => bytes = newBytes
|
||||||
case Left(newIterator) => valuesAfterPut = newIterator
|
case Left(newIterator) => valuesAfterPut = newIterator
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// If only save to disk
|
// Save directly to disk.
|
||||||
val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them
|
val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them
|
||||||
diskStore.putValues(blockId, values, level, askForBytes) match {
|
diskStore.putValues(blockId, values, level, askForBytes) match {
|
||||||
case Right(newBytes) => bytes = newBytes
|
case Right(newBytes) => bytes = newBytes
|
||||||
|
@ -477,8 +503,12 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the storage level
|
// Now that the block is in either the memory or disk store, let other threads read it,
|
||||||
setLevelAndTellMaster(blockId, level, tellMaster)
|
// and tell the master about it.
|
||||||
|
myInfo.markReady()
|
||||||
|
if (tellMaster) {
|
||||||
|
reportBlockStatus(blockId)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
|
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
|
||||||
|
|
||||||
|
@ -521,6 +551,17 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
throw new IllegalArgumentException("Storage level is null or invalid")
|
throw new IllegalArgumentException("Storage level is null or invalid")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (blockInfo.containsKey(blockId)) {
|
||||||
|
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remember the block's storage level so that we can correctly drop it to disk if it needs
|
||||||
|
// to be dropped right after it got put into memory. Note, however, that other threads will
|
||||||
|
// not be able to get() this block until we call markReady on its BlockInfo.
|
||||||
|
val myInfo = new BlockInfo(level, tellMaster)
|
||||||
|
blockInfo.put(blockId, myInfo)
|
||||||
|
|
||||||
val startTimeMs = System.currentTimeMillis
|
val startTimeMs = System.currentTimeMillis
|
||||||
|
|
||||||
// Initiate the replication before storing it locally. This is faster as
|
// Initiate the replication before storing it locally. This is faster as
|
||||||
|
@ -537,22 +578,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
locker.getLock(blockId).synchronized {
|
locker.getLock(blockId).synchronized {
|
||||||
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
|
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
|
||||||
+ " to get into synchronized block")
|
+ " to get into synchronized block")
|
||||||
if (getLevel(blockId) != null) {
|
|
||||||
logWarning("Block " + blockId + " already exists")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (level.useMemory) {
|
if (level.useMemory) {
|
||||||
|
// Store it only in memory at first, even if useDisk is also set to true
|
||||||
bytes.rewind()
|
bytes.rewind()
|
||||||
memoryStore.putBytes(blockId, bytes, level)
|
memoryStore.putBytes(blockId, bytes, level)
|
||||||
}
|
} else {
|
||||||
if (level.useDisk) {
|
|
||||||
bytes.rewind()
|
bytes.rewind()
|
||||||
diskStore.putBytes(blockId, bytes, level)
|
diskStore.putBytes(blockId, bytes, level)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Store the storage level
|
// Now that the block is in either the memory or disk store, let other threads read it,
|
||||||
setLevelAndTellMaster(blockId, level, tellMaster)
|
// and tell the master about it.
|
||||||
|
myInfo.markReady()
|
||||||
|
if (tellMaster) {
|
||||||
|
reportBlockStatus(blockId)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This code will be removed when CacheTracker is gone.
|
// TODO: This code will be removed when CacheTracker is gone.
|
||||||
|
@ -604,11 +645,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
|
|
||||||
// TODO: This code will be removed when CacheTracker is gone.
|
// TODO: This code will be removed when CacheTracker is gone.
|
||||||
private def notifyTheCacheTracker(key: String) {
|
private def notifyTheCacheTracker(key: String) {
|
||||||
val rddInfo = key.split("_")
|
if (cacheTracker != null) {
|
||||||
val rddId: Int = rddInfo(1).toInt
|
val rddInfo = key.split("_")
|
||||||
val splitIndex: Int = rddInfo(2).toInt
|
val rddId: Int = rddInfo(1).toInt
|
||||||
val host = System.getProperty("spark.hostname", Utils.localHostName())
|
val partition: Int = rddInfo(2).toInt
|
||||||
cacheTracker.notifyTheCacheTrackerFromBlockManager(spark.AddedToCache(rddId, splitIndex, host))
|
val host = System.getProperty("spark.hostname", Utils.localHostName())
|
||||||
|
cacheTracker.notifyTheCacheTrackerFromBlockManager(spark.AddedToCache(rddId, partition, host))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -626,22 +669,31 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Drop block from memory (called when memory store has reached it limit)
|
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory
|
||||||
|
* store reaches its limit and needs to free up space.
|
||||||
*/
|
*/
|
||||||
def dropFromMemory(blockId: String) {
|
def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) {
|
||||||
|
logInfo("Dropping block " + blockId + " from memory")
|
||||||
locker.getLock(blockId).synchronized {
|
locker.getLock(blockId).synchronized {
|
||||||
val level = getLevel(blockId)
|
val info = blockInfo.get(blockId)
|
||||||
if (level == null) {
|
val level = info.level
|
||||||
logWarning("Block " + blockId + " cannot be removed from memory as it does not exist")
|
if (level.useDisk && !diskStore.contains(blockId)) {
|
||||||
return
|
logInfo("Writing block " + blockId + " to disk")
|
||||||
}
|
data match {
|
||||||
if (!level.useMemory) {
|
case Left(iterator) =>
|
||||||
logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory")
|
diskStore.putValues(blockId, iterator, level, false)
|
||||||
return
|
case Right(bytes) =>
|
||||||
|
diskStore.putBytes(blockId, bytes, level)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
memoryStore.remove(blockId)
|
memoryStore.remove(blockId)
|
||||||
val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
|
if (info.tellMaster) {
|
||||||
setLevelAndTellMaster(blockId, newLevel)
|
reportBlockStatus(blockId)
|
||||||
|
}
|
||||||
|
if (!level.useDisk) {
|
||||||
|
// The block is completely gone from this node; forget it so we can put() it again later.
|
||||||
|
blockInfo.remove(blockId)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -31,5 +31,7 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
|
||||||
|
|
||||||
def remove(blockId: String)
|
def remove(blockId: String)
|
||||||
|
|
||||||
|
def contains(blockId: String): Boolean
|
||||||
|
|
||||||
def clear() { }
|
def clear() { }
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,7 +26,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
||||||
addShutdownHook()
|
addShutdownHook()
|
||||||
|
|
||||||
override def getSize(blockId: String): Long = {
|
override def getSize(blockId: String): Long = {
|
||||||
getFile(blockId).length
|
getFile(blockId).length()
|
||||||
}
|
}
|
||||||
|
|
||||||
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
|
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
|
||||||
|
@ -93,6 +93,10 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
override def contains(blockId: String): Boolean = {
|
||||||
|
getFile(blockId).exists()
|
||||||
|
}
|
||||||
|
|
||||||
private def createFile(blockId: String): File = {
|
private def createFile(blockId: String): File = {
|
||||||
val file = getFile(blockId)
|
val file = getFile(blockId)
|
||||||
if (file.exists()) {
|
if (file.exists()) {
|
||||||
|
|
|
@ -18,29 +18,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
||||||
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
|
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
|
||||||
private var currentMemory = 0L
|
private var currentMemory = 0L
|
||||||
|
|
||||||
//private val blockDropper = Executors.newSingleThreadExecutor()
|
|
||||||
private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
|
|
||||||
private val blockDropper = new Thread("memory store - block dropper") {
|
|
||||||
override def run() {
|
|
||||||
try {
|
|
||||||
while (true) {
|
|
||||||
val blockId = blocksToDrop.take()
|
|
||||||
logDebug("Block " + blockId + " ready to be dropped")
|
|
||||||
blockManager.dropFromMemory(blockId)
|
|
||||||
}
|
|
||||||
} catch {
|
|
||||||
case ie: InterruptedException =>
|
|
||||||
logInfo("Shutting down block dropper")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
blockDropper.start()
|
|
||||||
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
|
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
|
||||||
|
|
||||||
def freeMemory: Long = maxMemory - currentMemory
|
def freeMemory: Long = maxMemory - currentMemory
|
||||||
|
|
||||||
override def getSize(blockId: String): Long = {
|
override def getSize(blockId: String): Long = {
|
||||||
entries.synchronized {
|
synchronized {
|
||||||
entries.get(blockId).size
|
entries.get(blockId).size
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -52,19 +35,12 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
||||||
val elements = new ArrayBuffer[Any]
|
val elements = new ArrayBuffer[Any]
|
||||||
elements ++= values
|
elements ++= values
|
||||||
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
|
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
|
||||||
ensureFreeSpace(sizeEstimate)
|
tryToPut(blockId, elements, sizeEstimate, true)
|
||||||
val entry = new Entry(elements, sizeEstimate, true)
|
|
||||||
entries.synchronized { entries.put(blockId, entry) }
|
|
||||||
currentMemory += sizeEstimate
|
|
||||||
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
|
|
||||||
blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
|
|
||||||
} else {
|
} else {
|
||||||
val entry = new Entry(bytes, bytes.limit, false)
|
val entry = new Entry(bytes, bytes.limit, false)
|
||||||
ensureFreeSpace(bytes.limit)
|
ensureFreeSpace(blockId, bytes.limit)
|
||||||
entries.synchronized { entries.put(blockId, entry) }
|
synchronized { entries.put(blockId, entry) }
|
||||||
currentMemory += bytes.limit
|
tryToPut(blockId, bytes, bytes.limit, false)
|
||||||
logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
|
|
||||||
blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,27 +55,17 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
||||||
val elements = new ArrayBuffer[Any]
|
val elements = new ArrayBuffer[Any]
|
||||||
elements ++= values
|
elements ++= values
|
||||||
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
|
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
|
||||||
ensureFreeSpace(sizeEstimate)
|
tryToPut(blockId, elements, sizeEstimate, true)
|
||||||
val entry = new Entry(elements, sizeEstimate, true)
|
|
||||||
entries.synchronized { entries.put(blockId, entry) }
|
|
||||||
currentMemory += sizeEstimate
|
|
||||||
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
|
|
||||||
blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
|
|
||||||
Left(elements.iterator)
|
Left(elements.iterator)
|
||||||
} else {
|
} else {
|
||||||
val bytes = blockManager.dataSerialize(values)
|
val bytes = blockManager.dataSerialize(values)
|
||||||
ensureFreeSpace(bytes.limit)
|
tryToPut(blockId, bytes, bytes.limit, false)
|
||||||
val entry = new Entry(bytes, bytes.limit, false)
|
|
||||||
entries.synchronized { entries.put(blockId, entry) }
|
|
||||||
currentMemory += bytes.limit
|
|
||||||
logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
|
|
||||||
blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
|
|
||||||
Right(bytes)
|
Right(bytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def getBytes(blockId: String): Option[ByteBuffer] = {
|
override def getBytes(blockId: String): Option[ByteBuffer] = {
|
||||||
val entry = entries.synchronized {
|
val entry = synchronized {
|
||||||
entries.get(blockId)
|
entries.get(blockId)
|
||||||
}
|
}
|
||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
|
@ -112,7 +78,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def getValues(blockId: String): Option[Iterator[Any]] = {
|
override def getValues(blockId: String): Option[Iterator[Any]] = {
|
||||||
val entry = entries.synchronized {
|
val entry = synchronized {
|
||||||
entries.get(blockId)
|
entries.get(blockId)
|
||||||
}
|
}
|
||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
|
@ -126,7 +92,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def remove(blockId: String) {
|
override def remove(blockId: String) {
|
||||||
entries.synchronized {
|
synchronized {
|
||||||
val entry = entries.get(blockId)
|
val entry = entries.get(blockId)
|
||||||
if (entry != null) {
|
if (entry != null) {
|
||||||
entries.remove(blockId)
|
entries.remove(blockId)
|
||||||
|
@ -134,54 +100,118 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
|
||||||
logInfo("Block %s of size %d dropped from memory (free %d)".format(
|
logInfo("Block %s of size %d dropped from memory (free %d)".format(
|
||||||
blockId, entry.size, freeMemory))
|
blockId, entry.size, freeMemory))
|
||||||
} else {
|
} else {
|
||||||
logWarning("Block " + blockId + " could not be removed as it doesnt exist")
|
logWarning("Block " + blockId + " could not be removed as it does not exist")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def clear() {
|
override def clear() {
|
||||||
entries.synchronized {
|
synchronized {
|
||||||
entries.clear()
|
entries.clear()
|
||||||
}
|
}
|
||||||
blockDropper.interrupt()
|
|
||||||
logInfo("MemoryStore cleared")
|
logInfo("MemoryStore cleared")
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: This should be able to return false if the space is larger than our total memory,
|
/**
|
||||||
// or if adding this block would require evicting another one from the same RDD
|
* Return the RDD ID that a given block ID is from, or null if it is not an RDD block.
|
||||||
private def ensureFreeSpace(space: Long) {
|
*/
|
||||||
|
private def getRddId(blockId: String): String = {
|
||||||
|
if (blockId.startsWith("rdd_")) {
|
||||||
|
blockId.split('_')(1)
|
||||||
|
} else {
|
||||||
|
null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to put in a set of values, if we can free up enough space. The value should either be
|
||||||
|
* an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
|
||||||
|
* size must also be passed by the caller.
|
||||||
|
*/
|
||||||
|
private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
|
||||||
|
synchronized {
|
||||||
|
if (ensureFreeSpace(blockId, size)) {
|
||||||
|
val entry = new Entry(value, size, deserialized)
|
||||||
|
entries.put(blockId, entry)
|
||||||
|
currentMemory += size
|
||||||
|
if (deserialized) {
|
||||||
|
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
|
||||||
|
blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
|
||||||
|
} else {
|
||||||
|
logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
|
||||||
|
blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
|
||||||
|
}
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
// Tell the block manager that we couldn't put it in memory so that it can drop it to
|
||||||
|
// disk if the block allows disk storage.
|
||||||
|
val data = if (deserialized) {
|
||||||
|
Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
|
||||||
|
} else {
|
||||||
|
Right(value.asInstanceOf[ByteBuffer].duplicate())
|
||||||
|
}
|
||||||
|
blockManager.dropFromMemory(blockId, data)
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tries to free up a given amount of space to store a particular block, but can fail and return
|
||||||
|
* false if either the block is bigger than our memory or it would require replacing another
|
||||||
|
* block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
|
||||||
|
* don't fit into memory that we want to avoid).
|
||||||
|
*
|
||||||
|
* Assumes that a lock on entries is held by the caller.
|
||||||
|
*/
|
||||||
|
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
|
||||||
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
|
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
|
||||||
space, currentMemory, maxMemory))
|
space, currentMemory, maxMemory))
|
||||||
|
|
||||||
if (maxMemory - currentMemory < space) {
|
if (space > maxMemory) {
|
||||||
|
logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if (maxMemory - currentMemory < space) {
|
||||||
|
val rddToAdd = getRddId(blockIdToAdd)
|
||||||
val selectedBlocks = new ArrayBuffer[String]()
|
val selectedBlocks = new ArrayBuffer[String]()
|
||||||
var selectedMemory = 0L
|
var selectedMemory = 0L
|
||||||
|
|
||||||
entries.synchronized {
|
val iterator = entries.entrySet().iterator()
|
||||||
val iter = entries.entrySet().iterator()
|
while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
|
||||||
while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
|
val pair = iterator.next()
|
||||||
val pair = iter.next()
|
val blockId = pair.getKey
|
||||||
val blockId = pair.getKey
|
if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
|
||||||
val entry = pair.getValue
|
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
|
||||||
if (!entry.dropPending) {
|
"block from the same RDD")
|
||||||
selectedBlocks += blockId
|
return false
|
||||||
entry.dropPending = true
|
|
||||||
}
|
|
||||||
selectedMemory += pair.getValue.size
|
|
||||||
logInfo("Block " + blockId + " selected for dropping")
|
|
||||||
}
|
}
|
||||||
|
selectedBlocks += blockId
|
||||||
|
selectedMemory += pair.getValue.size
|
||||||
}
|
}
|
||||||
|
|
||||||
logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
|
if (maxMemory - (currentMemory - selectedMemory) >= space) {
|
||||||
blocksToDrop.size + " blocks pending")
|
logInfo(selectedBlocks.size + " blocks selected for dropping")
|
||||||
var i = 0
|
for (blockId <- selectedBlocks) {
|
||||||
while (i < selectedBlocks.size) {
|
val entry = entries.get(blockId)
|
||||||
blocksToDrop.add(selectedBlocks(i))
|
val data = if (entry.deserialized) {
|
||||||
i += 1
|
Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
|
||||||
|
} else {
|
||||||
|
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
|
||||||
|
}
|
||||||
|
blockManager.dropFromMemory(blockId, data)
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
} else {
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
selectedBlocks.clear()
|
|
||||||
}
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
override def contains(blockId: String): Boolean = {
|
||||||
|
synchronized { entries.containsKey(blockId) }
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -69,9 +69,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
|
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
|
||||||
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
|
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
|
||||||
|
|
||||||
// Setting storage level of a1 and a2 to invalid; they should be removed from store and master
|
// Drop a1 and a2 from memory; this should be reported back to the master
|
||||||
store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1))
|
store.dropFromMemory("a1", null)
|
||||||
store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0))
|
store.dropFromMemory("a2", null)
|
||||||
assert(store.getSingle("a1") === None, "a1 not removed from store")
|
assert(store.getSingle("a1") === None, "a1 not removed from store")
|
||||||
assert(store.getSingle("a2") === None, "a2 not removed from store")
|
assert(store.getSingle("a2") === None, "a2 not removed from store")
|
||||||
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
|
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")
|
||||||
|
@ -113,11 +113,56 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
assert(store.getSingle("a1") === None, "a1 was in store")
|
assert(store.getSingle("a1") === None, "a1 was in store")
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2") != None, "a2 was not in store")
|
||||||
// At this point a2 was gotten last, so LRU will getSingle rid of a3
|
// At this point a2 was gotten last, so LRU will getSingle rid of a3
|
||||||
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY)
|
store.putSingle("a1", a1, StorageLevel.MEMORY_ONLY_SER)
|
||||||
Thread.sleep(100)
|
Thread.sleep(100)
|
||||||
assert(store.getSingle("a1") != None, "a1 was not in store")
|
assert(store.getSingle("a1") != None, "a1 was not in store")
|
||||||
assert(store.getSingle("a2") != None, "a2 was not in store")
|
assert(store.getSingle("a2") != None, "a2 was not in store")
|
||||||
assert(store.getSingle("a3") === None, "a1 was in store")
|
assert(store.getSingle("a3") === None, "a3 was in store")
|
||||||
|
}
|
||||||
|
|
||||||
|
test("in-memory LRU for partitions of same RDD") {
|
||||||
|
val store = new BlockManager(master, new KryoSerializer, 1200)
|
||||||
|
val a1 = new Array[Byte](400)
|
||||||
|
val a2 = new Array[Byte](400)
|
||||||
|
val a3 = new Array[Byte](400)
|
||||||
|
store.putSingle("rdd_0_1", a1, StorageLevel.MEMORY_ONLY)
|
||||||
|
store.putSingle("rdd_0_2", a2, StorageLevel.MEMORY_ONLY)
|
||||||
|
store.putSingle("rdd_0_3", a3, StorageLevel.MEMORY_ONLY)
|
||||||
|
Thread.sleep(100)
|
||||||
|
// Even though we accessed rdd_0_3 last, it should not have replaced partitiosn 1 and 2
|
||||||
|
// from the same RDD
|
||||||
|
assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
|
||||||
|
assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
|
||||||
|
assert(store.getSingle("rdd_0_1") != None, "rdd_0_1 was not in store")
|
||||||
|
// Check that rdd_0_3 doesn't replace them even after further accesses
|
||||||
|
assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
|
||||||
|
assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
|
||||||
|
assert(store.getSingle("rdd_0_3") === None, "rdd_0_3 was in store")
|
||||||
|
}
|
||||||
|
|
||||||
|
test("in-memory LRU for partitions of multiple RDDs") {
|
||||||
|
val store = new BlockManager(master, new KryoSerializer, 1200)
|
||||||
|
store.putSingle("rdd_0_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
|
||||||
|
store.putSingle("rdd_0_2", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
|
||||||
|
store.putSingle("rdd_1_1", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
|
||||||
|
Thread.sleep(100)
|
||||||
|
// At this point rdd_1_1 should've replaced rdd_0_1
|
||||||
|
assert(store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was not in store")
|
||||||
|
assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
|
||||||
|
assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
|
||||||
|
// Do a get() on rdd_0_2 so that it is the most recently used item
|
||||||
|
assert(store.getSingle("rdd_0_2") != None, "rdd_0_2 was not in store")
|
||||||
|
// Put in more partitions from RDD 0; they should replace rdd_1_1
|
||||||
|
store.putSingle("rdd_0_3", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
|
||||||
|
store.putSingle("rdd_0_4", new Array[Byte](400), StorageLevel.MEMORY_ONLY)
|
||||||
|
Thread.sleep(100)
|
||||||
|
// Now rdd_1_1 should be dropped to add rdd_0_3, but then rdd_0_2 should *not* be dropped
|
||||||
|
// when we try to add rdd_0_4.
|
||||||
|
assert(!store.memoryStore.contains("rdd_1_1"), "rdd_1_1 was in store")
|
||||||
|
assert(!store.memoryStore.contains("rdd_0_1"), "rdd_0_1 was in store")
|
||||||
|
assert(!store.memoryStore.contains("rdd_0_4"), "rdd_0_4 was in store")
|
||||||
|
assert(store.memoryStore.contains("rdd_0_2"), "rdd_0_2 was not in store")
|
||||||
|
assert(store.memoryStore.contains("rdd_0_3"), "rdd_0_3 was not in store")
|
||||||
}
|
}
|
||||||
|
|
||||||
test("on-disk storage") {
|
test("on-disk storage") {
|
||||||
|
@ -149,6 +194,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("disk and memory storage with getLocalBytes") {
|
||||||
|
val store = new BlockManager(master, new KryoSerializer, 1200)
|
||||||
|
val a1 = new Array[Byte](400)
|
||||||
|
val a2 = new Array[Byte](400)
|
||||||
|
val a3 = new Array[Byte](400)
|
||||||
|
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK)
|
||||||
|
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK)
|
||||||
|
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK)
|
||||||
|
Thread.sleep(100)
|
||||||
|
assert(store.getLocalBytes("a2") != None, "a2 was not in store")
|
||||||
|
assert(store.getLocalBytes("a3") != None, "a3 was not in store")
|
||||||
|
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
||||||
|
assert(store.getLocalBytes("a1") != None, "a1 was not in store")
|
||||||
|
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
||||||
|
}
|
||||||
|
|
||||||
test("disk and memory storage with serialization") {
|
test("disk and memory storage with serialization") {
|
||||||
val store = new BlockManager(master, new KryoSerializer, 1200)
|
val store = new BlockManager(master, new KryoSerializer, 1200)
|
||||||
val a1 = new Array[Byte](400)
|
val a1 = new Array[Byte](400)
|
||||||
|
@ -165,6 +226,22 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
|
||||||
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test("disk and memory storage with serialization and getLocalBytes") {
|
||||||
|
val store = new BlockManager(master, new KryoSerializer, 1200)
|
||||||
|
val a1 = new Array[Byte](400)
|
||||||
|
val a2 = new Array[Byte](400)
|
||||||
|
val a3 = new Array[Byte](400)
|
||||||
|
store.putSingle("a1", a1, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
|
store.putSingle("a2", a2, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
|
store.putSingle("a3", a3, StorageLevel.MEMORY_AND_DISK_SER)
|
||||||
|
Thread.sleep(100)
|
||||||
|
assert(store.getLocalBytes("a2") != None, "a2 was not in store")
|
||||||
|
assert(store.getLocalBytes("a3") != None, "a3 was not in store")
|
||||||
|
assert(store.memoryStore.getValues("a1") == None, "a1 was in memory store")
|
||||||
|
assert(store.getLocalBytes("a1") != None, "a1 was not in store")
|
||||||
|
assert(store.memoryStore.getValues("a1") != None, "a1 was not in memory store")
|
||||||
|
}
|
||||||
|
|
||||||
test("LRU with mixed storage levels") {
|
test("LRU with mixed storage levels") {
|
||||||
val store = new BlockManager(master, new KryoSerializer, 1200)
|
val store = new BlockManager(master, new KryoSerializer, 1200)
|
||||||
val a1 = new Array[Byte](400)
|
val a1 = new Array[Byte](400)
|
||||||
|
|
Loading…
Reference in a new issue