UseHttpPipelining option is brought back in. It works!

This commit is contained in:
Mosharaf Chowdhury 2010-12-07 10:07:30 -08:00
parent 7e2d72c328
commit f82cc17bc5

View file

@ -68,10 +68,28 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
(myIndex, LocalFileShuffle.serverUri) (myIndex, LocalFileShuffle.serverUri)
}).collect() }).collect()
val splitsByUri = new ArrayBuffer[(String, Int)] // Load config option to decide whether or not to use HTTP pipelining
val UseHttpPipelining =
System.getProperty("spark.shuffle.UseHttpPipelining", "true").toBoolean
// Build a traversable list of pairs of server URI and split. Needs to be
// of type TraversableOnce[(String, ArrayBuffer[Int])]
val splitsByUri = if (UseHttpPipelining) {
// Build a hashmap from server URI to list of splits (to facillitate
// fetching all the URIs on a server within a single connection)
val splitsByUriHM = new HashMap[String, ArrayBuffer[Int]]
for ((inputId, serverUri) <- outputLocs) { for ((inputId, serverUri) <- outputLocs) {
splitsByUri += ((serverUri, inputId)) splitsByUriHM.getOrElseUpdate(serverUri, ArrayBuffer()) += inputId
} }
splitsByUriHM.toArray
} else {
// Don't use HTTP pipelining
val splitsByUriAB = new ArrayBuffer[(String, ArrayBuffer[Int])]
for ((inputId, serverUri) <- outputLocs) {
splitsByUriAB += ((serverUri, new ArrayBuffer[Int] += inputId))
}
splitsByUriAB.toArray
}
// TODO: Could broadcast splitsByUri // TODO: Could broadcast splitsByUri
@ -97,12 +115,10 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
val splitIndex = selectRandomSplit val splitIndex = selectRandomSplit
if (splitIndex != -1) { if (splitIndex != -1) {
val (serverUri, inputId) = splitsByUri (splitIndex) val (serverUri, inputIds) = splitsByUri (splitIndex)
val url =
"%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId)
threadPool.execute ( threadPool.execute ( new ShuffleClient (serverUri, shuffleId.toInt,
new ShuffleClient (url, splitIndex, mergeCombiners)) inputIds, myId, splitIndex, mergeCombiners))
// splitIndex is in transit. Will be unset in the ShuffleClient // splitIndex is in transit. Will be unset in the ShuffleClient
splitsInRequestBitVector.synchronized { splitsInRequestBitVector.synchronized {
@ -138,32 +154,42 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
} }
} }
class ShuffleClient (url: String, splitIndex: Int, class ShuffleClient (serverUri: String, shuffleId: Int,
inputIds: ArrayBuffer[Int], myId: Int, splitIndex: Int,
mergeCombiners: (C, C) => C) mergeCombiners: (C, C) => C)
extends Thread with Logging { extends Thread with Logging {
private var receptionSucceeded = false private var receptionSucceeded = false
override def run: Unit = { override def run: Unit = {
val readStartTime = System.currentTimeMillis
logInfo ("BEGIN READ: " + url)
try { try {
val inputStream = new ObjectInputStream(new URL(url).openStream()) for (inputId <- inputIds) {
try { val url =
while (true) { "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId)
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
combiners.synchronized { val readStartTime = System.currentTimeMillis
combiners(k) = combiners.get(k) match { logInfo ("BEGIN READ: " + url)
case Some(oldC) => mergeCombiners(oldC, c)
case None => c val inputStream = new ObjectInputStream(new URL(url).openStream())
try {
while (true) {
val (k, c) = inputStream.readObject().asInstanceOf[(K, C)]
combiners.synchronized {
combiners(k) = combiners.get(k) match {
case Some(oldC) => mergeCombiners(oldC, c)
case None => c
}
} }
} }
} catch {
case e: EOFException => {}
} }
} catch { inputStream.close()
case e: EOFException => {}
logInfo ("END READ: " + url)
val readTime = (System.currentTimeMillis - readStartTime)
logInfo ("Reading " + url + " took " + readTime + " millis.")
} }
inputStream.close()
// Reception completed. Update stats. // Reception completed. Update stats.
hasSplitsBitVector.synchronized { hasSplitsBitVector.synchronized {
hasSplitsBitVector.set (splitIndex) hasSplitsBitVector.set (splitIndex)
@ -176,10 +202,6 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
} }
receptionSucceeded = true receptionSucceeded = true
logInfo ("END READ: " + url)
val readTime = (System.currentTimeMillis - readStartTime)
logInfo ("Reading " + url + " took " + readTime + " millis.")
} catch { } catch {
// EOFException is expected to happen because sender can break // EOFException is expected to happen because sender can break
// connection due to timeout // connection due to timeout