Refactored TrackMultipleValues out.

This commit is contained in:
Mosharaf Chowdhury 2012-07-09 21:35:39 -07:00
parent 654576ef1a
commit ca02a92332
3 changed files with 210 additions and 328 deletions

View file

@ -2,7 +2,7 @@ package spark.broadcast
import java.io._ import java.io._
import java.net._ import java.net._
import java.util.{BitSet, Comparator, Random, Timer, TimerTask, UUID} import java.util.{BitSet, Comparator, Timer, TimerTask, UUID}
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import scala.collection.mutable.{ListBuffer, Map, Set} import scala.collection.mutable.{ListBuffer, Map, Set}
@ -15,8 +15,8 @@ extends Broadcast[T] with Logging with Serializable {
def value = value_ def value = value_
BitTorrentBroadcast.synchronized { Broadcast.synchronized {
BitTorrentBroadcast.values.put(uuid, 0, value_) Broadcast.values.put(uuid, 0, value_)
} }
@transient var arrayOfBlocks: Array[BroadcastBlock] = null @transient var arrayOfBlocks: Array[BroadcastBlock] = null
@ -109,14 +109,14 @@ extends Broadcast[T] with Logging with Serializable {
listOfSources += masterSource listOfSources += masterSource
// Register with the Tracker // Register with the Tracker
registerBroadcast(uuid, Broadcast.registerBroadcast(uuid,
SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes)) SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes))
} }
private def readObject(in: ObjectInputStream) { private def readObject(in: ObjectInputStream) {
in.defaultReadObject() in.defaultReadObject()
BitTorrentBroadcast.synchronized { Broadcast.synchronized {
val cachedVal = BitTorrentBroadcast.values.get(uuid, 0) val cachedVal = Broadcast.values.get(uuid, 0)
if (cachedVal != null) { if (cachedVal != null) {
value_ = cachedVal.asInstanceOf[T] value_ = cachedVal.asInstanceOf[T]
@ -137,7 +137,7 @@ extends Broadcast[T] with Logging with Serializable {
val receptionSucceeded = receiveBroadcast(uuid) val receptionSucceeded = receiveBroadcast(uuid)
if (receptionSucceeded) { if (receptionSucceeded) {
value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
BitTorrentBroadcast.values.put(uuid, 0, value_) Broadcast.values.put(uuid, 0, value_)
} else { } else {
logError("Reading Broadcasted variable " + uuid + " failed") logError("Reading Broadcasted variable " + uuid + " failed")
} }
@ -171,58 +171,6 @@ extends Broadcast[T] with Logging with Serializable {
stopBroadcast = false stopBroadcast = false
} }
private def registerBroadcast(uuid: UUID, gInfo: SourceInfo) {
val socket = new Socket(Broadcast.MasterHostAddress,
Broadcast.MasterTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)
// Send messageType/intention
oosST.writeObject(Broadcast.REGISTER_BROADCAST_TRACKER)
oosST.flush()
// Send UUID of this broadcast
oosST.writeObject(uuid)
oosST.flush()
// Send this tracker's information
oosST.writeObject(gInfo)
oosST.flush()
// Receive ACK and throw it away
oisST.readObject.asInstanceOf[Int]
// Shut stuff down
oisST.close()
oosST.close()
socket.close()
}
private def unregisterBroadcast(uuid: UUID) {
val socket = new Socket(Broadcast.MasterHostAddress,
Broadcast.MasterTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)
// Send messageType/intention
oosST.writeObject(Broadcast.UNREGISTER_BROADCAST_TRACKER)
oosST.flush()
// Send UUID of this broadcast
oosST.writeObject(uuid)
oosST.flush()
// Receive ACK and throw it away
oisST.readObject.asInstanceOf[Int]
// Shut stuff down
oisST.close()
oosST.close()
socket.close()
}
private def getLocalSourceInfo: SourceInfo = { private def getLocalSourceInfo: SourceInfo = {
// Wait till hostName and listenPort are OK // Wait till hostName and listenPort are OK
while (listenPort == -1) { while (listenPort == -1) {
@ -274,7 +222,7 @@ extends Broadcast[T] with Logging with Serializable {
// Keep exchaning information until all blocks have been received // Keep exchaning information until all blocks have been received
while (hasBlocks.get < totalBlocks) { while (hasBlocks.get < totalBlocks) {
talkOnce talkOnce
Thread.sleep(BitTorrentBroadcast.ranGen.nextInt( Thread.sleep(Broadcast.ranGen.nextInt(
Broadcast.MaxKnockInterval - Broadcast.MinKnockInterval) + Broadcast.MaxKnockInterval - Broadcast.MinKnockInterval) +
Broadcast.MinKnockInterval) Broadcast.MinKnockInterval)
} }
@ -354,7 +302,7 @@ extends Broadcast[T] with Logging with Serializable {
} }
} }
Thread.sleep(BitTorrentBroadcast.ranGen.nextInt( Thread.sleep(Broadcast.ranGen.nextInt(
Broadcast.MaxKnockInterval - Broadcast.MinKnockInterval) + Broadcast.MaxKnockInterval - Broadcast.MinKnockInterval) +
Broadcast.MinKnockInterval) Broadcast.MinKnockInterval)
@ -492,7 +440,7 @@ extends Broadcast[T] with Logging with Serializable {
// Now always picking randomly // Now always picking randomly
if (curPeer == null && peersNotInUse.size > 0) { if (curPeer == null && peersNotInUse.size > 0) {
// Pick uniformly the i'th required peer // Pick uniformly the i'th required peer
var i = BitTorrentBroadcast.ranGen.nextInt(peersNotInUse.size) var i = Broadcast.ranGen.nextInt(peersNotInUse.size)
var peerIter = peersNotInUse.iterator var peerIter = peersNotInUse.iterator
curPeer = peerIter.next curPeer = peerIter.next
@ -563,7 +511,7 @@ extends Broadcast[T] with Logging with Serializable {
// Sort the peers based on how many rare blocks they have // Sort the peers based on how many rare blocks they have
peersWithRareBlocks.sortBy(_._2) peersWithRareBlocks.sortBy(_._2)
var randomNumber = BitTorrentBroadcast.ranGen.nextDouble var randomNumber = Broadcast.ranGen.nextDouble
var tempSum = 0.0 var tempSum = 0.0
var i = 0 var i = 0
@ -732,7 +680,7 @@ extends Broadcast[T] with Logging with Serializable {
return -1 return -1
} else { } else {
// Pick uniformly the i'th required block // Pick uniformly the i'th required block
var i = BitTorrentBroadcast.ranGen.nextInt(needBlocksBitVector.cardinality) var i = Broadcast.ranGen.nextInt(needBlocksBitVector.cardinality)
var pickedBlockIndex = needBlocksBitVector.nextSetBit(0) var pickedBlockIndex = needBlocksBitVector.nextSetBit(0)
while (i > 0) { while (i > 0) {
@ -804,7 +752,7 @@ extends Broadcast[T] with Logging with Serializable {
return -1 return -1
} else { } else {
// Pick uniformly the i'th index // Pick uniformly the i'th index
var i = BitTorrentBroadcast.ranGen.nextInt(minBlocksIndices.size) var i = Broadcast.ranGen.nextInt(minBlocksIndices.size)
return minBlocksIndices(i) return minBlocksIndices(i)
} }
} }
@ -885,7 +833,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("Sending stopBroadcast notifications...") logInfo("Sending stopBroadcast notifications...")
sendStopBroadcastNotifications sendStopBroadcastNotifications
unregisterBroadcast(uuid) Broadcast.unregisterBroadcast(uuid)
} finally { } finally {
if (serverSocket != null) { if (serverSocket != null) {
logInfo("GuideMultipleRequests now stopping...") logInfo("GuideMultipleRequests now stopping...")
@ -1000,7 +948,7 @@ extends Broadcast[T] with Logging with Serializable {
var i = -1 var i = -1
do { do {
i = BitTorrentBroadcast.ranGen.nextInt(listOfSources.size) i = Broadcast.ranGen.nextInt(listOfSources.size)
} while (alreadyPicked.get(i)) } while (alreadyPicked.get(i))
var peerIter = listOfSources.iterator var peerIter = listOfSources.iterator
@ -1114,7 +1062,7 @@ extends Broadcast[T] with Logging with Serializable {
// If it is master AND at least one copy of each block has not been // If it is master AND at least one copy of each block has not been
// sent out already, MODIFY blockToSend // sent out already, MODIFY blockToSend
if (BitTorrentBroadcast.isMaster && sentBlocks.get < totalBlocks) { if (Broadcast.isMaster && sentBlocks.get < totalBlocks) {
blockToSend = sentBlocks.getAndIncrement blockToSend = sentBlocks.getAndIncrement
} }
@ -1170,150 +1118,10 @@ extends Broadcast[T] with Logging with Serializable {
class BitTorrentBroadcastFactory class BitTorrentBroadcastFactory
extends BroadcastFactory { extends BroadcastFactory {
def initialize(isMaster: Boolean) { def initialize(isMaster: Boolean) {
BitTorrentBroadcast.initialize(isMaster) // BitTorrentBroadcast.initialize(isMaster)
} }
def newBroadcast[T](value_ : T, isLocal: Boolean) = { def newBroadcast[T](value_ : T, isLocal: Boolean) = {
new BitTorrentBroadcast[T](value_, isLocal) new BitTorrentBroadcast[T](value_, isLocal)
} }
} }
private object BitTorrentBroadcast
extends Logging {
val values = SparkEnv.get.cache.newKeySpace()
var valueToGuideMap = Map[UUID, SourceInfo]()
// Random number generator
var ranGen = new Random
private var initialized = false
private var isMaster_ = false
private var trackMV: TrackMultipleValues = null
def initialize(isMaster__ : Boolean) {
synchronized {
if (!initialized) {
isMaster_ = isMaster__
if (isMaster) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
trackMV.start()
}
initialized = true
}
}
}
def isMaster = isMaster_
class TrackMultipleValues
extends Thread with Logging {
override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(Broadcast.MasterTrackerPort)
logInfo("TrackMultipleValues" + serverSocket)
try {
while (true) {
var clientSocket: Socket = null
try {
serverSocket.setSoTimeout(Broadcast.TrackerSocketTimeout)
clientSocket = serverSocket.accept()
} catch {
case e: Exception => {
logInfo("TrackMultipleValues Timeout. Stopping listening...")
}
}
if (clientSocket != null) {
try {
threadPool.execute(new Thread {
override def run() {
val oos = new ObjectOutputStream(clientSocket.getOutputStream)
oos.flush()
val ois = new ObjectInputStream(clientSocket.getInputStream)
try {
// First, read message type
val messageType = ois.readObject.asInstanceOf[Int]
if (messageType == Broadcast.REGISTER_BROADCAST_TRACKER) {
// Receive UUID
val uuid = ois.readObject.asInstanceOf[UUID]
// Receive hostAddress and listenPort
val gInfo = ois.readObject.asInstanceOf[SourceInfo]
// Add to the map
valueToGuideMap.synchronized {
valueToGuideMap += (uuid -> gInfo)
}
logInfo ("New broadcast registered with TrackMultipleValues " + uuid + " " + valueToGuideMap)
// Send dummy ACK
oos.writeObject(-1)
oos.flush()
} else if (messageType == Broadcast.UNREGISTER_BROADCAST_TRACKER) {
// Receive UUID
val uuid = ois.readObject.asInstanceOf[UUID]
// Remove from the map
valueToGuideMap.synchronized {
valueToGuideMap(uuid) = SourceInfo("", SourceInfo.TxOverGoToDefault)
logInfo("Value unregistered from the Tracker " + valueToGuideMap)
}
logInfo ("Broadcast unregistered from TrackMultipleValues " + uuid + " " + valueToGuideMap)
// Send dummy ACK
oos.writeObject(-1)
oos.flush()
} else if (messageType == Broadcast.FIND_BROADCAST_TRACKER) {
// Receive UUID
val uuid = ois.readObject.asInstanceOf[UUID]
var gInfo =
if (valueToGuideMap.contains(uuid)) valueToGuideMap(uuid)
else SourceInfo("", SourceInfo.TxNotStartedRetry)
logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort)
// Send reply back
oos.writeObject(gInfo)
oos.flush()
} else if (messageType == Broadcast.GET_UPDATED_SHARE) {
// TODO: Not implemented
} else {
throw new SparkException("Undefined messageType at TrackMultipleValues")
}
} catch {
case e: Exception => {
logInfo("TrackMultipleValues had a " + e)
}
} finally {
ois.close()
oos.close()
clientSocket.close()
}
}
})
} catch {
// In failure, close socket here; else, client thread will close
case ioe: IOException => {
clientSocket.close()
}
}
}
}
} finally {
serverSocket.close()
}
// Shutdown the thread pool
threadPool.shutdown()
}
}
}

View file

@ -2,9 +2,11 @@ package spark.broadcast
import java.io._ import java.io._
import java.net._ import java.net._
import java.util.{BitSet, UUID} import java.util.{BitSet, UUID, Random}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor} import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import scala.collection.mutable.Map
import spark._ import spark._
trait Broadcast[T] extends Serializable { trait Broadcast[T] extends Serializable {
@ -30,6 +32,18 @@ object Broadcast extends Logging with Serializable {
private var isMaster_ = false private var isMaster_ = false
private var broadcastFactory: BroadcastFactory = null private var broadcastFactory: BroadcastFactory = null
// Cache of broadcasted objects
val values = SparkEnv.get.cache.newKeySpace()
// Map to keep track of guides of ongoing broadcasts
var valueToGuideMap = Map[UUID, SourceInfo]()
// Random number generator
var ranGen = new Random
// Tracker object
private var trackMV: TrackMultipleValues = null
// Called by SparkContext or Executor before using Broadcast // Called by SparkContext or Executor before using Broadcast
def initialize(isMaster__ : Boolean) { def initialize(isMaster__ : Boolean) {
synchronized { synchronized {
@ -46,6 +60,11 @@ object Broadcast extends Logging with Serializable {
// Set masterHostAddress to the master's IP address for the slaves to read // Set masterHostAddress to the master's IP address for the slaves to read
if (isMaster) { if (isMaster) {
System.setProperty("spark.broadcast.masterHostAddress", Utils.localIpAddress) System.setProperty("spark.broadcast.masterHostAddress", Utils.localIpAddress)
// Start the tracker
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
trackMV.start()
} }
// Initialize appropriate BroadcastFactory and BroadcastObject // Initialize appropriate BroadcastFactory and BroadcastObject
@ -127,6 +146,167 @@ object Broadcast extends Logging with Serializable {
def EndGameFraction = EndGameFraction_ def EndGameFraction = EndGameFraction_
class TrackMultipleValues
extends Thread with Logging {
override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(Broadcast.MasterTrackerPort)
logInfo("TrackMultipleValues" + serverSocket)
try {
while (true) {
var clientSocket: Socket = null
try {
serverSocket.setSoTimeout(Broadcast.TrackerSocketTimeout)
clientSocket = serverSocket.accept()
} catch {
case e: Exception => {
logInfo("TrackMultipleValues Timeout. Stopping listening...")
}
}
if (clientSocket != null) {
try {
threadPool.execute(new Thread {
override def run() {
val oos = new ObjectOutputStream(clientSocket.getOutputStream)
oos.flush()
val ois = new ObjectInputStream(clientSocket.getInputStream)
try {
// First, read message type
val messageType = ois.readObject.asInstanceOf[Int]
if (messageType == Broadcast.REGISTER_BROADCAST_TRACKER) {
// Receive UUID
val uuid = ois.readObject.asInstanceOf[UUID]
// Receive hostAddress and listenPort
val gInfo = ois.readObject.asInstanceOf[SourceInfo]
// Add to the map
valueToGuideMap.synchronized {
valueToGuideMap += (uuid -> gInfo)
}
logInfo ("New broadcast registered with TrackMultipleValues " + uuid + " " + valueToGuideMap)
// Send dummy ACK
oos.writeObject(-1)
oos.flush()
} else if (messageType == Broadcast.UNREGISTER_BROADCAST_TRACKER) {
// Receive UUID
val uuid = ois.readObject.asInstanceOf[UUID]
// Remove from the map
valueToGuideMap.synchronized {
valueToGuideMap(uuid) = SourceInfo("", SourceInfo.TxOverGoToDefault)
logInfo("Value unregistered from the Tracker " + valueToGuideMap)
}
logInfo ("Broadcast unregistered from TrackMultipleValues " + uuid + " " + valueToGuideMap)
// Send dummy ACK
oos.writeObject(-1)
oos.flush()
} else if (messageType == Broadcast.FIND_BROADCAST_TRACKER) {
// Receive UUID
val uuid = ois.readObject.asInstanceOf[UUID]
var gInfo =
if (valueToGuideMap.contains(uuid)) valueToGuideMap(uuid)
else SourceInfo("", SourceInfo.TxNotStartedRetry)
logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + gInfo.listenPort)
// Send reply back
oos.writeObject(gInfo)
oos.flush()
} else if (messageType == Broadcast.GET_UPDATED_SHARE) {
// TODO: Not implemented
} else {
throw new SparkException("Undefined messageType at TrackMultipleValues")
}
} catch {
case e: Exception => {
logInfo("TrackMultipleValues had a " + e)
}
} finally {
ois.close()
oos.close()
clientSocket.close()
}
}
})
} catch {
// In failure, close socket here; else, client thread will close
case ioe: IOException => {
clientSocket.close()
}
}
}
}
} finally {
serverSocket.close()
}
// Shutdown the thread pool
threadPool.shutdown()
}
}
def registerBroadcast(uuid: UUID, gInfo: SourceInfo) {
val socket = new Socket(Broadcast.MasterHostAddress,
Broadcast.MasterTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)
// Send messageType/intention
oosST.writeObject(Broadcast.REGISTER_BROADCAST_TRACKER)
oosST.flush()
// Send UUID of this broadcast
oosST.writeObject(uuid)
oosST.flush()
// Send this tracker's information
oosST.writeObject(gInfo)
oosST.flush()
// Receive ACK and throw it away
oisST.readObject.asInstanceOf[Int]
// Shut stuff down
oisST.close()
oosST.close()
socket.close()
}
def unregisterBroadcast(uuid: UUID) {
val socket = new Socket(Broadcast.MasterHostAddress,
Broadcast.MasterTrackerPort)
val oosST = new ObjectOutputStream(socket.getOutputStream)
oosST.flush()
val oisST = new ObjectInputStream(socket.getInputStream)
// Send messageType/intention
oosST.writeObject(Broadcast.UNREGISTER_BROADCAST_TRACKER)
oosST.flush()
// Send UUID of this broadcast
oosST.writeObject(uuid)
oosST.flush()
// Receive ACK and throw it away
oisST.readObject.asInstanceOf[Int]
// Shut stuff down
oisST.close()
oosST.close()
socket.close()
}
// Helper method to convert an object to Array[BroadcastBlock] // Helper method to convert an object to Array[BroadcastBlock]
def blockifyObject[IN](obj: IN): VariableInfo = { def blockifyObject[IN](obj: IN): VariableInfo = {
val baos = new ByteArrayOutputStream val baos = new ByteArrayOutputStream

View file

@ -14,8 +14,8 @@ extends Broadcast[T] with Logging with Serializable {
def value = value_ def value = value_
TreeBroadcast.synchronized { Broadcast.synchronized {
TreeBroadcast.values.put(uuid, 0, value_) Broadcast.values.put(uuid, 0, value_)
} }
@transient var arrayOfBlocks: Array[BroadcastBlock] = null @transient var arrayOfBlocks: Array[BroadcastBlock] = null
@ -87,13 +87,14 @@ extends Broadcast[T] with Logging with Serializable {
listOfSources += masterSource listOfSources += masterSource
// Register with the Tracker // Register with the Tracker
TreeBroadcast.registerValue(uuid, guidePort) Broadcast.registerBroadcast(uuid,
SourceInfo(hostAddress, guidePort, totalBlocks, totalBytes))
} }
private def readObject(in: ObjectInputStream) { private def readObject(in: ObjectInputStream) {
in.defaultReadObject() in.defaultReadObject()
TreeBroadcast.synchronized { Broadcast.synchronized {
val cachedVal = TreeBroadcast.values.get(uuid, 0) val cachedVal = Broadcast.values.get(uuid, 0)
if (cachedVal != null) { if (cachedVal != null) {
value_ = cachedVal.asInstanceOf[T] value_ = cachedVal.asInstanceOf[T]
} else { } else {
@ -112,7 +113,7 @@ extends Broadcast[T] with Logging with Serializable {
val receptionSucceeded = receiveBroadcast(uuid) val receptionSucceeded = receiveBroadcast(uuid)
if (receptionSucceeded) { if (receptionSucceeded) {
value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks) value_ = Broadcast.unBlockifyObject[T](arrayOfBlocks, totalBytes, totalBlocks)
TreeBroadcast.values.put(uuid, 0, value_) Broadcast.values.put(uuid, 0, value_)
} else { } else {
logError("Reading Broadcasted variable " + uuid + " failed") logError("Reading Broadcasted variable " + uuid + " failed")
} }
@ -181,7 +182,7 @@ extends Broadcast[T] with Logging with Serializable {
} }
retriesLeft -= 1 retriesLeft -= 1
Thread.sleep(TreeBroadcast.ranGen.nextInt( Thread.sleep(Broadcast.ranGen.nextInt(
Broadcast.MaxKnockInterval - Broadcast.MinKnockInterval) + Broadcast.MaxKnockInterval - Broadcast.MinKnockInterval) +
Broadcast.MinKnockInterval) Broadcast.MinKnockInterval)
@ -382,7 +383,7 @@ extends Broadcast[T] with Logging with Serializable {
logInfo("Sending stopBroadcast notifications...") logInfo("Sending stopBroadcast notifications...")
sendStopBroadcastNotifications sendStopBroadcastNotifications
TreeBroadcast.unregisterValue(uuid) Broadcast.unregisterBroadcast(uuid)
} finally { } finally {
if (serverSocket != null) { if (serverSocket != null) {
logInfo("GuideMultipleRequests now stopping...") logInfo("GuideMultipleRequests now stopping...")
@ -666,116 +667,9 @@ extends Broadcast[T] with Logging with Serializable {
class TreeBroadcastFactory class TreeBroadcastFactory
extends BroadcastFactory { extends BroadcastFactory {
def initialize(isMaster: Boolean) = TreeBroadcast.initialize(isMaster) def initialize(isMaster: Boolean) {
// TreeBroadcast.initialize(isMaster)
}
def newBroadcast[T](value_ : T, isLocal: Boolean) = def newBroadcast[T](value_ : T, isLocal: Boolean) =
new TreeBroadcast[T](value_, isLocal) new TreeBroadcast[T](value_, isLocal)
} }
private object TreeBroadcast
extends Logging {
val values = SparkEnv.get.cache.newKeySpace()
var valueToGuidePortMap = Map[UUID, Int]()
// Random number generator
var ranGen = new Random
private var initialized = false
private var isMaster_ = false
private var trackMV: TrackMultipleValues = null
private var MaxDegree_ : Int = 2
def initialize(isMaster__ : Boolean) {
synchronized {
if (!initialized) {
isMaster_ = isMaster__
if (isMaster) {
trackMV = new TrackMultipleValues
trackMV.setDaemon(true)
trackMV.start()
}
initialized = true
}
}
}
def isMaster = isMaster_
def registerValue(uuid: UUID, guidePort: Int) {
valueToGuidePortMap.synchronized {
valueToGuidePortMap += (uuid -> guidePort)
logInfo("New value registered with the Tracker " + valueToGuidePortMap)
}
}
def unregisterValue(uuid: UUID) {
valueToGuidePortMap.synchronized {
valueToGuidePortMap(uuid) = SourceInfo.TxOverGoToDefault
logInfo("Value unregistered from the Tracker " + valueToGuidePortMap)
}
}
class TrackMultipleValues
extends Thread with Logging {
override def run() {
var threadPool = Utils.newDaemonCachedThreadPool()
var serverSocket: ServerSocket = null
serverSocket = new ServerSocket(Broadcast.MasterTrackerPort)
logInfo("TrackMultipleValues" + serverSocket)
try {
while (true) {
var clientSocket: Socket = null
try {
serverSocket.setSoTimeout(Broadcast.TrackerSocketTimeout)
clientSocket = serverSocket.accept
} catch {
case e: Exception => {
logInfo("TrackMultipleValues Timeout. Stopping listening...")
}
}
if (clientSocket != null) {
try {
threadPool.execute(new Thread {
override def run() {
val oos = new ObjectOutputStream(clientSocket.getOutputStream)
oos.flush()
val ois = new ObjectInputStream(clientSocket.getInputStream)
try {
val uuid = ois.readObject.asInstanceOf[UUID]
var guidePort =
if (valueToGuidePortMap.contains(uuid)) {
valueToGuidePortMap(uuid)
} else SourceInfo.TxNotStartedRetry
logInfo("TrackMultipleValues: Got new request: " + clientSocket + " for " + uuid + " : " + guidePort)
oos.writeObject(guidePort)
} catch {
case e: Exception => {
logInfo("TrackMultipleValues had a " + e)
}
} finally {
ois.close()
oos.close()
clientSocket.close()
}
}
})
} catch {
// In failure, close() socket here; else, client thread will close()
case ioe: IOException => clientSocket.close()
}
}
}
} finally {
serverSocket.close()
}
// Shutdown the thread pool
threadPool.shutdown()
}
}
}