- Replaced pqOfSources with a vector: listOfSources

- Guide now returns back multiple Sources instead of just one
- Receiver part is not updated though. So it wont work
- Added BroadcastCS.MaxPeersInGuideResponse
- Changed Source selection to simple round-robin
This commit is contained in:
Mosharaf Chowdhury 2010-10-19 17:02:55 -07:00
parent 905745707c
commit 365012f586

@ -2,7 +2,7 @@ package spark
import java.io._
import java.net._
import java.util.{UUID, PriorityQueue, Comparator, BitSet}
import java.util.{UUID, Vector, Comparator, BitSet}
import com.google.common.collect.MapMaker
@ -44,7 +44,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
@transient var totalBlocksLock = new Object
@transient var hasBlocksLock = new Object
@transient var pqOfSources = new PriorityQueue[SourceInfo]
@transient var listOfSources = new Vector[SourceInfo]
@transient var hasBlocksBitVector: BitSet = null
@ -103,10 +103,10 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
pqOfSources = new PriorityQueue[SourceInfo]
listOfSources = new Vector[SourceInfo]
val masterSource =
new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes)
pqOfSources.add (masterSource)
listOfSources.add (masterSource)
// Register with the Tracker
while (guidePort == -1) {
@ -267,11 +267,12 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// Connect and receive broadcast from the specified source, retrying the
// specified number of times in case of failures
var retriesLeft = BroadcastCS.maxRetryCount
do {
// Connect to Master and send this worker's Information
// Connect to Master and send this worker's information
val clientSocketToMaster =
new Socket(gInfo.hostAddress, gInfo.listenPort)
logInfo ("Connected to Master's guiding object")
@ -285,7 +286,8 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
oosMaster.writeObject(new SourceInfo (hostAddress, listenPort, -1, -1))
// Receive source information from Master
// Receive source information from Master
var sourceInfo = oisMaster.readObject.asInstanceOf[SourceInfo]
totalBlocks = sourceInfo.totalBlocks
arrayOfBlocks = new Array[BroadcastBlock] (totalBlocks)
@ -426,9 +428,11 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
private val ois = new ObjectInputStream (clientSocket.getInputStream)
private var selectedSourceInfo: SourceInfo = null
private var selectedSources: Vector[SourceInfo] = null
private var thisWorkerInfo:SourceInfo = null
private var rollOverIndex = 0
def run = {
try {
logInfo ("new GuideSingleRequest is running")
@ -436,62 +440,27 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// will be listening to. Other fields are invalid (-1)
var sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
pqOfSources.synchronized {
listOfSources.synchronized {
// Select a suitable source and send it back to the worker
selectedSourceInfo = selectSuitableSource (sourceInfo)
logInfo ("Sending selectedSourceInfo:" + selectedSourceInfo)
oos.writeObject (selectedSourceInfo)
selectedSources = selectSuitableSources (sourceInfo)
logInfo ("Sending selectedSources:" + selectedSources)
oos.writeObject (selectedSources)
// Add this new (if it can finish) source to the PQ of sources
thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress,
sourceInfo.listenPort, totalBlocks, totalBytes)
logInfo ("Adding possible new source to pqOfSources: " + thisWorkerInfo)
pqOfSources.add (thisWorkerInfo)
logInfo ("Adding possible new source to listOfSources: " + thisWorkerInfo)
listOfSources.add (thisWorkerInfo)
// Wait till the whole transfer is done. Then receive and update source
// statistics in pqOfSources
sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
pqOfSources.synchronized {
// This should work since SourceInfo is a case class
assert (pqOfSources.contains (selectedSourceInfo))
// Remove first
pqOfSources.remove (selectedSourceInfo)
// TODO: Removing a source based on just one failure notification!
// Update sourceInfo and put it back in, IF reception succeeded
if (!sourceInfo.receptionFailed) {
selectedSourceInfo.currentLeechers -= 1
selectedSourceInfo.MBps = sourceInfo.MBps
// Put it back
pqOfSources.add (selectedSourceInfo)
// Update global source speed statistics
BroadcastCS.setSourceSpeed (
sourceInfo.hostAddress, sourceInfo.MBps)
} catch {
// If something went wrong, e.g., the worker at the other end died etc.
// then close everything up
case e: Exception => {
// Assuming that exception caused due to receiver worker failure.
// Remove failed worker from pqOfSources and update leecherCount of
// corresponding source worker
pqOfSources.synchronized {
if (selectedSourceInfo != null) {
// Remove first
pqOfSources.remove (selectedSourceInfo)
// Update leecher count and put it back in
selectedSourceInfo.currentLeechers -= 1
pqOfSources.add (selectedSourceInfo)
listOfSources.synchronized {
// Remove thisWorkerInfo
if (pqOfSources != null) { pqOfSources.remove (thisWorkerInfo) }
if (listOfSources != null) { listOfSources.remove (thisWorkerInfo) }
} finally {
@ -501,22 +470,20 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// TODO: Caller must have a synchronized block on pqOfSources
// TODO: If a worker fails to get the broadcasted variable from a source and
// comes back to Master, this function might choose the worker itself as a
// source tp create a dependency cycle (this worker was put into pqOfSources
// as a streming source when it first arrived). The length of this cycle can
// be arbitrarily long.
private def selectSuitableSource(skipSourceInfo: SourceInfo): SourceInfo = {
// Select one based on the ordering strategy (e.g., least leechers etc.)
// take is a blocking call removing the element from PQ
var selectedSource = pqOfSources.poll
assert (selectedSource != null)
// Update leecher count
selectedSource.currentLeechers += 1
// Add it back and then return
pqOfSources.add (selectedSource)
return selectedSource
// TODO: Randomly select some sources to send back.
// Right now just rolls over the list to send back
// BroadcastCS.MaxPeersInGuideResponse
private def selectSuitableSources(skipSourceInfo: SourceInfo): Vector[SourceInfo] = {
var curIndex = rollOverIndex
var selectedSources: Vector[SourceInfo] = new Vector[SourceInfo]
do {
if (listOfSources.get(curIndex) != skipSourceInfo)
{ selectedSources.add (listOfSources.get(curIndex)) }
curIndex = (curIndex + 1) % listOfSources.size
} while (curIndex != rollOverIndex &&
selectedSources.size != BroadcastCS.MaxPeersInGuideResponse)
rollOverIndex = curIndex
return selectedSources
@ -662,14 +629,11 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
case class SourceInfo (val hostAddress: String, val listenPort: Int,
val totalBlocks: Int, val totalBytes: Int)
extends Comparable [SourceInfo] with Logging {
extends Logging {
var currentLeechers = 0
var receptionFailed = false
var MBps: Double = BroadcastCS.MaxMBps
// Ascending sort based on leecher count
def compareTo (o: SourceInfo): Int = (currentLeechers - o.currentLeechers)
@ -731,6 +695,8 @@ private object BroadcastCS extends Logging {
private val ALPHA = 0.7
// 125.0 MBps = 1 Gbps link
private val MaxMBps_ = 125.0
private var MaxPeersInGuideResponse_ = 4
def initialize (isMaster__ : Boolean) {
synchronized {
@ -744,7 +710,9 @@ private object BroadcastCS extends Logging {
maxRetryCount_ =
System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
serverSocketTimout_ =
System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt
System.getProperty ("spark.broadcast.serverSocketTimout", "50000").toInt
MaxPeersInGuideResponse_ =
System.getProperty ("spark.broadcast.MaxPeersInGuideResponse", "4").toInt
isMaster_ = isMaster__
@ -770,6 +738,8 @@ private object BroadcastCS extends Logging {
def MaxMBps = MaxMBps_
def MaxPeersInGuideResponse = MaxPeersInGuideResponse_
def registerValue (uuid: UUID, gInfo: GuideInfo) = {
valueToGuideMap.synchronized {
valueToGuideMap += (uuid -> gInfo)
@ -787,19 +757,19 @@ private object BroadcastCS extends Logging {
// def startMultiTracker
// def stopMultiTracker
def getSourceSpeed (hostAddress: String): Double = {
sourceToSpeedMap.synchronized {
sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
// def getSourceSpeed (hostAddress: String): Double = {
// sourceToSpeedMap.synchronized {
// sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
// }
// }
def setSourceSpeed (hostAddress: String, MBps: Double) = {
sourceToSpeedMap.synchronized {
var oldSpeed = sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
var newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * MBps
sourceToSpeedMap.update (hostAddress, newSpeed)
// def setSourceSpeed (hostAddress: String, MBps: Double) = {
// sourceToSpeedMap.synchronized {
// var oldSpeed = sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
// var newSpeed = ALPHA * oldSpeed + (1 - ALPHA) * MBps
// sourceToSpeedMap.update (hostAddress, newSpeed)
// }
// }
class TrackMultipleValues extends Thread with Logging {
var keepAccepting = true