Added a separate thread to deserialize (1 thread per reducer) in CustomParallelLocalFileShuffle

Upside: No synchronized blocking on "combiners" variable. 3x faster :)
Downside: Inefficient implementation. Requiring too much temporary data. Approx. 2x increase in memory requirement :( Should be fixed at some point.
This commit is contained in:
Mosharaf Chowdhury 2010-12-21 21:52:37 -08:00
parent f4d0e917a2
commit 5f0cdabd40
2 changed files with 84 additions and 31 deletions

View file

@ -1 +1 @@
-Dspark.shuffle.class=spark.CustomParallelLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=4 -Dspark.parallelLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxKnockInterval=5000
-Dspark.shuffle.class=spark.CustomParallelLocalFileShuffle -Dspark.blockedLocalFileShuffle.maxRxConnections=2 -Dspark.blockedLocalFileShuffle.blockSize=256 -Dspark.blockedLocalFileShuffle.minKnockInterval=50 -Dspark.parallelLocalFileShuffle.maxRxConnections=2 -Dspark.parallelLocalFileShuffle.maxTxConnections=2 -Dspark.parallelLocalFileShuffle.minKnockInterval=1000 -Dspark.parallelLocalFileShuffle.maxKnockInterval=5000

View file

@ -4,7 +4,7 @@ import
import java.util.{BitSet, Random, Timer, TimerTask, UUID}
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.{Executors, ThreadPoolExecutor, ThreadFactory}
import java.util.concurrent.{LinkedBlockingQueue, Executors, ThreadPoolExecutor, ThreadFactory}
import scala.collection.mutable.{ArrayBuffer, HashMap}
@ -23,6 +23,7 @@ extends Shuffle[K, V, C] with Logging {
@transient var hasSplitsBitVector: BitSet = null
@transient var splitsInRequestBitVector: BitSet = null
@transient var receivedData: LinkedBlockingQueue[(Int, Array[Byte])] = null
@transient var combiners: HashMap[K,C] = null
override def compute(input: RDD[(K, V)],
@ -87,11 +88,19 @@ extends Shuffle[K, V, C] with Logging {
hasSplits = 0
hasSplitsBitVector = new BitSet(totalSplits)
splitsInRequestBitVector = new BitSet(totalSplits)
receivedData = new LinkedBlockingQueue[(Int, Array[Byte])]
combiners = new HashMap[K, C]
var threadPool = CustomParallelLocalFileShuffle.newDaemonFixedThreadPool(
// Start consumer
var shuffleConsumer = new ShuffleConsumer(mergeCombiners)
logInfo("ShuffleConsumer started...")
while (hasSplits < totalSplits) {
var numThreadsToCreate = Math.min(totalSplits,
CustomParallelLocalFileShuffle.MaxRxConnections) -
@ -106,7 +115,7 @@ extends Shuffle[K, V, C] with Logging {
val requestPath = "%d/%d/%d".format(shuffleId, inputId, myId)
threadPool.execute(new ShuffleClient(splitIndex, serverAddress,
serverPort, requestPath, mergeCombiners))
serverPort, requestPath))
// splitIndex is in transit. Will be unset in the ShuffleClient
splitsInRequestBitVector.synchronized {
@ -145,8 +154,57 @@ extends Shuffle[K, V, C] with Logging {
class ShuffleConsumer(mergeCombiners: (C, C) => C)
extends Thread with Logging {
override def run: Unit = {
// Run until all splits are here
while (hasSplits < totalSplits) {
var splitIndex = -1
var recvByteArray: Array[Byte] = null
try {
var tempPair = receivedData.take().asInstanceOf[(Int, Array[Byte])]
splitIndex = tempPair._1
recvByteArray = tempPair._2
} catch {
case e: Exception => {
logInfo("Exception during taking data from receivedData")
val inputStream =
new ObjectInputStream(new ByteArrayInputStream(recvByteArray))
while (true) {
val (k, c) = inputStream.readObject.asInstanceOf[(K, C)]
combiners(k) = combiners.get(k) match {
case Some(oldC) => mergeCombiners(oldC, c)
case None => c
} catch {
case e: EOFException => { }
// Consumption completed. Update stats.
hasSplitsBitVector.synchronized {
hasSplits += 1
// We have received splitIndex
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex, false)
class ShuffleClient(splitIndex: Int, hostAddress: String, listenPort: Int,
requestPath: String, mergeCombiners: (C, C) => C)
requestPath: String)
extends Thread with Logging {
private var peerSocketToSource: Socket = null
private var oosSource: ObjectOutputStream = null
@ -192,35 +250,30 @@ extends Shuffle[K, V, C] with Logging {
val readStartTime = System.currentTimeMillis
logInfo("BEGIN READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath))
// Add this to combiners
val inputStream = new ObjectInputStream(isSource)
while (true) {
val (k, c) = inputStream.readObject.asInstanceOf[(K, C)]
combiners.synchronized {
combiners(k) = combiners.get(k) match {
case Some(oldC) => mergeCombiners(oldC, c)
case None => c
} catch {
case e: EOFException => { }
// Receive data in an Array[Byte]
var recvByteArray = new Array[Byte](requestedFileLen)
var alreadyRead = 0
var bytesRead = 0
// Reception completed. Update stats.
hasSplitsBitVector.synchronized {
hasSplits += 1
// We have received splitIndex
splitsInRequestBitVector.synchronized {
splitsInRequestBitVector.set(splitIndex, false)
while (alreadyRead != requestedFileLen) {
bytesRead =, alreadyRead,
requestedFileLen - alreadyRead)
if (bytesRead > 0) {
alreadyRead = alreadyRead + bytesRead
// Make it available to the consumer
try {
receivedData.put((splitIndex, recvByteArray))
} catch {
case e: Exception => {
logInfo("Exception during putting data into receivedData")
// NOTE: Update of bitVectors are now done by the consumer.
receptionSucceeded = true
logInfo("END READ: http://%s:%d/shuffle/%s".format(hostAddress, listenPort, requestPath))
@ -329,7 +382,7 @@ object CustomParallelLocalFileShuffle extends Logging {
// Create and start the shuffleServer
shuffleServer = new ShuffleServer
logInfo("ShuffleServer started...")
initialized = true