Put synchronized blocks around BitSets.

Added getLocalSourceInfo method that returns a SourceInfo based on local info.
This commit is contained in:
Mosharaf Chowdhury 2010-10-21 17:58:42 -07:00
parent 22a47a9d16
commit 8e6ed77724

View file

@ -81,16 +81,6 @@ extends BroadcastRecipe with Logging {
// Create a variableInfo object and store it in valueInfos
var variableInfo = blockifyObject (value_, BroadcastBT.blockSize)
guideMR = new GuideMultipleRequests
guideMR.setDaemon (true)
guideMR.start
logInfo ("GuideMultipleRequests started")
serveMR = new ServeMultipleRequests
serveMR.setDaemon (true)
serveMR.start
logInfo ("ServeMultipleRequests started")
// Prepare the value being broadcasted
// TODO: Refactoring and clean-up required here
arrayOfBlocks = variableInfo.arrayOfBlocks
@ -99,8 +89,7 @@ extends BroadcastRecipe with Logging {
hasBlocks = variableInfo.totalBlocks
hasBlocksBitVector = new BitSet (totalBlocks)
hasBlocksBitVector.set (0, totalBlocks)
hasBlocksBitVector.set (0, totalBlocks)
while (listenPort == -1) {
listenPortLock.synchronized {
@ -117,7 +106,18 @@ extends BroadcastRecipe with Logging {
guidePortLock.synchronized {
guidePortLock.wait
}
}
}
guideMR = new GuideMultipleRequests
guideMR.setDaemon (true)
guideMR.start
logInfo ("GuideMultipleRequests started")
serveMR = new ServeMultipleRequests
serveMR.setDaemon (true)
serveMR.start
logInfo ("ServeMultipleRequests started")
BroadcastBT.registerValue (uuid,
SourceInfo (hostAddress, guidePort, totalBlocks, totalBytes))
}
@ -223,13 +223,37 @@ extends BroadcastRecipe with Logging {
return retVal
}
private def getLocalSourceInfo: SourceInfo = {
// Wait till hostName and listenPort are OK
while (listenPort == -1) {
listenPortLock.synchronized {
listenPortLock.wait
}
}
// Wait till totalBlocks and totalBytes are OK
while (totalBlocks == -1) {
totalBlocksLock.synchronized {
totalBlocksLock.wait
}
}
var localSourceInfo = SourceInfo (hostAddress, listenPort, totalBlocks,
totalBytes)
hasBlocksBitVector.synchronized {
localSourceInfo.hasBlocksBitVector = hasBlocksBitVector
}
return localSourceInfo
}
class TalkToGuide (gInfo: SourceInfo)
extends Thread with Logging {
override def run = {
// Connect to Guide and send this worker's information
val clientSocketToGuide =
new Socket(gInfo.hostAddress, gInfo.listenPort)
logInfo ("Sending local information to the Guide")
val oosGuide =
new ObjectOutputStream (clientSocketToGuide.getOutputStream)
oosGuide.flush
@ -238,11 +262,12 @@ extends BroadcastRecipe with Logging {
// TODO: Do we need a breaking mechanism out of this infinite loop?
while (true) {
oosGuide.writeObject(SourceInfo (hostAddress, listenPort,
gInfo.totalBlocks, gInfo.totalBytes))
// Send local information
oosGuide.writeObject(getLocalSourceInfo)
oosGuide.flush
logInfo ("Sent local SourceInfo to Guide")
// Receive source information from Master
// Receive source information from Guide
var suitableSources =
oisGuide.readObject.asInstanceOf[ListBuffer[SourceInfo]]
@ -250,6 +275,8 @@ extends BroadcastRecipe with Logging {
// TODO: There might be some contradiciton on the use of listOfSources
listOfSources.synchronized {
suitableSources.foreach { srcInfo =>
// Removing old copy of srcInfo to be replaced with a new one
// It works because case clases are compared by constructor params
if (listOfSources.contains(srcInfo))
{ listOfSources = listOfSources - srcInfo }
listOfSources = listOfSources + srcInfo
@ -380,7 +407,10 @@ extends BroadcastRecipe with Logging {
}
peersNotInUse.foreach { eachSource =>
var tempHasBlocksBitVector = hasBlocksBitVector
var tempHasBlocksBitVector: BitSet = null
hasBlocksBitVector.synchronized {
tempHasBlocksBitVector = hasBlocksBitVector
}
tempHasBlocksBitVector.flip (0, tempHasBlocksBitVector.size)
tempHasBlocksBitVector.and (eachSource.hasBlocksBitVector)
@ -428,7 +458,9 @@ extends BroadcastRecipe with Logging {
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
arrayOfBlocks(hasBlocks) = bcBlock
hasBlocksBitVector.set (bcBlock.blockID)
hasBlocksBitVector.synchronized {
hasBlocksBitVector.set (bcBlock.blockID)
}
hasBlocks += 1
hasBlocksLock.synchronized {
hasBlocksLock.notifyAll
@ -650,12 +682,11 @@ extends BroadcastRecipe with Logging {
private def pickAndSendBlock (rxHasBlocksBitVector: BitSet): Boolean = {
// Figure out which blocks to send
rxHasBlocksBitVector.flip (0, rxHasBlocksBitVector.size)
hasBlocksBitVector.synchronized {
hasBlocksBitVector.synchronized {
rxHasBlocksBitVector.and (hasBlocksBitVector)
}
var nextIndex = rxHasBlocksBitVector.nextSetBit (0)
var nextIndex = rxHasBlocksBitVector.nextSetBit (0)
if (nextIndex == -1) { return false }
try {