diff --git a/src/examples/BroadcastTest.scala b/src/examples/BroadcastTest.scala index 05778668bd..6d31ade95b 100644 --- a/src/examples/BroadcastTest.scala +++ b/src/examples/BroadcastTest.scala @@ -14,11 +14,14 @@ object BroadcastTest { for (i <- 0 until arr.length) arr(i) = i + val start = System.nanoTime val barr = spark.broadcast(arr) spark.parallelize(1 to 10, slices).foreach { println("in task: barr = " + barr) i => println(barr.value.size) } + val time = (System.nanoTime - start) / 1e9 + println("BroadcastTest took " + time + " s") } } diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index 0c1aa43285..b2114eb151 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -67,13 +67,19 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) @transient var hasCopyInHDFS = false // Must call this after all the variables have been created/initialized - if (!local) { sendBroadcast } + if (!local) { + val start = System.nanoTime + sendBroadcast + val time = (System.nanoTime - start) / 1e9 + println("sendBroadcast took " + time + " s") + } - def sendBroadcast () { + def sendBroadcast () { // Store a persistent copy in HDFS - val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) - out.writeObject (value_) - out.close + // TODO: Turned OFF for now + // val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) + // out.writeObject (value_) + // out.close hasCopyInHDFS = true // Create a variableInfo object and store it in valueInfos @@ -438,17 +444,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // be listening to. ReplicaID is 0 and other fields are invalid (-1) var sourceInfo = ois.readObject.asInstanceOf[SourceInfo] - // Select a suitable source and send it back to the worker - selectedSourceInfo = selectSuitableSource (sourceInfo) - // println (System.currentTimeMillis + ": " + "Sending selectedSourceInfo:" + selectedSourceInfo) - oos.writeObject (selectedSourceInfo) - oos.flush - - // Add this new (if it can finish) source to the PQ of sources - thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress, - sourceInfo.listenPort, totalBlocks, totalBytes, 0) - // println (System.currentTimeMillis + ": " + "Adding possible new source to pqOfSources: " + thisWorkerInfo) pqOfSources.synchronized { + // Select a suitable source and send it back to the worker + selectedSourceInfo = selectSuitableSource (sourceInfo) + // println (System.currentTimeMillis + ": " + "Sending selectedSourceInfo:" + selectedSourceInfo) + oos.writeObject (selectedSourceInfo) + oos.flush + + // Add this new (if it can finish) source to the PQ of sources + thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress, + sourceInfo.listenPort, totalBlocks, totalBytes, 0) + // println (System.currentTimeMillis + ": " + "Adding possible new source to pqOfSources: " + thisWorkerInfo) pqOfSources.add (thisWorkerInfo) } @@ -509,23 +515,22 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) } } + // TODO: Caller must have a synchronized block on pqOfSources // TODO: If a worker fails to get the broadcasted variable from a source and // comes back to Master, this function might choose the worker itself as a // source tp create a dependency cycle (this worker was put into pqOfSources // as a streming source when it first arrived). The length of this cycle can // be arbitrarily long. private def selectSuitableSource(skipSourceInfo: SourceInfo): SourceInfo = { - // Select one with the lowest number of leechers - pqOfSources.synchronized { - // take is a blocking call removing the element from PQ - var selectedSource = pqOfSources.poll - assert (selectedSource != null) - // Update leecher count - selectedSource.currentLeechers += 1 - // Add it back and then return - pqOfSources.add (selectedSource) - return selectedSource - } + // Select one based on the ordering strategy (e.g., least leechers etc.) + // take is a blocking call removing the element from PQ + var selectedSource = pqOfSources.poll + assert (selectedSource != null) + // Update leecher count + selectedSource.currentLeechers += 1 + // Add it back and then return + pqOfSources.add (selectedSource) + return selectedSource } } } @@ -744,7 +749,7 @@ case class SourceInfo (val hostAddress: String, val listenPort: Int, var MBps: Double = BroadcastCS.MaxMBps // Ascending sort based on leecher count - // def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers) + def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers) // Descending sort based on speed // def compareTo (o: SourceInfo): Int = { @@ -754,14 +759,14 @@ case class SourceInfo (val hostAddress: String, val listenPort: Int, // } // Descending sort based on globally stored speed - def compareTo (o: SourceInfo): Int = { - val mySpeed = BroadcastCS.getSourceSpeed (hostAddress) - val urSpeed = BroadcastCS.getSourceSpeed (o.hostAddress) + // def compareTo (o: SourceInfo): Int = { + // val mySpeed = BroadcastCS.getSourceSpeed (hostAddress) + // val urSpeed = BroadcastCS.getSourceSpeed (o.hostAddress) - if (mySpeed > urSpeed) -1 - else if (mySpeed < urSpeed) 1 - else 0 - } + // if (mySpeed > urSpeed) -1 + // else if (mySpeed < urSpeed) 1 + // else 0 + // } } @serializable