diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 65c0a9095c..747b643f25 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -154,7 +154,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } val time = (System.nanoTime - start) / 1e9 - println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s") + println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s") } } } @@ -656,11 +656,13 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean) } else { val start = System.nanoTime + // Thread.sleep (5000) // TODO: val receptionSucceeded = BroadcastSS.receiveVariable (uuid) // If does not succeed, then get from HDFS copy if (receptionSucceeded) { value_ = BroadcastSS.values.get(uuid).asInstanceOf[T] } else { + // println (System.currentTimeMillis + ": " + "Reading from HDFS") val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid)) value_ = fileIn.readObject.asInstanceOf[T] BroadcastSS.values.put(uuid, value_) @@ -669,10 +671,6 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean) val time = (System.nanoTime - start) / 1e9 println( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s") - - - // TODO: Do something - Thread.sleep (10000) } } } @@ -680,7 +678,8 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean) class PublishThread extends Thread { override def run = { // TODO: Put some delay here to give time others to register - Thread.sleep (10000) + // Thread.sleep (5000) + // println (System.currentTimeMillis + ": " + "Waited. Now sending...") BroadcastSS.synchronized { BroadcastSS.publishVariable[T] (uuid, value) } @@ -1076,11 +1075,14 @@ private object BroadcastSS { def receiveVariable[A] (uuid: UUID): Boolean = { // TODO: Things will change if out-of-order variable recepetion is supported + // println (System.currentTimeMillis + ": " + "In receiveVariable") + // Check in valueBytes if (xferValueBytesToValues[A] (uuid)) { return true } // Check if its in progress for (i <- 0 until maxRetryCount) { + // println (System.currentTimeMillis + ": " + uuid + " " + curUUID) while (uuid == curUUID) { Thread.sleep (100) } // TODO: How long to sleep if (xferValueBytesToValues[A] (uuid)) { return true } @@ -1196,7 +1198,7 @@ private object BroadcastSS { curBlockBitmap = new Array[Boolean] (curTotalBlocks) curArrayOfBytes = new Array[Byte] (curTotalBytes) - println (curUUID + " " + curTotalBlocks + " " + curTotalBytes) + // println (System.currentTimeMillis + ": " + curUUID + " " + curTotalBlocks + " " + curTotalBytes) } case DATA_MSG => { val realInfo = byteArrayToObject[(UUID, Int, Array[Byte])] ( @@ -1215,7 +1217,9 @@ private object BroadcastSS { curBlockBitmap(blockIndex) = true System.arraycopy (blockData, 0, curArrayOfBytes, blockIndex * blockSize, blockData.length) - + + // println (System.currentTimeMillis + ": " + "Got stuff for: " + blockUUID) + // Done receiving if (curHasBlocks == curTotalBlocks) { // Store as a Array[Byte] @@ -1223,6 +1227,8 @@ private object BroadcastSS { valueBytes.put (curUUID, curArrayOfBytes) } + // println (System.currentTimeMillis + ": " + "Finished reading. Stored in valueBytes") + // RESET curUUID = null } @@ -1232,7 +1238,6 @@ private object BroadcastSS { } } } - println(endPoint.getId + " deliver(" + s + "):seq:" + data(0) + " stripe:" + data(1) + " " + data + ")") } }