Revamped tracker interface. Now tracker can send back multiple response at the same time.

Turned OFF timers in the reducers due to inconsistent behavior (sometimes they fire, sometimes they don't)
This commit is contained in:
Mosharaf Chowdhury 2011-01-14 00:08:58 -08:00
parent 36b21813fa
commit c71124ed3d
6 changed files with 150 additions and 138 deletions

View file

@ -1,10 +1,10 @@
-Dspark.shuffle.class=spark.TrackedCustomBlockedInMemoryShuffle
-Dspark.shuffle.masterHostAddress=127.0.0.1
-Dspark.shuffle.masterTrackerPort=22222
-Dspark.shuffle.trackerStrategy=spark.LimitConnectionsShuffleTrackerStrategy
-Dspark.shuffle.trackerStrategy=spark.BalanceRemainingShuffleTrackerStrategy
-Dspark.shuffle.maxRxConnections=40
-Dspark.shuffle.maxTxConnections=120
-Dspark.shuffle.blockSize=4096
-Dspark.shuffle.blockSize=512
-Dspark.shuffle.minKnockInterval=100
-Dspark.shuffle.maxKnockInterval=5000
-Dspark.shuffle.maxChatTime=500

View file

@ -23,6 +23,10 @@ trait Shuffle[K, V, C] {
*/
private object Shuffle
extends Logging {
// Tracker communication constants
val ReducerEntering = 0
val ReducerLeaving = 1
// ShuffleTracker info
private var MasterHostAddress_ = System.getProperty(
"spark.shuffle.masterHostAddress", InetAddress.getLocalHost.getHostAddress)
@ -54,7 +58,6 @@ extends Logging {
private var ThrottleFraction_ = System.getProperty(
"spark.shuffle.throttleFraction", "2.0").toDouble
def MasterHostAddress = MasterHostAddress_
def MasterTrackerPort = MasterTrackerPort_

View file

@ -1,7 +1,8 @@
package spark
import java.util.Random
import java.util.{BitSet, Random}
import scala.collection.mutable.ArrayBuffer
import scala.util.Sorting._
/**
@ -11,11 +12,11 @@ trait ShuffleTrackerStrategy {
// Initialize
def initialize(outputLocs_ : Array[SplitInfo]): Unit
// Select a split and send it back
def selectSplit(reducerSplitInfo: SplitInfo): Int
// Select a set of splits and send back
def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int]
// Update internal stats if things could be sent back successfully
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit
// A reducer is done. Update internal stats
def deleteReducerFrom(reducerSplitInfo: SplitInfo,
@ -50,9 +51,11 @@ extends ShuffleTrackerStrategy with Logging {
totalConnectionsPerLoc = Array.tabulate(numSources)(_ => 0)
}
def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized {
def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int] = synchronized {
var minConnections = Int.MaxValue
var splitIndex = -1
var minIndex = -1
var splitIndices = ArrayBuffer[Int]()
for (i <- 0 until numSources) {
// TODO: Use of MaxRxConnections instead of MaxTxConnections is
@ -62,15 +65,19 @@ extends ShuffleTrackerStrategy with Logging {
totalConnectionsPerLoc(i) < minConnections &&
!reducerSplitInfo.hasSplitsBitVector.get(i)) {
minConnections = totalConnectionsPerLoc(i)
splitIndex = i
minIndex = i
}
}
if (minIndex != -1) {
splitIndices += minIndex
}
return splitIndex
return splitIndices
}
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized {
if (splitIndex != -1) {
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit = synchronized {
splitIndices.foreach { splitIndex =>
curConnectionsPerLoc(splitIndex) = curConnectionsPerLoc(splitIndex) + 1
totalConnectionsPerLoc(splitIndex) =
totalConnectionsPerLoc(splitIndex) + 1
@ -111,17 +118,17 @@ extends ShuffleTrackerStrategy with Logging {
numMappers = outputLocs.size
}
def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized {
def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int] = synchronized {
var splitIndex = -1
do {
splitIndex = ranGen.nextInt(numMappers)
} while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex))
return splitIndex
return ArrayBuffer(splitIndex)
}
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized {
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit = synchronized {
}
def deleteReducerFrom(reducerSplitInfo: SplitInfo,
@ -176,7 +183,7 @@ extends ShuffleTrackerStrategy with Logging {
totalConnectionsPerLoc = Array.tabulate(numMappers)(_ => 0)
}
def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized {
def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int] = synchronized {
var splitIndex = -1
// Estimate time remaining to finish receiving for all reducer/mapper pairs
@ -260,11 +267,11 @@ extends ShuffleTrackerStrategy with Logging {
}
}
return splitIndex
return ArrayBuffer(splitIndex)
}
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized {
if (splitIndex != -1) {
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit = synchronized {
splitIndices.foreach { splitIndex =>
curConnectionsPerLoc(splitIndex) += 1
totalConnectionsPerLoc(splitIndex) += 1
}
@ -345,8 +352,8 @@ extends ShuffleTrackerStrategy with Logging {
maxConnectionsPerReducer = Array.tabulate(numReducers)(_ => Shuffle.MaxRxConnections)
}
def selectSplit(reducerSplitInfo: SplitInfo): Int = synchronized {
var splitIndex = -1
def selectSplit(reducerSplitInfo: SplitInfo): ArrayBuffer[Int] = synchronized {
var splitIndices = ArrayBuffer[Int]()
// Estimate time remaining to finish receiving for all reducer/mapper pairs
// If speed is unknown or zero then make it 1 to give a large estimate
@ -389,17 +396,33 @@ extends ShuffleTrackerStrategy with Logging {
// Send back a splitIndex if this reducer is within its limit
if (curConnectionsPerReducer(reducerSplitInfo.splitId) <
maxConnectionsPerReducer(reducerSplitInfo.splitId)) {
var i = maxConnectionsPerReducer(reducerSplitInfo.splitId) -
curConnectionsPerReducer(reducerSplitInfo.splitId)
var temp = reducerSplitInfo.hasSplitsBitVector.clone.asInstanceOf[BitSet]
temp.flip(0, numMappers)
i = Math.min(i, temp.cardinality)
do {
splitIndex = ranGen.nextInt(numMappers)
} while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex))
while (i > 0) {
var splitIndex = -1
do {
splitIndex = ranGen.nextInt(numMappers)
} while (reducerSplitInfo.hasSplitsBitVector.get(splitIndex))
reducerSplitInfo.hasSplitsBitVector.set(splitIndex)
splitIndices += splitIndex
i -= 1
}
}
return splitIndex
return splitIndices
}
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndex: Int): Unit = synchronized {
if (splitIndex != -1) {
def AddReducerToSplit(reducerSplitInfo: SplitInfo, splitIndices: ArrayBuffer[Int]): Unit = synchronized {
splitIndices.foreach { splitIndex =>
curConnectionsPerReducer(reducerSplitInfo.splitId) += 1
}
}

View file

@ -188,20 +188,22 @@ extends Shuffle[K, V, C] with Logging {
// Receive which split to pull from the tracker
logInfo("Talking to tracker...")
val startTime = System.currentTimeMillis
val splitIndex = getTrackerSelectedSplit(myId)
logInfo("Got %d from tracker in %d millis".format(splitIndex, System.currentTimeMillis - startTime))
val splitIndices = getTrackerSelectedSplit(myId)
logInfo("Got %s from tracker in %d millis".format(splitIndices, System.currentTimeMillis - startTime))
if (splitIndex != -1) {
val selectedSplitInfo = outputLocs(splitIndex)
val requestSplit =
"%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId)
if (splitIndices.size > 0) {
splitIndices.foreach { splitIndex =>
val selectedSplitInfo = outputLocs(splitIndex)
val requestSplit =
"%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId)
threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo,
requestSplit, myId))
// splitIndex is in transit. Will be unset in the ShuffleClient
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex)
threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo,
requestSplit, myId))
// splitIndex is in transit. Will be unset in the ShuffleClient
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex)
}
}
} else {
// Tracker replied back with a NO. Sleep for a while.
@ -282,13 +284,13 @@ extends Shuffle[K, V, C] with Logging {
}
// Talks to the tracker and receives instruction
private def getTrackerSelectedSplit(myId: Int): Int = {
private def getTrackerSelectedSplit(myId: Int): ArrayBuffer[Int] = {
// Local status of hasSplitsBitVector and splitsInRequestBitVector
val localSplitInfo = getLocalSplitInfo(myId)
// DO NOT talk to the tracker if all the required splits are already busy
if (localSplitInfo.hasSplitsBitVector.cardinality == totalSplits) {
return -1
return ArrayBuffer[Int]()
}
val clientSocketToTracker = new Socket(Shuffle.MasterHostAddress,
@ -299,30 +301,30 @@ extends Shuffle[K, V, C] with Logging {
val oisTracker =
new ObjectInputStream(clientSocketToTracker.getInputStream)
var selectedSplitIndex = -1
var selectedSplitIndices = ArrayBuffer[Int]()
// Setup the timeout mechanism
var timeOutTask = new TimerTask {
override def run: Unit = {
logInfo("Waited enough for tracker response... Take random response...")
// sockets will be closed in finally
// sockets will be closed in finally
// TODO: Sometimes timer wont go off
// TODO: Selecting randomly here. Tracker won't know about it and get an
// asssertion failure when this thread leaves
selectedSplitIndex = selectRandomSplit
selectedSplitIndices = ArrayBuffer(selectRandomSplit)
}
}
var timeOutTimer = new Timer
// TODO: Which timeout to use?
timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval)
// timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval)
try {
// Send intention
oosTracker.writeObject(
TrackedCustomBlockedInMemoryShuffle.ReducerEntering)
oosTracker.writeObject(Shuffle.ReducerEntering)
oosTracker.flush()
// Send what this reducer has
@ -330,7 +332,7 @@ extends Shuffle[K, V, C] with Logging {
oosTracker.flush()
// Receive reply from the tracker
selectedSplitIndex = oisTracker.readObject.asInstanceOf[Int]
selectedSplitIndices = oisTracker.readObject.asInstanceOf[ArrayBuffer[Int]]
// Turn the timer OFF
timeOutTimer.cancel()
@ -344,7 +346,7 @@ extends Shuffle[K, V, C] with Logging {
clientSocketToTracker.close()
}
return selectedSplitIndex
return selectedSplitIndices
}
class ShuffleTracker(outputLocs: Array[SplitInfo])
@ -391,31 +393,29 @@ extends Shuffle[K, V, C] with Logging {
// Receive intention
val reducerIntention = ois.readObject.asInstanceOf[Int]
if (reducerIntention ==
TrackedCustomBlockedInMemoryShuffle.ReducerEntering) {
if (reducerIntention == Shuffle.ReducerEntering) {
// Receive what the reducer has
val reducerSplitInfo =
ois.readObject.asInstanceOf[SplitInfo]
// Select split and update stats if necessary
var selectedSplitIndex = -1
// Select splits and update stats if necessary
var selectedSplitIndices = ArrayBuffer[Int]()
trackerStrategy.synchronized {
selectedSplitIndex = trackerStrategy.selectSplit(
selectedSplitIndices = trackerStrategy.selectSplit(
reducerSplitInfo)
}
// Send reply back
oos.writeObject(selectedSplitIndex)
oos.writeObject(selectedSplitIndices)
oos.flush()
// Update internal stats, only if receiver got the reply
trackerStrategy.synchronized {
trackerStrategy.AddReducerToSplit(reducerSplitInfo,
selectedSplitIndex)
selectedSplitIndices)
}
}
else if (reducerIntention ==
TrackedCustomBlockedInMemoryShuffle.ReducerLeaving) {
else if (reducerIntention == Shuffle.ReducerLeaving) {
val reducerSplitInfo =
ois.readObject.asInstanceOf[SplitInfo]
@ -647,8 +647,7 @@ extends Shuffle[K, V, C] with Logging {
try {
// Send intention
oosTracker.writeObject(
TrackedCustomBlockedInMemoryShuffle.ReducerLeaving)
oosTracker.writeObject(Shuffle.ReducerLeaving)
oosTracker.flush()
// Send reducerSplitInfo
@ -698,10 +697,6 @@ extends Shuffle[K, V, C] with Logging {
}
object TrackedCustomBlockedInMemoryShuffle extends Logging {
// Tracker communication constants
val ReducerEntering = 0
val ReducerLeaving = 1
// Cache for keeping the splits around
val splitsCache = new HashMap[String, Array[Byte]]

View file

@ -176,20 +176,22 @@ extends Shuffle[K, V, C] with Logging {
// Receive which split to pull from the tracker
logInfo("Talking to tracker...")
val startTime = System.currentTimeMillis
val splitIndex = getTrackerSelectedSplit(myId)
logInfo("Got %d from tracker in %d millis".format(splitIndex, System.currentTimeMillis - startTime))
val splitIndices = getTrackerSelectedSplit(myId)
logInfo("Got %s from tracker in %d millis".format(splitIndices, System.currentTimeMillis - startTime))
if (splitIndex != -1) {
val selectedSplitInfo = outputLocs(splitIndex)
val requestSplit =
"%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId)
if (splitIndices.size > 0) {
splitIndices.foreach { splitIndex =>
val selectedSplitInfo = outputLocs(splitIndex)
val requestSplit =
"%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId)
threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo,
requestSplit, myId))
// splitIndex is in transit. Will be unset in the ShuffleClient
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex)
threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo,
requestSplit, myId))
// splitIndex is in transit. Will be unset in the ShuffleClient
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex)
}
}
} else {
// Tracker replied back with a NO. Sleep for a while.
@ -270,13 +272,13 @@ extends Shuffle[K, V, C] with Logging {
}
// Talks to the tracker and receives instruction
private def getTrackerSelectedSplit(myId: Int): Int = {
private def getTrackerSelectedSplit(myId: Int): ArrayBuffer[Int] = {
// Local status of hasSplitsBitVector and splitsInRequestBitVector
val localSplitInfo = getLocalSplitInfo(myId)
// DO NOT talk to the tracker if all the required splits are already busy
if (localSplitInfo.hasSplitsBitVector.cardinality == totalSplits) {
return -1
return ArrayBuffer[Int]()
}
val clientSocketToTracker = new Socket(Shuffle.MasterHostAddress,
@ -287,30 +289,30 @@ extends Shuffle[K, V, C] with Logging {
val oisTracker =
new ObjectInputStream(clientSocketToTracker.getInputStream)
var selectedSplitIndex = -1
var selectedSplitIndices = ArrayBuffer[Int]()
// Setup the timeout mechanism
var timeOutTask = new TimerTask {
override def run: Unit = {
logInfo("Waited enough for tracker response... Take random response...")
// sockets will be closed in finally
// sockets will be closed in finally
// TODO: Sometimes timer wont go off
// TODO: Selecting randomly here. Tracker won't know about it and get an
// asssertion failure when this thread leaves
selectedSplitIndex = selectRandomSplit
selectedSplitIndices = ArrayBuffer(selectRandomSplit)
}
}
var timeOutTimer = new Timer
// TODO: Which timeout to use?
timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval)
// timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval)
try {
// Send intention
oosTracker.writeObject(
TrackedCustomBlockedLocalFileShuffle.ReducerEntering)
oosTracker.writeObject(Shuffle.ReducerEntering)
oosTracker.flush()
// Send what this reducer has
@ -318,7 +320,7 @@ extends Shuffle[K, V, C] with Logging {
oosTracker.flush()
// Receive reply from the tracker
selectedSplitIndex = oisTracker.readObject.asInstanceOf[Int]
selectedSplitIndices = oisTracker.readObject.asInstanceOf[ArrayBuffer[Int]]
// Turn the timer OFF
timeOutTimer.cancel()
@ -332,7 +334,7 @@ extends Shuffle[K, V, C] with Logging {
clientSocketToTracker.close()
}
return selectedSplitIndex
return selectedSplitIndices
}
class ShuffleTracker(outputLocs: Array[SplitInfo])
@ -379,31 +381,29 @@ extends Shuffle[K, V, C] with Logging {
// Receive intention
val reducerIntention = ois.readObject.asInstanceOf[Int]
if (reducerIntention ==
TrackedCustomBlockedLocalFileShuffle.ReducerEntering) {
if (reducerIntention == Shuffle.ReducerEntering) {
// Receive what the reducer has
val reducerSplitInfo =
ois.readObject.asInstanceOf[SplitInfo]
// Select split and update stats if necessary
var selectedSplitIndex = -1
// Select splits and update stats if necessary
var selectedSplitIndices = ArrayBuffer[Int]()
trackerStrategy.synchronized {
selectedSplitIndex = trackerStrategy.selectSplit(
selectedSplitIndices = trackerStrategy.selectSplit(
reducerSplitInfo)
}
// Send reply back
oos.writeObject(selectedSplitIndex)
oos.writeObject(selectedSplitIndices)
oos.flush()
// Update internal stats, only if receiver got the reply
trackerStrategy.synchronized {
trackerStrategy.AddReducerToSplit(reducerSplitInfo,
selectedSplitIndex)
selectedSplitIndices)
}
}
else if (reducerIntention ==
TrackedCustomBlockedLocalFileShuffle.ReducerLeaving) {
else if (reducerIntention == Shuffle.ReducerLeaving) {
val reducerSplitInfo =
ois.readObject.asInstanceOf[SplitInfo]
@ -635,8 +635,7 @@ extends Shuffle[K, V, C] with Logging {
try {
// Send intention
oosTracker.writeObject(
TrackedCustomBlockedLocalFileShuffle.ReducerLeaving)
oosTracker.writeObject(Shuffle.ReducerLeaving)
oosTracker.flush()
// Send reducerSplitInfo
@ -686,10 +685,6 @@ extends Shuffle[K, V, C] with Logging {
}
object TrackedCustomBlockedLocalFileShuffle extends Logging {
// Tracker communication constants
val ReducerEntering = 0
val ReducerLeaving = 1
private var initialized = false
private var nextShuffleId = new AtomicLong(0)

View file

@ -113,20 +113,23 @@ extends Shuffle[K, V, C] with Logging {
while (hasSplits < totalSplits && numThreadsToCreate > 0) {
// Receive which split to pull from the tracker
logInfo("Talking to tracker...")
val splitIndex = getTrackerSelectedSplit(myId)
logInfo("Got %d from tracker...".format(splitIndex))
val startTime = System.currentTimeMillis
val splitIndices = getTrackerSelectedSplit(myId)
logInfo("Got %s from tracker in %d millis".format(splitIndices, System.currentTimeMillis - startTime))
if (splitIndex != -1) {
val selectedSplitInfo = outputLocs(splitIndex)
val requestSplit =
"%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId)
if (splitIndices.size > 0) {
splitIndices.foreach { splitIndex =>
val selectedSplitInfo = outputLocs(splitIndex)
val requestSplit =
"%d/%d/%d".format(shuffleId, selectedSplitInfo.splitId, myId)
threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo,
requestSplit, myId))
// splitIndex is in transit. Will be unset in the ShuffleClient
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex)
threadPool.execute(new ShuffleClient(splitIndex, selectedSplitInfo,
requestSplit, myId))
// splitIndex is in transit. Will be unset in the ShuffleClient
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex)
}
}
} else {
// Tracker replied back with a NO. Sleep for a while.
@ -200,13 +203,13 @@ extends Shuffle[K, V, C] with Logging {
}
// Talks to the tracker and receives instruction
private def getTrackerSelectedSplit(myId: Int): Int = {
private def getTrackerSelectedSplit(myId: Int): ArrayBuffer[Int] = {
// Local status of hasSplitsBitVector and splitsInRequestBitVector
val localSplitInfo = getLocalSplitInfo(myId)
// DO NOT talk to the tracker if all the required splits are already busy
if (localSplitInfo.hasSplitsBitVector.cardinality == totalSplits) {
return -1
return ArrayBuffer[Int]()
}
val clientSocketToTracker = new Socket(Shuffle.MasterHostAddress,
@ -217,30 +220,30 @@ extends Shuffle[K, V, C] with Logging {
val oisTracker =
new ObjectInputStream(clientSocketToTracker.getInputStream)
var selectedSplitIndex = -1
var selectedSplitIndices = ArrayBuffer[Int]()
// Setup the timeout mechanism
var timeOutTask = new TimerTask {
override def run: Unit = {
logInfo("Waited enough for tracker response... Take random response...")
// sockets will be closed in finally
// sockets will be closed in finally
// TODO: Sometimes timer wont go off
// TODO: Selecting randomly here. Tracker won't know about it and get an
// asssertion failure when this thread leaves
selectedSplitIndex = selectRandomSplit
selectedSplitIndices = ArrayBuffer(selectRandomSplit)
}
}
var timeOutTimer = new Timer
// TODO: Which timeout to use?
timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval)
// timeOutTimer.schedule(timeOutTask, Shuffle.MinKnockInterval)
try {
// Send intention
oosTracker.writeObject(
TrackedCustomParallelLocalFileShuffle.ReducerEntering)
oosTracker.writeObject(Shuffle.ReducerEntering)
oosTracker.flush()
// Send what this reducer has
@ -248,7 +251,7 @@ extends Shuffle[K, V, C] with Logging {
oosTracker.flush()
// Receive reply from the tracker
selectedSplitIndex = oisTracker.readObject.asInstanceOf[Int]
selectedSplitIndices = oisTracker.readObject.asInstanceOf[ArrayBuffer[Int]]
// Turn the timer OFF
timeOutTimer.cancel()
@ -262,7 +265,7 @@ extends Shuffle[K, V, C] with Logging {
clientSocketToTracker.close()
}
return selectedSplitIndex
return selectedSplitIndices
}
class ShuffleTracker(outputLocs: Array[SplitInfo])
@ -309,31 +312,29 @@ extends Shuffle[K, V, C] with Logging {
// Receive intention
val reducerIntention = ois.readObject.asInstanceOf[Int]
if (reducerIntention ==
TrackedCustomParallelLocalFileShuffle.ReducerEntering) {
if (reducerIntention == Shuffle.ReducerEntering) {
// Receive what the reducer has
val reducerSplitInfo =
ois.readObject.asInstanceOf[SplitInfo]
// Select split and update stats if necessary
var selectedSplitIndex = -1
var selectedSplitIndices = ArrayBuffer[Int]()
trackerStrategy.synchronized {
selectedSplitIndex = trackerStrategy.selectSplit(
selectedSplitIndices = trackerStrategy.selectSplit(
reducerSplitInfo)
}
// Send reply back
oos.writeObject(selectedSplitIndex)
oos.writeObject(selectedSplitIndices)
oos.flush()
// Update internal stats, only if receiver got the reply
trackerStrategy.synchronized {
trackerStrategy.AddReducerToSplit(reducerSplitInfo,
selectedSplitIndex)
selectedSplitIndices)
}
}
else if (reducerIntention ==
TrackedCustomParallelLocalFileShuffle.ReducerLeaving) {
else if (reducerIntention == Shuffle.ReducerLeaving) {
val reducerSplitInfo =
ois.readObject.asInstanceOf[SplitInfo]
@ -557,8 +558,7 @@ extends Shuffle[K, V, C] with Logging {
try {
// Send intention
oosTracker.writeObject(
TrackedCustomParallelLocalFileShuffle.ReducerLeaving)
oosTracker.writeObject(Shuffle.ReducerLeaving)
oosTracker.flush()
// Send reducerSplitInfo
@ -606,10 +606,6 @@ extends Shuffle[K, V, C] with Logging {
}
object TrackedCustomParallelLocalFileShuffle extends Logging {
// Tracker communication constants
val ReducerEntering = 0
val ReducerLeaving = 1
private var initialized = false
private var nextShuffleId = new AtomicLong(0)