Removed ReplicaID from SourceInfo.

This commit is contained in:
Mosharaf Chowdhury 2010-10-19 15:35:54 -07:00
parent 4ad6c5218e
commit 905745707c

View file

@ -105,7 +105,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
pqOfSources = new PriorityQueue[SourceInfo]
val masterSource =
new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes, 0)
new SourceInfo (hostAddress, listenPort, totalBlocks, totalBytes)
pqOfSources.add (masterSource)
// Register with the Tracker
@ -282,7 +282,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
val oisMaster =
new ObjectInputStream (clientSocketToMaster.getInputStream)
oosMaster.writeObject(new SourceInfo (hostAddress, listenPort, -1, -1, 0))
oosMaster.writeObject(new SourceInfo (hostAddress, listenPort, -1, -1))
oosMaster.flush
// Receive source information from Master
@ -432,8 +432,8 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
def run = {
try {
logInfo ("new GuideSingleRequest is running")
// Connecting worker is sending in its hostAddress and listenPort it will
// be listening to. ReplicaID is 0 and other fields are invalid (-1)
// Connecting worker is sending in its hostAddress and listenPort it
// will be listening to. Other fields are invalid (-1)
var sourceInfo = ois.readObject.asInstanceOf[SourceInfo]
pqOfSources.synchronized {
@ -445,7 +445,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
// Add this new (if it can finish) source to the PQ of sources
thisWorkerInfo = new SourceInfo(sourceInfo.hostAddress,
sourceInfo.listenPort, totalBlocks, totalBytes, 0)
sourceInfo.listenPort, totalBlocks, totalBytes)
logInfo ("Adding possible new source to pqOfSources: " + thisWorkerInfo)
pqOfSources.add (thisWorkerInfo)
}
@ -661,7 +661,7 @@ class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
@serializable
case class SourceInfo (val hostAddress: String, val listenPort: Int,
val totalBlocks: Int, val totalBytes: Int, val replicaID: Int)
val totalBlocks: Int, val totalBytes: Int)
extends Comparable [SourceInfo] with Logging {
var currentLeechers = 0