diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 5666ba999d..b36d813fdc 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -334,64 +334,65 @@ class BitTorrentBroadcast[T] (@transient var value_ : T, local: Boolean) logInfo ("TalkToGuide started") // TODO: + + return (hasBlocks == totalBlocks) } - // Tries to receive broadcast from the source and returns Boolean status. - // This might be called multiple times to retry a defined number of times. - private def receiveSingleTransmission(sourceInfo: SourceInfo): Boolean = { - var clientSocketToSource: Socket = null - var oosSource: ObjectOutputStream = null - var oisSource: ObjectInputStream = null - - var receptionSucceeded = false - try { - // Connect to the source to get the object itself - clientSocketToSource = - new Socket (sourceInfo.hostAddress, sourceInfo.listenPort) - oosSource = - new ObjectOutputStream (clientSocketToSource.getOutputStream) - oosSource.flush - oisSource = - new ObjectInputStream (clientSocketToSource.getInputStream) - - logInfo ("Inside receiveSingleTransmission") - logInfo ("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks) + class TalkToPeer (sourceInfo: SourceInfo) + extends Thread with Logging { + override def run = { + var peerSocketToSource: Socket = null + var oosSource: ObjectOutputStream = null + var oisSource: ObjectInputStream = null - // Send hasBlocksBitVector - oosSource.writeObject(hasBlocksBitVector) - oosSource.flush - - // Send the range - oosSource.writeObject((hasBlocks, totalBlocks)) - oosSource.flush - - for (i <- hasBlocks until totalBlocks) { - val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] - arrayOfBlocks(hasBlocks) = bcBlock - hasBlocksBitVector.set (bcBlock.blockID) - hasBlocks += 1 - // Set to true if at least one block is received - receptionSucceeded = true - hasBlocksLock.synchronized { - hasBlocksLock.notifyAll - } - logInfo ("Received block: " + i + " " + bcBlock) - } - logInfo ("After the receive loop") - } catch { - case e: Exception => { - logInfo ("receiveSingleTransmission had a " + e) - } - } finally { - if (oisSource != null) { oisSource.close } - if (oosSource != null) { oosSource.close } - if (clientSocketToSource != null) { clientSocketToSource.close } - } + try { + // Connect to the source + peerSocketToSource = + new Socket (sourceInfo.hostAddress, sourceInfo.listenPort) + oosSource = + new ObjectOutputStream (peerSocketToSource.getOutputStream) + oosSource.flush + oisSource = + new ObjectInputStream (peerSocketToSource.getInputStream) - return receptionSucceeded - } + // TODO: Who decides which blocks to move back and forth? + // TODO: Letting the source decide for now + + while (true) { + // Send hasBlocksBitVector + oosSource.writeObject(hasBlocksBitVector) + oosSource.flush + + // Receive hasBlocksBitVector + // TODO: Need to update this information in the listOfSources + var txHasBlocksBitVector = oisSource.readObject.asInstanceOf[BitSet] + + val bcBlock = oisSource.readObject.asInstanceOf[BroadcastBlock] + arrayOfBlocks(hasBlocks) = bcBlock + hasBlocksBitVector.set (bcBlock.blockID) + hasBlocks += 1 + hasBlocksLock.synchronized { + hasBlocksLock.notifyAll + } + logInfo ("Received block: " + bcBlock.blockID + " " + bcBlock) + } + } catch { + case e: Exception => { + // TODO: Right now assuming an exception == the other end is dead + // Remove this pInfo from listOfSources + listOfSources.synchronized { + listOfSources = listOfSources - sourceInfo + } + } + } finally { + if (oisSource != null) { oisSource.close } + if (oosSource != null) { oosSource.close } + if (peerSocketToSource != null) { peerSocketToSource.close } + } + } + } class GuideMultipleRequests extends Thread with Logging { override def run = { @@ -547,16 +548,24 @@ class BitTorrentBroadcast[T] (@transient var value_ : T, local: Boolean) oos.flush private val ois = new ObjectInputStream (clientSocket.getInputStream) + var keepServing = true + def run = { try { logInfo ("new ServeSingleRequest is running") + do { // Receive hasBlocksBitVector from the receiver var rxHasBlocksBitVector = ois.readObject.asInstanceOf[BitSet] + // TODO: Update this info in listOfSources - // Receive range to send - var sendRange = ois.readObject.asInstanceOf[(Int, Int)] - sendObject (sendRange._1, sendRange._2) + // Send hasBlocksBitVector to the receiver + oos.writeObject(hasBlocksBitVector) + oos.flush + + keepServing = pickAndSendBlock (rxHasBlocksBitVector) + // TODO: Perhaps we shouldn't close connection after only one try + } while (keepServing) } catch { // TODO: Need to add better exception handling here // If something went wrong, e.g., the worker at the other end died etc. @@ -572,28 +581,24 @@ class BitTorrentBroadcast[T] (@transient var value_ : T, local: Boolean) } } - private def sendObject (sendFrom: Int, sendUntil: Int) = { - // Wait till receiving the SourceInfo from Master - while (totalBlocks == -1) { - totalBlocksLock.synchronized { - totalBlocksLock.wait - } - } - - for (i <- sendFrom until sendUntil) { - while (i == hasBlocks) { - hasBlocksLock.synchronized { - hasBlocksLock.wait - } - } - try { - oos.writeObject (arrayOfBlocks(i)) - oos.flush - } catch { - case e: Exception => { } - } - logInfo ("Send block: " + i + " " + arrayOfBlocks(i)) + // TODO: Right now picking the first block that matches + private def pickAndSendBlock (rxHasBlocksBitVector: BitSet): Boolean = { + // Figure out which blocks to send + rxHasBlocksBitVector.flip (0, rxHasBlocksBitVector.size) + hasBlocksBitVector.synchronized { + rxHasBlocksBitVector.and (hasBlocksBitVector) } + + var nextIndex = rxHasBlocksBitVector.nextSetBit (0) + + if (nextIndex == -1) { return false } + + try { + oos.writeObject (arrayOfBlocks(nextIndex)) + oos.flush + } catch { case e: Exception => { } } + logInfo ("Sent block: " + nextIndex + " " + arrayOfBlocks(nextIndex)) + return true } } }