diff --git a/src/scala/spark/LocalFileShuffle.scala b/src/scala/spark/LocalFileShuffle.scala index a015f53945..b70315deff 100644 --- a/src/scala/spark/LocalFileShuffle.scala +++ b/src/scala/spark/LocalFileShuffle.scala @@ -68,10 +68,28 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { (myIndex, LocalFileShuffle.serverUri) }).collect() - val splitsByUri = new ArrayBuffer[(String, Int)] + // Load config option to decide whether or not to use HTTP pipelining + val UseHttpPipelining = + System.getProperty("spark.shuffle.UseHttpPipelining", "true").toBoolean + + // Build a traversable list of pairs of server URI and split. Needs to be + // of type TraversableOnce[(String, ArrayBuffer[Int])] + val splitsByUri = if (UseHttpPipelining) { + // Build a hashmap from server URI to list of splits (to facillitate + // fetching all the URIs on a server within a single connection) + val splitsByUriHM = new HashMap[String, ArrayBuffer[Int]] for ((inputId, serverUri) <- outputLocs) { - splitsByUri += ((serverUri, inputId)) + splitsByUriHM.getOrElseUpdate(serverUri, ArrayBuffer()) += inputId } + splitsByUriHM.toArray + } else { + // Don't use HTTP pipelining + val splitsByUriAB = new ArrayBuffer[(String, ArrayBuffer[Int])] + for ((inputId, serverUri) <- outputLocs) { + splitsByUriAB += ((serverUri, new ArrayBuffer[Int] += inputId)) + } + splitsByUriAB.toArray + } // TODO: Could broadcast splitsByUri @@ -97,12 +115,10 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { val splitIndex = selectRandomSplit if (splitIndex != -1) { - val (serverUri, inputId) = splitsByUri (splitIndex) - val url = - "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId) + val (serverUri, inputIds) = splitsByUri (splitIndex) - threadPool.execute ( - new ShuffleClient (url, splitIndex, mergeCombiners)) + threadPool.execute ( new ShuffleClient (serverUri, shuffleId.toInt, + inputIds, myId, splitIndex, mergeCombiners)) // splitIndex is in transit. Will be unset in the ShuffleClient splitsInRequestBitVector.synchronized { @@ -138,32 +154,42 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } } - class ShuffleClient (url: String, splitIndex: Int, + class ShuffleClient (serverUri: String, shuffleId: Int, + inputIds: ArrayBuffer[Int], myId: Int, splitIndex: Int, mergeCombiners: (C, C) => C) extends Thread with Logging { private var receptionSucceeded = false override def run: Unit = { - val readStartTime = System.currentTimeMillis - logInfo ("BEGIN READ: " + url) - try { - val inputStream = new ObjectInputStream(new URL(url).openStream()) - try { - while (true) { - val (k, c) = inputStream.readObject().asInstanceOf[(K, C)] - combiners.synchronized { - combiners(k) = combiners.get(k) match { - case Some(oldC) => mergeCombiners(oldC, c) - case None => c + for (inputId <- inputIds) { + val url = + "%s/shuffle/%d/%d/%d".format(serverUri, shuffleId, inputId, myId) + + val readStartTime = System.currentTimeMillis + logInfo ("BEGIN READ: " + url) + + val inputStream = new ObjectInputStream(new URL(url).openStream()) + try { + while (true) { + val (k, c) = inputStream.readObject().asInstanceOf[(K, C)] + combiners.synchronized { + combiners(k) = combiners.get(k) match { + case Some(oldC) => mergeCombiners(oldC, c) + case None => c + } } } + } catch { + case e: EOFException => {} } - } catch { - case e: EOFException => {} + inputStream.close() + + logInfo ("END READ: " + url) + val readTime = (System.currentTimeMillis - readStartTime) + logInfo ("Reading " + url + " took " + readTime + " millis.") } - inputStream.close() - + // Reception completed. Update stats. hasSplitsBitVector.synchronized { hasSplitsBitVector.set (splitIndex) @@ -176,10 +202,6 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging { } receptionSucceeded = true - - logInfo ("END READ: " + url) - val readTime = (System.currentTimeMillis - readStartTime) - logInfo ("Reading " + url + " took " + readTime + " millis.") } catch { // EOFException is expected to happen because sender can break // connection due to timeout