Combined MaxRxPeers and MaxTxPeers to a single config parameter MaxConnections
This commit is contained in:
parent
b1745b3103
commit
7df20d681a
|
@ -85,11 +85,11 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
combiners = new HashMap[K, C]
|
||||
|
||||
var threadPool =
|
||||
LocalFileShuffle.newDaemonFixedThreadPool (LocalFileShuffle.MaxTxPeers)
|
||||
LocalFileShuffle.newDaemonFixedThreadPool (LocalFileShuffle.MaxConnections)
|
||||
|
||||
while (hasSplits < totalSplits) {
|
||||
var numThreadsToCreate =
|
||||
Math.min (totalSplits, LocalFileShuffle.MaxTxPeers) -
|
||||
Math.min (totalSplits, LocalFileShuffle.MaxConnections) -
|
||||
threadPool.getActiveCount
|
||||
|
||||
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
|
||||
|
@ -259,9 +259,8 @@ object LocalFileShuffle extends Logging {
|
|||
private var MinKnockInterval_ = 1000
|
||||
private var MaxKnockInterval_ = 5000
|
||||
|
||||
// Maximum number of receiving and sending threads
|
||||
private var MaxRxPeers_ = 4
|
||||
private var MaxTxPeers_ = 4
|
||||
// Maximum number of connections
|
||||
private var MaxConnections_ = 4
|
||||
|
||||
private var initialized = false
|
||||
private var nextShuffleId = new AtomicLong(0)
|
||||
|
@ -284,10 +283,8 @@ object LocalFileShuffle extends Logging {
|
|||
MaxKnockInterval_ =
|
||||
System.getProperty ("spark.shuffle.MaxKnockInterval", "5000").toInt
|
||||
|
||||
MaxRxPeers_ =
|
||||
System.getProperty ("spark.shuffle.MaxRxPeers", "4").toInt
|
||||
MaxTxPeers_ =
|
||||
System.getProperty ("spark.shuffle.MaxTxPeers", "4").toInt
|
||||
MaxConnections_ =
|
||||
System.getProperty ("spark.shuffle.MaxConnections", "4").toInt
|
||||
|
||||
// TODO: localDir should be created by some mechanism common to Spark
|
||||
// so that it can be shared among shuffle, broadcast, etc
|
||||
|
@ -332,8 +329,7 @@ object LocalFileShuffle extends Logging {
|
|||
def MinKnockInterval = MinKnockInterval_
|
||||
def MaxKnockInterval = MaxKnockInterval_
|
||||
|
||||
def MaxRxPeers = MaxRxPeers_
|
||||
def MaxTxPeers = MaxTxPeers_
|
||||
def MaxConnections = MaxConnections_
|
||||
|
||||
def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = {
|
||||
initializeIfNeeded()
|
||||
|
@ -370,7 +366,7 @@ object LocalFileShuffle extends Logging {
|
|||
|
||||
class ShuffleServer
|
||||
extends Thread with Logging {
|
||||
var threadPool = newDaemonFixedThreadPool(LocalFileShuffle.MaxRxPeers)
|
||||
var threadPool = newDaemonFixedThreadPool(LocalFileShuffle.MaxConnections)
|
||||
|
||||
var serverSocket: ServerSocket = null
|
||||
|
||||
|
|
Loading…
Reference in a new issue