Merge branch 'master' into blockmanagerUI

This commit is contained in:
Denny 2012-11-11 14:00:02 -08:00
commit 68e0a88282
5 changed files with 207 additions and 104 deletions

View file

@ -50,16 +50,6 @@ private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
private[spark] class BlockLocker(numLockers: Int) {
private val hashLocker = Array.fill(numLockers)(new Object())
def getLock(blockId: String): Object = {
return hashLocker(math.abs(blockId.hashCode % numLockers))
}
}
private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
@ -87,10 +77,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
private val NUM_LOCKS = 337
private val locker = new BlockLocker(NUM_LOCKS)
private val blockInfo = new ConcurrentHashMap[String, BlockInfo]()
private val blockInfo = new ConcurrentHashMap[String, BlockInfo](1000)
private[storage] val memoryStore: BlockStore = new MemoryStore(this, maxMemory)
private[storage] val diskStore: BlockStore =
@ -110,7 +97,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
val maxBytesInFlight =
System.getProperty("spark.reducer.maxMbInFlight", "48").toLong * 1024 * 1024
// Whether to compress broadcast variables that are stored
val compressBroadcast = System.getProperty("spark.broadcast.compress", "true").toBoolean
// Whether to compress shuffle output that are stored
val compressShuffle = System.getProperty("spark.shuffle.compress", "true").toBoolean
// Whether to compress RDD partitions that are stored serialized
val compressRdds = System.getProperty("spark.rdd.compress", "false").toBoolean
@ -150,28 +139,28 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
* For example, a block with MEMORY_AND_DISK set might have fallen out to be only on disk.
*/
def reportBlockStatus(blockId: String) {
locker.getLock(blockId).synchronized {
val curLevel = blockInfo.get(blockId) match {
case null =>
StorageLevel.NONE
case info =>
val (curLevel, inMemSize, onDiskSize) = blockInfo.get(blockId) match {
case null =>
(StorageLevel.NONE, 0L, 0L)
case info =>
info.synchronized {
info.level match {
case null =>
StorageLevel.NONE
(StorageLevel.NONE, 0L, 0L)
case level =>
val inMem = level.useMemory && memoryStore.contains(blockId)
val onDisk = level.useDisk && diskStore.contains(blockId)
new StorageLevel(onDisk, inMem, level.deserialized, level.replication)
(
new StorageLevel(onDisk, inMem, level.deserialized, level.replication),
if (inMem) memoryStore.getSize(blockId) else 0L,
if (onDisk) diskStore.getSize(blockId) else 0L
)
}
}
master.mustHeartBeat(HeartBeat(
blockManagerId,
blockId,
curLevel,
if (curLevel.useMemory) memoryStore.getSize(blockId) else 0L,
if (curLevel.useDisk) diskStore.getSize(blockId) else 0L))
logDebug("Told master about block " + blockId)
}
}
master.mustHeartBeat(HeartBeat(blockManagerId, blockId, curLevel, inMemSize, onDiskSize))
logDebug("Told master about block " + blockId)
}
/**
@ -213,9 +202,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
if (info != null) {
val info = blockInfo.get(blockId)
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@ -273,9 +262,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
}
} else {
logDebug("Block " + blockId + " not registered locally")
}
} else {
logDebug("Block " + blockId + " not registered locally")
}
return None
}
@ -298,9 +287,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
}
}
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
if (info != null) {
val info = blockInfo.get(blockId)
if (info != null) {
info.synchronized {
info.waitForReady() // In case the block is still being put() by another thread
val level = info.level
logDebug("Level for block " + blockId + " is " + level)
@ -338,9 +327,9 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
throw new Exception("Block " + blockId + " not found on disk, though it should be")
}
}
} else {
logDebug("Block " + blockId + " not registered locally")
}
} else {
logDebug("Block " + blockId + " not registered locally")
}
return None
}
@ -583,7 +572,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
// Size of the block in bytes (to return to caller)
var size = 0L
locker.getLock(blockId).synchronized {
myInfo.synchronized {
logDebug("Put for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@ -681,7 +670,7 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
null
}
locker.getLock(blockId).synchronized {
myInfo.synchronized {
logDebug("PutBytes for block " + blockId + " took " + Utils.getUsedTimeMs(startTimeMs)
+ " to get into synchronized block")
@ -779,26 +768,30 @@ class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, m
*/
def dropFromMemory(blockId: String, data: Either[ArrayBuffer[Any], ByteBuffer]) {
logInfo("Dropping block " + blockId + " from memory")
locker.getLock(blockId).synchronized {
val info = blockInfo.get(blockId)
val level = info.level
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo("Writing block " + blockId + " to disk")
data match {
case Left(elements) =>
diskStore.putValues(blockId, elements, level, false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
val info = blockInfo.get(blockId)
if (info != null) {
info.synchronized {
val level = info.level
if (level.useDisk && !diskStore.contains(blockId)) {
logInfo("Writing block " + blockId + " to disk")
data match {
case Left(elements) =>
diskStore.putValues(blockId, elements, level, false)
case Right(bytes) =>
diskStore.putBytes(blockId, bytes, level)
}
}
memoryStore.remove(blockId)
if (info.tellMaster) {
reportBlockStatus(blockId)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
blockInfo.remove(blockId)
}
}
memoryStore.remove(blockId)
if (info.tellMaster) {
reportBlockStatus(blockId)
}
if (!level.useDisk) {
// The block is completely gone from this node; forget it so we can put() it again later.
blockInfo.remove(blockId)
}
} else {
// The block has already been dropped
}
}

View file

@ -18,12 +18,16 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
private val entries = new LinkedHashMap[String, Entry](32, 0.75f, true)
private var currentMemory = 0L
// Object used to ensure that only one thread is putting blocks and if necessary, dropping
// blocks from the memory store.
private val putLock = new Object()
logInfo("MemoryStore started with capacity %s.".format(Utils.memoryBytesToString(maxMemory)))
def freeMemory: Long = maxMemory - currentMemory
override def getSize(blockId: String): Long = {
synchronized {
entries.synchronized {
entries.get(blockId).size
}
}
@ -60,7 +64,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getBytes(blockId: String): Option[ByteBuffer] = {
val entry = synchronized {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@ -73,7 +77,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def getValues(blockId: String): Option[Iterator[Any]] = {
val entry = synchronized {
val entry = entries.synchronized {
entries.get(blockId)
}
if (entry == null) {
@ -87,7 +91,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def remove(blockId: String) {
synchronized {
entries.synchronized {
val entry = entries.get(blockId)
if (entry != null) {
entries.remove(blockId)
@ -101,7 +105,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def clear() {
synchronized {
entries.synchronized {
entries.clear()
}
logInfo("MemoryStore cleared")
@ -122,12 +126,22 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* Try to put in a set of values, if we can free up enough space. The value should either be
* an ArrayBuffer if deserialized is true or a ByteBuffer otherwise. Its (possibly estimated)
* size must also be passed by the caller.
*
* Locks on the object putLock to ensure that all the put requests and its associated block
* dropping is done by only on thread at a time. Otherwise while one thread is dropping
* blocks to free memory for one block, another thread may use up the freed space for
* another block.
*/
private def tryToPut(blockId: String, value: Any, size: Long, deserialized: Boolean): Boolean = {
synchronized {
// TODO: Its possible to optimize the locking by locking entries only when selecting blocks
// to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
// released, it must be ensured that those to-be-dropped blocks are not double counted for
// freeing up more space for another block that needs to be put. Only then the actually dropping
// of blocks (and writing to disk if necessary) can proceed in parallel.
putLock.synchronized {
if (ensureFreeSpace(blockId, size)) {
val entry = new Entry(value, size, deserialized)
entries.put(blockId, entry)
entries.synchronized { entries.put(blockId, entry) }
currentMemory += size
if (deserialized) {
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
@ -157,10 +171,11 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
* block from the same RDD (which leads to a wasteful cyclic replacement pattern for RDDs that
* don't fit into memory that we want to avoid).
*
* Assumes that a lock on the MemoryStore is held by the caller. (Otherwise, the freed space
* might fill up before the caller puts in their new value.)
* Assumes that a lock is held by the caller to ensure only one thread is dropping blocks.
* Otherwise, the freed space may fill up before the caller puts in their new value.
*/
private def ensureFreeSpace(blockIdToAdd: String, space: Long): Boolean = {
logInfo("ensureFreeSpace(%d) called with curMem=%d, maxMem=%d".format(
space, currentMemory, maxMemory))
@ -169,36 +184,44 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
return false
}
// TODO: This should relinquish the lock on the MemoryStore while flushing out old blocks
// in order to allow parallelism in writing to disk
if (maxMemory - currentMemory < space) {
val rddToAdd = getRddId(blockIdToAdd)
val selectedBlocks = new ArrayBuffer[String]()
var selectedMemory = 0L
val iterator = entries.entrySet().iterator()
while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
"block from the same RDD")
return false
// This is synchronized to ensure that the set of entries is not changed
// (because of getValue or getBytes) while traversing the iterator, as that
// can lead to exceptions.
entries.synchronized {
val iterator = entries.entrySet().iterator()
while (maxMemory - (currentMemory - selectedMemory) < space && iterator.hasNext) {
val pair = iterator.next()
val blockId = pair.getKey
if (rddToAdd != null && rddToAdd == getRddId(blockId)) {
logInfo("Will not store " + blockIdToAdd + " as it would require dropping another " +
"block from the same RDD")
return false
}
selectedBlocks += blockId
selectedMemory += pair.getValue.size
}
selectedBlocks += blockId
selectedMemory += pair.getValue.size
}
if (maxMemory - (currentMemory - selectedMemory) >= space) {
logInfo(selectedBlocks.size + " blocks selected for dropping")
for (blockId <- selectedBlocks) {
val entry = entries.get(blockId)
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
val entry = entries.synchronized { entries.get(blockId) }
// This should never be null as only one thread should be dropping
// blocks and removing entries. However the check is still here for
// future safety.
if (entry != null) {
val data = if (entry.deserialized) {
Left(entry.value.asInstanceOf[ArrayBuffer[Any]])
} else {
Right(entry.value.asInstanceOf[ByteBuffer].duplicate())
}
blockManager.dropFromMemory(blockId, data)
}
blockManager.dropFromMemory(blockId, data)
}
return true
} else {
@ -209,7 +232,7 @@ private class MemoryStore(blockManager: BlockManager, maxMemory: Long)
}
override def contains(blockId: String): Boolean = {
synchronized { entries.containsKey(blockId) }
entries.synchronized { entries.containsKey(blockId) }
}
}

View file

@ -0,0 +1,91 @@
package spark.storage
import akka.actor._
import spark.KryoSerializer
import java.util.concurrent.ArrayBlockingQueue
import util.Random
/**
* This class tests the BlockManager and MemoryStore for thread safety and
* deadlocks. It spawns a number of producer and consumer threads. Producer
* threads continuously pushes blocks into the BlockManager and consumer
* threads continuously retrieves the blocks form the BlockManager and tests
* whether the block is correct or not.
*/
private[spark] object ThreadingTest {
val numProducers = 5
val numBlocksPerProducer = 20000
private[spark] class ProducerThread(manager: BlockManager, id: Int) extends Thread {
val queue = new ArrayBlockingQueue[(String, Seq[Int])](100)
override def run() {
for (i <- 1 to numBlocksPerProducer) {
val blockId = "b-" + id + "-" + i
val blockSize = Random.nextInt(1000)
val block = (1 to blockSize).map(_ => Random.nextInt())
val level = randomLevel()
val startTime = System.currentTimeMillis()
manager.put(blockId, block.iterator, level, true)
println("Pushed block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
queue.add((blockId, block))
}
println("Producer thread " + id + " terminated")
}
def randomLevel(): StorageLevel = {
math.abs(Random.nextInt()) % 4 match {
case 0 => StorageLevel.MEMORY_ONLY
case 1 => StorageLevel.MEMORY_ONLY_SER
case 2 => StorageLevel.MEMORY_AND_DISK
case 3 => StorageLevel.MEMORY_AND_DISK_SER
}
}
}
private[spark] class ConsumerThread(
manager: BlockManager,
queue: ArrayBlockingQueue[(String, Seq[Int])]
) extends Thread {
var numBlockConsumed = 0
override def run() {
println("Consumer thread started")
while(numBlockConsumed < numBlocksPerProducer) {
val (blockId, block) = queue.take()
val startTime = System.currentTimeMillis()
manager.get(blockId) match {
case Some(retrievedBlock) =>
assert(retrievedBlock.toList.asInstanceOf[List[Int]] == block.toList, "Block " + blockId + " did not match")
println("Got block " + blockId + " in " + (System.currentTimeMillis - startTime) + " ms")
case None =>
assert(false, "Block " + blockId + " could not be retrieved")
}
numBlockConsumed += 1
}
println("Consumer thread terminated")
}
}
def main(args: Array[String]) {
System.setProperty("spark.kryoserializer.buffer.mb", "1")
val actorSystem = ActorSystem("test")
val serializer = new KryoSerializer
val blockManagerMaster = new BlockManagerMaster(actorSystem, true, true)
val blockManager = new BlockManager(blockManagerMaster, serializer, 1024 * 1024)
val producers = (1 to numProducers).map(i => new ProducerThread(blockManager, i))
val consumers = producers.map(p => new ConsumerThread(blockManager, p.queue))
producers.foreach(_.start)
consumers.foreach(_.start)
producers.foreach(_.join)
consumers.foreach(_.join)
blockManager.stop()
blockManagerMaster.stop()
actorSystem.shutdown()
actorSystem.awaitTermination()
println("Everything stopped.")
println("It will take sometime for the JVM to clean all temporary files and shutdown. Sit tight.")
}
}

View file

@ -49,7 +49,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
return ans
}
def +=(other: Vector) {
def += (other: Vector): Vector = {
if (length != other.length)
throw new IllegalArgumentException("Vectors of different length")
var ans = 0.0
@ -58,6 +58,7 @@ class Vector(val elements: Array[Double]) extends Serializable {
elements(i) += other(i)
i += 1
}
this
}
def * (scale: Double): Vector = Vector(length, i => this(i) * scale)

View file

@ -15,14 +15,13 @@ object SparkKMeans {
return new Vector(line.split(' ').map(_.toDouble))
}
def closestPoint(p: Vector, centers: HashMap[Int, Vector]): Int = {
def closestPoint(p: Vector, centers: Array[Vector]): Int = {
var index = 0
var bestIndex = 0
var closest = Double.PositiveInfinity
for (i <- 1 to centers.size) {
val vCurr = centers.get(i).get
val tempDist = p.squaredDist(vCurr)
for (i <- 0 until centers.length) {
val tempDist = p.squaredDist(centers(i))
if (tempDist < closest) {
closest = tempDist
bestIndex = i
@ -43,32 +42,28 @@ object SparkKMeans {
val K = args(2).toInt
val convergeDist = args(3).toDouble
var points = data.takeSample(false, K, 42)
var kPoints = new HashMap[Int, Vector]
var kPoints = data.takeSample(false, K, 42).toArray
var tempDist = 1.0
for (i <- 1 to points.size) {
kPoints.put(i, points(i-1))
}
while(tempDist > convergeDist) {
var closest = data.map (p => (closestPoint(p, kPoints), (p, 1)))
var pointStats = closest.reduceByKey {case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
var pointStats = closest.reduceByKey{case ((x1, y1), (x2, y2)) => (x1 + x2, y1 + y2)}
var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collect()
var newPoints = pointStats.map {pair => (pair._1, pair._2._1 / pair._2._2)}.collectAsMap()
tempDist = 0.0
for (pair <- newPoints) {
tempDist += kPoints.get(pair._1).get.squaredDist(pair._2)
for (i <- 0 until K) {
tempDist += kPoints(i).squaredDist(newPoints(i))
}
for (newP <- newPoints) {
kPoints.put(newP._1, newP._2)
kPoints(newP._1) = newP._2
}
}
println("Final centers: " + kPoints)
println("Final centers:")
kPoints.foreach(println)
System.exit(0)
}
}