- SplitStream working in local machine for single variable broadcast

- Removed delays before publishing and receiving.
- Commented out some prints.
This commit is contained in:
Mosharaf Chowdhury 2010-05-04 15:32:30 -07:00
parent d0a92571dd
commit 53a2367c9c

View file

@ -154,7 +154,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
val time = (System.nanoTime - start) / 1e9
println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
}
}
}
@ -656,11 +656,13 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
} else {
val start = System.nanoTime
// Thread.sleep (5000) // TODO:
val receptionSucceeded = BroadcastSS.receiveVariable (uuid)
// If does not succeed, then get from HDFS copy
if (receptionSucceeded) {
value_ = BroadcastSS.values.get(uuid).asInstanceOf[T]
} else {
// println (System.currentTimeMillis + ": " + "Reading from HDFS")
val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
value_ = fileIn.readObject.asInstanceOf[T]
BroadcastSS.values.put(uuid, value_)
@ -669,10 +671,6 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
val time = (System.nanoTime - start) / 1e9
println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
// TODO: Do something
Thread.sleep (10000)
}
}
}
@ -680,7 +678,8 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
class PublishThread extends Thread {
override def run = {
// TODO: Put some delay here to give time others to register
Thread.sleep (10000)
// Thread.sleep (5000)
// println (System.currentTimeMillis + ": " + "Waited. Now sending...")
BroadcastSS.synchronized {
BroadcastSS.publishVariable[T] (uuid, value)
}
@ -1076,11 +1075,14 @@ private object BroadcastSS {
def receiveVariable[A] (uuid: UUID): Boolean = {
// TODO: Things will change if out-of-order variable recepetion is supported
// println (System.currentTimeMillis + ": " + "In receiveVariable")
// Check in valueBytes
if (xferValueBytesToValues[A] (uuid)) { return true }
// Check if its in progress
for (i <- 0 until maxRetryCount) {
// println (System.currentTimeMillis + ": " + uuid + " " + curUUID)
while (uuid == curUUID) { Thread.sleep (100) } // TODO: How long to sleep
if (xferValueBytesToValues[A] (uuid)) { return true }
@ -1196,7 +1198,7 @@ private object BroadcastSS {
curBlockBitmap = new Array[Boolean] (curTotalBlocks)
curArrayOfBytes = new Array[Byte] (curTotalBytes)
println (curUUID + " " + curTotalBlocks + " " + curTotalBytes)
// println (System.currentTimeMillis + ": " + curUUID + " " + curTotalBlocks + " " + curTotalBytes)
}
case DATA_MSG => {
val realInfo = byteArrayToObject[(UUID, Int, Array[Byte])] (
@ -1215,7 +1217,9 @@ private object BroadcastSS {
curBlockBitmap(blockIndex) = true
System.arraycopy (blockData, 0, curArrayOfBytes,
blockIndex * blockSize, blockData.length)
// println (System.currentTimeMillis + ": " + "Got stuff for: " + blockUUID)
// Done receiving
if (curHasBlocks == curTotalBlocks) {
// Store as a Array[Byte]
@ -1223,6 +1227,8 @@ private object BroadcastSS {
valueBytes.put (curUUID, curArrayOfBytes)
}
// println (System.currentTimeMillis + ": " + "Finished reading. Stored in valueBytes")
// RESET
curUUID = null
}
@ -1232,7 +1238,6 @@ private object BroadcastSS {
}
}
}
println(endPoint.getId + " deliver(" + s + "):seq:" + data(0) + " stripe:" + data(1) + " " + data + ")")
}
}