diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index aafbb73821..a112cbff3b 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -105,7 +105,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) pqOfSources = new PriorityQueue[SourceInfo] val masterSource = - new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 0) + new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes) pqOfSources.add (masterSource) // Register with the Tracker @@ -282,7 +282,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) val oisMaster = new ObjectInputStream (clientSocketToMaster.getInputStream) - oosMaster.writeObject(new SourceInfo (hostAddress, listenPort, -1, -1, 0)) + oosMaster.writeObject(new SourceInfo (hostAddress, listenPort, -1, -1)) oosMaster.flush // Receive source information from Master @@ -432,8 +432,8 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) def run = { try { logInfo ("new GuideSingleRequest is running") - // Connecting worker is sending in its hostAddress and listenPort it will - // be listening to. ReplicaID is 0 and other fields are invalid (-1) + // Connecting worker is sending in its hostAddress and listenPort it + // will be listening to. Other fields are invalid (-1) var sourceInfo = ois.readObject.asInstanceOf[SourceInfo] pqOfSources.synchronized { @@ -445,7 +445,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) // Add this new (if it can finish) source to the PQ of sources thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress, - sourceInfo.listenPort, totalBlocks, totalBytes, 0) + sourceInfo.listenPort, totalBlocks, totalBytes) logInfo ("Adding possible new source to pqOfSources: " + thisWorkerInfo) pqOfSources.add (thisWorkerInfo) } @@ -661,7 +661,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean) @serializable case class SourceInfo (val hostAddress: String, val listenPort: Int, - val totalBlocks: Int, val totalBytes: Int, val replicaID: Int) + val totalBlocks: Int, val totalBytes: Int) extends Comparable [SourceInfo] with Logging { var currentLeechers = 0