Added hasBlocksBitVector to keep track of which blocks a peer actually have.

This commit is contained in:
Mosharaf Chowdhury 2010-10-18 14:34:33 -07:00
parent b02ad04560
commit 0791e20bfc

View file

@ -2,7 +2,7 @@ package spark
import java.io._
import java.net._
import java.util.{UUID, PriorityQueue, Comparator}
import java.util.{UUID, PriorityQueue, Comparator, BitSet}
import com.google.common.collect.MapMaker
@ -46,6 +46,8 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
@transient var pqOfSources = new PriorityQueue[SourceInfo]
@transient var hasBlocksBitVector: BitSet = null
@transient var serveMR: ServeMultipleRequests = null
@transient var guideMR: GuideMultipleRequests = null
@ -91,6 +93,10 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
totalBlocks = variableInfo.totalBlocks
hasBlocks = variableInfo.totalBlocks
hasBlocksBitVector = new BitSet (totalBlocks)
hasBlocksBitVector.set (0, totalBlocks)
while (listenPort == -1) {
listenPortLock.synchronized {
listenPortLock.wait
@ -98,9 +104,9 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
pqOfSources = new PriorityQueue[SourceInfo]
val masterSource_0 =
val masterSource =
new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 0)
pqOfSources.add (masterSource_0)
pqOfSources.add (masterSource)
// Register with the Tracker
while (guidePort == -1) {
@ -151,6 +157,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
private def initializeSlaveVariables = {
arrayOfBlocks = null
hasBlocksBitVector = null
totalBytes = -1
totalBlocks = -1
hasBlocks = 0
@ -282,6 +289,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
var sourceInfo = oisMaster.readObject.asInstanceOf[SourceInfo]
totalBlocks = sourceInfo.totalBlocks
arrayOfBlocks = new Array[BroadcastBlock] (totalBlocks)
hasBlocksBitVector = new BitSet (totalBlocks)
totalBlocksLock.synchronized {
totalBlocksLock.notifyAll
}
@ -341,6 +349,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
for (i <- hasBlocks until totalBlocks) {
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
arrayOfBlocks(hasBlocks) = bcBlock
hasBlocksBitVector.set (bcBlock.blockID)
hasBlocks += 1
// Set to true if at least one block is received
receptionSucceeded = true