Added error messages to the tests to make failed tests less cryptic
This commit is contained in:
parent
abcefb3858
commit
33a0f59354
|
@ -178,20 +178,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
|||
// RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5
|
||||
val data = sc.makeRDD((1 to 9).map( i => (i, (i to (i+2)).map{ j => "m" + (j%6)} )))
|
||||
val coalesced1 = data.coalesce(3)
|
||||
assert(coalesced1.collect().toList.sorted === (1 to 9).toList) // no data lost
|
||||
assert(coalesced1.collect().toList.sorted === (1 to 9).toList, "Data got *lost* in coalescing")
|
||||
|
||||
val splits = coalesced1.glom().collect().map(_.toList).toList
|
||||
assert(splits.length === 3) // ensure it indeed created 3 partitions
|
||||
assert(splits.length == 3, "Supposed to coalesce to 3 but got " + splits.length)
|
||||
|
||||
assert(splits.foldLeft(true)
|
||||
( (x,y) => if (!x) false else y.length >= 1) === true) // (1+ balance)
|
||||
( (x,y) => if (!x) false else y.length >= 1) === true, "Some partitions were empty")
|
||||
|
||||
// If we try to coalesce into more partitions than the original RDD, it should just
|
||||
// keep the original number of partitions.
|
||||
val coalesced4 = data.coalesce(20)
|
||||
assert(coalesced4.glom().collect().map(_.toList).toList.sortWith(
|
||||
(x, y) => if (x.isEmpty) false else if (y.isEmpty) true else x(0) < y(0)) === (1 to 9).
|
||||
map(x => List(x)).toList)
|
||||
map(x => List(x)).toList, "Tried coalescing 9 partitions to 20 but didn't get 9 back")
|
||||
|
||||
|
||||
// large scale experiment
|
||||
|
@ -205,20 +205,20 @@ class RDDSuite extends FunSuite with SharedSparkContext {
|
|||
val blocks = (1 to partitions).map( i => (i, (i to (i+2))
|
||||
.map{ j => machines(rnd.nextInt(machines.size)) } ))
|
||||
|
||||
val data2 = sc.makeRDD(blocks)
|
||||
val data2 = sc.makeRDD(blocks) // .map( i => i*2 )
|
||||
val coalesced2 = data2.coalesce(numMachines*2)
|
||||
|
||||
// test that you get over 90% locality in each group
|
||||
val minLocality = coalesced2.partitions
|
||||
.map( part => part.asInstanceOf[CoalescedRDDPartition].localFraction )
|
||||
.foldLeft(100.)( (perc, loc) => math.min(perc,loc) )
|
||||
assert(minLocality > 0.90)
|
||||
assert(minLocality >= 0.90, "Expected 90% locality but got " + (minLocality*100.).toInt + "%")
|
||||
|
||||
// test that the groups are load balanced with 100 +/- 20 elements in each
|
||||
val maxImbalance = coalesced2.partitions
|
||||
.map( part => part.asInstanceOf[CoalescedRDDPartition].parents.size )
|
||||
.foldLeft(0)((dev, curr) => math.max(math.abs(100-curr),dev))
|
||||
assert(maxImbalance < 20)
|
||||
assert(maxImbalance <= 20, "Expected 100 +/- 20 per partition, but got " + maxImbalance)
|
||||
}
|
||||
|
||||
test("zipped RDDs") {
|
||||
|
|
Loading…
Reference in a new issue