diff --git a/src/scala/spark/Broadcast.scala b/src/scala/spark/Broadcast.scala index bed2991713..3edf54edb7 100644 --- a/src/scala/spark/Broadcast.scala +++ b/src/scala/spark/Broadcast.scala @@ -45,6 +45,8 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) BroadcastCS.synchronized { BroadcastCS.values.put (uuid, value_) } + if (!local) { sendBroadcast } + @transient var arrayOfBlocks: Array[BroadcastBlock] = null @transient var totalBytes = -1 @transient var totalBlocks = -1 @@ -65,8 +67,6 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean) @transient var guidePort = -1 @transient var hasCopyInHDFS = false - - if (!local) { sendBroadcast } def sendBroadcast () { // Store a persistent copy in HDFS @@ -634,6 +634,7 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean) if (!local) { sendBroadcast } + @transient var publishThread: PublishThread = null @transient var hasCopyInHDFS = false def sendBroadcast () { @@ -642,10 +643,9 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean) out.writeObject (value_) out.close hasCopyInHDFS = true - - val ssClient = new SSClient (BroadcastSS.pastryNode) - ssClient.subscribe - ssClient.publish[T] (value, BroadcastSS.blockSize) + + publishThread = new PublishThread + publishThread.start } private def readObject (in: ObjectInputStream) { @@ -655,103 +655,20 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean) if (cachedVal != null) { value_ = cachedVal.asInstanceOf[T] } else { - val ssClient = new SSClient (BroadcastSS.pastryNode) - ssClient.subscribe + // TODO: Do something + Thread.sleep (10000) } } } - class PublishThread (ssClient: SSClient) extends Runnable { - def run = { - - } - } - - class SSClient (pastryNode: PastryNode) extends SplitStreamClient - with Application { - // Bytes reserved before each published block. 8 byte = 2 integer - val preAmbleSize = 8 - - // The Endpoint represents the underlying node. By making calls on the - // Endpoint, it assures that the message will be delivered to the App on - // whichever node the message is intended for. - protected val endPoint = pastryNode.buildEndpoint (this, "myInstance") - - // Handle to a SplitStream implementation - val mySplitStream = new SplitStreamImpl (pastryNode, "mySplitStream") - - // The ChannelId is constructed from a normal PastryId based on the UUID - val myChannelId = new ChannelId (new PastryIdFactory - (pastryNode.getEnvironment).buildId ("myChannel")) - - // The channel - var myChannel: Channel = null - - // The stripes. Acquired from myChannel. - var myStripes: Array[Stripe] = null - - // Now we can receive messages - endPoint.register - - // Subscribes to all stripes in myChannelId. - def subscribe = { - // Attaching makes you part of the Channel, and volunteers to be an - // internal node of one of the trees - myChannel = mySplitStream.attachChannel (myChannelId) - - // Subscribing notifies your application when data comes through the tree - myStripes = myChannel.getStripes - for (curStripe <- myStripes) { curStripe.subscribe (this) } - } - - // Part of SplitStreamClient. Called when a published message is received. - def deliver (s: Stripe, data: Array[Byte]) = { - // TODO: Do real work here. - if (!BroadcastSS.isMaster) { - } - println(endPoint.getId + " deliver(" + s + "):seq:" + data(0) + " stripe:" + data(1) + " " + data + ")") - } - - private def objectToByteArray[A] (obj: A): Array[Byte] = { - val baos = new ByteArrayOutputStream - val oos = new ObjectOutputStream (baos) - oos.writeObject (obj) - oos.close - baos.close - return baos.toByteArray - } - - // Multicasts data. - def publish[A] (obj: A, blockSize: Int) = { - val byteArray = objectToByteArray[A] (obj) - - var blockNum = (byteArray.length / blockSize) - if (byteArray.length % blockSize != 0) - blockNum += 1 - - var blockID = 0 - for (i <- 0 until (byteArray.length, blockSize)) { - val thisBlockSize = Math.min (blockSize, byteArray.length - i) - var tempByteArray = new Array[Byte] (thisBlockSize + preAmbleSize) - System.arraycopy (byteArray, i * blockSize, - tempByteArray, preAmbleSize, thisBlockSize) - - myStripes(blockID % myStripes.length).publish (tempByteArray) - blockID += 1 + class PublishThread extends Thread { + override def run = { + // TODO: Put some delay here to give time others to register + Thread.sleep (10000) + BroadcastSS.synchronized { + BroadcastSS.publish[T] (value) } } - - /* class PublishContent extends Message { - def getPriority: Int = { Message.MEDIUM_PRIORITY } - } */ - - // Error handling - def joinFailed(s: Stripe) = { println ("joinFailed(" + s + ")") } - - // Rest of the Application interface. NOT USED. - def deliver (id: rice.p2p.commonapi.Id, message: Message) = { } - def forward (message: RouteMessage): Boolean = false - def update (handle: rice.p2p.commonapi.NodeHandle, joined: Boolean) = { } } } @@ -1034,6 +951,7 @@ private object BroadcastSS { private var pEnvironment_ : Environment = null private var pastryNode_ : PastryNode = null + private var ssClient: SSClient = null def initialize (isMaster__ : Boolean) { synchronized { @@ -1065,8 +983,15 @@ private object BroadcastSS { def blockSize = blockSize_ def maxRetryCount = maxRetryCount_ - def pEnvironment = pEnvironment_ - def pastryNode = pastryNode_ + def pEnvironment: Environment = { + if (pEnvironment_ == null) { initializeSplitStream } + pEnvironment_ + } + + def pastryNode: PastryNode = { + if (pastryNode_ == null) { initializeSplitStream } + pastryNode_ + } def localBindPort = { if (localBindPort_ == -1) { @@ -1099,6 +1024,7 @@ private object BroadcastSS { // Boot the node. pastryNode.boot (masterBootAddress) + // TODO: Some unknown messages are dropped in slaves at this point // The node may require sending several messages to fully boot into the ring pastryNode.synchronized { @@ -1112,6 +1038,101 @@ private object BroadcastSS { } } } + + // Create the SplitStream client and subscribe + ssClient = new SSClient (BroadcastSS.pastryNode) + ssClient.subscribe + } + + def publish[A] (obj: A) = { + ssClient.publish[A] (obj) + } + + class SSClient (pastryNode: PastryNode) extends SplitStreamClient + with Application { + // Bytes reserved before each published block. 8 byte = 2 integer + val preAmbleSize = 8 + + // The Endpoint represents the underlying node. By making calls on the + // Endpoint, it assures that the message will be delivered to the App on + // whichever node the message is intended for. + protected val endPoint = pastryNode.buildEndpoint (this, "myInstance") + + // Handle to a SplitStream implementation + val mySplitStream = new SplitStreamImpl (pastryNode, "mySplitStream") + + // The ChannelId is constructed from a normal PastryId based on the UUID + val myChannelId = new ChannelId (new PastryIdFactory + (pastryNode.getEnvironment).buildId ("myChannel")) + + // The channel + var myChannel: Channel = null + + // The stripes. Acquired from myChannel. + var myStripes: Array[Stripe] = null + + // Now we can receive messages + endPoint.register + + // Subscribes to all stripes in myChannelId. + def subscribe = { + // Attaching makes you part of the Channel, and volunteers to be an + // internal node of one of the trees + myChannel = mySplitStream.attachChannel (myChannelId) + + // Subscribing notifies your application when data comes through the tree + myStripes = myChannel.getStripes + for (curStripe <- myStripes) { curStripe.subscribe (this) } + } + + // Part of SplitStreamClient. Called when a published message is received. + def deliver (s: Stripe, data: Array[Byte]) = { + // TODO: Do real work here. + if (!BroadcastSS.isMaster) { + } + println(endPoint.getId + " deliver(" + s + "):seq:" + data(0) + " stripe:" + data(1) + " " + data + ")") + } + + private def objectToByteArray[A] (obj: A): Array[Byte] = { + val baos = new ByteArrayOutputStream + val oos = new ObjectOutputStream (baos) + oos.writeObject (obj) + oos.close + baos.close + return baos.toByteArray + } + + // Multicasts data. + def publish[A] (obj: A) = { + val byteArray = objectToByteArray[A] (obj) + + var blockNum = (byteArray.length / blockSize) + if (byteArray.length % blockSize != 0) + blockNum += 1 + + var blockID = 0 + for (i <- 0 until (byteArray.length, blockSize)) { + val thisBlockSize = Math.min (blockSize, byteArray.length - i) + var tempByteArray = new Array[Byte] (thisBlockSize + preAmbleSize) + System.arraycopy (byteArray, i * blockSize, + tempByteArray, preAmbleSize, thisBlockSize) + + myStripes(blockID % myStripes.length).publish (tempByteArray) + blockID += 1 + } + } + + /* class PublishContent extends Message { + def getPriority: Int = { Message.MEDIUM_PRIORITY } + } */ + + // Error handling + def joinFailed(s: Stripe) = { println ("joinFailed(" + s + ")") } + + // Rest of the Application interface. NOT USED. + def deliver (id: rice.p2p.commonapi.Id, message: Message) = { } + def forward (message: RouteMessage): Boolean = false + def update (handle: rice.p2p.commonapi.NodeHandle, joined: Boolean) = { } } }