From 9f0d91329516c829c86fc8e95d02071ca7d1f186 Mon Sep 17 00:00:00 2001 From: Matei Zaharia Date: Tue, 25 Jun 2013 19:18:30 -0400 Subject: [PATCH] Refactored tests to share SparkContexts in some of them Creating these seems to take a while and clutters the output with Akka stuff, so it would be nice to share them. --- .../test/scala/spark/CheckpointSuite.scala | 10 + .../scala/spark/PairRDDFunctionsSuite.scala | 287 +++++++++++++++++ .../test/scala/spark/PartitioningSuite.scala | 19 +- core/src/test/scala/spark/PipedRDDSuite.scala | 16 +- core/src/test/scala/spark/RDDSuite.scala | 87 +---- .../test/scala/spark/SharedSparkContext.scala | 25 ++ core/src/test/scala/spark/ShuffleSuite.scala | 298 +----------------- core/src/test/scala/spark/SortingSuite.scala | 23 +- .../src/test/scala/spark/UnpersistSuite.scala | 30 ++ .../scala/spark/ZippedPartitionsSuite.scala | 3 +- 10 files changed, 373 insertions(+), 425 deletions(-) create mode 100644 core/src/test/scala/spark/PairRDDFunctionsSuite.scala create mode 100644 core/src/test/scala/spark/SharedSparkContext.scala create mode 100644 core/src/test/scala/spark/UnpersistSuite.scala diff --git a/core/src/test/scala/spark/CheckpointSuite.scala b/core/src/test/scala/spark/CheckpointSuite.scala index ca385972fb..28a7b21b92 100644 --- a/core/src/test/scala/spark/CheckpointSuite.scala +++ b/core/src/test/scala/spark/CheckpointSuite.scala @@ -27,6 +27,16 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { } } + test("basic checkpointing") { + val parCollection = sc.makeRDD(1 to 4) + val flatMappedRDD = parCollection.flatMap(x => 1 to x) + flatMappedRDD.checkpoint() + assert(flatMappedRDD.dependencies.head.rdd == parCollection) + val result = flatMappedRDD.collect() + assert(flatMappedRDD.dependencies.head.rdd != parCollection) + assert(flatMappedRDD.collect() === result) + } + test("RDDs with one-to-one dependencies") { testCheckpointing(_.map(x => x.toString)) testCheckpointing(_.flatMap(x => 1 to x)) diff --git a/core/src/test/scala/spark/PairRDDFunctionsSuite.scala b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala new file mode 100644 index 0000000000..682d2745bf --- /dev/null +++ b/core/src/test/scala/spark/PairRDDFunctionsSuite.scala @@ -0,0 +1,287 @@ +package spark + +import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.HashSet + +import org.scalatest.FunSuite +import org.scalatest.prop.Checkers +import org.scalacheck.Arbitrary._ +import org.scalacheck.Gen +import org.scalacheck.Prop._ + +import com.google.common.io.Files + +import spark.rdd.ShuffledRDD +import spark.SparkContext._ + +class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { + test("groupByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with duplicates") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with negative key hash codes") { + val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))) + val groups = pairs.groupByKey().collect() + assert(groups.size === 2) + val valuesForMinus1 = groups.find(_._1 == -1).get._2 + assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("groupByKey with many output partitions") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) + val groups = pairs.groupByKey(10).collect() + assert(groups.size === 2) + val valuesFor1 = groups.find(_._1 == 1).get._2 + assert(valuesFor1.toList.sorted === List(1, 2, 3)) + val valuesFor2 = groups.find(_._1 == 2).get._2 + assert(valuesFor2.toList.sorted === List(1)) + } + + test("reduceByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val sums = pairs.reduceByKey(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with collectAsMap") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val sums = pairs.reduceByKey(_+_).collectAsMap() + assert(sums.size === 2) + assert(sums(1) === 7) + assert(sums(2) === 1) + } + + test("reduceByKey with many output partitons") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val sums = pairs.reduceByKey(_+_, 10).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("reduceByKey with partitioner") { + val p = new Partitioner() { + def numPartitions = 2 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p) + val sums = pairs.reduceByKey(_+_) + assert(sums.collect().toSet === Set((1, 4), (0, 1))) + assert(sums.partitioner === Some(p)) + // count the dependencies to make sure there is only 1 ShuffledRDD + val deps = new HashSet[RDD[_]]() + def visit(r: RDD[_]) { + for (dep <- r.dependencies) { + deps += dep.rdd + visit(dep.rdd) + } + } + visit(sums) + assert(deps.size === 2) // ShuffledRDD, ParallelCollection + } + + test("join") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("join all-to-all") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))) + val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 6) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (1, 'y')), + (1, (2, 'x')), + (1, (2, 'y')), + (1, (3, 'x')), + (1, (3, 'y')) + )) + } + + test("leftOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.leftOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (1, Some('x'))), + (1, (2, Some('x'))), + (2, (1, Some('y'))), + (2, (1, Some('z'))), + (3, (1, None)) + )) + } + + test("rightOuterJoin") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.rightOuterJoin(rdd2).collect() + assert(joined.size === 5) + assert(joined.toSet === Set( + (1, (Some(1), 'x')), + (1, (Some(2), 'x')), + (2, (Some(1), 'y')), + (2, (Some(1), 'z')), + (4, (None, 'w')) + )) + } + + test("join with no matches") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) + val joined = rdd1.join(rdd2).collect() + assert(joined.size === 0) + } + + test("join with many output partitions") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.join(rdd2, 10).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (1, 'x')), + (1, (2, 'x')), + (2, (1, 'y')), + (2, (1, 'z')) + )) + } + + test("groupWith") { + val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val joined = rdd1.groupWith(rdd2).collect() + assert(joined.size === 4) + assert(joined.toSet === Set( + (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), + (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), + (3, (ArrayBuffer(1), ArrayBuffer())), + (4, (ArrayBuffer(), ArrayBuffer('w'))) + )) + } + + test("zero-partition RDD") { + val emptyDir = Files.createTempDir() + val file = sc.textFile(emptyDir.getAbsolutePath) + assert(file.partitions.size == 0) + assert(file.collect().toList === Nil) + // Test that a shuffle on the file works, because this used to be a bug + assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) + } + + test("keys and values") { + val rdd = sc.parallelize(Array((1, "a"), (2, "b"))) + assert(rdd.keys.collect().toList === List(1, 2)) + assert(rdd.values.collect().toList === List("a", "b")) + } + + test("default partitioner uses partition size") { + // specify 2000 partitions + val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) + // do a map, which loses the partitioner + val b = a.map(a => (a, (a * 2).toString)) + // then a group by, and see we didn't revert to 2 partitions + val c = b.groupByKey() + assert(c.partitions.size === 2000) + } + + test("default partitioner uses largest partitioner") { + val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) + val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) + val c = a.join(b) + assert(c.partitions.size === 2000) + } + + test("subtract") { + val a = sc.parallelize(Array(1, 2, 3), 2) + val b = sc.parallelize(Array(2, 3, 4), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set(1)) + assert(c.partitions.size === a.partitions.size) + } + + test("subtract with narrow dependency") { + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtract(b) + assert(c.collect().toSet === Set((1, "a"), (3, "c"))) + // Ideally we could keep the original partitioner... + assert(c.partitioner === None) + } + + test("subtractByKey") { + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) + val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitions.size === a.partitions.size) + } + + test("subtractByKey with narrow dependency") { + // use a deterministic partitioner + val p = new Partitioner() { + def numPartitions = 5 + def getPartition(key: Any) = key.asInstanceOf[Int] + } + // partitionBy so we have a narrow dependency + val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + // more partitions/no partitioner so a shuffle dependency + val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val c = a.subtractByKey(b) + assert(c.collect().toSet === Set((1, "a"), (1, "a"))) + assert(c.partitioner.get === p) + } + + test("foldByKey") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val sums = pairs.foldByKey(0)(_+_).collect() + assert(sums.toSet === Set((1, 7), (2, 1))) + } + + test("foldByKey with mutable result type") { + val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() + // Fold the values using in-place mutation + val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() + assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1)))) + // Check that the mutable objects in the original RDD were not changed + assert(bufs.collect().toSet === Set( + (1, ArrayBuffer(1)), + (1, ArrayBuffer(2)), + (1, ArrayBuffer(3)), + (1, ArrayBuffer(1)), + (2, ArrayBuffer(1)))) + } +} diff --git a/core/src/test/scala/spark/PartitioningSuite.scala b/core/src/test/scala/spark/PartitioningSuite.scala index 16f93e71a3..99e433e3bd 100644 --- a/core/src/test/scala/spark/PartitioningSuite.scala +++ b/core/src/test/scala/spark/PartitioningSuite.scala @@ -6,8 +6,8 @@ import SparkContext._ import spark.util.StatCounter import scala.math.abs -class PartitioningSuite extends FunSuite with LocalSparkContext { - +class PartitioningSuite extends FunSuite with SharedSparkContext { + test("HashPartitioner equality") { val p2 = new HashPartitioner(2) val p4 = new HashPartitioner(4) @@ -21,8 +21,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { } test("RangePartitioner equality") { - sc = new SparkContext("local", "test") - // Make an RDD where all the elements are the same so that the partition range bounds // are deterministically all the same. val rdd = sc.parallelize(Seq(1, 1, 1, 1)).map(x => (x, x)) @@ -50,7 +48,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { } test("HashPartitioner not equal to RangePartitioner") { - sc = new SparkContext("local", "test") val rdd = sc.parallelize(1 to 10).map(x => (x, x)) val rangeP2 = new RangePartitioner(2, rdd) val hashP2 = new HashPartitioner(2) @@ -61,8 +58,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { } test("partitioner preservation") { - sc = new SparkContext("local", "test") - val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x)) val grouped2 = rdd.groupByKey(2) @@ -101,7 +96,6 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { } test("partitioning Java arrays should fail") { - sc = new SparkContext("local", "test") val arrs: RDD[Array[Int]] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => Array(x)) val arrPairs: RDD[(Array[Int], Int)] = sc.parallelize(Array(1, 2, 3, 4), 2).map(x => (Array(x), x)) @@ -120,21 +114,20 @@ class PartitioningSuite extends FunSuite with LocalSparkContext { assert(intercept[SparkException]{ arrPairs.reduceByKeyLocally(_ + _) }.getMessage.contains("array")) assert(intercept[SparkException]{ arrPairs.reduceByKey(_ + _) }.getMessage.contains("array")) } - - test("Zero-length partitions should be correctly handled") { + + test("zero-length partitions should be correctly handled") { // Create RDD with some consecutive empty partitions (including the "first" one) - sc = new SparkContext("local", "test") val rdd: RDD[Double] = sc .parallelize(Array(-1.0, -1.0, -1.0, -1.0, 2.0, 4.0, -1.0, -1.0), 8) .filter(_ >= 0.0) - + // Run the partitions, including the consecutive empty ones, through StatCounter val stats: StatCounter = rdd.stats(); assert(abs(6.0 - stats.sum) < 0.01); assert(abs(6.0/2 - rdd.mean) < 0.01); assert(abs(1.0 - rdd.variance) < 0.01); assert(abs(1.0 - rdd.stdev) < 0.01); - + // Add other tests here for classes that should be able to handle empty partitions correctly } } diff --git a/core/src/test/scala/spark/PipedRDDSuite.scala b/core/src/test/scala/spark/PipedRDDSuite.scala index ed075f93ec..1c9ca50811 100644 --- a/core/src/test/scala/spark/PipedRDDSuite.scala +++ b/core/src/test/scala/spark/PipedRDDSuite.scala @@ -3,10 +3,9 @@ package spark import org.scalatest.FunSuite import SparkContext._ -class PipedRDDSuite extends FunSuite with LocalSparkContext { - +class PipedRDDSuite extends FunSuite with SharedSparkContext { + test("basic pipe") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val piped = nums.pipe(Seq("cat")) @@ -20,12 +19,11 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { } test("advanced pipe") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val bl = sc.broadcast(List("0")) - val piped = nums.pipe(Seq("cat"), - Map[String, String](), + val piped = nums.pipe(Seq("cat"), + Map[String, String](), (f: String => Unit) => {bl.value.map(f(_));f("\u0001")}, (i:Int, f: String=> Unit) => f(i + "_")) @@ -43,8 +41,8 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) val d = nums1.groupBy(str=>str.split("\t")(0)). - pipe(Seq("cat"), - Map[String, String](), + pipe(Seq("cat"), + Map[String, String](), (f: String => Unit) => {bl.value.map(f(_));f("\u0001")}, (i:Tuple2[String, Seq[String]], f: String=> Unit) => {for (e <- i._2){ f(e + "_")}}).collect() assert(d.size === 8) @@ -59,7 +57,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { } test("pipe with env variable") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA")) val c = piped.collect() @@ -69,7 +66,6 @@ class PipedRDDSuite extends FunSuite with LocalSparkContext { } test("pipe with non-zero exit status") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val piped = nums.pipe("cat nonexistent_file") intercept[SparkException] { diff --git a/core/src/test/scala/spark/RDDSuite.scala b/core/src/test/scala/spark/RDDSuite.scala index 67f3332d44..d8db69b1c9 100644 --- a/core/src/test/scala/spark/RDDSuite.scala +++ b/core/src/test/scala/spark/RDDSuite.scala @@ -7,10 +7,9 @@ import org.scalatest.time.{Span, Millis} import spark.SparkContext._ import spark.rdd.{CoalescedRDD, CoGroupedRDD, EmptyRDD, PartitionPruningRDD, ShuffledRDD} -class RDDSuite extends FunSuite with LocalSparkContext { +class RDDSuite extends FunSuite with SharedSparkContext { test("basic operations") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(nums.collect().toList === List(1, 2, 3, 4)) val dups = sc.makeRDD(Array(1, 1, 2, 2, 3, 3, 4, 4), 2) @@ -46,7 +45,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("SparkContext.union") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) assert(sc.union(nums).collect().toList === List(1, 2, 3, 4)) assert(sc.union(nums, nums).collect().toList === List(1, 2, 3, 4, 1, 2, 3, 4)) @@ -55,7 +53,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("aggregate") { - sc = new SparkContext("local", "test") val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) type StringMap = HashMap[String, Int] val emptyMap = new StringMap { @@ -75,57 +72,14 @@ class RDDSuite extends FunSuite with LocalSparkContext { assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5))) } - test("basic checkpointing") { - import java.io.File - val checkpointDir = File.createTempFile("temp", "") - checkpointDir.delete() - - sc = new SparkContext("local", "test") - sc.setCheckpointDir(checkpointDir.toString) - val parCollection = sc.makeRDD(1 to 4) - val flatMappedRDD = parCollection.flatMap(x => 1 to x) - flatMappedRDD.checkpoint() - assert(flatMappedRDD.dependencies.head.rdd == parCollection) - val result = flatMappedRDD.collect() - Thread.sleep(1000) - assert(flatMappedRDD.dependencies.head.rdd != parCollection) - assert(flatMappedRDD.collect() === result) - - checkpointDir.deleteOnExit() - } - test("basic caching") { - sc = new SparkContext("local", "test") val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() assert(rdd.collect().toList === List(1, 2, 3, 4)) assert(rdd.collect().toList === List(1, 2, 3, 4)) assert(rdd.collect().toList === List(1, 2, 3, 4)) } - test("unpersist RDD") { - sc = new SparkContext("local", "test") - val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() - rdd.count - assert(sc.persistentRdds.isEmpty === false) - rdd.unpersist() - assert(sc.persistentRdds.isEmpty === true) - - failAfter(Span(3000, Millis)) { - try { - while (! sc.getRDDStorageInfo.isEmpty) { - Thread.sleep(200) - } - } catch { - case _ => { Thread.sleep(10) } - // Do nothing. We might see exceptions because block manager - // is racing this thread to remove entries from the driver. - } - } - assert(sc.getRDDStorageInfo.isEmpty === true) - } - test("caching with failures") { - sc = new SparkContext("local", "test") val onlySplit = new Partition { override def index: Int = 0 } var shouldFail = true val rdd = new RDD[Int](sc, Nil) { @@ -148,7 +102,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("empty RDD") { - sc = new SparkContext("local", "test") val empty = new EmptyRDD[Int](sc) assert(empty.count === 0) assert(empty.collect().size === 0) @@ -168,37 +121,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("cogrouped RDDs") { - sc = new SparkContext("local", "test") - val rdd1 = sc.makeRDD(Array((1, "one"), (1, "another one"), (2, "two"), (3, "three")), 2) - val rdd2 = sc.makeRDD(Array((1, "one1"), (1, "another one1"), (2, "two1")), 2) - - // Use cogroup function - val cogrouped = rdd1.cogroup(rdd2).collectAsMap() - assert(cogrouped(1) === (Seq("one", "another one"), Seq("one1", "another one1"))) - assert(cogrouped(2) === (Seq("two"), Seq("two1"))) - assert(cogrouped(3) === (Seq("three"), Seq())) - - // Construct CoGroupedRDD directly, with map side combine enabled - val cogrouped1 = new CoGroupedRDD[Int]( - Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]), - new HashPartitioner(3), - true).collectAsMap() - assert(cogrouped1(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1"))) - assert(cogrouped1(2).toSeq === Seq(Seq("two"), Seq("two1"))) - assert(cogrouped1(3).toSeq === Seq(Seq("three"), Seq())) - - // Construct CoGroupedRDD directly, with map side combine disabled - val cogrouped2 = new CoGroupedRDD[Int]( - Seq(rdd1.asInstanceOf[RDD[(Int, Any)]], rdd2.asInstanceOf[RDD[(Int, Any)]]), - new HashPartitioner(3), - false).collectAsMap() - assert(cogrouped2(1).toSeq === Seq(Seq("one", "another one"), Seq("one1", "another one1"))) - assert(cogrouped2(2).toSeq === Seq(Seq("two"), Seq("two1"))) - assert(cogrouped2(3).toSeq === Seq(Seq("three"), Seq())) - } - - test("coalesced RDDs") { - sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) val coalesced1 = data.coalesce(2) @@ -236,7 +158,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("zipped RDDs") { - sc = new SparkContext("local", "test") val nums = sc.makeRDD(Array(1, 2, 3, 4), 2) val zipped = nums.zip(nums.map(_ + 1.0)) assert(zipped.glom().map(_.toList).collect().toList === @@ -248,7 +169,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("partition pruning") { - sc = new SparkContext("local", "test") val data = sc.parallelize(1 to 10, 10) // Note that split number starts from 0, so > 8 means only 10th partition left. val prunedRdd = new PartitionPruningRDD(data, splitNum => splitNum > 8) @@ -260,7 +180,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { test("mapWith") { import java.util.Random - sc = new SparkContext("local", "test") val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.mapWith( (index: Int) => new Random(index + 42)) @@ -279,7 +198,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { test("flatMapWith") { import java.util.Random - sc = new SparkContext("local", "test") val ones = sc.makeRDD(Array(1, 1, 1, 1, 1, 1), 2) val randoms = ones.flatMapWith( (index: Int) => new Random(index + 42)) @@ -301,7 +219,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { test("filterWith") { import java.util.Random - sc = new SparkContext("local", "test") val ints = sc.makeRDD(Array(1, 2, 3, 4, 5, 6), 2) val sample = ints.filterWith( (index: Int) => new Random(index + 42)) @@ -319,7 +236,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("top with predefined ordering") { - sc = new SparkContext("local", "test") val nums = Array.range(1, 100000) val ints = sc.makeRDD(scala.util.Random.shuffle(nums), 2) val topK = ints.top(5) @@ -328,7 +244,6 @@ class RDDSuite extends FunSuite with LocalSparkContext { } test("top with custom ordering") { - sc = new SparkContext("local", "test") val words = Vector("a", "b", "c", "d") implicit val ord = implicitly[Ordering[String]].reverse val rdd = sc.makeRDD(words, 2) diff --git a/core/src/test/scala/spark/SharedSparkContext.scala b/core/src/test/scala/spark/SharedSparkContext.scala new file mode 100644 index 0000000000..1da79f9824 --- /dev/null +++ b/core/src/test/scala/spark/SharedSparkContext.scala @@ -0,0 +1,25 @@ +package spark + +import org.scalatest.Suite +import org.scalatest.BeforeAndAfterAll + +/** Shares a local `SparkContext` between all tests in a suite and closes it at the end */ +trait SharedSparkContext extends BeforeAndAfterAll { self: Suite => + + @transient private var _sc: SparkContext = _ + + def sc: SparkContext = _sc + + override def beforeAll() { + _sc = new SparkContext("local", "test") + super.beforeAll() + } + + override def afterAll() { + if (_sc != null) { + LocalSparkContext.stop(_sc) + _sc = null + } + super.afterAll() + } +} diff --git a/core/src/test/scala/spark/ShuffleSuite.scala b/core/src/test/scala/spark/ShuffleSuite.scala index 0c1ec29f96..950218fa28 100644 --- a/core/src/test/scala/spark/ShuffleSuite.scala +++ b/core/src/test/scala/spark/ShuffleSuite.scala @@ -16,54 +16,9 @@ import spark.rdd.ShuffledRDD import spark.SparkContext._ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { - - test("groupByKey") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with duplicates") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with negative key hash codes") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))) - val groups = pairs.groupByKey().collect() - assert(groups.size === 2) - val valuesForMinus1 = groups.find(_._1 == -1).get._2 - assert(valuesForMinus1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - - test("groupByKey with many output partitions") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) - val groups = pairs.groupByKey(10).collect() - assert(groups.size === 2) - val valuesFor1 = groups.find(_._1 == 1).get._2 - assert(valuesFor1.toList.sorted === List(1, 2, 3)) - val valuesFor2 = groups.find(_._1 == 2).get._2 - assert(valuesFor2.toList.sorted === List(1)) - } - test("groupByKey with compression") { try { - System.setProperty("spark.blockManager.compress", "true") + System.setProperty("spark.shuffle.compress", "true") sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4) val groups = pairs.groupByKey(4).collect() @@ -77,234 +32,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { } } - test("reduceByKey") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val sums = pairs.reduceByKey(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("reduceByKey with collectAsMap") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val sums = pairs.reduceByKey(_+_).collectAsMap() - assert(sums.size === 2) - assert(sums(1) === 7) - assert(sums(2) === 1) - } - - test("reduceByKey with many output partitons") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val sums = pairs.reduceByKey(_+_, 10).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("reduceByKey with partitioner") { - sc = new SparkContext("local", "test") - val p = new Partitioner() { - def numPartitions = 2 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p) - val sums = pairs.reduceByKey(_+_) - assert(sums.collect().toSet === Set((1, 4), (0, 1))) - assert(sums.partitioner === Some(p)) - // count the dependencies to make sure there is only 1 ShuffledRDD - val deps = new HashSet[RDD[_]]() - def visit(r: RDD[_]) { - for (dep <- r.dependencies) { - deps += dep.rdd - visit(dep.rdd) - } - } - visit(sums) - assert(deps.size === 2) // ShuffledRDD, ParallelCollection - } - - test("join") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("join all-to-all") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))) - val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 6) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (1, 'y')), - (1, (2, 'x')), - (1, (2, 'y')), - (1, (3, 'x')), - (1, (3, 'y')) - )) - } - - test("leftOuterJoin") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.leftOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (1, Some('x'))), - (1, (2, Some('x'))), - (2, (1, Some('y'))), - (2, (1, Some('z'))), - (3, (1, None)) - )) - } - - test("rightOuterJoin") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.rightOuterJoin(rdd2).collect() - assert(joined.size === 5) - assert(joined.toSet === Set( - (1, (Some(1), 'x')), - (1, (Some(2), 'x')), - (2, (Some(1), 'y')), - (2, (Some(1), 'z')), - (4, (None, 'w')) - )) - } - - test("join with no matches") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) - val joined = rdd1.join(rdd2).collect() - assert(joined.size === 0) - } - - test("join with many output partitions") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.join(rdd2, 10).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (1, 'x')), - (1, (2, 'x')), - (2, (1, 'y')), - (2, (1, 'z')) - )) - } - - test("groupWith") { - sc = new SparkContext("local", "test") - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val joined = rdd1.groupWith(rdd2).collect() - assert(joined.size === 4) - assert(joined.toSet === Set( - (1, (ArrayBuffer(1, 2), ArrayBuffer('x'))), - (2, (ArrayBuffer(1), ArrayBuffer('y', 'z'))), - (3, (ArrayBuffer(1), ArrayBuffer())), - (4, (ArrayBuffer(), ArrayBuffer('w'))) - )) - } - - test("zero-partition RDD") { - sc = new SparkContext("local", "test") - val emptyDir = Files.createTempDir() - val file = sc.textFile(emptyDir.getAbsolutePath) - assert(file.partitions.size == 0) - assert(file.collect().toList === Nil) - // Test that a shuffle on the file works, because this used to be a bug - assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil) - } - - test("keys and values") { - sc = new SparkContext("local", "test") - val rdd = sc.parallelize(Array((1, "a"), (2, "b"))) - assert(rdd.keys.collect().toList === List(1, 2)) - assert(rdd.values.collect().toList === List("a", "b")) - } - - test("default partitioner uses partition size") { - sc = new SparkContext("local", "test") - // specify 2000 partitions - val a = sc.makeRDD(Array(1, 2, 3, 4), 2000) - // do a map, which loses the partitioner - val b = a.map(a => (a, (a * 2).toString)) - // then a group by, and see we didn't revert to 2 partitions - val c = b.groupByKey() - assert(c.partitions.size === 2000) - } - - test("default partitioner uses largest partitioner") { - sc = new SparkContext("local", "test") - val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) - val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) - val c = a.join(b) - assert(c.partitions.size === 2000) - } - - test("subtract") { - sc = new SparkContext("local", "test") - val a = sc.parallelize(Array(1, 2, 3), 2) - val b = sc.parallelize(Array(2, 3, 4), 4) - val c = a.subtract(b) - assert(c.collect().toSet === Set(1)) - assert(c.partitions.size === a.partitions.size) - } - - test("subtract with narrow dependency") { - sc = new SparkContext("local", "test") - // use a deterministic partitioner - val p = new Partitioner() { - def numPartitions = 5 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - // partitionBy so we have a narrow dependency - val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) - // more partitions/no partitioner so a shuffle dependency - val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) - val c = a.subtract(b) - assert(c.collect().toSet === Set((1, "a"), (3, "c"))) - // Ideally we could keep the original partitioner... - assert(c.partitioner === None) - } - - test("subtractByKey") { - sc = new SparkContext("local", "test") - val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) - val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) - val c = a.subtractByKey(b) - assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - assert(c.partitions.size === a.partitions.size) - } - - test("subtractByKey with narrow dependency") { - sc = new SparkContext("local", "test") - // use a deterministic partitioner - val p = new Partitioner() { - def numPartitions = 5 - def getPartition(key: Any) = key.asInstanceOf[Int] - } - // partitionBy so we have a narrow dependency - val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p) - // more partitions/no partitioner so a shuffle dependency - val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) - val c = a.subtractByKey(b) - assert(c.collect().toSet === Set((1, "a"), (1, "a"))) - assert(c.partitioner.get === p) - } - test("shuffle non-zero block size") { sc = new SparkContext("local-cluster[2,1,512]", "test") val NUM_BLOCKS = 3 @@ -391,29 +118,6 @@ class ShuffleSuite extends FunSuite with ShouldMatchers with LocalSparkContext { // We should have at most 4 non-zero sized partitions assert(nonEmptyBlocks.size <= 4) } - - test("foldByKey") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val sums = pairs.foldByKey(0)(_+_).collect() - assert(sums.toSet === Set((1, 7), (2, 1))) - } - - test("foldByKey with mutable result type") { - sc = new SparkContext("local", "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) - val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() - // Fold the values using in-place mutation - val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() - assert(sums.toSet === Set((1, ArrayBuffer(1, 2, 3, 1)), (2, ArrayBuffer(1)))) - // Check that the mutable objects in the original RDD were not changed - assert(bufs.collect().toSet === Set( - (1, ArrayBuffer(1)), - (1, ArrayBuffer(2)), - (1, ArrayBuffer(3)), - (1, ArrayBuffer(1)), - (2, ArrayBuffer(1)))) - } } object ShuffleSuite { diff --git a/core/src/test/scala/spark/SortingSuite.scala b/core/src/test/scala/spark/SortingSuite.scala index 495f957e53..f7bf207c68 100644 --- a/core/src/test/scala/spark/SortingSuite.scala +++ b/core/src/test/scala/spark/SortingSuite.scala @@ -5,16 +5,14 @@ import org.scalatest.BeforeAndAfter import org.scalatest.matchers.ShouldMatchers import SparkContext._ -class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers with Logging { - +class SortingSuite extends FunSuite with SharedSparkContext with ShouldMatchers with Logging { + test("sortByKey") { - sc = new SparkContext("local", "test") val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2) - assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0))) + assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0))) } test("large array") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -24,7 +22,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("large array with one split") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -32,9 +29,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w assert(sorted.partitions.size === 1) assert(sorted.collect() === pairArr.sortBy(_._1)) } - + test("large array with many partitions") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -42,9 +38,8 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w assert(sorted.partitions.size === 20) assert(sorted.collect() === pairArr.sortBy(_._1)) } - + test("sort descending") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -52,15 +47,13 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("sort descending with one split") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 1) assert(pairs.sortByKey(false, 1).collect() === pairArr.sortWith((x, y) => x._1 > y._1)) } - + test("sort descending with many partitions") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 2) @@ -68,7 +61,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("more partitions than elements") { - sc = new SparkContext("local", "test") val rand = new scala.util.Random() val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) } val pairs = sc.parallelize(pairArr, 30) @@ -76,14 +68,12 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("empty RDD") { - sc = new SparkContext("local", "test") val pairArr = new Array[(Int, Int)](0) val pairs = sc.parallelize(pairArr, 2) assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1)) } test("partition balancing") { - sc = new SparkContext("local", "test") val pairArr = (1 to 1000).map(x => (x, x)).toArray val sorted = sc.parallelize(pairArr, 4).sortByKey() assert(sorted.collect() === pairArr.sortBy(_._1)) @@ -99,7 +89,6 @@ class SortingSuite extends FunSuite with LocalSparkContext with ShouldMatchers w } test("partition balancing for descending sort") { - sc = new SparkContext("local", "test") val pairArr = (1 to 1000).map(x => (x, x)).toArray val sorted = sc.parallelize(pairArr, 4).sortByKey(false) assert(sorted.collect() === pairArr.sortBy(_._1).reverse) diff --git a/core/src/test/scala/spark/UnpersistSuite.scala b/core/src/test/scala/spark/UnpersistSuite.scala new file mode 100644 index 0000000000..94776e7572 --- /dev/null +++ b/core/src/test/scala/spark/UnpersistSuite.scala @@ -0,0 +1,30 @@ +package spark + +import org.scalatest.FunSuite +import org.scalatest.concurrent.Timeouts._ +import org.scalatest.time.{Span, Millis} +import spark.SparkContext._ + +class UnpersistSuite extends FunSuite with LocalSparkContext { + test("unpersist RDD") { + sc = new SparkContext("local", "test") + val rdd = sc.makeRDD(Array(1, 2, 3, 4), 2).cache() + rdd.count + assert(sc.persistentRdds.isEmpty === false) + rdd.unpersist() + assert(sc.persistentRdds.isEmpty === true) + + failAfter(Span(3000, Millis)) { + try { + while (! sc.getRDDStorageInfo.isEmpty) { + Thread.sleep(200) + } + } catch { + case _ => { Thread.sleep(10) } + // Do nothing. We might see exceptions because block manager + // is racing this thread to remove entries from the driver. + } + } + assert(sc.getRDDStorageInfo.isEmpty === true) + } +} diff --git a/core/src/test/scala/spark/ZippedPartitionsSuite.scala b/core/src/test/scala/spark/ZippedPartitionsSuite.scala index 5f60aa75d7..96cb295f45 100644 --- a/core/src/test/scala/spark/ZippedPartitionsSuite.scala +++ b/core/src/test/scala/spark/ZippedPartitionsSuite.scala @@ -17,9 +17,8 @@ object ZippedPartitionsSuite { } } -class ZippedPartitionsSuite extends FunSuite with LocalSparkContext { +class ZippedPartitionsSuite extends FunSuite with SharedSparkContext { test("print sizes") { - sc = new SparkContext("local", "test") val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) val data3 = sc.makeRDD(Array(1.0, 2.0), 2)