Added a synchronized block around numCopiesSent manipulation.

This commit is contained in:
Mosharaf Chowdhury 2010-10-26 08:52:56 -07:00
parent dba92f7dbe
commit e659efca88

View file

@ -409,12 +409,14 @@ extends BroadcastRecipe with Logging {
Executors.newFixedThreadPool(MAX_PEERS).asInstanceOf[ThreadPoolExecutor]
while (hasBlocks < totalBlocks) {
val maxPeers = Math.min (listOfSources.size, MAX_PEERS)
while(threadPool.getActiveCount < maxPeers && hasBlocks < totalBlocks) {
var numThreadsToCreate = Math.min (listOfSources.size, MAX_PEERS) -
threadPool.getActiveCount
while(numThreadsToCreate > 0 && hasBlocks < totalBlocks) {
var peerToTalkTo = pickPeerToTalkTo
if (peerToTalkTo != null) {
threadPool.execute (new TalkToPeer (peerToTalkTo))
}
numThreadsToCreate = numThreadsToCreate - 1
// Sleep for a while before starting some more threads
Thread.sleep(500)
}
@ -739,14 +741,16 @@ extends BroadcastRecipe with Logging {
}
// Traverse over all the blocks
do {
nextIndex = rxHasBlocksBitVector.nextSetBit(nextIndex + 1)
if (nextIndex != -1 && numCopiesSent(nextIndex) < minCopies) {
minCopies = numCopiesSent(nextIndex)
numCopiesSent(nextIndex) = numCopiesSent(nextIndex) + 1
blockIndex = nextIndex
}
} while (nextIndex != -1)
numCopiesSent.synchronized {
do {
nextIndex = rxHasBlocksBitVector.nextSetBit(nextIndex + 1)
if (nextIndex != -1 && numCopiesSent(nextIndex) < minCopies) {
minCopies = numCopiesSent(nextIndex)
numCopiesSent(nextIndex) = numCopiesSent(nextIndex) + 1
blockIndex = nextIndex
}
} while (nextIndex != -1)
}
if (blockIndex == -1) { return false }