Merge pull request #190 from rxin/dev

Log cache add/remove messages in block manager.
This commit is contained in:
Matei Zaharia 2012-09-05 16:41:52 -07:00
commit 53a5681c8a
9 changed files with 143 additions and 125 deletions

View file

@ -43,8 +43,6 @@ class CacheTrackerActor extends Actor with Logging {
def receive = {
case SlaveCacheStarted(host: String, size: Long) =>
logInfo("Started slave cache (size %s) on %s".format(
Utils.memoryBytesToString(size), host))
slaveCapacity.put(host, size)
slaveUsage.put(host, 0)
sender ! true
@ -56,22 +54,12 @@ class CacheTrackerActor extends Actor with Logging {
case AddedToCache(rddId, partition, host, size) =>
slaveUsage.put(host, getCacheUsage(host) + size)
logInfo("Cache entry added: (%s, %s) on %s (size added: %s, available: %s)".format(
rddId, partition, host, Utils.memoryBytesToString(size),
Utils.memoryBytesToString(getCacheAvailable(host))))
locs(rddId)(partition) = host :: locs(rddId)(partition)
sender ! true
case DroppedFromCache(rddId, partition, host, size) =>
logInfo("Cache entry removed: (%s, %s) on %s (size dropped: %s, available: %s)".format(
rddId, partition, host, Utils.memoryBytesToString(size),
Utils.memoryBytesToString(getCacheAvailable(host))))
slaveUsage.put(host, getCacheUsage(host) - size)
// Do a sanity check to make sure usage is greater than 0.
val usage = getCacheUsage(host)
if (usage < 0) {
logError("Cache usage on %s is negative (%d)".format(host, usage))
}
locs(rddId)(partition) = locs(rddId)(partition).filterNot(_ == host)
sender ! true
@ -223,7 +211,7 @@ class CacheTracker(actorSystem: ActorSystem, isMaster: Boolean, blockManager: Bl
logInfo("Computing partition " + split)
try {
// BlockManager will iterate over results from compute to create RDD
blockManager.put(key, rdd.compute(split), storageLevel, false)
blockManager.put(key, rdd.compute(split), storageLevel, true)
//future.apply() // Wait for the reply from the cache tracker
blockManager.get(key) match {
case Some(values) =>

View file

@ -1,29 +1,21 @@
package spark.storage
import java.io._
import java.nio._
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.LinkedBlockingQueue
import java.util.Collections
import akka.dispatch.{Await, Future}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.{HashMap, HashSet}
import scala.collection.mutable.Queue
import akka.util.Duration
import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream
import java.io.{Externalizable, ObjectInput, ObjectOutput}
import java.nio.ByteBuffer
import java.util.concurrent.{ConcurrentHashMap, LinkedBlockingQueue}
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Queue}
import scala.collection.JavaConversions._
import it.unimi.dsi.fastutil.io._
import spark.CacheTracker
import spark.Logging
import spark.Serializer
import spark.SizeEstimator
import spark.SparkEnv
import spark.SparkException
import spark.Utils
import spark.util.ByteBufferInputStream
import spark.{CacheTracker, Logging, Serializer, SizeEstimator, SparkException, Utils}
import spark.network._
import akka.util.Duration
import spark.util.ByteBufferInputStream
class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
def this() = this(null, 0)
@ -49,7 +41,8 @@ class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
}
case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message)
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
class BlockLocker(numLockers: Int) {
@ -115,10 +108,14 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
/**
* Change storage level for a local block and tell master is necesary.
* If new level is invalid, then block info (if it exists) will be silently removed.
* Change the storage level for a local block in the block info meta data, and
* tell the master if necessary. Note that this is only a meta data change and
* does NOT actually change the storage of the block. If the new level is
* invalid, then block info (if exists) will be silently removed.
*/
def setLevel(blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
private[spark] def setLevelAndTellMaster(
blockId: String, level: StorageLevel, tellMaster: Boolean = true) {
if (level == null) {
throw new IllegalArgumentException("Storage level is null")
}
@ -141,8 +138,13 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Tell master if necessary
if (newTellMaster) {
master.mustHeartBeat(HeartBeat(
blockManagerId,
blockId,
level,
if (level.isValid && level.useMemory) memoryStore.getSize(blockId) else 0,
if (level.isValid && level.useDisk) diskStore.getSize(blockId) else 0))
logDebug("Told master about block " + blockId)
notifyMaster(HeartBeat(blockManagerId, blockId, level, 0, 0))
} else {
logDebug("Did not tell master about block " + blockId)
}
@ -431,9 +433,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
case _ => throw new Exception("Unexpected return value")
}
}
// Store the storage level
setLevel(blockId, level, tellMaster)
setLevelAndTellMaster(blockId, level, tellMaster)
}
logDebug("Put block " + blockId + " locally took " + Utils.getUsedTimeMs(startTimeMs))
@ -461,7 +463,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
/**
* Put a new block of serialized bytes to the block manager.
*/
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
def putBytes(
blockId: String, bytes: ByteBuffer, level: StorageLevel, tellMaster: Boolean = true) {
if (blockId == null) {
throw new IllegalArgumentException("Block Id is null")
}
@ -500,7 +504,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
// Store the storage level
setLevel(blockId, level, tellMaster)
setLevelAndTellMaster(blockId, level, tellMaster)
}
// TODO: This code will be removed when CacheTracker is gone.
@ -587,7 +591,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
memoryStore.remove(blockId)
val newLevel = new StorageLevel(level.useDisk, false, level.deserialized, level.replication)
setLevel(blockId, newLevel)
setLevelAndTellMaster(blockId, newLevel)
}
}
@ -606,10 +610,6 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
return ser.deserializeStream(new ByteBufferInputStream(bytes)).toIterator
}
private def notifyMaster(heartBeat: HeartBeat) {
master.mustHeartBeat(heartBeat)
}
def stop() {
connectionManager.stop()
blockInfo.clear()

View file

@ -3,22 +3,18 @@ package spark.storage
import java.io._
import java.util.{HashMap => JHashMap}
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random
import akka.actor._
import akka.dispatch._
import akka.pattern.ask
import akka.remote._
import akka.util.Duration
import akka.util.Timeout
import akka.util.{Duration, Timeout}
import akka.util.duration._
import spark.Logging
import spark.SparkException
import spark.Utils
import spark.{Logging, SparkException, Utils}
sealed trait ToBlockManagerMaster
@ -27,13 +23,13 @@ case class RegisterBlockManager(
maxMemSize: Long,
maxDiskSize: Long)
extends ToBlockManagerMaster
class HeartBeat(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
var deserializedSize: Long,
var size: Long)
var memSize: Long,
var diskSize: Long)
extends ToBlockManagerMaster
with Externalizable {
@ -43,8 +39,8 @@ class HeartBeat(
blockManagerId.writeExternal(out)
out.writeUTF(blockId)
storageLevel.writeExternal(out)
out.writeInt(deserializedSize.toInt)
out.writeInt(size.toInt)
out.writeInt(memSize.toInt)
out.writeInt(diskSize.toInt)
}
override def readExternal(in: ObjectInput) {
@ -53,8 +49,8 @@ class HeartBeat(
blockId = in.readUTF()
storageLevel = new StorageLevel()
storageLevel.readExternal(in)
deserializedSize = in.readInt()
size = in.readInt()
memSize = in.readInt()
diskSize = in.readInt()
}
}
@ -62,15 +58,14 @@ object HeartBeat {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
deserializedSize: Long,
size: Long): HeartBeat = {
new HeartBeat(blockManagerId, blockId, storageLevel, deserializedSize, size)
memSize: Long,
diskSize: Long): HeartBeat = {
new HeartBeat(blockManagerId, blockId, storageLevel, memSize, diskSize)
}
// For pattern-matching
def unapply(h: HeartBeat): Option[(BlockManagerId, String, StorageLevel, Long, Long)] = {
Some((h.blockManagerId, h.blockId, h.storageLevel, h.deserializedSize, h.size))
Some((h.blockManagerId, h.blockId, h.storageLevel, h.memSize, h.diskSize))
}
}
@ -88,49 +83,64 @@ case object StopBlockManagerMaster extends ToBlockManagerMaster
class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
class BlockManagerInfo(
val blockManagerId: BlockManagerId,
timeMs: Long,
maxMem: Long,
maxDisk: Long) {
val maxMem: Long,
val maxDisk: Long) {
private var lastSeenMs = timeMs
private var remainedMem = maxMem
private var remainedDisk = maxDisk
private val blocks = new JHashMap[String, StorageLevel]
logInfo("Registering block manager (%s:%d, ram: %d, disk: %d)".format(
blockManagerId.ip, blockManagerId.port, maxMem, maxDisk))
def updateLastSeenMs() {
lastSeenMs = System.currentTimeMillis() / 1000
}
def addBlock(blockId: String, storageLevel: StorageLevel, deserializedSize: Long, size: Long) =
synchronized {
def updateBlockInfo(
blockId: String, storageLevel: StorageLevel, memSize: Long, diskSize: Long) = synchronized {
updateLastSeenMs()
if (blocks.containsKey(blockId)) {
val oriLevel: StorageLevel = blocks.get(blockId)
// The block exists on the slave already.
val originalLevel: StorageLevel = blocks.get(blockId)
if (oriLevel.deserialized) {
remainedMem += deserializedSize
if (originalLevel.useMemory) {
remainedMem += memSize
}
if (oriLevel.useMemory) {
remainedMem += size
}
if (oriLevel.useDisk) {
remainedDisk += size
if (originalLevel.useDisk) {
remainedDisk += diskSize
}
}
if (storageLevel.isValid) {
if (storageLevel.isValid) {
// isValid means it is either stored in-memory or on-disk.
blocks.put(blockId, storageLevel)
if (storageLevel.deserialized) {
remainedMem -= deserializedSize
}
if (storageLevel.useMemory) {
remainedMem -= size
remainedMem -= memSize
logInfo("Added %s in memory on %s:%d (size: %d, free: %d)".format(
blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedMem))
}
if (storageLevel.useDisk) {
remainedDisk -= size
remainedDisk -= diskSize
logInfo("Added %s on disk on %s:%d (size: %d, free: %d)".format(
blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
}
} else {
} else if (blocks.containsKey(blockId)) {
// If isValid is not true, drop the block.
val originalLevel: StorageLevel = blocks.get(blockId)
blocks.remove(blockId)
if (originalLevel.useMemory) {
logInfo("Removed %s on %s:%d in memory (size: %d, free: %d)".format(
blockId, blockManagerId.ip, blockManagerId.port, memSize, remainedDisk))
}
if (originalLevel.useDisk) {
logInfo("Removed %s on %s:%d on disk (size: %d, free: %d)".format(
blockId, blockManagerId.ip, blockManagerId.port, diskSize, remainedDisk))
}
}
}
@ -204,12 +214,11 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " "
logDebug("Got in register 0" + tmp + Utils.getUsedTimeMs(startTimeMs))
logInfo("Got Register Msg from " + blockManagerId)
if (blockManagerId.ip == Utils.localHostName() && !isLocal) {
logInfo("Got Register Msg from master node, don't register it")
} else {
blockManagerInfo += (blockManagerId -> new BlockManagerInfo(
System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize))
blockManagerId, System.currentTimeMillis() / 1000, maxMemSize, maxDiskSize))
}
logDebug("Got in register 1" + tmp + Utils.getUsedTimeMs(startTimeMs))
sender ! true
@ -219,8 +228,8 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
deserializedSize: Long,
size: Long) {
memSize: Long,
diskSize: Long) {
val startTimeMs = System.currentTimeMillis()
val tmp = " " + blockManagerId + " " + blockId + " "
@ -231,7 +240,7 @@ class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {
sender ! true
}
blockManagerInfo(blockManagerId).addBlock(blockId, storageLevel, deserializedSize, size)
blockManagerInfo(blockManagerId).updateBlockInfo(blockId, storageLevel, memSize, diskSize)
var locations: HashSet[BlockManagerId] = null
if (blockInfo.containsKey(blockId)) {

View file

@ -1,23 +1,21 @@
package spark.storage
import java.nio._
import java.nio.ByteBuffer
import scala.actors._
import scala.actors.Actor._
import scala.actors.remote._
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.mutable.HashSet
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet}
import scala.util.Random
import spark.Logging
import spark.Utils
import spark.SparkEnv
import spark.{Logging, Utils, SparkEnv}
import spark.network._
/**
* This should be changed to use event model late.
* A network interface for BlockManager. Each slave should have one
* BlockManagerWorker.
*
* TODO: Use event model.
*/
class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
initLogging()
@ -32,7 +30,7 @@ class BlockManagerWorker(val blockManager: BlockManager) extends Logging {
logDebug("Handling as a buffer message " + bufferMessage)
val blockMessages = BlockMessageArray.fromBufferMessage(bufferMessage)
logDebug("Parsed as a block message array")
val responseMessages = blockMessages.map(processBlockMessage _).filter(_ != None).map(_.get)
val responseMessages = blockMessages.map(processBlockMessage).filter(_ != None).map(_.get)
/*logDebug("Processed block messages")*/
return Some(new BlockMessageArray(responseMessages).toBufferMessage)
} catch {

View file

@ -1,6 +1,6 @@
package spark.storage
import java.nio._
import java.nio.ByteBuffer
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer

View file

@ -1,5 +1,6 @@
package spark.storage
import java.nio._
import java.nio.ByteBuffer
import scala.collection.mutable.StringBuilder
import scala.collection.mutable.ArrayBuffer

View file

@ -1,15 +1,14 @@
package spark.storage
import spark.{Utils, Logging, Serializer, SizeEstimator}
import scala.collection.mutable.ArrayBuffer
import java.io.{File, RandomAccessFile}
import java.nio.ByteBuffer
import java.nio.channels.FileChannel.MapMode
import java.util.{UUID, LinkedHashMap}
import java.util.concurrent.Executors
import java.util.concurrent.ConcurrentHashMap
import it.unimi.dsi.fastutil.io._
import java.util.concurrent.ArrayBlockingQueue
import java.util.{LinkedHashMap, UUID}
import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
import scala.collection.mutable.ArrayBuffer
import spark.{Utils, Logging, Serializer, SizeEstimator}
/**
* Abstract class to store blocks
@ -19,7 +18,13 @@ abstract class BlockStore(blockManager: BlockManager) extends Logging {
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel)
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer]
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer]
/**
* Return the size of a block.
*/
def getSize(blockId: String): Long
def getBytes(blockId: String): Option[ByteBuffer]
@ -62,6 +67,11 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
}
blockDropper.start()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
def getSize(blockId: String): Long = memoryStore.synchronized { memoryStore.get(blockId).size }
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
if (level.deserialized) {
@ -74,17 +84,20 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(elements, sizeEstimate, true)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += sizeEstimate
logDebug("Block " + blockId + " stored as values to memory")
logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
blockId, sizeEstimate, freeMemory))
} else {
val entry = new Entry(bytes, bytes.array().length, false)
ensureFreeSpace(bytes.array.length)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += bytes.array().length
logDebug("Block " + blockId + " stored as " + bytes.array().length + " bytes to memory")
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
blockId, bytes.array().length, freeMemory))
}
}
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer] = {
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer] = {
if (level.deserialized) {
val elements = new ArrayBuffer[Any]
elements ++= values
@ -93,7 +106,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(elements, sizeEstimate, true)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += sizeEstimate
logDebug("Block " + blockId + " stored as values to memory")
logInfo("Block %s stored as values to memory (estimated size %d, free %d)".format(
blockId, sizeEstimate, freeMemory))
return Left(elements.iterator)
} else {
val bytes = dataSerialize(values)
@ -101,7 +115,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
val entry = new Entry(bytes, bytes.array().length, false)
memoryStore.synchronized { memoryStore.put(blockId, entry) }
currentMemory += bytes.array().length
logDebug("Block " + blockId + " stored as " + bytes.array.length + " bytes to memory")
logInfo("Block %s stored as %d bytes to memory (free %d)".format(
blockId, bytes.array.length, freeMemory))
return Right(bytes)
}
}
@ -128,7 +143,8 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
if (entry != null) {
memoryStore.remove(blockId)
currentMemory -= entry.size
logDebug("Block " + blockId + " of size " + entry.size + " dropped from memory")
logInfo("Block %s of size %d dropped from memory (free %d)".format(
blockId, entry.size, freeMemory))
} else {
logWarning("Block " + blockId + " could not be removed as it doesnt exist")
}
@ -164,11 +180,11 @@ class MemoryStore(blockManager: BlockManager, maxMemory: Long)
entry.dropPending = true
}
selectedMemory += pair.getValue.size
logDebug("Block " + blockId + " selected for dropping")
logInfo("Block " + blockId + " selected for dropping")
}
}
logDebug("" + selectedBlocks.size + " new blocks selected for dropping, " +
logInfo("" + selectedBlocks.size + " new blocks selected for dropping, " +
blocksToDrop.size + " blocks pending")
var i = 0
while (i < selectedBlocks.size) {
@ -192,7 +208,11 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
var lastLocalDirUsed = 0
addShutdownHook()
def getSize(blockId: String): Long = {
getFile(blockId).length
}
def putBytes(blockId: String, bytes: ByteBuffer, level: StorageLevel) {
logDebug("Attempting to put block " + blockId)
val startTime = System.currentTimeMillis
@ -203,13 +223,15 @@ class DiskStore(blockManager: BlockManager, rootDirs: String)
buffer.put(bytes.array)
channel.close()
val finishTime = System.currentTimeMillis
logDebug("Block " + blockId + " stored to file of " + bytes.array.length + " bytes to disk in " + (finishTime - startTime) + " ms")
logDebug("Block %s stored to file of %d bytes to disk in %d ms".format(
blockId, bytes.array.length, (finishTime - startTime)))
} else {
logError("File not created for block " + blockId)
}
}
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel): Either[Iterator[Any], ByteBuffer] = {
def putValues(blockId: String, values: Iterator[Any], level: StorageLevel)
: Either[Iterator[Any], ByteBuffer] = {
val bytes = dataSerialize(values)
logDebug("Converted block " + blockId + " to " + bytes.array.length + " bytes")
putBytes(blockId, bytes, level)

View file

@ -1,6 +1,6 @@
package spark.storage
import java.io._
import java.io.{Externalizable, ObjectInput, ObjectOutput}
class StorageLevel(
var useDisk: Boolean,

View file

@ -70,8 +70,8 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
assert(master.mustGetLocations(GetLocations("a3")).size === 0, "master was told about a3")
// Setting storage level of a1 and a2 to invalid; they should be removed from store and master
store.setLevel("a1", new StorageLevel(false, false, false, 1))
store.setLevel("a2", new StorageLevel(true, false, false, 0))
store.setLevelAndTellMaster("a1", new StorageLevel(false, false, false, 1))
store.setLevelAndTellMaster("a2", new StorageLevel(true, false, false, 0))
assert(store.getSingle("a1") === None, "a1 not removed from store")
assert(store.getSingle("a2") === None, "a2 not removed from store")
assert(master.mustGetLocations(GetLocations("a1")).size === 0, "master did not remove a1")