Fixed a test that was getting extremely lucky before, and increased the

number of samples used for sorting
This commit is contained in:
Matei Zaharia 2012-09-26 00:25:34 -07:00
parent d51d5e0582
commit d71a358c46
3 changed files with 16 additions and 12 deletions

View file

@ -41,9 +41,9 @@ class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
Array()
} else {
val rddSize = rdd.count()
val maxSampleSize = partitions * 10.0
val maxSampleSize = partitions * 20.0
val frac = math.min(maxSampleSize / math.max(rddSize, 1), 1.0)
val rddSample = rdd.sample(true, frac, 1).map(_._1).collect().sortWith(_ < _)
val rddSample = rdd.sample(false, frac, 1).map(_._1).collect().sortWith(_ < _)
if (rddSample.length == 0) {
Array()
} else {

View file

@ -71,7 +71,11 @@ class ShuffledSortedRDD[K <% Ordered[K]: ClassManifest, V](
val buf = new ArrayBuffer[(K, V)]
def addTupleToBuffer(k: K, v: V) = { buf += Tuple(k, v) }
SparkEnv.get.shuffleFetcher.fetch[K, V](dep.shuffleId, split.index, addTupleToBuffer)
buf.sortWith((x, y) => if (ascending) x._1 < y._1 else x._1 > y._1).iterator
if (ascending) {
buf.sortWith((x, y) => x._1 < y._1).iterator
} else {
buf.sortWith((x, y) => x._1 > y._1).iterator
}
}
}

View file

@ -58,11 +58,11 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
val sorted = sc.parallelize(pairArr, 4).sortByKey()
assert(sorted.collect() === pairArr.sortBy(_._1))
val partitions = sorted.collectPartitions()
logInfo("partition lengths: " + partitions.map(_.length).mkString(", "))
partitions(0).length should be > 200
partitions(1).length should be > 200
partitions(2).length should be > 200
partitions(3).length should be > 200
logInfo("Partition lengths: " + partitions.map(_.length).mkString(", "))
partitions(0).length should be > 180
partitions(1).length should be > 180
partitions(2).length should be > 180
partitions(3).length should be > 180
partitions(0).last should be < partitions(1).head
partitions(1).last should be < partitions(2).head
partitions(2).last should be < partitions(3).head
@ -75,10 +75,10 @@ class SortingSuite extends FunSuite with BeforeAndAfter with ShouldMatchers with
assert(sorted.collect() === pairArr.sortBy(_._1).reverse)
val partitions = sorted.collectPartitions()
logInfo("partition lengths: " + partitions.map(_.length).mkString(", "))
partitions(0).length should be > 200
partitions(1).length should be > 200
partitions(2).length should be > 200
partitions(3).length should be > 200
partitions(0).length should be > 180
partitions(1).length should be > 180
partitions(2).length should be > 180
partitions(3).length should be > 180
partitions(0).last should be > partitions(1).head
partitions(1).last should be > partitions(2).head
partitions(2).last should be > partitions(3).head