Made DFS shuffle's "reduce tasks" fetch inputs in a random order so they

don't all hit the same nodes at the same time.
This commit is contained in:
Matei Zaharia 2010-11-03 22:45:44 -07:00
parent 820dac5afe
commit 44530c310b
2 changed files with 21 additions and 5 deletions

View file

@ -80,8 +80,8 @@ extends Logging
}
val fs = DfsShuffle.getFileSystem()
val outputStreams = (0 until numOutputSplits).map(i => {
val path = new Path(dir, "intermediate-%d-%d".format(myIndex, i))
new ObjectOutputStream(fs.create(path, 1.toShort))
val path = new Path(dir, "%d-to-%d".format(myIndex, i))
new ObjectOutputStream(fs.create(path, true))
}).toArray
for ((k, c) <- combiners) {
val bucket = k.hashCode % numOutputSplits
@ -96,8 +96,8 @@ extends Logging
override def default(key: K) = createCombiner()
}
val fs = DfsShuffle.getFileSystem()
for (i <- 0 until numInputSplits) {
val path = new Path(dir, "intermediate-%d-%d".format(i, myIndex))
for (i <- Utils.shuffle(0 until numInputSplits)) {
val path = new Path(dir, "%d-to-%d".format(i, myIndex))
val inputStream = new ObjectInputStream(fs.open(path))
try {
while (true) {

View file

@ -4,13 +4,14 @@ import java.io._
import java.util.UUID
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
/**
* Various utility methods used by Spark.
*/
object Utils {
def serialize[T](o: T): Array[Byte] = {
val bos = new ByteArrayOutputStream
val bos = new ByteArrayOutputStream()
val oos = new ObjectOutputStream(bos)
oos.writeObject(o)
oos.close
@ -95,4 +96,19 @@ object Utils {
out.close()
}
}
// Shuffle the elements of a collection into a random order, returning the
// result in a new collection. Unlike scala.util.Random.shuffle, this method
// uses a local random number generator, avoiding inter-thread contention.
def shuffle[T](seq: Seq[T]): Seq[T] = {
val buf = ArrayBuffer(seq: _*)
val rand = new Random()
for (i <- (buf.size - 1) to 1 by -1) {
val j = rand.nextInt(i)
val tmp = buf(j)
buf(j) = buf(i)
buf(i) = tmp
}
buf
}
}