Added a tracker strategy that selects random mappers for reducers. This can be used to measure tracker overhead.
This commit is contained in:
parent
fd3fd37383
commit
5bf6369220
|
@ -1,7 +1,7 @@
|
|||
-Dspark.shuffle.class=spark.TrackedCustomBlockedInMemoryShuffle
|
||||
-Dspark.shuffle.masterHostAddress=127.0.0.1
|
||||
-Dspark.shuffle.masterTrackerPort=22222
|
||||
-Dspark.shuffle.trackerStrategy=spark.BalanceConnectionsShuffleTrackerStrategy
|
||||
-Dspark.shuffle.trackerStrategy=spark.SelectRandomShuffleTrackerStrategy
|
||||
-Dspark.shuffle.maxRxConnections=40
|
||||
-Dspark.shuffle.maxTxConnections=120
|
||||
-Dspark.shuffle.blockSize=4096
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
package spark
|
||||
|
||||
import java.util.Random
|
||||
|
||||
import scala.util.Sorting._
|
||||
|
||||
/**
|
||||
|
@ -92,6 +94,46 @@ extends ShuffleTrackerStrategy with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A simple ShuffleTrackerStrategy that randomly selects mapper for each reducer
|
||||
*/
|
||||
class SelectRandomShuffleTrackerStrategy
|
||||
extends ShuffleTrackerStrategy with Logging {
|
||||
private var numMappers = -1
|
||||
private var outputLocs: Array[SplitInfo] = null
|
||||
|
||||
private var ranGen = new Random
|
||||
|
||||
// The order of elements in the outputLocs (splitIndex) is used to pass
|
||||
// information back and forth between the tracker, mappers, and reducers
|
||||
def initialize(outputLocs_ : Array[SplitInfo]): Unit = {
|
||||
outputLocs = outputLocs_
|
||||
numMappers = outputLocs.size
|
||||
}
|
||||
|
||||
def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized {
|
||||
var splitIndex = -1
|
||||
|
||||
do {
|
||||
splitIndex = ranGen.nextInt(numMappers)
|
||||
} while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex))
|
||||
|
||||
logInfo("%d".format(splitIndex))
|
||||
|
||||
return splitIndex
|
||||
}
|
||||
|
||||
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized {
|
||||
}
|
||||
|
||||
def deleteReducerFrom(reducerSplitInfo: SplitInfo,
|
||||
receptionStat: ReceptionStats): Unit = synchronized {
|
||||
// TODO: This assertion can legally fail when ShuffleClient times out while
|
||||
// waiting for tracker response and decides to go to a random server
|
||||
// assert(curConnectionsPerLoc(receptionStat.serverSplitIndex) >= 0)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shuffle tracker strategy that tries to balance the percentage of blocks
|
||||
* remaining for each reducer
|
||||
|
|
Loading…
Reference in a new issue