- Changed guidePort to GuideInfo that now contains the hostAddress

as well as the port. This will allow anyone other than the master
to be a guide.
- The GuideInfo object now contains the constants related to
tracker response.
This commit is contained in:
Mosharaf Chowdhury 2010-10-13 16:26:18 -07:00
parent 8690be8f5a
commit 38194e5731

View file

@ -15,17 +15,6 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import spark.compress.lzf.{LZFInputStream, LZFOutputStream}
//import rice.environment.Environment
//import rice.p2p.commonapi._
//import rice.p2p.commonapi.rawserialization.RawMessage
//import rice.pastry._
//import rice.pastry.commonapi.PastryIdFactory
//import rice.pastry.direct._
//import rice.pastry.socket.SocketPastryNodeFactory
//import rice.pastry.standard.RandomNodeIdFactory
//import rice.p2p.scribe._
//import rice.p2p.splitstream._
@serializable
trait BroadcastRecipe {
val uuid = UUID.randomUUID
@ -125,7 +114,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
guidePortLock.wait
}
}
BroadcastCS.registerValue (uuid, guidePort)
BroadcastCS.registerValue (uuid, GuideInfo (hostAddress, guidePort))
}
private def readObject (in: ObjectInputStream) {
@ -227,21 +216,17 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
return retVal
}
// masterListenPort aka guidePort value legend
// 0 = missed the broadcast, read from HDFS;
// <0 = hasn't started yet, wait & retry;
// >0 = Read from this port
def getMasterListenPort (variableUUID: UUID): Int = {
def getGuideInfo (variableUUID: UUID): GuideInfo = {
var clientSocketToTracker: Socket = null
var oosTracker: ObjectOutputStream = null
var oisTracker: ObjectInputStream = null
var masterListenPort: Int = -1
var gInfo: GuideInfo = GuideInfo ("", GuideInfo.TxNotStartedRetry)
var retriesLeft = BroadcastCS.maxRetryCount
do {
try {
// Connect to the tracker to find out the guide
// Connect to the tracker to find out GuideInfo
val clientSocketToTracker =
new Socket(BroadcastCS.masterHostAddress, BroadcastCS.masterTrackerPort)
val oosTracker =
@ -250,13 +235,12 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
val oisTracker =
new ObjectInputStream (clientSocketToTracker.getInputStream)
// Send UUID and receive masterListenPort
// Send UUID and receive GuideInfo
oosTracker.writeObject (uuid)
oosTracker.flush
masterListenPort = oisTracker.readObject.asInstanceOf[Int]
gInfo = oisTracker.readObject.asInstanceOf[GuideInfo]
} catch {
// In case of any failure, set masterListenPort = 0 to read from HDFS
case e: Exception => (masterListenPort = 0)
case e: Exception => (gInfo = GuideInfo("", GuideInfo.TxOverGoToHDFS))
} finally {
if (oisTracker != null) { oisTracker.close }
if (oosTracker != null) { oosTracker.close }
@ -264,16 +248,16 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
retriesLeft -= 1
// TODO: Should wait before retrying
} while (retriesLeft > 0 && masterListenPort < 0)
logInfo (System.currentTimeMillis + ": " + "Got this guidePort from Tracker: " + masterListenPort)
return masterListenPort
} while (retriesLeft > 0 && gInfo.listenPort < 0)
logInfo (System.currentTimeMillis + ": " + "Got this guidePort from Tracker: " + gInfo.listenPort)
return gInfo
}
def receiveBroadcast (variableUUID: UUID): Boolean = {
// Get masterListenPort for this variable from the Tracker
val masterListenPort = getMasterListenPort (variableUUID)
// Get GuideInfo for this variable from the Tracker
val gInfo = getGuideInfo (variableUUID)
// If Tracker says that there is no guide for this object, read from HDFS
if (masterListenPort == 0) { return false }
if (gInfo.listenPort == 0) { return false }
// Wait until hostAddress and listenPort are created by the
// ServeMultipleRequests thread
@ -289,7 +273,7 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
do {
// Connect to Master and send this worker's Information
val clientSocketToMaster =
new Socket(BroadcastCS.masterHostAddress, masterListenPort)
new Socket(gInfo.hostAddress, gInfo.listenPort)
logInfo (System.currentTimeMillis + ": " + "Connected to Master's guiding object")
// TODO: Guiding object connection is reusable
val oosMaster =
@ -636,70 +620,6 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
}
}
//@serializable
//class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
// extends BroadcastRecipe with Logging {
// def value = value_
// BroadcastSS.synchronized { BroadcastSS.values.put (uuid, value_) }
//
// if (!local) { sendBroadcast }
//
// @transient var publishThread: PublishThread = null
// @transient var hasCopyInHDFS = false
//
// def sendBroadcast () {
// // Store a persistent copy in HDFS
// val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
// out.writeObject (value_)
// out.close
// hasCopyInHDFS = true
//
// publishThread = new PublishThread
// publishThread.start
// }
//
// private def readObject (in: ObjectInputStream) {
// in.defaultReadObject
// BroadcastSS.synchronized {
// val cachedVal = BroadcastSS.values.get(uuid)
// if (cachedVal != null) {
// value_ = cachedVal.asInstanceOf[T]
// } else {
// val start = System.nanoTime
// // Thread.sleep (5000) // TODO:
// val receptionSucceeded = BroadcastSS.receiveVariable (uuid)
// // If does not succeed, then get from HDFS copy
// if (receptionSucceeded) {
// value_ = BroadcastSS.values.get(uuid).asInstanceOf[T]
// } else {
// logInfo (System.currentTimeMillis + ": " + "Reading from HDFS")
// val fileIn = new ObjectInputStream(BroadcastCH.openFileForReading(uuid))
// value_ = fileIn.readObject.asInstanceOf[T]
// BroadcastSS.values.put(uuid, value_)
// fileIn.close
// }
//
// val time = (System.nanoTime - start) / 1e9
// logInfo( System.currentTimeMillis + ": " + "Reading Broadcasted variable " + uuid + " took " + time + " s")
// }
// }
// }
//
// class PublishThread extends Thread with Logging {
// override def run = {
// // TODO: Put some delay here to give time others to register
// // Thread.sleep (5000)
// logInfo (System.currentTimeMillis + ": " + "Waited. Now sending...")
// BroadcastSS.synchronized {
// BroadcastSS.publishVariable[T] (uuid, value)
// }
// }
// }
//}
@serializable
class CentralizedHDFSBroadcast[T](@transient var value_ : T, local: Boolean)
extends BroadcastRecipe with Logging {
@ -774,11 +694,19 @@ case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) { }
@serializable
case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock],
val totalBlocks: Int, val totalBytes: Int) {
val totalBlocks: Int, val totalBytes: Int) {
@transient var hasBlocks = 0
}
@serializable
case class GuideInfo (val hostAddress: String, val listenPort: Int) { }
object GuideInfo {
// Constant values for special values of listenPort
val TxNotStartedRetry = -1
val TxOverGoToHDFS = 0
}
private object Broadcast extends Logging {
private var initialized = false
@ -804,7 +732,7 @@ private object Broadcast extends Logging {
private object BroadcastCS extends Logging {
val values = new MapMaker ().softValues ().makeMap[UUID, Any]
var valueToGuidePortMap = Map[UUID, Int] ()
var valueToGuideMap = Map[UUID, GuideInfo] ()
var sourceToSpeedMap = Map[String, Double] ()
@ -866,21 +794,23 @@ private object BroadcastCS extends Logging {
def MaxMBps = MaxMBps_
def registerValue (uuid: UUID, guidePort: Int) = {
valueToGuidePortMap.synchronized {
valueToGuidePortMap += (uuid -> guidePort)
logInfo (System.currentTimeMillis + ": " + "New value registered with the Tracker " + valueToGuidePortMap)
def registerValue (uuid: UUID, gInfo: GuideInfo) = {
valueToGuideMap.synchronized {
valueToGuideMap += (uuid -> gInfo)
logInfo (System.currentTimeMillis + ": " + "New value registered with the Tracker " + valueToGuideMap)
}
}
def unregisterValue (uuid: UUID) = {
valueToGuidePortMap.synchronized {
// Set to 0 to make sure that people read it from HDFS
valueToGuidePortMap (uuid) = 0
logInfo (System.currentTimeMillis + ": " + "Value unregistered from the Tracker " + valueToGuidePortMap)
valueToGuideMap.synchronized {
valueToGuideMap (uuid) = GuideInfo ("", GuideInfo.TxOverGoToHDFS)
logInfo (System.currentTimeMillis + ": " + "Value unregistered from the Tracker " + valueToGuideMap)
}
}
// def startMultiTracker
// def stopMultiTracker
def getSourceSpeed (hostAddress: String): Double = {
sourceToSpeedMap.synchronized {
sourceToSpeedMap.getOrElseUpdate(hostAddress, MaxMBps)
@ -929,16 +859,12 @@ private object BroadcastCS extends Logging {
val ois = new ObjectInputStream (clientSocket.getInputStream)
try {
val uuid = ois.readObject.asInstanceOf[UUID]
// masterListenPort/guidePort value legend
// 0 = missed the broadcast, read from HDFS;
// <0 = hasn't started yet, wait & retry;
// >0 = Read from this port
var guidePort =
if (valueToGuidePortMap.contains (uuid)) {
valueToGuidePortMap (uuid)
} else -1
logInfo (System.currentTimeMillis + ": " + "TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + guidePort)
oos.writeObject (guidePort)
var gInfo =
if (valueToGuideMap.contains (uuid)) {
valueToGuideMap (uuid)
} else GuideInfo ("", GuideInfo.TxNotStartedRetry)
logInfo (System.currentTimeMillis + ": " + "TrackMultipleValues:Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort)
oos.writeObject (gInfo)
} catch {
case e: Exception => { }
} finally {
@ -961,359 +887,6 @@ private object BroadcastCS extends Logging {
}
}
//private object BroadcastSS {
// val values = new MapMaker ().softValues ().makeMap[UUID, Any]
// private val valueBytes = new MapMaker().softValues().makeMap[UUID,Array[Byte]]
// private var initialized = false
// private var isMaster_ = false
//
// private var masterBootHost_ = "127.0.0.1"
// private var masterBootPort_ : Int = 22222
// private var blockSize_ : Int = 512 * 1024
// private var maxRetryCount_ : Int = 2
//
// private var masterBootAddress_ : InetSocketAddress = null
// private var localBindPort_ : Int = -1
//
// private var pEnvironment_ : Environment = null
// private var pastryNode_ : PastryNode = null
// private var ssClient: SSClient = null
//
// // Current transmission state variables
// private var curUUID: UUID = null
// private var curTotalBlocks = -1
// private var curTotalBytes = -1
// private var curHasBlocks = -1
// private var curBlockBitmap: Array[Boolean] = null
// private var curArrayOfBytes: Array[Byte] = null
//
// // TODO: Add stuff so that we can handle out of order variable broadcast
// def initialize (isMaster__ : Boolean) {
// synchronized {
// if (!initialized) {
// masterBootHost_ =
// System.getProperty ("spark.broadcast.masterHostAddress", "127.0.0.1")
// masterBootPort_ =
// System.getProperty ("spark.broadcast.masterBootPort", "22222").toInt
//
// masterBootAddress_ = new InetSocketAddress(masterBootHost_,
// masterBootPort_)
//
// blockSize_ =
// System.getProperty ("spark.broadcast.blockSize", "512").toInt * 1024
// maxRetryCount_ =
// System.getProperty ("spark.broadcast.maxRetryCount", "2").toInt
//
// isMaster_ = isMaster__
//
// // Initialize the SplitStream tree
// initializeSplitStream
//
// initialized = true
// }
// }
// }
//
// def masterBootAddress = masterBootAddress_
// def blockSize = blockSize_
// def maxRetryCount = maxRetryCount_
//
// def pEnvironment: Environment = {
// if (pEnvironment_ == null) { initializeSplitStream }
// pEnvironment_
// }
//
// def pastryNode: PastryNode = {
// if (pastryNode_ == null) { initializeSplitStream }
// pastryNode_
// }
//
// def localBindPort = {
// if (localBindPort_ == -1) {
// if (isMaster) { localBindPort_ = masterBootPort_ }
// else {
// // TODO: What's the best way of finding a free port?
// val sSocket = new ServerSocket (0)
// val sPort = sSocket.getLocalPort
// sSocket.close
// localBindPort_ = sPort
// }
// }
// localBindPort_
// }
// def isMaster = isMaster_
//
// private def initializeSplitStream = {
// pEnvironment_ = new Environment
//
// // Generate the NodeIds Randomly
// val nidFactory = new RandomNodeIdFactory (pEnvironment)
//
// // Construct the PastryNodeFactory
// val pastryNodeFactory = new SocketPastryNodeFactory (nidFactory,
// localBindPort, pEnvironment)
//
// // Construct a Pastry node
// pastryNode_ = pastryNodeFactory.newNode
//
// // Boot the node.
// pastryNode.boot (masterBootAddress)
// // TODO: Some unknown messages are dropped in slaves at this point
//
// // The node may require sending several messages to fully boot into the ring
// pastryNode.synchronized {
// while(!pastryNode.isReady && !pastryNode.joinFailed) {
// // Delay so we don't busy-wait
// pastryNode.wait (500)
//
// // Abort if can't join
// if (pastryNode.joinFailed()) {
// // TODO: throw new IOException("Join failed " + node.joinFailedReason)
// }
// }
// }
//
// // Create the SplitStream client and subscribe
// ssClient = new SSClient (BroadcastSS.pastryNode)
// ssClient.subscribe
// }
//
// def publishVariable[A] (uuid: UUID, obj: A) = {
// ssClient.synchronized {
// ssClient.publish[A] (uuid, obj)
// }
// }
//
// // Return status of the reception
// def receiveVariable[A] (uuid: UUID): Boolean = {
// // TODO: Things will change if out-of-order variable recepetion is supported
//
// logInfo (System.currentTimeMillis + ": " + "In receiveVariable")
//
// // Check in valueBytes
// if (xferValueBytesToValues[A] (uuid)) { return true }
//
// // Check if its in progress
// for (i <- 0 until maxRetryCount) {
// logInfo (System.currentTimeMillis + ": " + uuid + " " + curUUID)
// while (uuid == curUUID) { Thread.sleep (100) } // TODO: How long to sleep
// if (xferValueBytesToValues[A] (uuid)) { return true }
//
// // Wait for a while to see if we've reached here before xmission started
// Thread.sleep (100)
// }
// return false
// }
//
// private def xferValueBytesToValues[A] (uuid: UUID): Boolean = {
// var cachedValueBytes: Array[Byte] = null
// valueBytes.synchronized { cachedValueBytes = valueBytes.get (uuid) }
// if (cachedValueBytes != null) {
// val cachedValue = byteArrayToObject[A] (cachedValueBytes)
// values.synchronized { values.put (uuid, cachedValue) }
// return true
// }
// return false
// }
//
// private def objectToByteArray[A] (obj: A): Array[Byte] = {
// val baos = new ByteArrayOutputStream
// val oos = new ObjectOutputStream (baos)
// oos.writeObject (obj)
// oos.close
// baos.close
// return baos.toByteArray
// }
// private def byteArrayToObject[A] (bytes: Array[Byte]): A = {
// val in = new ObjectInputStream (new ByteArrayInputStream (bytes))
// val retVal = in.readObject.asInstanceOf[A]
// in.close
// return retVal
// }
// private def intToByteArray (value: Int): Array[Byte] = {
// var retVal = new Array[Byte] (4)
// for (i <- 0 until 4)
// retVal(i) = (value >> ((4 - 1 - i) * 8)).toByte
// return retVal
// }
// private def byteArrayToInt (arr: Array[Byte], offset: Int): Int = {
// var retVal = 0
// for (i <- 0 until 4)
// retVal += ((arr(i + offset).toInt & 0x000000FF) << ((4 - 1 - i) * 8))
// return retVal
// }
// class SSClient (pastryNode: PastryNode) extends SplitStreamClient
// with Application {
// // Magic bits: 11111100001100100100110000111111
// val magicBits = 0xFC324C3F
//
// // Message Types
// val INFO_MSG = 1
// val DATA_MSG = 2
//
// // The Endpoint represents the underlying node. By making calls on the
// // Endpoint, it assures that the message will be delivered to the App on
// // whichever node the message is intended for.
// protected val endPoint = pastryNode.buildEndpoint (this, "myInstance")
// // Handle to a SplitStream implementation
// val mySplitStream = new SplitStreamImpl (pastryNode, "mySplitStream")
// // The ChannelId is constructed from a normal PastryId based on the UUID
// val myChannelId = new ChannelId (new PastryIdFactory
// (pastryNode.getEnvironment).buildId ("myChannel"))
//
// // The channel
// var myChannel: Channel = null
//
// // The stripes. Acquired from myChannel.
// var myStripes: Array[Stripe] = null
// // Now we can receive messages
// endPoint.register
//
// // Subscribes to all stripes in myChannelId.
// def subscribe = {
// // Attaching makes you part of the Channel, and volunteers to be an
// // internal node of one of the trees
// myChannel = mySplitStream.attachChannel (myChannelId)
//
// // Subscribing notifies your application when data comes through the tree
// myStripes = myChannel.getStripes
// for (curStripe <- myStripes) { curStripe.subscribe (this) }
// }
//
// // Part of SplitStreamClient. Called when a published message is received.
// def deliver (s: Stripe, data: Array[Byte]) = {
// // Unpack and verify magicBits
// val topLevelInfo = byteArrayToObject[(Int, Int, Array[Byte])] (data)
//
// // Process only if magicBits are OK
// if (topLevelInfo._1 == magicBits) {
// // Process only for slaves
// if (!BroadcastSS.isMaster) {
// // Match on Message Type
// topLevelInfo._2 match {
// case INFO_MSG => {
// val realInfo = byteArrayToObject[(UUID, Int, Int)] (
// topLevelInfo._3)
//
// // Setup states for impending transmission
// curUUID = realInfo._1 // TODO:
// curTotalBlocks = realInfo._2
// curTotalBytes = realInfo._3
//
// curHasBlocks = 0
// curBlockBitmap = new Array[Boolean] (curTotalBlocks)
// curArrayOfBytes = new Array[Byte] (curTotalBytes)
//
// logInfo (System.currentTimeMillis + ": " + curUUID + " " + curTotalBlocks + " " + curTotalBytes)
// }
// case DATA_MSG => {
// val realInfo = byteArrayToObject[(UUID, Int, Array[Byte])] (
// topLevelInfo._3)
// val blockUUID = realInfo._1
// val blockIndex = realInfo._2
// val blockData = realInfo._3
//
// // TODO: Will change in future implementation. Right now we
// // require broadcast in order on the variable level. Blocks can
// // come out of order though
// assert (blockUUID == curUUID)
//
// // Update everything
// curHasBlocks += 1
// curBlockBitmap(blockIndex) = true
// System.arraycopy (blockData, 0, curArrayOfBytes,
// blockIndex * blockSize, blockData.length)
//
// logInfo (System.currentTimeMillis + ": " + "Got stuff for: " + blockUUID)
//
// // Done receiving
// if (curHasBlocks == curTotalBlocks) {
// // Store as a Array[Byte]
// valueBytes.synchronized {
// valueBytes.put (curUUID, curArrayOfBytes)
// }
//
// logInfo (System.currentTimeMillis + ": " + "Finished reading. Stored in valueBytes")
//
// // RESET
// curUUID = null
// }
// }
// case _ => {
// // Should never happen
// }
// }
// }
// }
// }
// // Multicasts data.
// def publish[A] (uuid: UUID, obj: A) = {
// val byteArray = objectToByteArray[A] (obj)
//
// var blockNum = (byteArray.length / blockSize)
// if (byteArray.length % blockSize != 0)
// blockNum += 1
//
// // -------------------------------------
// // INFO_MSG: | UUID | Total Blocks | Total Bytes |
// // -------------------------------------
// var infoByteArray = objectToByteArray[(UUID, Int, Int)] ((uuid, blockNum,
// byteArray.length))
// doPublish (0, INFO_MSG, infoByteArray)
//
// // -------------------------------------
// // DATA_MSG: | UUID | Block Index | Single Block |
// // -------------------------------------
// var blockID = 0
// for (i <- 0 until (byteArray.length, blockSize)) {
// val thisBlockSize = Math.min (blockSize, byteArray.length - i)
// var thisBlockData = new Array[Byte] (thisBlockSize)
// System.arraycopy (byteArray, i * blockSize, thisBlockData, 0,
// thisBlockSize)
// var dataByteArray = objectToByteArray[(UUID, Int, Array[Byte])] ((uuid,
// blockID, thisBlockData))
// doPublish (blockID % myStripes.length, DATA_MSG, dataByteArray)
// blockID += 1
// }
// }
//
// // --------------------------------
// // Message Format: | MagicBits | Type | Real Data |
// // --------------------------------
// private def doPublish (stripeID: Int, msgType: Int, data: Array[Byte]) = {
// val bytesToSend = objectToByteArray[(Int, Int, Array[Byte])] ((magicBits,
// msgType, data))
// myStripes(stripeID).publish (bytesToSend)
// }
// /* class PublishContent extends Message {
// def getPriority: Int = { Message.MEDIUM_PRIORITY }
// } */
//
// // Error handling
// def joinFailed(s: Stripe) = { logInfo ("joinFailed(" + s + ")") }
// // Rest of the Application interface. NOT USED.
// def deliver (id: rice.p2p.commonapi.Id, message: Message) = { }
// def forward (message: RouteMessage): Boolean = false
// def update (handle: rice.p2p.commonapi.NodeHandle, joined: Boolean) = { }
// }
//}
private object BroadcastCH extends Logging {
val values = new MapMaker ().softValues ().makeMap[UUID, Any]