Added timers around BroadcastTest and sendBroadcast.
Turned OFF saving to HDFS for now for stress tests. pqOfSources is ordered by least leechers again.
This commit is contained in:
parent
2d381c974e
commit
7ab703117a
|
@ -14,11 +14,14 @@ object BroadcastTest {
|
|||
for (i <- 0 until arr.length)
|
||||
arr(i) = i
|
||||
|
||||
val start = System.nanoTime
|
||||
val barr = spark.broadcast(arr)
|
||||
spark.parallelize(1 to 10, slices).foreach {
|
||||
println("in task: barr = " + barr)
|
||||
i => println(barr.value.size)
|
||||
}
|
||||
val time = (System.nanoTime - start) / 1e9
|
||||
println("BroadcastTest took " + time + " s")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -67,13 +67,19 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
|
|||
@transient var hasCopyInHDFS = false
|
||||
|
||||
// Must call this after all the variables have been created/initialized
|
||||
if (!local) { sendBroadcast }
|
||||
if (!local) {
|
||||
val start = System.nanoTime
|
||||
sendBroadcast
|
||||
val time = (System.nanoTime - start) / 1e9
|
||||
println("sendBroadcast took " + time + " s")
|
||||
}
|
||||
|
||||
def sendBroadcast () {
|
||||
// Store a persistent copy in HDFS
|
||||
val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
|
||||
out.writeObject (value_)
|
||||
out.close
|
||||
// TODO: Turned OFF for now
|
||||
// val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
|
||||
// out.writeObject (value_)
|
||||
// out.close
|
||||
hasCopyInHDFS = true
|
||||
|
||||
// Create a variableInfo object and store it in valueInfos
|
||||
|
@ -438,17 +444,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
|
|||
// be listening to. ReplicaID is 0 and other fields are invalid (-1)
|
||||
var sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
|
||||
|
||||
// Select a suitable source and send it back to the worker
|
||||
selectedSourceInfo = selectSuitableSource (sourceInfo)
|
||||
// println (System.currentTimeMillis + ": " + "Sending selectedSourceInfo:" + selectedSourceInfo)
|
||||
oos.writeObject (selectedSourceInfo)
|
||||
oos.flush
|
||||
|
||||
// Add this new (if it can finish) source to the PQ of sources
|
||||
thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress,
|
||||
sourceInfo.listenPort, totalBlocks, totalBytes, 0)
|
||||
// println (System.currentTimeMillis + ": " + "Adding possible new source to pqOfSources: " + thisWorkerInfo)
|
||||
pqOfSources.synchronized {
|
||||
// Select a suitable source and send it back to the worker
|
||||
selectedSourceInfo = selectSuitableSource (sourceInfo)
|
||||
// println (System.currentTimeMillis + ": " + "Sending selectedSourceInfo:" + selectedSourceInfo)
|
||||
oos.writeObject (selectedSourceInfo)
|
||||
oos.flush
|
||||
|
||||
// Add this new (if it can finish) source to the PQ of sources
|
||||
thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress,
|
||||
sourceInfo.listenPort, totalBlocks, totalBytes, 0)
|
||||
// println (System.currentTimeMillis + ": " + "Adding possible new source to pqOfSources: " + thisWorkerInfo)
|
||||
pqOfSources.add (thisWorkerInfo)
|
||||
}
|
||||
|
||||
|
@ -509,23 +515,22 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: Caller must have a synchronized block on pqOfSources
|
||||
// TODO: If a worker fails to get the broadcasted variable from a source and
|
||||
// comes back to Master, this function might choose the worker itself as a
|
||||
// source tp create a dependency cycle (this worker was put into pqOfSources
|
||||
// as a streming source when it first arrived). The length of this cycle can
|
||||
// be arbitrarily long.
|
||||
private def selectSuitableSource(skipSourceInfo: SourceInfo): SourceInfo = {
|
||||
// Select one with the lowest number of leechers
|
||||
pqOfSources.synchronized {
|
||||
// take is a blocking call removing the element from PQ
|
||||
var selectedSource = pqOfSources.poll
|
||||
assert (selectedSource != null)
|
||||
// Update leecher count
|
||||
selectedSource.currentLeechers += 1
|
||||
// Add it back and then return
|
||||
pqOfSources.add (selectedSource)
|
||||
return selectedSource
|
||||
}
|
||||
// Select one based on the ordering strategy (e.g., least leechers etc.)
|
||||
// take is a blocking call removing the element from PQ
|
||||
var selectedSource = pqOfSources.poll
|
||||
assert (selectedSource != null)
|
||||
// Update leecher count
|
||||
selectedSource.currentLeechers += 1
|
||||
// Add it back and then return
|
||||
pqOfSources.add (selectedSource)
|
||||
return selectedSource
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -744,7 +749,7 @@ case class SourceInfo (val hostAddress: String, val listenPort: Int,
|
|||
var MBps: Double = BroadcastCS.MaxMBps
|
||||
|
||||
// Ascending sort based on leecher count
|
||||
// def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers)
|
||||
def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers)
|
||||
|
||||
// Descending sort based on speed
|
||||
// def compareTo (o: SourceInfo): Int = {
|
||||
|
@ -754,14 +759,14 @@ case class SourceInfo (val hostAddress: String, val listenPort: Int,
|
|||
// }
|
||||
|
||||
// Descending sort based on globally stored speed
|
||||
def compareTo (o: SourceInfo): Int = {
|
||||
val mySpeed = BroadcastCS.getSourceSpeed (hostAddress)
|
||||
val urSpeed = BroadcastCS.getSourceSpeed (o.hostAddress)
|
||||
// def compareTo (o: SourceInfo): Int = {
|
||||
// val mySpeed = BroadcastCS.getSourceSpeed (hostAddress)
|
||||
// val urSpeed = BroadcastCS.getSourceSpeed (o.hostAddress)
|
||||
|
||||
if (mySpeed > urSpeed) -1
|
||||
else if (mySpeed < urSpeed) 1
|
||||
else 0
|
||||
}
|
||||
// if (mySpeed > urSpeed) -1
|
||||
// else if (mySpeed < urSpeed) 1
|
||||
// else 0
|
||||
// }
|
||||
}
|
||||
|
||||
@serializable
|
||||
|
|
Loading…
Reference in a new issue