From 3a671ce989b9813c16bec7a50e07a7d97d8fc011 Mon Sep 17 00:00:00 2001 From: Mosharaf Chowdhury Date: Sat, 4 Dec 2010 10:59:06 -0800 Subject: [PATCH] Config parameters are in place. Good to go (I think) --- src/scala/spark/LocalFileShuffle.scala | 50 +++++++++++++++++--------- 1 file changed, 34 insertions(+), 16 deletions(-) diff --git a/src/scala/spark/LocalFileShuffle.scala b/src/scala/spark/LocalFileShuffle.scala index 6ddd661288..eb9905b77f 100644 --- a/src/scala/spark/LocalFileShuffle.scala +++ b/src/scala/spark/LocalFileShuffle.scala @@ -84,13 +84,13 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { splitsInRequestBitVector = new BitSet (totalSplits) combiners = new HashMap[K, C] - // TODO: Fix config param - var threadPool = LocalFileShuffle.newDaemonFixedThreadPool (2) + var threadPool = + LocalFileShuffle.newDaemonFixedThreadPool (LocalFileShuffle.MaxTxPeers) while (hasSplits < totalSplits) { - // TODO: - var numThreadsToCreate = - Math.min (totalSplits, 2) - threadPool.getActiveCount + var numThreadsToCreate = + Math.min (totalSplits, LocalFileShuffle.MaxTxPeers) - + threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { // Select a random split to pull @@ -112,8 +112,8 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { numThreadsToCreate = numThreadsToCreate - 1 } - // TODO: - Thread.sleep (1000) + // Sleep for a while before creating new threads + Thread.sleep (LocalFileShuffle.MinKnockInterval) } threadPool.shutdown @@ -162,9 +162,7 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } var timeOutTimer = new Timer - // TODO: Set wait timer - // TODO: If its too small, things FAIL - timeOutTimer.schedule (timeOutTask, 5000) + timeOutTimer.schedule (timeOutTask, LocalFileShuffle.MaxKnockInterval) logInfo ("ShuffleClient started... => %s:%d#%s".format(hostAddress, listenPort, requestPath)) @@ -201,8 +199,6 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } } - logInfo ("Finished reading " + requestPath) - // Now add this to combiners val inputStream = new ObjectInputStream ( new ByteArrayInputStream(byteArray)) @@ -221,8 +217,6 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } inputStream.close - logInfo ("Finished combining " + requestPath) - // Reception completed. Update stats. hasSplitsBitVector.synchronized { hasSplitsBitVector.set (splitIndex) @@ -275,6 +269,14 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } object LocalFileShuffle extends Logging { + // Used thoughout the code for small and large waits/timeouts + private var MinKnockInterval_ = 1000 + private var MaxKnockInterval_ = 5000 + + // Maximum number of receiving and sending threads + private var MaxRxPeers_ = 4 + private var MaxTxPeers_ = 4 + private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -290,6 +292,17 @@ object LocalFileShuffle extends Logging { private def initializeIfNeeded() = synchronized { if (!initialized) { + // Load config parameters + MinKnockInterval_ = + System.getProperty ("spark.shuffle.MinKnockInterval", "1000").toInt + MaxKnockInterval_ = + System.getProperty ("spark.shuffle.MaxKnockInterval", "5000").toInt + + MaxRxPeers_ = + System.getProperty ("spark.shuffle.MaxRxPeers", "4").toInt + MaxTxPeers_ = + System.getProperty ("spark.shuffle.MaxTxPeers", "4").toInt + // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc val localDirRoot = System.getProperty("spark.local.dir", "/tmp") @@ -330,6 +343,12 @@ object LocalFileShuffle extends Logging { } } + def MinKnockInterval = MinKnockInterval_ + def MaxKnockInterval = MaxKnockInterval_ + + def MaxRxPeers = MaxRxPeers_ + def MaxTxPeers = MaxTxPeers_ + def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() val dir = new File(shuffleDir, shuffleId + "/" + inputId) @@ -365,8 +384,7 @@ object LocalFileShuffle extends Logging { class ShuffleServer extends Thread with Logging { - // TODO: Set config param - var threadPool = newDaemonFixedThreadPool(2) + var threadPool = newDaemonFixedThreadPool(LocalFileShuffle.MaxRxPeers) var serverSocket: ServerSocket = null