SPARK-1770: Load balance elements when repartitioning.

This patch adds better balancing when performing a repartition of an
RDD. Previously the elements in the RDD were hash partitioned, meaning
if the RDD was skewed certain partitions would end up being very large.

This commit adds load balancing of elements across the repartitioned
RDD splits. The load balancing is not perfect: a given output partition
can have up to N more elements than the average if there are N input
partitions. However, some randomization is used to minimize the
probabiliy that this happens.

Author: Patrick Wendell <pwendell@gmail.com>

Closes #727 from pwendell/load-balance and squashes the following commits:

f9da752 [Patrick Wendell] Response to Matei's feedback
acfa46a [Patrick Wendell] SPARK-1770: Load balance elements when repartitioning.
This commit is contained in:
Patrick Wendell 2014-05-11 17:11:55 -07:00
parent 6bee01dd04
commit 7d9cc9214b
2 changed files with 46 additions and 2 deletions

View file

@ -328,11 +328,22 @@ abstract class RDD[T: ClassTag](
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = {
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1
(position, t)
}
}
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[T, Null, (T, Null)](map(x => (x, null)),
new ShuffledRDD[Int, T, (Int, T)](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions).keys
numPartitions).values
} else {
new CoalescedRDD(this, numPartitions)
}

View file

@ -202,6 +202,39 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
}
test("repartitioned RDDs perform load balancing") {
// Coalesce partitions
val input = Array.fill(1000)(1)
val initialPartitions = 10
val data = sc.parallelize(input, initialPartitions)
val repartitioned1 = data.repartition(2)
assert(repartitioned1.partitions.size == 2)
val partitions1 = repartitioned1.glom().collect()
// some noise in balancing is allowed due to randomization
assert(math.abs(partitions1(0).length - 500) < initialPartitions)
assert(math.abs(partitions1(1).length - 500) < initialPartitions)
assert(repartitioned1.collect() === input)
def testSplitPartitions(input: Seq[Int], initialPartitions: Int, finalPartitions: Int) {
val data = sc.parallelize(input, initialPartitions)
val repartitioned = data.repartition(finalPartitions)
assert(repartitioned.partitions.size === finalPartitions)
val partitions = repartitioned.glom().collect()
// assert all elements are present
assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq)
// assert no bucket is overloaded
for (partition <- partitions) {
val avg = input.size / finalPartitions
val maxPossible = avg + initialPartitions
assert(partition.length <= maxPossible)
}
}
testSplitPartitions(Array.fill(100)(1), 10, 20)
testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100)
}
test("coalesced RDDs") {
val data = sc.parallelize(1 to 10, 10)