Cleaned TreeBroadcast

This commit is contained in:
Mosharaf Chowdhury 2012-07-13 00:54:25 -07:00
parent 34999d97f5
commit 8ccffe21da
3 changed files with 50 additions and 119 deletions

View file

@ -262,10 +262,7 @@ extends Broadcast[T] with Logging with Serializable {
def receiveBroadcast(variableUUID: UUID): Boolean = {
val gInfo = MultiTracker.getGuideInfo(variableUUID)
if (gInfo.listenPort == SourceInfo.TxOverGoToDefault ||
gInfo.listenPort == SourceInfo.TxNotStartedRetry) {
// TODO: SourceInfo.TxNotStartedRetry is not really in use because we go
// to the default mechanism anyway when receiveBroadcast returns false
if (gInfo.listenPort == SourceInfo.TxOverGoToDefault) {
return false
}
@ -798,13 +795,10 @@ extends Broadcast[T] with Logging with Serializable {
try {
// Connect to the source
guideSocketToSource =
new Socket(sourceInfo.hostAddress, sourceInfo.listenPort)
gosSource =
new ObjectOutputStream(guideSocketToSource.getOutputStream)
guideSocketToSource = new Socket(sourceInfo.hostAddress, sourceInfo.listenPort)
gosSource = new ObjectOutputStream(guideSocketToSource.getOutputStream)
gosSource.flush()
gisSource =
new ObjectInputStream(guideSocketToSource.getInputStream)
gisSource = new ObjectInputStream(guideSocketToSource.getInputStream)
// Throw away whatever comes in
gisSource.readObject.asInstanceOf[SourceInfo]
@ -991,7 +985,6 @@ extends Broadcast[T] with Logging with Serializable {
if (rxSourceInfo.listenPort == SourceInfo.StopBroadcast) {
stopBroadcast = true
} else {
// Carry on
addToListOfSources(rxSourceInfo)
}
@ -1029,12 +1022,7 @@ extends Broadcast[T] with Logging with Serializable {
}
}
} catch {
// If something went wrong, e.g., the worker at the other end died etc.
// then close everything up
// Exception can happen if the receiver stops receiving
case e: Exception => {
logInfo("ServeSingleRequest had a " + e)
}
case e: Exception => logInfo("ServeSingleRequest had a " + e)
} finally {
logInfo("ServeSingleRequest is closing streams and sockets")
ois.close()
@ -1048,9 +1036,7 @@ extends Broadcast[T] with Logging with Serializable {
oos.writeObject(arrayOfBlocks(blockToSend))
oos.flush()
} catch {
case e: Exception => {
logInfo("sendBlock had a " + e)
}
case e: Exception => logInfo("sendBlock had a " + e)
}
logInfo("Sent block: " + blockToSend + " to " + clientSocket)
}

View file

@ -16,7 +16,6 @@ extends Logging {
val REGISTER_BROADCAST_TRACKER = 0
val UNREGISTER_BROADCAST_TRACKER = 1
val FIND_BROADCAST_TRACKER = 2
val GET_UPDATED_SHARE = 3
// Map to keep track of guides of ongoing broadcasts
var valueToGuideMap = Map[UUID, SourceInfo]()
@ -197,8 +196,6 @@ extends Logging {
// Send reply back
oos.writeObject(gInfo)
oos.flush()
} else if (messageType == GET_UPDATED_SHARE) {
// TODO: Not implemented
} else {
throw new SparkException("Undefined messageType at TrackMultipleValues")
}
@ -255,9 +252,7 @@ extends Logging {
oosTracker.flush()
gInfo = oisTracker.readObject.asInstanceOf[SourceInfo]
} catch {
case e: Exception => {
logInfo("getGuideInfo had a " + e)
}
case e: Exception => logInfo("getGuideInfo had a " + e)
} finally {
if (oisTracker != null) {
oisTracker.close()

View file

@ -63,9 +63,7 @@ extends Broadcast[T] with Logging with Serializable {
// Must always come AFTER guideMR is created
while (guidePort == -1) {
guidePortLock.synchronized {
guidePortLock.wait()
}
guidePortLock.synchronized { guidePortLock.wait() }
}
serveMR = new ServeMultipleRequests
@ -75,9 +73,7 @@ extends Broadcast[T] with Logging with Serializable {
// Must always come AFTER serveMR is created
while (listenPort == -1) {
listenPortLock.synchronized {
listenPortLock.wait()
}
listenPortLock.synchronized { listenPortLock.wait() }
}
// Must always come AFTER listenPort is created
@ -145,19 +141,14 @@ extends Broadcast[T] with Logging with Serializable {
def receiveBroadcast(variableUUID: UUID): Boolean = {
val gInfo = MultiTracker.getGuideInfo(variableUUID)
if (gInfo.listenPort == SourceInfo.TxOverGoToDefault ||
gInfo.listenPort == SourceInfo.TxNotStartedRetry) {
// TODO: SourceInfo.TxNotStartedRetry is not really in use because we go
// to the default mechanism anyway when receiveBroadcast returns false
if (gInfo.listenPort == SourceInfo.TxOverGoToDefault) {
return false
}
// Wait until hostAddress and listenPort are created by the
// ServeMultipleRequests thread
while (listenPort == -1) {
listenPortLock.synchronized {
listenPortLock.wait()
}
listenPortLock.synchronized { listenPortLock.wait() }
}
var clientSocketToMaster: Socket = null
@ -169,14 +160,10 @@ extends Broadcast[T] with Logging with Serializable {
var retriesLeft = MultiTracker.MaxRetryCount
do {
// Connect to Master and send this worker's Information
clientSocketToMaster =
new Socket(Broadcast.MasterHostAddress, gInfo.listenPort)
// TODO: Guiding object connection is reusable
oosMaster =
new ObjectOutputStream(clientSocketToMaster.getOutputStream)
clientSocketToMaster = new Socket(Broadcast.MasterHostAddress, gInfo.listenPort)
oosMaster = new ObjectOutputStream(clientSocketToMaster.getOutputStream)
oosMaster.flush()
oisMaster =
new ObjectInputStream(clientSocketToMaster.getInputStream)
oisMaster = new ObjectInputStream(clientSocketToMaster.getInputStream)
logInfo("Connected to Master's guiding object")
@ -188,9 +175,7 @@ extends Broadcast[T] with Logging with Serializable {
var sourceInfo = oisMaster.readObject.asInstanceOf[SourceInfo]
totalBlocks = sourceInfo.totalBlocks
arrayOfBlocks = new Array[BroadcastBlock](totalBlocks)
totalBlocksLock.synchronized {
totalBlocksLock.notifyAll()
}
totalBlocksLock.synchronized { totalBlocksLock.notifyAll() }
totalBytes = sourceInfo.totalBytes
logInfo("Received SourceInfo from Master:" + sourceInfo + " My Port: " + listenPort)
@ -223,8 +208,10 @@ extends Broadcast[T] with Logging with Serializable {
return (hasBlocks == totalBlocks)
}
// Tries to receive broadcast from the source and returns Boolean status.
// This might be called multiple times to retry a defined number of times.
/**
* Tries to receive broadcast from the source and returns Boolean status.
* This might be called multiple times to retry a defined number of times.
*/
private def receiveSingleTransmission(sourceInfo: SourceInfo): Boolean = {
var clientSocketToSource: Socket = null
var oosSource: ObjectOutputStream = null
@ -233,13 +220,10 @@ extends Broadcast[T] with Logging with Serializable {
var receptionSucceeded = false
try {
// Connect to the source to get the object itself
clientSocketToSource =
new Socket(sourceInfo.hostAddress, sourceInfo.listenPort)
oosSource =
new ObjectOutputStream(clientSocketToSource.getOutputStream)
clientSocketToSource = new Socket(sourceInfo.hostAddress, sourceInfo.listenPort)
oosSource = new ObjectOutputStream(clientSocketToSource.getOutputStream)
oosSource.flush()
oisSource =
new ObjectInputStream(clientSocketToSource.getInputStream)
oisSource = new ObjectInputStream(clientSocketToSource.getInputStream)
logInfo("Inside receiveSingleTransmission")
logInfo("totalBlocks: "+ totalBlocks + " " + "hasBlocks: " + hasBlocks)
@ -257,16 +241,13 @@ extends Broadcast[T] with Logging with Serializable {
arrayOfBlocks(hasBlocks) = bcBlock
hasBlocks += 1
// Set to true if at least one block is received
receptionSucceeded = true
hasBlocksLock.synchronized {
hasBlocksLock.notifyAll()
}
hasBlocksLock.synchronized { hasBlocksLock.notifyAll() }
}
} catch {
case e: Exception => {
logInfo("receiveSingleTransmission had a " + e)
}
case e: Exception => logInfo("receiveSingleTransmission had a " + e)
} finally {
if (oisSource != null) {
oisSource.close()
@ -295,12 +276,9 @@ extends Broadcast[T] with Logging with Serializable {
guidePort = serverSocket.getLocalPort
logInfo("GuideMultipleRequests => " + serverSocket + " " + guidePort)
guidePortLock.synchronized {
guidePortLock.notifyAll()
}
guidePortLock.synchronized { guidePortLock.notifyAll() }
try {
// Don't stop until there is a copy in HDFS
while (!stopBroadcast) {
var clientSocket: Socket = null
try {
@ -311,8 +289,9 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("GuideMultipleRequests Timeout.")
// Stop broadcast if at least one worker has connected and
// everyone connected so far are done. Comparing with
// listOfSources.size - 1, because it includes the Guide itself
// everyone connected so far are done.
// Comparing with listOfSources.size - 1, because the Guide itself
// is included
if (listOfSources.size > 1 &&
setOfCompletedSources.size == listOfSources.size - 1) {
stopBroadcast = true
@ -340,7 +319,6 @@ extends Broadcast[T] with Logging with Serializable {
serverSocket.close()
}
}
// Shutdown the thread pool
threadPool.shutdown()
}
@ -357,17 +335,13 @@ extends Broadcast[T] with Logging with Serializable {
try {
// Connect to the source
guideSocketToSource =
new Socket(sourceInfo.hostAddress, sourceInfo.listenPort)
gosSource =
new ObjectOutputStream(guideSocketToSource.getOutputStream)
guideSocketToSource = new Socket(sourceInfo.hostAddress, sourceInfo.listenPort)
gosSource = new ObjectOutputStream(guideSocketToSource.getOutputStream)
gosSource.flush()
gisSource =
new ObjectInputStream(guideSocketToSource.getInputStream)
gisSource = new ObjectInputStream(guideSocketToSource.getInputStream)
// Send stopBroadcast signal. Range = SourceInfo.StopBroadcast*2
gosSource.writeObject((SourceInfo.StopBroadcast,
SourceInfo.StopBroadcast))
// Send stopBroadcast signal
gosSource.writeObject((SourceInfo.StopBroadcast, SourceInfo.StopBroadcast))
gosSource.flush()
} catch {
case e: Exception => {
@ -426,9 +400,9 @@ extends Broadcast[T] with Logging with Serializable {
// This should work since SourceInfo is a case class
assert(listOfSources.contains(selectedSourceInfo))
// Remove first
// Remove first
// (Currently removing a source based on just one failure notification!)
listOfSources = listOfSources - selectedSourceInfo
// TODO: Removing a source based on just one failure notification!
// Update sourceInfo and put it back in, IF reception succeeded
if (!sourceInfo.receptionFailed) {
@ -437,17 +411,13 @@ extends Broadcast[T] with Logging with Serializable {
setOfCompletedSources += thisWorkerInfo
}
// Update leecher count and put it back in
selectedSourceInfo.currentLeechers -= 1
// Put it back
listOfSources += selectedSourceInfo
}
}
} catch {
// If something went wrong, e.g., the worker at the other end died etc.
// then close() everything up
case e: Exception => {
// Assuming that exception caused due to receiver worker failure.
// Remove failed worker from listOfSources and update leecherCount of
// corresponding source worker
listOfSources.synchronized {
@ -472,20 +442,15 @@ extends Broadcast[T] with Logging with Serializable {
}
}
// FIXME: Caller must have a synchronized block on listOfSources
// FIXME: If a worker fails to get the broadcasted variable from a source
// and comes back to the Master, this function might choose the worker
// itself as a source to create a dependency cycle (this worker was put
// into listOfSources as a streming source when it first arrived). The
// length of this cycle can be arbitrarily long.
// Assuming the caller to have a synchronized block on listOfSources
// Select one with the most leechers. This will level-wise fill the tree
private def selectSuitableSource(skipSourceInfo: SourceInfo): SourceInfo = {
// Select one with the most leechers. This will level-wise fill the tree
var maxLeechers = -1
var selectedSource: SourceInfo = null
listOfSources.foreach { source =>
if (source != skipSourceInfo &&
if ((source.hostAddress != skipSourceInfo.hostAddress ||
source.listenPort != skipSourceInfo.listenPort) &&
source.currentLeechers < MultiTracker.MaxDegree &&
source.currentLeechers > maxLeechers) {
selectedSource = source
@ -495,7 +460,6 @@ extends Broadcast[T] with Logging with Serializable {
// Update leecher count
selectedSource.currentLeechers += 1
return selectedSource
}
}
@ -512,9 +476,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("ServeMultipleRequests started with " + serverSocket)
listenPortLock.synchronized {
listenPortLock.notifyAll()
}
listenPortLock.synchronized { listenPortLock.notifyAll() }
try {
while (!stopBroadcast) {
@ -523,10 +485,9 @@ extends Broadcast[T] with Logging with Serializable {
serverSocket.setSoTimeout(MultiTracker.ServerSocketTimeout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
logInfo("ServeMultipleRequests Timeout.")
}
case e: Exception => logInfo("ServeMultipleRequests Timeout.")
}
if (clientSocket != null) {
logInfo("Serve: Accepted new client connection: " + clientSocket)
try {
@ -565,19 +526,14 @@ extends Broadcast[T] with Logging with Serializable {
sendFrom = rangeToSend._1
sendUntil = rangeToSend._2
if (sendFrom == SourceInfo.StopBroadcast &&
sendUntil == SourceInfo.StopBroadcast) {
// If not a valid range, stop broadcast
if (sendFrom == SourceInfo.StopBroadcast && sendUntil == SourceInfo.StopBroadcast) {
stopBroadcast = true
} else {
// Carry on
sendObject
}
} catch {
// If something went wrong, e.g., the worker at the other end died etc.
// then close everything up
case e: Exception => {
logInfo("ServeSingleRequest had a " + e)
}
case e: Exception => logInfo("ServeSingleRequest had a " + e)
} finally {
logInfo("ServeSingleRequest is closing streams and sockets")
ois.close()
@ -589,24 +545,18 @@ extends Broadcast[T] with Logging with Serializable {
private def sendObject() {
// Wait till receiving the SourceInfo from Master
while (totalBlocks == -1) {
totalBlocksLock.synchronized {
totalBlocksLock.wait()
}
totalBlocksLock.synchronized { totalBlocksLock.wait() }
}
for (i <- sendFrom until sendUntil) {
while (i == hasBlocks) {
hasBlocksLock.synchronized {
hasBlocksLock.wait()
}
hasBlocksLock.synchronized { hasBlocksLock.wait() }
}
try {
oos.writeObject(arrayOfBlocks(i))
oos.flush()
} catch {
case e: Exception => {
logInfo("sendObject had a " + e)
}
case e: Exception => logInfo("sendObject had a " + e)
}
logInfo("Sent block: " + i + " to " + clientSocket)
}