Moved SplitStreamClient inside the BroadcastSS object with the decision that

there should be only a single SSClient for the whole Spark program instead of
one for each broadcasted variable.

It's still working well though.
This commit is contained in:
Mosharaf Chowdhury 2010-04-20 19:16:27 -07:00
parent d2f1d0151a
commit e2f21279be

View file

@ -45,6 +45,8 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
BroadcastCS.synchronized { BroadcastCS.values.put (uuid, value_) } BroadcastCS.synchronized { BroadcastCS.values.put (uuid, value_) }
if (!local) { sendBroadcast }
@transient var arrayOfBlocks: Array[BroadcastBlock] = null @transient var arrayOfBlocks: Array[BroadcastBlock] = null
@transient var totalBytes = -1 @transient var totalBytes = -1
@transient var totalBlocks = -1 @transient var totalBlocks = -1
@ -66,8 +68,6 @@ class ChainedStreamingBroadcast[T] (@transient var value_ : T, local: Boolean)
@transient var hasCopyInHDFS = false @transient var hasCopyInHDFS = false
if (!local) { sendBroadcast }
def sendBroadcast () { def sendBroadcast () {
// Store a persistent copy in HDFS // Store a persistent copy in HDFS
val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid)) val out = new ObjectOutputStream (BroadcastCH.openFileForWriting(uuid))
@ -634,6 +634,7 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
if (!local) { sendBroadcast } if (!local) { sendBroadcast }
@transient var publishThread: PublishThread = null
@transient var hasCopyInHDFS = false @transient var hasCopyInHDFS = false
def sendBroadcast () { def sendBroadcast () {
@ -643,9 +644,8 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
out.close out.close
hasCopyInHDFS = true hasCopyInHDFS = true
val ssClient = new SSClient (BroadcastSS.pastryNode) publishThread = new PublishThread
ssClient.subscribe publishThread.start
ssClient.publish[T] (value, BroadcastSS.blockSize)
} }
private def readObject (in: ObjectInputStream) { private def readObject (in: ObjectInputStream) {
@ -655,103 +655,20 @@ class SplitStreamBroadcast[T] (@transient var value_ : T, local: Boolean)
if (cachedVal != null) { if (cachedVal != null) {
value_ = cachedVal.asInstanceOf[T] value_ = cachedVal.asInstanceOf[T]
} else { } else {
val ssClient = new SSClient (BroadcastSS.pastryNode) // TODO: Do something
ssClient.subscribe Thread.sleep (10000)
} }
} }
} }
class PublishThread (ssClient: SSClient) extends Runnable { class PublishThread extends Thread {
def run = { override def run = {
// TODO: Put some delay here to give time others to register
Thread.sleep (10000)
BroadcastSS.synchronized {
BroadcastSS.publish[T] (value)
} }
} }
class SSClient (pastryNode: PastryNode) extends SplitStreamClient
with Application {
// Bytes reserved before each published block. 8 byte = 2 integer
val preAmbleSize = 8
// The Endpoint represents the underlying node. By making calls on the
// Endpoint, it assures that the message will be delivered to the App on
// whichever node the message is intended for.
protected val endPoint = pastryNode.buildEndpoint (this, "myInstance")
// Handle to a SplitStream implementation
val mySplitStream = new SplitStreamImpl (pastryNode, "mySplitStream")
// The ChannelId is constructed from a normal PastryId based on the UUID
val myChannelId = new ChannelId (new PastryIdFactory
(pastryNode.getEnvironment).buildId ("myChannel"))
// The channel
var myChannel: Channel = null
// The stripes. Acquired from myChannel.
var myStripes: Array[Stripe] = null
// Now we can receive messages
endPoint.register
// Subscribes to all stripes in myChannelId.
def subscribe = {
// Attaching makes you part of the Channel, and volunteers to be an
// internal node of one of the trees
myChannel = mySplitStream.attachChannel (myChannelId)
// Subscribing notifies your application when data comes through the tree
myStripes = myChannel.getStripes
for (curStripe <- myStripes) { curStripe.subscribe (this) }
}
// Part of SplitStreamClient. Called when a published message is received.
def deliver (s: Stripe, data: Array[Byte]) = {
// TODO: Do real work here.
if (!BroadcastSS.isMaster) {
}
println(endPoint.getId + " deliver(" + s + "):seq:" + data(0) + " stripe:" + data(1) + " " + data + ")")
}
private def objectToByteArray[A] (obj: A): Array[Byte] = {
val baos = new ByteArrayOutputStream
val oos = new ObjectOutputStream (baos)
oos.writeObject (obj)
oos.close
baos.close
return baos.toByteArray
}
// Multicasts data.
def publish[A] (obj: A, blockSize: Int) = {
val byteArray = objectToByteArray[A] (obj)
var blockNum = (byteArray.length / blockSize)
if (byteArray.length % blockSize != 0)
blockNum += 1
var blockID = 0
for (i <- 0 until (byteArray.length, blockSize)) {
val thisBlockSize = Math.min (blockSize, byteArray.length - i)
var tempByteArray = new Array[Byte] (thisBlockSize + preAmbleSize)
System.arraycopy (byteArray, i * blockSize,
tempByteArray, preAmbleSize, thisBlockSize)
myStripes(blockID % myStripes.length).publish (tempByteArray)
blockID += 1
}
}
/* class PublishContent extends Message {
def getPriority: Int = { Message.MEDIUM_PRIORITY }
} */
// Error handling
def joinFailed(s: Stripe) = { println ("joinFailed(" + s + ")") }
// Rest of the Application interface. NOT USED.
def deliver (id: rice.p2p.commonapi.Id, message: Message) = { }
def forward (message: RouteMessage): Boolean = false
def update (handle: rice.p2p.commonapi.NodeHandle, joined: Boolean) = { }
} }
} }
@ -1034,6 +951,7 @@ private object BroadcastSS {
private var pEnvironment_ : Environment = null private var pEnvironment_ : Environment = null
private var pastryNode_ : PastryNode = null private var pastryNode_ : PastryNode = null
private var ssClient: SSClient = null
def initialize (isMaster__ : Boolean) { def initialize (isMaster__ : Boolean) {
synchronized { synchronized {
@ -1065,8 +983,15 @@ private object BroadcastSS {
def blockSize = blockSize_ def blockSize = blockSize_
def maxRetryCount = maxRetryCount_ def maxRetryCount = maxRetryCount_
def pEnvironment = pEnvironment_ def pEnvironment: Environment = {
def pastryNode = pastryNode_ if (pEnvironment_ == null) { initializeSplitStream }
pEnvironment_
}
def pastryNode: PastryNode = {
if (pastryNode_ == null) { initializeSplitStream }
pastryNode_
}
def localBindPort = { def localBindPort = {
if (localBindPort_ == -1) { if (localBindPort_ == -1) {
@ -1099,6 +1024,7 @@ private object BroadcastSS {
// Boot the node. // Boot the node.
pastryNode.boot (masterBootAddress) pastryNode.boot (masterBootAddress)
// TODO: Some unknown messages are dropped in slaves at this point
// The node may require sending several messages to fully boot into the ring // The node may require sending several messages to fully boot into the ring
pastryNode.synchronized { pastryNode.synchronized {
@ -1112,6 +1038,101 @@ private object BroadcastSS {
} }
} }
} }
// Create the SplitStream client and subscribe
ssClient = new SSClient (BroadcastSS.pastryNode)
ssClient.subscribe
}
def publish[A] (obj: A) = {
ssClient.publish[A] (obj)
}
class SSClient (pastryNode: PastryNode) extends SplitStreamClient
with Application {
// Bytes reserved before each published block. 8 byte = 2 integer
val preAmbleSize = 8
// The Endpoint represents the underlying node. By making calls on the
// Endpoint, it assures that the message will be delivered to the App on
// whichever node the message is intended for.
protected val endPoint = pastryNode.buildEndpoint (this, "myInstance")
// Handle to a SplitStream implementation
val mySplitStream = new SplitStreamImpl (pastryNode, "mySplitStream")
// The ChannelId is constructed from a normal PastryId based on the UUID
val myChannelId = new ChannelId (new PastryIdFactory
(pastryNode.getEnvironment).buildId ("myChannel"))
// The channel
var myChannel: Channel = null
// The stripes. Acquired from myChannel.
var myStripes: Array[Stripe] = null
// Now we can receive messages
endPoint.register
// Subscribes to all stripes in myChannelId.
def subscribe = {
// Attaching makes you part of the Channel, and volunteers to be an
// internal node of one of the trees
myChannel = mySplitStream.attachChannel (myChannelId)
// Subscribing notifies your application when data comes through the tree
myStripes = myChannel.getStripes
for (curStripe <- myStripes) { curStripe.subscribe (this) }
}
// Part of SplitStreamClient. Called when a published message is received.
def deliver (s: Stripe, data: Array[Byte]) = {
// TODO: Do real work here.
if (!BroadcastSS.isMaster) {
}
println(endPoint.getId + " deliver(" + s + "):seq:" + data(0) + " stripe:" + data(1) + " " + data + ")")
}
private def objectToByteArray[A] (obj: A): Array[Byte] = {
val baos = new ByteArrayOutputStream
val oos = new ObjectOutputStream (baos)
oos.writeObject (obj)
oos.close
baos.close
return baos.toByteArray
}
// Multicasts data.
def publish[A] (obj: A) = {
val byteArray = objectToByteArray[A] (obj)
var blockNum = (byteArray.length / blockSize)
if (byteArray.length % blockSize != 0)
blockNum += 1
var blockID = 0
for (i <- 0 until (byteArray.length, blockSize)) {
val thisBlockSize = Math.min (blockSize, byteArray.length - i)
var tempByteArray = new Array[Byte] (thisBlockSize + preAmbleSize)
System.arraycopy (byteArray, i * blockSize,
tempByteArray, preAmbleSize, thisBlockSize)
myStripes(blockID % myStripes.length).publish (tempByteArray)
blockID += 1
}
}
/* class PublishContent extends Message {
def getPriority: Int = { Message.MEDIUM_PRIORITY }
} */
// Error handling
def joinFailed(s: Stripe) = { println ("joinFailed(" + s + ")") }
// Rest of the Application interface. NOT USED.
def deliver (id: rice.p2p.commonapi.Id, message: Message) = { }
def forward (message: RouteMessage): Boolean = false
def update (handle: rice.p2p.commonapi.NodeHandle, joined: Boolean) = { }
} }
} }