- Added TalkToPeer class which will allow peers to communicate between them.

- Still need the controller class that will decide which peers to communicate with
This commit is contained in:
Mosharaf Chowdhury 2010-10-20 17:27:02 -07:00
parent 05bca235a7
commit 53bd64afe7

View file

@ -334,64 +334,65 @@ class BitTorrentBroadcast[T] (@transient var value_ : T, local: Boolean)
logInfo ("TalkToGuide started")
// TODO:
return (hasBlocks == totalBlocks)
}
// Tries to receive broadcast from the source and returns Boolean status.
// This might be called multiple times to retry a defined number of times.
private def receiveSingleTransmission(sourceInfo: SourceInfo): Boolean = {
var clientSocketToSource: Socket = null
var oosSource: ObjectOutputStream = null
var oisSource: ObjectInputStream = null
var receptionSucceeded = false
try {
// Connect to the source to get the object itself
clientSocketToSource =
new Socket (sourceInfo.hostAddress, sourceInfo.listenPort)
oosSource =
new ObjectOutputStream (clientSocketToSource.getOutputStream)
oosSource.flush
oisSource =
new ObjectInputStream (clientSocketToSource.getInputStream)
logInfo ("Inside receiveSingleTransmission")
logInfo ("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
class TalkToPeer (sourceInfo: SourceInfo)
extends Thread with Logging {
override def run = {
var peerSocketToSource: Socket = null
var oosSource: ObjectOutputStream = null
var oisSource: ObjectInputStream = null
// Send hasBlocksBitVector
oosSource.writeObject(hasBlocksBitVector)
oosSource.flush
// Send the range
oosSource.writeObject((hasBlocks, totalBlocks))
oosSource.flush
for (i <- hasBlocks until totalBlocks) {
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
arrayOfBlocks(hasBlocks) = bcBlock
hasBlocksBitVector.set (bcBlock.blockID)
hasBlocks += 1
// Set to true if at least one block is received
receptionSucceeded = true
hasBlocksLock.synchronized {
hasBlocksLock.notifyAll
}
logInfo ("Received block: " + i + " " + bcBlock)
}
logInfo ("After the receive loop")
} catch {
case e: Exception => {
logInfo ("receiveSingleTransmission had a " + e)
}
} finally {
if (oisSource != null) { oisSource.close }
if (oosSource != null) { oosSource.close }
if (clientSocketToSource != null) { clientSocketToSource.close }
}
try {
// Connect to the source
peerSocketToSource =
new Socket (sourceInfo.hostAddress, sourceInfo.listenPort)
oosSource =
new ObjectOutputStream (peerSocketToSource.getOutputStream)
oosSource.flush
oisSource =
new ObjectInputStream (peerSocketToSource.getInputStream)
return receptionSucceeded
}
// TODO: Who decides which blocks to move back and forth?
// TODO: Letting the source decide for now
while (true) {
// Send hasBlocksBitVector
oosSource.writeObject(hasBlocksBitVector)
oosSource.flush
// Receive hasBlocksBitVector
// TODO: Need to update this information in the listOfSources
var txHasBlocksBitVector = oisSource.readObject.asInstanceOf[BitSet]
val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock]
arrayOfBlocks(hasBlocks) = bcBlock
hasBlocksBitVector.set (bcBlock.blockID)
hasBlocks += 1
hasBlocksLock.synchronized {
hasBlocksLock.notifyAll
}
logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock)
}
} catch {
case e: Exception => {
// TODO: Right now assuming an exception == the other end is dead
// Remove this pInfo from listOfSources
listOfSources.synchronized {
listOfSources = listOfSources - sourceInfo
}
}
} finally {
if (oisSource != null) { oisSource.close }
if (oosSource != null) { oosSource.close }
if (peerSocketToSource != null) { peerSocketToSource.close }
}
}
}
class GuideMultipleRequests extends Thread with Logging {
override def run = {
@ -547,16 +548,24 @@ class BitTorrentBroadcast[T] (@transient var value_ : T, local: Boolean)
oos.flush
private val ois = new ObjectInputStream (clientSocket.getInputStream)
var keepServing = true
def run = {
try {
logInfo ("new ServeSingleRequest is running")
do {
// Receive hasBlocksBitVector from the receiver
var rxHasBlocksBitVector = ois.readObject.asInstanceOf[BitSet]
// TODO: Update this info in listOfSources
// Receive range to send
var sendRange = ois.readObject.asInstanceOf[(Int, Int)]
sendObject (sendRange._1, sendRange._2)
// Send hasBlocksBitVector to the receiver
oos.writeObject(hasBlocksBitVector)
oos.flush
keepServing = pickAndSendBlock (rxHasBlocksBitVector)
// TODO: Perhaps we shouldn't close connection after only one try
} while (keepServing)
} catch {
// TODO: Need to add better exception handling here
// If something went wrong, e.g., the worker at the other end died etc.
@ -572,28 +581,24 @@ class BitTorrentBroadcast[T] (@transient var value_ : T, local: Boolean)
}
}
private def sendObject (sendFrom: Int, sendUntil: Int) = {
// Wait till receiving the SourceInfo from Master
while (totalBlocks == -1) {
totalBlocksLock.synchronized {
totalBlocksLock.wait
}
}
for (i <- sendFrom until sendUntil) {
while (i == hasBlocks) {
hasBlocksLock.synchronized {
hasBlocksLock.wait
}
}
try {
oos.writeObject (arrayOfBlocks(i))
oos.flush
} catch {
case e: Exception => { }
}
logInfo ("Send block: " + i + " " + arrayOfBlocks(i))
// TODO: Right now picking the first block that matches
private def pickAndSendBlock (rxHasBlocksBitVector: BitSet): Boolean = {
// Figure out which blocks to send
rxHasBlocksBitVector.flip (0, rxHasBlocksBitVector.size)
hasBlocksBitVector.synchronized {
rxHasBlocksBitVector.and (hasBlocksBitVector)
}
var nextIndex = rxHasBlocksBitVector.nextSetBit (0)
if (nextIndex == -1) { return false }
try {
oos.writeObject (arrayOfBlocks(nextIndex))
oos.flush
} catch { case e: Exception => { } }
logInfo ("Sent block: " + nextIndex + " " + arrayOfBlocks(nextIndex))
return true
}
}
}