Refactored tests to share SparkContexts in some of them

Creating these seems to take a while and clutters the output with Akka
stuff, so it would be nice to share them.
This commit is contained in:
Matei Zaharia 2013-06-25 19:18:30 -04:00
parent 2bd04c3513
commit 9f0d913295
10 changed files with 373 additions and 425 deletions

View file

@ -27,6 +27,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
test("basic checkpointing") {
val parCollection = sc.makeRDD(1 to 4)
val flatMappedRDD = parCollection.flatMap(x => 1 to x)
assert(flatMappedRDD.dependencies.head.rdd == parCollection)
val result = flatMappedRDD.collect()
assert(flatMappedRDD.dependencies.head.rdd != parCollection)
assert(flatMappedRDD.collect() === result)
test("RDDs with one-to-one dependencies") {
testCheckpointing( => x.toString))
testCheckpointing(_.flatMap(x => 1 to x))

View file

@ -0,0 +1,287 @@
package spark
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashSet
import org.scalatest.FunSuite
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
import org.scalacheck.Prop._
import spark.rdd.ShuffledRDD
import spark.SparkContext._
class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
test("groupByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with duplicates") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with negative key hash codes") {
val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesForMinus1 = groups.find(_._1 == -1).get._2
assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with many output partitions") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
val groups = pairs.groupByKey(10).collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("reduceByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
test("reduceByKey with collectAsMap") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_).collectAsMap()
assert(sums.size === 2)
assert(sums(1) === 7)
assert(sums(2) === 1)
test("reduceByKey with many output partitons") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_, 10).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
test("reduceByKey with partitioner") {
val p = new Partitioner() {
def numPartitions = 2
def getPartition(key: Any) = key.asInstanceOf[Int]
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
val sums = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set((1, 4), (0, 1)))
assert(sums.partitioner === Some(p))
// count the dependencies to make sure there is only 1 ShuffledRDD
val deps = new HashSet[RDD[_]]()
def visit(r: RDD[_]) {
for (dep <- r.dependencies) {
deps += dep.rdd
assert(deps.size === 2) // ShuffledRDD, ParallelCollection
test("join") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
(2, (1, 'y')),
(2, (1, 'z'))
test("join all-to-all") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (1, 'y')),
(1, (2, 'x')),
(1, (2, 'y')),
(1, (3, 'x')),
(1, (3, 'y'))
test("leftOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.leftOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (1, Some('x'))),
(1, (2, Some('x'))),
(2, (1, Some('y'))),
(2, (1, Some('z'))),
(3, (1, None))
test("rightOuterJoin") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.rightOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (Some(1), 'x')),
(1, (Some(2), 'x')),
(2, (Some(1), 'y')),
(2, (Some(1), 'z')),
(4, (None, 'w'))
test("join with no matches") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 0)
test("join with many output partitions") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2, 10).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
(2, (1, 'y')),
(2, (1, 'z'))
test("groupWith") {
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.groupWith(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
(2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
(3, (ArrayBuffer(1), ArrayBuffer())),
(4, (ArrayBuffer(), ArrayBuffer('w')))
test("zero-partition RDD") {
val emptyDir = Files.createTempDir()
val file = sc.textFile(emptyDir.getAbsolutePath)
assert(file.partitions.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
assert( => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
test("keys and values") {
val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
assert(rdd.keys.collect().toList === List(1, 2))
assert(rdd.values.collect().toList === List("a", "b"))
test("default partitioner uses partition size") {
// specify 2000 partitions
val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
// do a map, which loses the partitioner
val b = => (a, (a * 2).toString))
// then a group by, and see we didn't revert to 2 partitions
val c = b.groupByKey()
assert(c.partitions.size === 2000)
test("default partitioner uses largest partitioner") {
val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
val c = a.join(b)
assert(c.partitions.size === 2000)
test("subtract") {
val a = sc.parallelize(Array(1, 2, 3), 2)
val b = sc.parallelize(Array(2, 3, 4), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set(1))
assert(c.partitions.size === a.partitions.size)
test("subtract with narrow dependency") {
// use a deterministic partitioner
val p = new Partitioner() {
def numPartitions = 5
def getPartition(key: Any) = key.asInstanceOf[Int]
// partitionBy so we have a narrow dependency
val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
// more partitions/no partitioner so a shuffle dependency
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set((1, "a"), (3, "c")))
// Ideally we could keep the original partitioner...
assert(c.partitioner === None)
test("subtractByKey") {
val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
val c = a.subtractByKey(b)
assert(c.collect().toSet === Set((1, "a"), (1, "a")))
assert(c.partitions.size === a.partitions.size)
test("subtractByKey with narrow dependency") {
// use a deterministic partitioner
val p = new Partitioner() {
def numPartitions = 5
def getPartition(key: Any) = key.asInstanceOf[Int]
// partitionBy so we have a narrow dependency
val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
// more partitions/no partitioner so a shuffle dependency
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
val c = a.subtractByKey(b)
assert(c.collect().toSet === Set((1, "a"), (1, "a")))
assert(c.partitioner.get === p)
test("foldByKey") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.foldByKey(0)(_+_).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
test("foldByKey with mutable result type") {
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
// Fold the values using in-place mutation
val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
// Check that the mutable objects in the original RDD were not changed
assert(bufs.collect().toSet === Set(
(1, ArrayBuffer(1)),
(1, ArrayBuffer(2)),
(1, ArrayBuffer(3)),
(1, ArrayBuffer(1)),
(2, ArrayBuffer(1))))

View file

@ -6,8 +6,8 @@ import SparkContext._
import spark.util.StatCounter
import scala.math.abs
class PartitioningSuite extends FunSuite with LocalSparkContext {
class PartitioningSuite extends FunSuite with SharedSparkContext {
test("HashPartitioner equality") {
val p2 = new HashPartitioner(2)
val p4 = new HashPartitioner(4)
@ -21,8 +21,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
test("RangePartitioner equality") {
sc = new SparkContext("local", "test")
// Make an RDD where all the elements are the same so that the partition range bounds
// are deterministically all the same.
val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x))
@ -50,7 +48,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
test("HashPartitioner not equal to RangePartitioner") {
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(1 to 10).map(x => (x, x))
val rangeP2 = new RangePartitioner(2, rdd)
val hashP2 = new HashPartitioner(2)
@ -61,8 +58,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
test("partitioner preservation") {
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
val grouped2 = rdd.groupByKey(2)
@ -101,7 +96,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
test("partitioning Java arrays should fail") {
sc = new SparkContext("local", "test")
val arrs: RDD[Array[Int]] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => Array(x))
val arrPairs: RDD[(Array[Int], Int)] =
sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x))
@ -120,21 +114,20 @@ class PartitioningSuite extends FunSuite with LocalSparkContext {
assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array"))
assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array"))
test("Zero-length partitions should be correctly handled") {
test("zero-length partitions should be correctly handled") {
// Create RDD with some consecutive empty partitions (including the "first" one)
sc = new SparkContext("local", "test")
val rdd: RDD[Double] = sc
.parallelize(Array(-1.0, -1.0, -1.0, -1.0, 2.0, 4.0, -1.0, -1.0), 8)
.filter(_ >= 0.0)
// Run the partitions, including the consecutive empty ones, through StatCounter
val stats: StatCounter = rdd.stats();
assert(abs(6.0 - stats.sum) < 0.01);
assert(abs(6.0/2 - rdd.mean) < 0.01);
assert(abs(1.0 - rdd.variance) < 0.01);
assert(abs(1.0 - rdd.stdev) < 0.01);
// Add other tests here for classes that should be able to handle empty partitions correctly

View file

@ -3,10 +3,9 @@ package spark
import org.scalatest.FunSuite
import SparkContext._
class PipedRDDSuite extends FunSuite with LocalSparkContext {
class PipedRDDSuite extends FunSuite with SharedSparkContext {
test("basic pipe") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"))
@ -20,12 +19,11 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
test("advanced pipe") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val bl = sc.broadcast(List("0"))
val piped = nums.pipe(Seq("cat"),
Map[String, String](),
val piped = nums.pipe(Seq("cat"),
Map[String, String](),
(f: String => Unit) => {;f("\u0001")},
(i:Int, f: String=> Unit) => f(i + "_"))
@ -43,8 +41,8 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str=>str.split("\t")(0)).
Map[String, String](),
Map[String, String](),
(f: String => Unit) => {;f("\u0001")},
(i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect()
assert(d.size === 8)
@ -59,7 +57,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
test("pipe with env variable") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
val c = piped.collect()
@ -69,7 +66,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext {
test("pipe with non-zero exit status") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe("cat nonexistent_file")
intercept[SparkException] {

View file

@ -7,10 +7,9 @@ import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD}
class RDDSuite extends FunSuite with LocalSparkContext {
class RDDSuite extends FunSuite with SharedSparkContext {
test("basic operations") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2)
@ -46,7 +45,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("SparkContext.union") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(sc.union(nums).collect().toList === List(1, 2, 3, 4))
assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4))
@ -55,7 +53,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("aggregate") {
sc = new SparkContext("local", "test")
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
type StringMap = HashMap[String, Int]
val emptyMap = new StringMap {
@ -75,57 +72,14 @@ class RDDSuite extends FunSuite with LocalSparkContext {
assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))
test("basic checkpointing") {
val checkpointDir = File.createTempFile("temp", "")
sc = new SparkContext("local", "test")
val parCollection = sc.makeRDD(1 to 4)
val flatMappedRDD = parCollection.flatMap(x => 1 to x)
assert(flatMappedRDD.dependencies.head.rdd == parCollection)
val result = flatMappedRDD.collect()
assert(flatMappedRDD.dependencies.head.rdd != parCollection)
assert(flatMappedRDD.collect() === result)
test("basic caching") {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
assert(rdd.collect().toList === List(1, 2, 3, 4))
assert(rdd.collect().toList === List(1, 2, 3, 4))
assert(rdd.collect().toList === List(1, 2, 3, 4))
test("unpersist RDD") {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
assert(sc.persistentRdds.isEmpty === false)
assert(sc.persistentRdds.isEmpty === true)
failAfter(Span(3000, Millis)) {
try {
while (! sc.getRDDStorageInfo.isEmpty) {
} catch {
case _ => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
assert(sc.getRDDStorageInfo.isEmpty === true)
test("caching with failures") {
sc = new SparkContext("local", "test")
val onlySplit = new Partition { override def index: Int = 0 }
var shouldFail = true
val rdd = new RDD[Int](sc, Nil) {
@ -148,7 +102,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("empty RDD") {
sc = new SparkContext("local", "test")
val empty = new EmptyRDD[Int](sc)
assert(empty.count === 0)
assert(empty.collect().size === 0)
@ -168,37 +121,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("cogrouped RDDs") {
sc = new SparkContext("local", "test")
val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2)
val rdd2 = sc.makeRDD(Array((1, "one1"), (1, "another one1"), (2, "two1")), 2)
// Use cogroup function
val cogrouped = rdd1.cogroup(rdd2).collectAsMap()
assert(cogrouped(1) === (Seq("one", "another one"), Seq("one1", "another one1")))
assert(cogrouped(2) === (Seq("two"), Seq("two1")))
assert(cogrouped(3) === (Seq("three"), Seq()))
// Construct CoGroupedRDD directly, with map side combine enabled
val cogrouped1 = new CoGroupedRDD[Int](
Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
new HashPartitioner(3),
assert(cogrouped1(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
assert(cogrouped1(2).toSeq === Seq(Seq("two"), Seq("two1")))
assert(cogrouped1(3).toSeq === Seq(Seq("three"), Seq()))
// Construct CoGroupedRDD directly, with map side combine disabled
val cogrouped2 = new CoGroupedRDD[Int](
Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]),
new HashPartitioner(3),
assert(cogrouped2(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1")))
assert(cogrouped2(2).toSeq === Seq(Seq("two"), Seq("two1")))
assert(cogrouped2(3).toSeq === Seq(Seq("three"), Seq()))
test("coalesced RDDs") {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
val coalesced1 = data.coalesce(2)
@ -236,7 +158,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("zipped RDDs") {
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val zipped = + 1.0))
assert(zipped.glom().map(_.toList).collect().toList ===
@ -248,7 +169,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("partition pruning") {
sc = new SparkContext("local", "test")
val data = sc.parallelize(1 to 10, 10)
// Note that split number starts from 0, so > 8 means only 10th partition left.
val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8)
@ -260,7 +180,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("mapWith") {
import java.util.Random
sc = new SparkContext("local", "test")
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
val randoms = ones.mapWith(
(index: Int) => new Random(index + 42))
@ -279,7 +198,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("flatMapWith") {
import java.util.Random
sc = new SparkContext("local", "test")
val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2)
val randoms = ones.flatMapWith(
(index: Int) => new Random(index + 42))
@ -301,7 +219,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("filterWith") {
import java.util.Random
sc = new SparkContext("local", "test")
val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2)
val sample = ints.filterWith(
(index: Int) => new Random(index + 42))
@ -319,7 +236,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("top with predefined ordering") {
sc = new SparkContext("local", "test")
val nums = Array.range(1, 100000)
val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2)
val topK =
@ -328,7 +244,6 @@ class RDDSuite extends FunSuite with LocalSparkContext {
test("top with custom ordering") {
sc = new SparkContext("local", "test")
val words = Vector("a", "b", "c", "d")
implicit val ord = implicitly[Ordering[String]].reverse
val rdd = sc.makeRDD(words, 2)

View file

@ -0,0 +1,25 @@
package spark
import org.scalatest.Suite
import org.scalatest.BeforeAndAfterAll
/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */
trait SharedSparkContext extends BeforeAndAfterAll { self: Suite =>
@transient private var _sc: SparkContext = _
def sc: SparkContext = _sc
override def beforeAll() {
_sc = new SparkContext("local", "test")
override def afterAll() {
if (_sc != null) {
_sc = null

View file

@ -16,54 +16,9 @@ import spark.rdd.ShuffledRDD
import spark.SparkContext._
class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
test("groupByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with duplicates") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with negative key hash codes") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
val valuesForMinus1 = groups.find(_._1 == -1).get._2
assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with many output partitions") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
val groups = pairs.groupByKey(10).collect()
assert(groups.size === 2)
val valuesFor1 = groups.find(_._1 == 1).get._2
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with compression") {
try {
System.setProperty("spark.blockManager.compress", "true")
System.setProperty("spark.shuffle.compress", "true")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4)
val groups = pairs.groupByKey(4).collect()
@ -77,234 +32,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
test("reduceByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
test("reduceByKey with collectAsMap") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_).collectAsMap()
assert(sums.size === 2)
assert(sums(1) === 7)
assert(sums(2) === 1)
test("reduceByKey with many output partitons") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_, 10).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
test("reduceByKey with partitioner") {
sc = new SparkContext("local", "test")
val p = new Partitioner() {
def numPartitions = 2
def getPartition(key: Any) = key.asInstanceOf[Int]
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p)
val sums = pairs.reduceByKey(_+_)
assert(sums.collect().toSet === Set((1, 4), (0, 1)))
assert(sums.partitioner === Some(p))
// count the dependencies to make sure there is only 1 ShuffledRDD
val deps = new HashSet[RDD[_]]()
def visit(r: RDD[_]) {
for (dep <- r.dependencies) {
deps += dep.rdd
assert(deps.size === 2) // ShuffledRDD, ParallelCollection
test("join") {
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
(2, (1, 'y')),
(2, (1, 'z'))
test("join all-to-all") {
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 6)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (1, 'y')),
(1, (2, 'x')),
(1, (2, 'y')),
(1, (3, 'x')),
(1, (3, 'y'))
test("leftOuterJoin") {
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.leftOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (1, Some('x'))),
(1, (2, Some('x'))),
(2, (1, Some('y'))),
(2, (1, Some('z'))),
(3, (1, None))
test("rightOuterJoin") {
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.rightOuterJoin(rdd2).collect()
assert(joined.size === 5)
assert(joined.toSet === Set(
(1, (Some(1), 'x')),
(1, (Some(2), 'x')),
(2, (Some(1), 'y')),
(2, (Some(1), 'z')),
(4, (None, 'w'))
test("join with no matches") {
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 0)
test("join with many output partitions") {
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2, 10).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (1, 'x')),
(1, (2, 'x')),
(2, (1, 'y')),
(2, (1, 'z'))
test("groupWith") {
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.groupWith(rdd2).collect()
assert(joined.size === 4)
assert(joined.toSet === Set(
(1, (ArrayBuffer(1, 2), ArrayBuffer('x'))),
(2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))),
(3, (ArrayBuffer(1), ArrayBuffer())),
(4, (ArrayBuffer(), ArrayBuffer('w')))
test("zero-partition RDD") {
sc = new SparkContext("local", "test")
val emptyDir = Files.createTempDir()
val file = sc.textFile(emptyDir.getAbsolutePath)
assert(file.partitions.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
assert( => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
test("keys and values") {
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(Array((1, "a"), (2, "b")))
assert(rdd.keys.collect().toList === List(1, 2))
assert(rdd.values.collect().toList === List("a", "b"))
test("default partitioner uses partition size") {
sc = new SparkContext("local", "test")
// specify 2000 partitions
val a = sc.makeRDD(Array(1, 2, 3, 4), 2000)
// do a map, which loses the partitioner
val b = => (a, (a * 2).toString))
// then a group by, and see we didn't revert to 2 partitions
val c = b.groupByKey()
assert(c.partitions.size === 2000)
test("default partitioner uses largest partitioner") {
sc = new SparkContext("local", "test")
val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
val c = a.join(b)
assert(c.partitions.size === 2000)
test("subtract") {
sc = new SparkContext("local", "test")
val a = sc.parallelize(Array(1, 2, 3), 2)
val b = sc.parallelize(Array(2, 3, 4), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set(1))
assert(c.partitions.size === a.partitions.size)
test("subtract with narrow dependency") {
sc = new SparkContext("local", "test")
// use a deterministic partitioner
val p = new Partitioner() {
def numPartitions = 5
def getPartition(key: Any) = key.asInstanceOf[Int]
// partitionBy so we have a narrow dependency
val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
// more partitions/no partitioner so a shuffle dependency
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
val c = a.subtract(b)
assert(c.collect().toSet === Set((1, "a"), (3, "c")))
// Ideally we could keep the original partitioner...
assert(c.partitioner === None)
test("subtractByKey") {
sc = new SparkContext("local", "test")
val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2)
val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4)
val c = a.subtractByKey(b)
assert(c.collect().toSet === Set((1, "a"), (1, "a")))
assert(c.partitions.size === a.partitions.size)
test("subtractByKey with narrow dependency") {
sc = new SparkContext("local", "test")
// use a deterministic partitioner
val p = new Partitioner() {
def numPartitions = 5
def getPartition(key: Any) = key.asInstanceOf[Int]
// partitionBy so we have a narrow dependency
val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p)
// more partitions/no partitioner so a shuffle dependency
val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4)
val c = a.subtractByKey(b)
assert(c.collect().toSet === Set((1, "a"), (1, "a")))
assert(c.partitioner.get === p)
test("shuffle non-zero block size") {
sc = new SparkContext("local-cluster[2,1,512]", "test")
val NUM_BLOCKS = 3
@ -391,29 +118,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext {
// We should have at most 4 non-zero sized partitions
assert(nonEmptyBlocks.size <= 4)
test("foldByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.foldByKey(0)(_+_).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
test("foldByKey with mutable result type") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache()
// Fold the values using in-place mutation
val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect()
assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1))))
// Check that the mutable objects in the original RDD were not changed
assert(bufs.collect().toSet === Set(
(1, ArrayBuffer(1)),
(1, ArrayBuffer(2)),
(1, ArrayBuffer(3)),
(1, ArrayBuffer(1)),
(2, ArrayBuffer(1))))
object ShuffleSuite {

View file

@ -5,16 +5,14 @@ import org.scalatest.BeforeAndAfter
import org.scalatest.matchers.ShouldMatchers
import SparkContext._
class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers with Logging {
class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers with Logging {
test("sortByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2)
assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
test("large array") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@ -24,7 +22,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
test("large array with one split") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@ -32,9 +29,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
assert(sorted.partitions.size === 1)
assert(sorted.collect() === pairArr.sortBy(_._1))
test("large array with many partitions") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@ -42,9 +38,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
assert(sorted.partitions.size === 20)
assert(sorted.collect() === pairArr.sortBy(_._1))
test("sort descending") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@ -52,15 +47,13 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
test("sort descending with one split") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 1)
assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
test("sort descending with many partitions") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 2)
@ -68,7 +61,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
test("more partitions than elements") {
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 30)
@ -76,14 +68,12 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
test("empty RDD") {
sc = new SparkContext("local", "test")
val pairArr = new Array[(Int, Int)](0)
val pairs = sc.parallelize(pairArr, 2)
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
test("partition balancing") {
sc = new SparkContext("local", "test")
val pairArr = (1 to 1000).map(x => (x, x)).toArray
val sorted = sc.parallelize(pairArr, 4).sortByKey()
assert(sorted.collect() === pairArr.sortBy(_._1))
@ -99,7 +89,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w
test("partition balancing for descending sort") {
sc = new SparkContext("local", "test")
val pairArr = (1 to 1000).map(x => (x, x)).toArray
val sorted = sc.parallelize(pairArr, 4).sortByKey(false)
assert(sorted.collect() === pairArr.sortBy(_._1).reverse)

View file

@ -0,0 +1,30 @@
package spark
import org.scalatest.FunSuite
import org.scalatest.concurrent.Timeouts._
import org.scalatest.time.{Span, Millis}
import spark.SparkContext._
class UnpersistSuite extends FunSuite with LocalSparkContext {
test("unpersist RDD") {
sc = new SparkContext("local", "test")
val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache()
assert(sc.persistentRdds.isEmpty === false)
assert(sc.persistentRdds.isEmpty === true)
failAfter(Span(3000, Millis)) {
try {
while (! sc.getRDDStorageInfo.isEmpty) {
} catch {
case _ => { Thread.sleep(10) }
// Do nothing. We might see exceptions because block manager
// is racing this thread to remove entries from the driver.
assert(sc.getRDDStorageInfo.isEmpty === true)

View file

@ -17,9 +17,8 @@ object ZippedPartitionsSuite {
class ZippedPartitionsSuite extends FunSuite with LocalSparkContext {
class ZippedPartitionsSuite extends FunSuite with SharedSparkContext {
test("print sizes") {
sc = new SparkContext("local", "test")
val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)