Enabling/disabling HTTP pipelining is a config option now. Performance tradeoffs are not obvious yet.
This commit is contained in:
parent
8494b3a4f9
commit
0de859fbe2
|
@ -1 +1 @@
|
|||
-Dspark.shuffle.class=spark.LocalFileShuffle
|
||||
-Dspark.shuffle.class=spark.LocalFileShuffle -Dspark.shuffle.UseHttpPipelining=false
|
||||
|
|
|
@ -60,11 +60,27 @@ class LocalFileShuffle[K, V, C] extends Shuffle[K, V, C] with Logging {
|
|||
(myIndex, LocalFileShuffle.serverUri)
|
||||
}).collect()
|
||||
|
||||
// Build a hashmap from server URI to list of splits (to facillitate
|
||||
// fetching all the URIs on a server within a single connection)
|
||||
val splitsByUri = new HashMap[String, ArrayBuffer[Int]]
|
||||
for ((inputId, serverUri) <- outputLocs) {
|
||||
splitsByUri.getOrElseUpdate(serverUri, ArrayBuffer()) += inputId
|
||||
// Load config option to decide whether or not to use HTTP pipelining
|
||||
val UseHttpPipelining =
|
||||
System.getProperty("spark.shuffle.UseHttpPipelining", "false").toBoolean
|
||||
|
||||
// Build a traversable list of pairs of server URI and split. Needs to be
|
||||
// of type TraversableOnce[(String, ArrayBuffer[Int])]
|
||||
val splitsByUri = if (UseHttpPipelining) {
|
||||
// Build a hashmap from server URI to list of splits (to facillitate
|
||||
// fetching all the URIs on a server within a single connection)
|
||||
val splitsByUriHM = new HashMap[String, ArrayBuffer[Int]]
|
||||
for ((inputId, serverUri) <- outputLocs) {
|
||||
splitsByUriHM.getOrElseUpdate(serverUri, ArrayBuffer()) += inputId
|
||||
}
|
||||
splitsByUriHM
|
||||
} else {
|
||||
// Don't use HTTP pipelining
|
||||
val splitsByUriAB = new ArrayBuffer[(String, ArrayBuffer[Int])]
|
||||
for ((inputId, serverUri) <- outputLocs) {
|
||||
splitsByUriAB += ((serverUri, new ArrayBuffer[Int] += inputId))
|
||||
}
|
||||
splitsByUriAB
|
||||
}
|
||||
|
||||
// TODO: Could broadcast splitsByUri
|
||||
|
@ -110,7 +126,7 @@ object LocalFileShuffle extends Logging {
|
|||
private var shuffleDir: File = null
|
||||
private var server: HttpServer = null
|
||||
private var serverUri: String = null
|
||||
|
||||
|
||||
private def initializeIfNeeded() = synchronized {
|
||||
if (!initialized) {
|
||||
// TODO: localDir should be created by some mechanism common to Spark
|
||||
|
@ -162,7 +178,7 @@ object LocalFileShuffle extends Logging {
|
|||
logInfo ("Local URI: " + serverUri)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def getOutputFile(shuffleId: Long, inputId: Int, outputId: Int): File = {
|
||||
initializeIfNeeded()
|
||||
val dir = new File(shuffleDir, shuffleId + "/" + inputId)
|
||||
|
|
Loading…
Reference in a new issue