Modified BlockManagerId API to ensure zero duplicate objects. Fixed BlockManagerId testcase in BlockManagerTestSuite.

This commit is contained in:
Tathagata Das 2013-01-22 22:55:26 -08:00
parent 151c47eef5
commit bacade6caf
6 changed files with 51 additions and 29 deletions

View file

@ -20,7 +20,7 @@ private[spark] class MapStatus(var address: BlockManagerId, var compressedSizes:
}
def readExternal(in: ObjectInput) {
address = new BlockManagerId(in)
address = BlockManagerId(in)
compressedSizes = new Array[Byte](in.readInt())
in.readFully(compressedSizes)
}

View file

@ -69,7 +69,7 @@ class BlockManager(
implicit val futureExecContext = connectionManager.futureExecContext
val connectionManagerId = connectionManager.id
val blockManagerId = new BlockManagerId(connectionManagerId.host, connectionManagerId.port)
val blockManagerId = BlockManagerId(connectionManagerId.host, connectionManagerId.port)
// TODO: This will be removed after cacheTracker is removed from the code base.
var cacheTracker: CacheTracker = null

View file

@ -3,20 +3,35 @@ package spark.storage
import java.io.{Externalizable, IOException, ObjectInput, ObjectOutput}
import java.util.concurrent.ConcurrentHashMap
/**
* This class represent an unique identifier for a BlockManager.
* The first 2 constructors of this class is made private to ensure that
* BlockManagerId objects can be created only using the factory method in
* [[spark.storage.BlockManager$]]. This allows de-duplication of id objects.
* Also, constructor parameters are private to ensure that parameters cannot
* be modified from outside this class.
*/
private[spark] class BlockManagerId private (
private var ip_ : String,
private var port_ : Int
) extends Externalizable {
private def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
private[spark] class BlockManagerId(var ip: String, var port: Int) extends Externalizable {
def this() = this(null, 0) // For deserialization only
def this(in: ObjectInput) = this(in.readUTF(), in.readInt())
def ip = ip_
def port = port_
override def writeExternal(out: ObjectOutput) {
out.writeUTF(ip)
out.writeInt(port)
out.writeUTF(ip_)
out.writeInt(port_)
}
override def readExternal(in: ObjectInput) {
ip = in.readUTF()
port = in.readInt()
ip_ = in.readUTF()
port_ = in.readInt()
}
@throws(classOf[IOException])
@ -35,6 +50,12 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
private[spark] object BlockManagerId {
def apply(ip: String, port: Int) =
getCachedBlockManagerId(new BlockManagerId(ip, port))
def apply(in: ObjectInput) =
getCachedBlockManagerId(new BlockManagerId(in))
val blockManagerIdCache = new ConcurrentHashMap[BlockManagerId, BlockManagerId]()
def getCachedBlockManagerId(id: BlockManagerId): BlockManagerId = {

View file

@ -54,8 +54,7 @@ class UpdateBlockInfo(
}
override def readExternal(in: ObjectInput) {
blockManagerId = new BlockManagerId()
blockManagerId.readExternal(in)
blockManagerId = BlockManagerId(in)
blockId = in.readUTF()
storageLevel = new StorageLevel()
storageLevel.readExternal(in)

View file

@ -47,13 +47,13 @@ class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000),
Array(compressedSize1000, compressedSize10000)))
tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000),
Array(compressedSize10000, compressedSize1000)))
val statuses = tracker.getServerStatuses(10, 0)
assert(statuses.toSeq === Seq((new BlockManagerId("hostA", 1000), size1000),
(new BlockManagerId("hostB", 1000), size10000)))
assert(statuses.toSeq === Seq((BlockManagerId("hostA", 1000), size1000),
(BlockManagerId("hostB", 1000), size10000)))
tracker.stop()
}
@ -65,14 +65,14 @@ class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
val compressedSize10000 = MapOutputTracker.compressSize(10000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
val size10000 = MapOutputTracker.decompressSize(compressedSize10000)
tracker.registerMapOutput(10, 0, new MapStatus(new BlockManagerId("hostA", 1000),
tracker.registerMapOutput(10, 0, new MapStatus(BlockManagerId("hostA", 1000),
Array(compressedSize1000, compressedSize1000, compressedSize1000)))
tracker.registerMapOutput(10, 1, new MapStatus(new BlockManagerId("hostB", 1000),
tracker.registerMapOutput(10, 1, new MapStatus(BlockManagerId("hostB", 1000),
Array(compressedSize10000, compressedSize1000, compressedSize1000)))
// As if we had two simulatenous fetch failures
tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
tracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
tracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
// The remaining reduce task might try to grab the output dispite the shuffle failure;
// this should cause it to fail, and the scheduler will ignore the failure due to the
@ -95,13 +95,13 @@ class MapOutputTrackerSuite extends FunSuite with BeforeAndAfter {
val compressedSize1000 = MapOutputTracker.compressSize(1000L)
val size1000 = MapOutputTracker.decompressSize(compressedSize1000)
masterTracker.registerMapOutput(10, 0, new MapStatus(
new BlockManagerId("hostA", 1000), Array(compressedSize1000)))
BlockManagerId("hostA", 1000), Array(compressedSize1000)))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
assert(slaveTracker.getServerStatuses(10, 0).toSeq ===
Seq((new BlockManagerId("hostA", 1000), size1000)))
Seq((BlockManagerId("hostA", 1000), size1000)))
masterTracker.unregisterMapOutput(10, 0, new BlockManagerId("hostA", 1000))
masterTracker.unregisterMapOutput(10, 0, BlockManagerId("hostA", 1000))
masterTracker.incrementGeneration()
slaveTracker.updateGeneration(masterTracker.getGeneration)
intercept[FetchFailedException] { slaveTracker.getServerStatuses(10, 0) }

View file

@ -82,16 +82,18 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}
test("BlockManagerId object caching") {
val id1 = new StorageLevel(false, false, false, 3)
val id2 = new StorageLevel(false, false, false, 3)
val id1 = BlockManagerId("XXX", 1)
val id2 = BlockManagerId("XXX", 1) // this should return the same object as id1
assert(id2 === id1, "id2 is not same as id1")
assert(id2.eq(id1), "id2 is not the same object as id1")
val bytes1 = spark.Utils.serialize(id1)
val id1_ = spark.Utils.deserialize[StorageLevel](bytes1)
val id1_ = spark.Utils.deserialize[BlockManagerId](bytes1)
val bytes2 = spark.Utils.serialize(id2)
val id2_ = spark.Utils.deserialize[StorageLevel](bytes2)
assert(id1_ === id1, "Deserialized id1 not same as original id1")
assert(id2_ === id2, "Deserialized id2 not same as original id1")
assert(id1_ === id2_, "Deserialized id1 not same as deserialized id2")
assert(id2_.eq(id1_), "Deserialized id2 not the same object as deserialized level1")
val id2_ = spark.Utils.deserialize[BlockManagerId](bytes2)
assert(id1_ === id1, "Deserialized id1 is not same as original id1")
assert(id1_.eq(id1), "Deserialized id1 is not the same object as original id1")
assert(id2_ === id2, "Deserialized id2 is not same as original id2")
assert(id2_.eq(id1), "Deserialized id2 is not the same object as original id1")
}
test("master + 1 manager interaction") {