Merge branch 'dev' of https://github.com/mesos/spark into dev

This commit is contained in:
Reynold Xin 2012-10-02 13:08:01 -07:00
commit 0898a21b95
5 changed files with 179 additions and 229 deletions

View file

@ -59,31 +59,15 @@ class BlockLocker(numLockers: Int) {
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging { extends Logging {
class BlockInfo(val level: StorageLevel, val tellMaster: Boolean, var pending: Boolean = true) { case class BlockInfo(level: StorageLevel, tellMaster: Boolean)
def waitForReady() {
if (pending) {
synchronized {
while (pending) this.wait()
}
}
}
def markReady() {
pending = false
synchronized {
this.notifyAll()
}
}
}
private val NUM_LOCKS = 337 private val NUM_LOCKS = 337
private val locker = new BlockLocker(NUM_LOCKS) private val locker = new BlockLocker(NUM_LOCKS)
private val blockInfo = new ConcurrentHashMap[String, BlockInfo]() private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory) private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore = private[storage] val diskStore: BlockStore = new DiskStore(this,
new DiskStore(this, System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir"))) System.getProperty("spark.local.dir", System.getProperty("java.io.tmpdir")))
val connectionManager = new ConnectionManager(0) val connectionManager = new ConnectionManager(0)
implicit val futureExecContext = connectionManager.futureExecContext implicit val futureExecContext = connectionManager.futureExecContext
@ -95,6 +79,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
var cacheTracker: CacheTracker = null var cacheTracker: CacheTracker = null
val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties val numParallelFetches = BlockManager.getNumParallelFetchesFromSystemProperties
val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean val compress = System.getProperty("spark.blockManager.compress", "false").toBoolean
initialize() initialize()
@ -125,32 +110,45 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
/** /**
* Tell the master about the current storage status of a block. This will send a heartbeat * Change the storage level for a local block in the block info meta data, and
* message reflecting the current status, *not* the desired storage level in its block info. * tell the master if necessary. Note that this is only a meta data change and
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk. * does NOT actually change the storage of the block. If the new level is
* invalid, then block info (if exists) will be silently removed.
*/ */
def reportBlockStatus(blockId: String) { private[spark] def setLevelAndTellMaster(
locker.getLock(blockId).synchronized { blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
val curLevel = blockInfo.get(blockId) match {
case null => if (level == null) {
StorageLevel.NONE throw new IllegalArgumentException("Storage level is null")
case info =>
info.level match {
case null =>
StorageLevel.NONE
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
} }
// If there was earlier info about the block, then use earlier tellMaster
val oldInfo = blockInfo.get(blockId)
val newTellMaster = if (oldInfo != null) oldInfo.tellMaster else tellMaster
if (oldInfo != null && oldInfo.tellMaster != tellMaster) {
logWarning("Ignoring tellMaster setting as it is different from earlier setting")
} }
// If level is valid, store the block info, else remove the block info
if (level.isValid) {
blockInfo.put(blockId, new BlockInfo(level, newTellMaster))
logDebug("Info for block " + blockId + " updated with new level as " + level)
} else {
blockInfo.remove(blockId)
logDebug("Info for block " + blockId + " removed as new level is null or invalid")
}
// Tell master if necessary
if (newTellMaster) {
master.mustHeartBeat(HeartBeat( master.mustHeartBeat(HeartBeat(
blockManagerId, blockManagerId,
blockId, blockId,
curLevel, level,
if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L, if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0,
if (curLevel.useDisk) diskStore.getSize(blockId) else 0L)) if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0))
logDebug("Told master about block " + blockId) logDebug("Told master about block " + blockId)
} else {
logDebug("Did not tell master about block " + blockId)
} }
} }
@ -182,36 +180,39 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def getLocal(blockId: String): Option[Iterator[Any]] = { def getLocal(blockId: String): Option[Iterator[Any]] = {
logDebug("Getting local block " + blockId) logDebug("Getting local block " + blockId)
locker.getLock(blockId).synchronized { locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId) // Check storage level of block
if (info != null) { val level = getLevel(blockId)
info.waitForReady() // In case the block is still being put() by another thread if (level != null) {
val level = info.level logDebug("Level for block " + blockId + " is " + level + " on local machine")
logDebug("Level for block " + blockId + " is " + level)
// Look for the block in memory // Look for the block in memory
if (level.useMemory) { if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory") logDebug("Getting block " + blockId + " from memory")
memoryStore.getValues(blockId) match { memoryStore.getValues(blockId) match {
case Some(iterator) => case Some(iterator) => {
logDebug("Block " + blockId + " found in memory") logDebug("Block " + blockId + " found in memory")
return Some(iterator) return Some(iterator)
case None => }
case None => {
logDebug("Block " + blockId + " not found in memory") logDebug("Block " + blockId + " not found in memory")
} }
} }
}
// Look for block on disk // Look for block on disk
if (level.useDisk) { if (level.useDisk) {
logDebug("Getting block " + blockId + " from disk") logDebug("Getting block " + blockId + " from disk")
diskStore.getValues(blockId) match { diskStore.getValues(blockId) match {
case Some(iterator) => case Some(iterator) => {
logDebug("Block " + blockId + " found in disk") logDebug("Block " + blockId + " found in disk")
return Some(iterator) return Some(iterator)
case None => }
case None => {
throw new Exception("Block " + blockId + " not found on disk, though it should be") throw new Exception("Block " + blockId + " not found on disk, though it should be")
return None return None
} }
} }
}
} else { } else {
logDebug("Block " + blockId + " not registered locally") logDebug("Block " + blockId + " not registered locally")
} }
@ -225,36 +226,39 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
def getLocalBytes(blockId: String): Option[ByteBuffer] = { def getLocalBytes(blockId: String): Option[ByteBuffer] = {
logDebug("Getting local block " + blockId + " as bytes") logDebug("Getting local block " + blockId + " as bytes")
locker.getLock(blockId).synchronized { locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId) // Check storage level of block
if (info != null) { val level = getLevel(blockId)
info.waitForReady() // In case the block is still being put() by another thread if (level != null) {
val level = info.level
logDebug("Level for block " + blockId + " is " + level + " on local machine") logDebug("Level for block " + blockId + " is " + level + " on local machine")
// Look for the block in memory // Look for the block in memory
if (level.useMemory) { if (level.useMemory) {
logDebug("Getting block " + blockId + " from memory") logDebug("Getting block " + blockId + " from memory")
memoryStore.getBytes(blockId) match { memoryStore.getBytes(blockId) match {
case Some(bytes) => case Some(bytes) => {
logDebug("Block " + blockId + " found in memory") logDebug("Block " + blockId + " found in memory")
return Some(bytes) return Some(bytes)
case None => }
case None => {
logDebug("Block " + blockId + " not found in memory") logDebug("Block " + blockId + " not found in memory")
} }
} }
}
// Look for block on disk // Look for block on disk
if (level.useDisk) { if (level.useDisk) {
logDebug("Getting block " + blockId + " from disk") logDebug("Getting block " + blockId + " from disk")
diskStore.getBytes(blockId) match { diskStore.getBytes(blockId) match {
case Some(bytes) => case Some(bytes) => {
logDebug("Block " + blockId + " found in disk") logDebug("Block " + blockId + " found in disk")
return Some(bytes) return Some(bytes)
case None => }
case None => {
throw new Exception("Block " + blockId + " not found on disk, though it should be") throw new Exception("Block " + blockId + " not found on disk, though it should be")
return None return None
} }
} }
}
} else { } else {
logDebug("Block " + blockId + " not registered locally") logDebug("Block " + blockId + " not registered locally")
} }
@ -427,17 +431,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid") throw new IllegalArgumentException("Storage level is null or invalid")
} }
if (blockInfo.containsKey(blockId)) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
return
}
// Remember the block's storage level so that we can correctly drop it to disk if it needs
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = new BlockInfo(level, tellMaster)
blockInfo.put(blockId, myInfo)
val startTimeMs = System.currentTimeMillis val startTimeMs = System.currentTimeMillis
var bytes: ByteBuffer = null var bytes: ByteBuffer = null
@ -451,15 +444,32 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block") + " to get into synchronized block")
if (level.useMemory) { // Check and warn if block with same id already exists
// Save it just to memory first, even if it also has useDisk set to true; we will later if (getLevel(blockId) != null) {
// drop it to disk if the memory store can't hold it. logWarning("Block " + blockId + " already exists in local machine")
return
}
if (level.useMemory && level.useDisk) {
// If saving to both memory and disk, then serialize only once
memoryStore.putValues(blockId, values, level, true) match {
case Left(newValues) =>
diskStore.putValues(blockId, newValues, level, true) match {
case Right(newBytes) => bytes = newBytes
case _ => throw new Exception("Unexpected return value")
}
case Right(newBytes) =>
bytes = newBytes
diskStore.putBytes(blockId, newBytes, level)
}
} else if (level.useMemory) {
// If only save to memory
memoryStore.putValues(blockId, values, level, true) match { memoryStore.putValues(blockId, values, level, true) match {
case Right(newBytes) => bytes = newBytes case Right(newBytes) => bytes = newBytes
case Left(newIterator) => valuesAfterPut = newIterator case Left(newIterator) => valuesAfterPut = newIterator
} }
} else { } else {
// Save directly to disk. // If only save to disk
val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them val askForBytes = level.replication > 1 // Don't get back the bytes unless we replicate them
diskStore.putValues(blockId, values, level, askForBytes) match { diskStore.putValues(blockId, values, level, askForBytes) match {
case Right(newBytes) => bytes = newBytes case Right(newBytes) => bytes = newBytes
@ -467,12 +477,8 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
} }
// Now that the block is in either the memory or disk store, let other threads read it, // Store the storage level
// and tell the master about it. setLevelAndTellMaster(blockId, level, tellMaster)
myInfo.markReady()
if (tellMaster) {
reportBlockStatus(blockId)
}
} }
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs)) logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@ -515,17 +521,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new IllegalArgumentException("Storage level is null or invalid") throw new IllegalArgumentException("Storage level is null or invalid")
} }
if (blockInfo.containsKey(blockId)) {
logWarning("Block " + blockId + " already exists on this machine; not re-adding it")
return
}
// Remember the block's storage level so that we can correctly drop it to disk if it needs
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo = new BlockInfo(level, tellMaster)
blockInfo.put(blockId, myInfo)
val startTimeMs = System.currentTimeMillis val startTimeMs = System.currentTimeMillis
// Initiate the replication before storing it locally. This is faster as // Initiate the replication before storing it locally. This is faster as
@ -542,22 +537,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
locker.getLock(blockId).synchronized { locker.getLock(blockId).synchronized {
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs) logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block") + " to get into synchronized block")
if (getLevel(blockId) != null) {
logWarning("Block " + blockId + " already exists")
return
}
if (level.useMemory) { if (level.useMemory) {
// Store it only in memory at first, even if useDisk is also set to true
bytes.rewind() bytes.rewind()
memoryStore.putBytes(blockId, bytes, level) memoryStore.putBytes(blockId, bytes, level)
} else { }
if (level.useDisk) {
bytes.rewind() bytes.rewind()
diskStore.putBytes(blockId, bytes, level) diskStore.putBytes(blockId, bytes, level)
} }
// Now that the block is in either the memory or disk store, let other threads read it, // Store the storage level
// and tell the master about it. setLevelAndTellMaster(blockId, level, tellMaster)
myInfo.markReady()
if (tellMaster) {
reportBlockStatus(blockId)
}
} }
// TODO: This code will be removed when CacheTracker is gone. // TODO: This code will be removed when CacheTracker is gone.
@ -631,31 +626,22 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
} }
/** /**
* Drop a block from memory, possibly putting it on disk if applicable. Called when the memory * Drop block from memory (called when memory store has reached it limit)
* store reaches its limit and needs to free up space.
*/ */
def dropFromMemory(blockId: String, data: Either[Iterator[_], ByteBuffer]) { def dropFromMemory(blockId: String) {
logInfo("Dropping block " + blockId + " from memory")
locker.getLock(blockId).synchronized { locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId) val level = getLevel(blockId)
val level = info.level if (level == null) {
if (level.useDisk && !diskStore.contains(blockId)) { logWarning("Block " + blockId + " cannot be removed from memory as it does not exist")
logInfo("Writing block " + blockId + " to disk") return
data match {
case Left(iterator) =>
diskStore.putValues(blockId, iterator, level, false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
} }
if (!level.useMemory) {
logWarning("Block " + blockId + " cannot be removed from memory as it is not in memory")
return
} }
memoryStore.remove(blockId) memoryStore.remove(blockId)
if (info.tellMaster) { val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
reportBlockStatus(blockId) setLevelAndTellMaster(blockId, newLevel)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
blockInfo.remove(blockId)
}
} }
} }

View file

@ -31,7 +31,5 @@ abstract class BlockStore(val blockManager: BlockManager) extends Logging {
def remove(blockId: String) def remove(blockId: String)
def contains(blockId: String): Boolean
def clear() { } def clear() { }
} }

View file

@ -26,7 +26,7 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
addShutdownHook() addShutdownHook()
override def getSize(blockId: String): Long = { override def getSize(blockId: String): Long = {
getFile(blockId).length() getFile(blockId).length
} }
override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) { override def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
@ -93,10 +93,6 @@ private class DiskStore(blockManager: BlockManager, rootDirs: String)
} }
} }
override def contains(blockId: String): Boolean = {
getFile(blockId).exists()
}
private def createFile(blockId: String): File = { private def createFile(blockId: String): File = {
val file = getFile(blockId) val file = getFile(blockId)
if (file.exists()) { if (file.exists()) {

View file

@ -18,12 +18,29 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true) private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L private var currentMemory = 0L
//private val blockDropper = Executors.newSingleThreadExecutor()
private val blocksToDrop = new ArrayBlockingQueue[String](10000, true)
private val blockDropper = new Thread("memory store - block dropper") {
override def run() {
try {
while (true) {
val blockId = blocksToDrop.take()
logDebug("Block " + blockId + " ready to be dropped")
blockManager.dropFromMemory(blockId)
}
} catch {
case ie: InterruptedException =>
logInfo("Shutting down block dropper")
}
}
}
blockDropper.start()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory))) logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = { override def getSize(blockId: String): Long = {
synchronized { entries.synchronized {
entries.get(blockId).size entries.get(blockId).size
} }
} }
@ -35,12 +52,19 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val elements = new ArrayBuffer[Any] val elements = new ArrayBuffer[Any]
elements ++= values elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate, true) ensureFreeSpace(sizeEstimate)
val entry = new Entry(elements, sizeEstimate, true)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += sizeEstimate
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
} else { } else {
val entry = new Entry(bytes, bytes.limit, false) val entry = new Entry(bytes, bytes.limit, false)
ensureFreeSpace(blockId, bytes.limit) ensureFreeSpace(bytes.limit)
synchronized { entries.put(blockId, entry) } entries.synchronized { entries.put(blockId, entry) }
tryToPut(blockId, bytes, bytes.limit, false) currentMemory += bytes.limit
logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
} }
} }
@ -55,17 +79,27 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val elements = new ArrayBuffer[Any] val elements = new ArrayBuffer[Any]
elements ++= values elements ++= values
val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef]) val sizeEstimate = SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate, true) ensureFreeSpace(sizeEstimate)
val entry = new Entry(elements, sizeEstimate, true)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += sizeEstimate
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.memoryBytesToString(sizeEstimate), Utils.memoryBytesToString(freeMemory)))
Left(elements.iterator) Left(elements.iterator)
} else { } else {
val bytes = blockManager.dataSerialize(values) val bytes = blockManager.dataSerialize(values)
tryToPut(blockId, bytes, bytes.limit, false) ensureFreeSpace(bytes.limit)
val entry = new Entry(bytes, bytes.limit, false)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += bytes.limit
logInfo("Block %s stored as serialized bytes to memory (size %s, free %s)".format(
blockId, Utils.memoryBytesToString(bytes.limit), Utils.memoryBytesToString(freeMemory)))
Right(bytes) Right(bytes)
} }
} }
override def getBytes(blockId: String): Option[ByteBuffer] = { override def getBytes(blockId: String): Option[ByteBuffer] = {
val entry = synchronized { val entry = entries.synchronized {
entries.get(blockId) entries.get(blockId)
} }
if (entry == null) { if (entry == null) {
@ -78,7 +112,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
override def getValues(blockId: String): Option[Iterator[Any]] = { override def getValues(blockId: String): Option[Iterator[Any]] = {
val entry = synchronized { val entry = entries.synchronized {
entries.get(blockId) entries.get(blockId)
} }
if (entry == null) { if (entry == null) {
@ -92,7 +126,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
} }
override def remove(blockId: String) { override def remove(blockId: String) {
synchronized { entries.synchronized {
val entry = entries.get(blockId) val entry = entries.get(blockId)
if (entry != null) { if (entry != null) {
entries.remove(blockId) entries.remove(blockId)
@ -100,118 +134,54 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
logInfo("Block %s of size %d dropped from memory (free %d)".format( logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory)) blockId, entry.size, freeMemory))
} else { } else {
logWarning("Block " + blockId + " could not be removed as it does not exist") logWarning("Block " + blockId + " could not be removed as it doesnt exist")
} }
} }
} }
override def clear() { override def clear() {
synchronized { entries.synchronized {
entries.clear() entries.clear()
} }
blockDropper.interrupt()
logInfo("MemoryStore cleared") logInfo("MemoryStore cleared")
} }
/** // TODO: This should be able to return false if the space is larger than our total memory,
* Return the RDD ID that a given block ID is from, or null if it is not an RDD block. // or if adding this block would require evicting another one from the same RDD
*/ private def ensureFreeSpace(space: Long) {
private def getRddId(blockId: String): String = {
if (blockId.startsWith("rdd_")) {
blockId.split('_')(1)
} else {
null
}
}
/**
* Try to put in a set of values, if we can free up enough space. The value should either be
* an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
* size must also be passed by the caller.
*/
private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
entries.put(blockId, entry)
currentMemory += size
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
} else {
logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
blockId, Utils.memoryBytesToString(size), Utils.memoryBytesToString(freeMemory)))
}
true
} else {
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data = if (deserialized) {
Left(value.asInstanceOf[ArrayBuffer[Any]].iterator)
} else {
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
blockManager.dropFromMemory(blockId, data)
false
}
}
}
/**
* Tries to free up a given amount of space to store a particular block, but can fail and return
* false if either the block is bigger than our memory or it would require replacing another
* block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
* Assumes that a lock on entries is held by the caller.
*/
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format( logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory)) space, currentMemory, maxMemory))
if (space > maxMemory) {
logInfo("Will not store " + blockIdToAdd + " as it is larger than our memory limit")
return false
}
if (maxMemory - currentMemory < space) { if (maxMemory - currentMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[String]() val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L var selectedMemory = 0L
val iterator = entries.entrySet().iterator() entries.synchronized {
while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) { val iter = entries.entrySet().iterator()
val pair = iterator.next() while (maxMemory - (currentMemory - selectedMemory) < space && iter.hasNext) {
val pair = iter.next()
val blockId = pair.getKey val blockId = pair.getKey
if (rddToAdd != null && rddToAdd == getRddId(blockId)) { val entry = pair.getValue
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " + if (!entry.dropPending) {
"block from the same RDD")
return false
}
selectedBlocks += blockId selectedBlocks += blockId
entry.dropPending = true
}
selectedMemory += pair.getValue.size selectedMemory += pair.getValue.size
} logInfo("Block " + blockId + " selected for dropping")
}
if (maxMemory - (currentMemory - selectedMemory) >= space) { }
logInfo(selectedBlocks.size + " blocks selected for dropping")
for (blockId <- selectedBlocks) { logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
val entry = entries.get(blockId) blocksToDrop.size + " blocks pending")
val data = if (entry.deserialized) { var i = 0
Left(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator) while (i < selectedBlocks.size) {
} else { blocksToDrop.add(selectedBlocks(i))
Right(entry.value.asInstanceOf[ByteBuffer].duplicate()) i += 1
} }
blockManager.dropFromMemory(blockId, data) selectedBlocks.clear()
} }
return true
} else {
return false
}
}
return true
}
override def contains(blockId: String): Boolean = {
synchronized { entries.containsKey(blockId) }
} }
} }

View file

@ -69,9 +69,9 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2") assert(master.mustGetLocations(GetLocations("a2")).size > 0, "master was not told about a2")
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3") assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
// Drop a1 and a2 from memory; this should be reported back to the master // Setting storage level of a1 and a2 to invalid; they should be removed from store and master
store.dropFromMemory("a1", null) store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1))
store.dropFromMemory("a2", null) store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0))
assert(store.getSingle("a1") === None, "a1 not removed from store") assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store") assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1") assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")