Picking peers weighted by the number of rare blocks they have. A block is rare if there are at most 2 copies in the neighborhood. Better number can be used (some function of neighborhood size)

This commit is contained in:
Mosharaf Chowdhury 2011-02-15 16:27:44 -08:00
parent cf81da9485
commit 0416cc22d2

View file

@ -444,7 +444,13 @@ extends Broadcast[T] with Logging {
threadPool.getActiveCount
while (hasBlocks < totalBlocks && numThreadsToCreate > 0) {
var peerToTalkTo = pickPeerToTalkTo
var peerToTalkTo = pickPeerToTalkToRarestFirst
if (peerToTalkTo != null)
logInfo ("Peer chosen: " + peerToTalkTo + " with " + peerToTalkTo.hasBlocksBitVector)
else
logInfo ("No peer chosen...")
if (peerToTalkTo != null) {
threadPool.execute (new TalkToPeer (peerToTalkTo))
@ -467,7 +473,7 @@ extends Broadcast[T] with Logging {
// Right now picking the one that has the most blocks this peer wants
// Also picking peer randomly if no one has anything interesting
private def pickPeerToTalkTo: SourceInfo = {
private def pickPeerToTalkToRandom: SourceInfo = {
var curPeer: SourceInfo = null
var curMax = 0
@ -475,10 +481,13 @@ extends Broadcast[T] with Logging {
// Find peers that are not connected right now
var peersNotInUse = ListBuffer[SourceInfo] ()
synchronized {
peersNotInUse = listOfSources -- peersNowTalking
listOfSources.synchronized {
peersNowTalking.synchronized {
peersNotInUse = listOfSources -- peersNowTalking
}
}
// Select the peer that has the most blocks that this receiver does not
peersNotInUse.foreach { eachSource =>
var tempHasBlocksBitVector: BitSet = null
hasBlocksBitVector.synchronized {
@ -493,7 +502,7 @@ extends Broadcast[T] with Logging {
}
}
// Always pick randomly or randomly pick randomly?
// TODO: Always pick randomly or randomly pick randomly?
// Now always picking randomly
if (curPeer == null && peersNotInUse.size > 0) {
// Pick uniformly the i'th required peer
@ -508,14 +517,86 @@ extends Broadcast[T] with Logging {
}
}
if (curPeer != null)
logInfo ("Peer chosen: " + curPeer + " with " + curPeer.hasBlocksBitVector)
else
logInfo ("No peer chosen...")
return curPeer
}
// Picking peer with the weight of rare blocks it has
private def pickPeerToTalkToRarestFirst: SourceInfo = {
// Find peers that are not connected right now
var peersNotInUse = ListBuffer[SourceInfo] ()
listOfSources.synchronized {
peersNowTalking.synchronized {
peersNotInUse = listOfSources -- peersNowTalking
}
}
// Count the number of copies of each block in the neighborhood
var numCopiesPerBlock = Array.tabulate [Int] (totalBlocks) (_ => 0)
listOfSources.synchronized {
listOfSources.foreach { eachSource =>
for (i <- 0 until totalBlocks) {
numCopiesPerBlock(i) +=
( if (eachSource.hasBlocksBitVector.get (i)) 1 else 0 )
}
}
}
// TODO: A block is rare if there are at most 2 copies of that block
// TODO: This CONSTANT could be a function of the neighborhood size
var rareBlocksIndices = ListBuffer[Int] ()
for (i <- 0 until totalBlocks) {
if (numCopiesPerBlock(i) > 0 && numCopiesPerBlock(i) <= 2) {
rareBlocksIndices += i
}
}
// Find peers with rare blocks
var peersWithRareBlocks = ListBuffer[(SourceInfo, Int)] ()
var totalRareBlocks = 0
peersNotInUse.foreach { eachPeer =>
var hasRareBlocks = 0
rareBlocksIndices.foreach { rareBlock =>
if (eachPeer.hasBlocksBitVector.get (rareBlock)) {
hasRareBlocks += 1
}
}
if (hasRareBlocks > 0) {
peersWithRareBlocks += ((eachPeer, hasRareBlocks))
}
totalRareBlocks += hasRareBlocks
}
// Select a peer from peersWithRareBlocks based on weight calculated from
// unique rare blocks
var selectedPeerToTalkTo: SourceInfo = null
if (peersWithRareBlocks.size > 0) {
// Sort the peers based on how many rare blocks they have
peersWithRareBlocks.sortBy(_._2)
var randomNumber = BitTorrentBroadcast.ranGen.nextDouble
var tempSum = 0.0
var i = 0
do {
tempSum += (1.0 * peersWithRareBlocks(i)._2 / totalRareBlocks)
if (tempSum >= randomNumber) {
selectedPeerToTalkTo = peersWithRareBlocks(i)._1
}
i += 1
} while (i < peersWithRareBlocks.size && selectedPeerToTalkTo == null)
}
if (selectedPeerToTalkTo == null) {
selectedPeerToTalkTo = pickPeerToTalkToRandom
}
return selectedPeerToTalkTo
}
class TalkToPeer (peerToTalkTo: SourceInfo)
extends Thread with Logging {
private var peerSocketToSource: Socket = null