diff --git a/src/scala/spark/LocalFileShuffle.scala b/src/scala/spark/LocalFileShuffle.scala index e3998b7774..9a5b0ff7d5 100644 --- a/src/scala/spark/LocalFileShuffle.scala +++ b/src/scala/spark/LocalFileShuffle.scala @@ -85,11 +85,11 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { combiners = new HashMap[K, C] var threadPool = - LocalFileShuffle.newDaemonFixedThreadPool (LocalFileShuffle.MaxTxPeers) + LocalFileShuffle.newDaemonFixedThreadPool (LocalFileShuffle.MaxConnections) while (hasSplits < totalSplits) { var numThreadsToCreate = - Math.min (totalSplits, LocalFileShuffle.MaxTxPeers) - + Math.min (totalSplits, LocalFileShuffle.MaxConnections) - threadPool.getActiveCount while (hasSplits < totalSplits && numThreadsToCreate > 0) { @@ -259,9 +259,8 @@ object LocalFileShuffle extends Logging { private var MinKnockInterval_ = 1000 private var MaxKnockInterval_ = 5000 - // Maximum number of receiving and sending threads - private var MaxRxPeers_ = 4 - private var MaxTxPeers_ = 4 + // Maximum number of connections + private var MaxConnections_ = 4 private var initialized = false private var nextShuffleId = new AtomicLong(0) @@ -284,10 +283,8 @@ object LocalFileShuffle extends Logging { MaxKnockInterval_ = System.getProperty ("spark.shuffle.MaxKnockInterval", "5000").toInt - MaxRxPeers_ = - System.getProperty ("spark.shuffle.MaxRxPeers", "4").toInt - MaxTxPeers_ = - System.getProperty ("spark.shuffle.MaxTxPeers", "4").toInt + MaxConnections_ = + System.getProperty ("spark.shuffle.MaxConnections", "4").toInt // TODO: localDir should be created by some mechanism common to Spark // so that it can be shared among shuffle, broadcast, etc @@ -332,8 +329,7 @@ object LocalFileShuffle extends Logging { def MinKnockInterval = MinKnockInterval_ def MaxKnockInterval = MaxKnockInterval_ - def MaxRxPeers = MaxRxPeers_ - def MaxTxPeers = MaxTxPeers_ + def MaxConnections = MaxConnections_ def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = { initializeIfNeeded() @@ -370,7 +366,7 @@ object LocalFileShuffle extends Logging { class ShuffleServer extends Thread with Logging { - var threadPool = newDaemonFixedThreadPool(LocalFileShuffle.MaxRxPeers) + var threadPool = newDaemonFixedThreadPool(LocalFileShuffle.MaxConnections) var serverSocket: ServerSocket = null