Config parameters are in place. Good to go (I think)

This commit is contained in:
Mosharaf Chowdhury 2010-12-04 10:59:06 -08:00
parent 476a216d9d
commit 3a671ce989

View file

@ -84,13 +84,13 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
splitsInRequestBitVector = new BitSet (totalSplits)
combiners = new HashMap[K, C]
// TODO: Fix config param
var threadPool = LocalFileShuffle.newDaemonFixedThreadPool (2)
var threadPool =
LocalFileShuffle.newDaemonFixedThreadPool (LocalFileShuffle.MaxTxPeers)
while (hasSplits < totalSplits) {
// TODO:
var numThreadsToCreate =
Math.min (totalSplits, 2) - threadPool.getActiveCount
var numThreadsToCreate =
Math.min (totalSplits, LocalFileShuffle.MaxTxPeers) -
threadPool.getActiveCount
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
// Select a random split to pull
@ -112,8 +112,8 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
numThreadsToCreate = numThreadsToCreate - 1
}
// TODO:
Thread.sleep (1000)
// Sleep for a while before creating new threads
Thread.sleep (LocalFileShuffle.MinKnockInterval)
}
threadPool.shutdown
@ -162,9 +162,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
var timeOutTimer = new Timer
// TODO: Set wait timer
// TODO: If its too small, things FAIL
timeOutTimer.schedule (timeOutTask, 5000)
timeOutTimer.schedule (timeOutTask, LocalFileShuffle.MaxKnockInterval)
logInfo ("ShuffleClient started... => %s:%d#%s".format(hostAddress, listenPort, requestPath))
@ -201,8 +199,6 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
}
logInfo ("Finished reading " + requestPath)
// Now add this to combiners
val inputStream = new ObjectInputStream (
new ByteArrayInputStream(byteArray))
@ -221,8 +217,6 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
inputStream.close
logInfo ("Finished combining " + requestPath)
// Reception completed. Update stats.
hasSplitsBitVector.synchronized {
hasSplitsBitVector.set (splitIndex)
@ -275,6 +269,14 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
}
object LocalFileShuffle extends Logging {
// Used thoughout the code for small and large waits/timeouts
private var MinKnockInterval_ = 1000
private var MaxKnockInterval_ = 5000
// Maximum number of receiving and sending threads
private var MaxRxPeers_ = 4
private var MaxTxPeers_ = 4
private var initialized = false
private var nextShuffleId = new AtomicLong(0)
@ -290,6 +292,17 @@ object LocalFileShuffle extends Logging {
private def initializeIfNeeded() = synchronized {
if (!initialized) {
// Load config parameters
MinKnockInterval_ =
System.getProperty ("spark.shuffle.MinKnockInterval", "1000").toInt
MaxKnockInterval_ =
System.getProperty ("spark.shuffle.MaxKnockInterval", "5000").toInt
MaxRxPeers_ =
System.getProperty ("spark.shuffle.MaxRxPeers", "4").toInt
MaxTxPeers_ =
System.getProperty ("spark.shuffle.MaxTxPeers", "4").toInt
// TODO: localDir should be created by some mechanism common to Spark
// so that it can be shared among shuffle, broadcast, etc
val localDirRoot = System.getProperty("spark.local.dir", "/tmp")
@ -330,6 +343,12 @@ object LocalFileShuffle extends Logging {
}
}
def MinKnockInterval = MinKnockInterval_
def MaxKnockInterval = MaxKnockInterval_
def MaxRxPeers = MaxRxPeers_
def MaxTxPeers = MaxTxPeers_
def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = {
initializeIfNeeded()
val dir = new File(shuffleDir, shuffleId + "/" + inputId)
@ -365,8 +384,7 @@ object LocalFileShuffle extends Logging {
class ShuffleServer
extends Thread with Logging {
// TODO: Set config param
var threadPool = newDaemonFixedThreadPool(2)
var threadPool = newDaemonFixedThreadPool(LocalFileShuffle.MaxRxPeers)
var serverSocket: ServerSocket = null