Always destroy SparkContext in after block for the unit tests.


This commit is contained in:
Denny 2012-07-18 13:09:50 -07:00 committed by Matei Zaharia
parent 600e99728d
commit 866e6949df
11 changed files with 164 additions and 115 deletions

View file

@ -1,6 +1,6 @@
package spark.bagel
import org.scalatest.{FunSuite, Assertions}
import org.scalatest.{FunSuite, Assertions, BeforeAndAfter}
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
@ -13,9 +13,16 @@ import spark._
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message[String] with Serializable
class BagelSuite extends FunSuite with Assertions {
class BagelSuite extends FunSuite with Assertions with BeforeAndAfter{
var sc: SparkContext = _
test("halting by voting") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0))))
val msgs = sc.parallelize(Array[(String, TestMessage)]())
val numSupersteps = 5
@ -26,11 +33,10 @@ class BagelSuite extends FunSuite with Assertions {
for ((id, vert) <- result.collect)
assert(vert.age === numSupersteps)
test("halting by message silence") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(false, 0))))
val msgs = sc.parallelize(Array("a" -> new TestMessage("a")))
val numSupersteps = 5
@ -48,6 +54,5 @@ class BagelSuite extends FunSuite with Assertions {
for ((id, vert) <- result.collect)
assert(vert.age === numSupersteps)

View file

@ -1,23 +1,31 @@
package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
class BroadcastSuite extends FunSuite {
class BroadcastSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
if(sc != null){
test("basic broadcast") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val list = List(1, 2, 3, 4)
val listBroadcast = sc.broadcast(list)
val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
assert(results.collect.toSet === Set((1, 10), (2, 10)))
test("broadcast variables accessed in multiple threads") {
val sc = new SparkContext("local[10]", "test")
sc = new SparkContext("local[10]", "test")
val list = List(1, 2, 3, 4)
val listBroadcast = sc.broadcast(list)
val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum))
assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)

View file

@ -1,6 +1,7 @@
package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.prop.Checkers
import scala.collection.mutable.ArrayBuffer
@ -22,11 +23,20 @@ object FailureSuiteState {
class FailureSuite extends FunSuite {
class FailureSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
if(sc != null){
// Run a 3-task map job in which task 1 deterministically fails once, and check
// whether the job completes successfully and we ran 4 tasks in total.
test("failure in a single-stage job") {
val sc = new SparkContext("local[1,1]", "test")
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3, 3).map { x =>
FailureSuiteState.synchronized {
FailureSuiteState.tasksRun += 1
@ -41,13 +51,12 @@ class FailureSuite extends FunSuite {
assert(FailureSuiteState.tasksRun === 4)
assert(results.toList === List(1,4,9))
// Run a map-reduce job in which a reduce task deterministically fails once.
test("failure in a two-stage job") {
val sc = new SparkContext("local[1,1]", "test")
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
case (k, v) =>
FailureSuiteState.synchronized {
@ -63,12 +72,11 @@ class FailureSuite extends FunSuite {
assert(FailureSuiteState.tasksRun === 4)
assert(results.toSet === Set((1, 1), (2, 4), (3, 9)))
test("failure because task results are not serializable") {
val sc = new SparkContext("local[1,1]", "test")
sc = new SparkContext("local[1,1]", "test")
val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
val thrown = intercept[spark.SparkException] {
@ -77,7 +85,6 @@ class FailureSuite extends FunSuite {
assert(thrown.getClass === classOf[spark.SparkException])

View file

@ -6,13 +6,23 @@ import
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import SparkContext._
class FileSuite extends FunSuite {
class FileSuite extends FunSuite with BeforeAndAfter{
var sc: SparkContext = _
if(sc != null){
test("text files") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
@ -23,11 +33,10 @@ class FileSuite extends FunSuite {
assert(content === "1\n2\n3\n4\n")
// Also try reading it in as a text file RDD
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
test("SequenceFiles") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
@ -35,11 +44,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert( === List("(1,a)", "(2,aa)", "(3,aaa)"))
test("SequenceFile with writable key") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
@ -47,11 +55,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert( === List("(1,a)", "(2,aa)", "(3,aaa)"))
test("SequenceFile with writable value") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x)))
@ -59,11 +66,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert( === List("(1,a)", "(2,aa)", "(3,aaa)"))
test("SequenceFile with writable key and value") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
@ -71,11 +77,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as a SequenceFile
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert( === List("(1,a)", "(2,aa)", "(3,aaa)"))
test("implicit conversions in reading SequenceFiles") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
@ -89,11 +94,10 @@ class FileSuite extends FunSuite {
assert( === List("(1,a)", "(2,aa)", "(3,aaa)"))
val output3 = sc.sequenceFile[IntWritable, String](outputDir)
assert( === List("(1,a)", "(2,aa)", "(3,aaa)"))
test("object files of ints") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 4)
@ -101,11 +105,10 @@ class FileSuite extends FunSuite {
// Try reading the output back as an object file
val output = sc.objectFile[Int](outputDir)
assert(output.collect().toList === List(1, 2, 3, 4))
test("object files of complex types") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x))
@ -113,12 +116,11 @@ class FileSuite extends FunSuite {
// Try reading the output back as an object file
val output = sc.objectFile[(Int, String)](outputDir)
assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
test("write SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
@ -126,12 +128,11 @@ class FileSuite extends FunSuite {
val output = sc.sequenceFile[IntWritable, Text](outputDir)
assert( === List("(1,a)", "(2,aa)", "(3,aaa)"))
test("read SequenceFile using new Hadoop API") {
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val tempDir = Files.createTempDir()
val outputDir = new File(tempDir, "output").getAbsolutePath
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
@ -139,6 +140,5 @@ class FileSuite extends FunSuite {
val output =
sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
assert( === List("(1,a)", "(2,aa)", "(3,aaa)"))

View file

@ -8,7 +8,8 @@ import com.esotericsoftware.kryo._
import SparkContext._
class KryoSerializerSuite extends FunSuite {
class KryoSerializerSuite extends FunSuite{
test("basic types") {
val ser = (new KryoSerializer).newInstance()
def check[T](t: T) {

View file

@ -1,12 +1,23 @@
package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import scala.collection.mutable.ArrayBuffer
import SparkContext._
class PartitioningSuite extends FunSuite {
class PartitioningSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
if(sc != null){
test("HashPartitioner equality") {
val p2 = new HashPartitioner(2)
val p4 = new HashPartitioner(4)
@ -20,7 +31,7 @@ class PartitioningSuite extends FunSuite {
test("RangePartitioner equality") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
// Make an RDD where all the elements are the same so that the partition range bounds
// are deterministically all the same.
@ -46,12 +57,10 @@ class PartitioningSuite extends FunSuite {
assert(p4 != descendingP4)
assert(descendingP2 != p2)
assert(descendingP4 != p4)
test("HashPartitioner not equal to RangePartitioner") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(1 to 10).map(x => (x, x))
val rangeP2 = new RangePartitioner(2, rdd)
val hashP2 = new HashPartitioner(2)
@ -59,11 +68,10 @@ class PartitioningSuite extends FunSuite {
assert(hashP2 === hashP2)
assert(hashP2 != rangeP2)
assert(rangeP2 != hashP2)
test("partitioner preservation") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
@ -95,7 +103,5 @@ class PartitioningSuite extends FunSuite {
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)

View file

@ -1,12 +1,21 @@
package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import SparkContext._
class PipedRDDSuite extends FunSuite {
class PipedRDDSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
if(sc != null){
test("basic pipe") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("cat"))
@ -18,18 +27,16 @@ class PipedRDDSuite extends FunSuite {
assert(c(1) === "2")
assert(c(2) === "3")
assert(c(3) === "4")
test("pipe with env variable") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
val c = piped.collect()
assert(c.size === 2)
assert(c(0) === "LALALA")
assert(c(1) === "LALALA")

View file

@ -2,11 +2,21 @@ package spark
import scala.collection.mutable.HashMap
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import SparkContext._
class RDDSuite extends FunSuite {
class RDDSuite extends FunSuite with BeforeAndAfter{
var sc: SparkContext = _
if(sc != null){
test("basic operations") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
assert(nums.collect().toList === List(1, 2, 3, 4))
assert(nums.reduce(_ + _) === 10)
@ -18,11 +28,10 @@ class RDDSuite extends FunSuite {
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
assert(partitionSums.collect().toList === List(3, 7))
test("aggregate") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
type StringMap = HashMap[String, Int]
val emptyMap = new StringMap {
@ -40,6 +49,5 @@ class RDDSuite extends FunSuite {
val result = pairs.aggregate(emptyMap)(mergeElement, mergeMaps)
assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))

View file

@ -1,6 +1,7 @@
package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import org.scalatest.prop.Checkers
import org.scalacheck.Arbitrary._
import org.scalacheck.Gen
@ -12,9 +13,18 @@ import scala.collection.mutable.ArrayBuffer
import SparkContext._
class ShuffleSuite extends FunSuite {
class ShuffleSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
if(sc != null){
test("groupByKey") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
@ -22,11 +32,10 @@ class ShuffleSuite extends FunSuite {
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with duplicates") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
@ -34,11 +43,10 @@ class ShuffleSuite extends FunSuite {
assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with negative key hash codes") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
val groups = pairs.groupByKey().collect()
assert(groups.size === 2)
@ -46,11 +54,10 @@ class ShuffleSuite extends FunSuite {
assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("groupByKey with many output partitions") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
val groups = pairs.groupByKey(10).collect()
assert(groups.size === 2)
@ -58,37 +65,33 @@ class ShuffleSuite extends FunSuite {
assert(valuesFor1.toList.sorted === List(1, 2, 3))
val valuesFor2 = groups.find(_._1 == 2).get._2
assert(valuesFor2.toList.sorted === List(1))
test("reduceByKey") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
test("reduceByKey with collectAsMap") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_).collectAsMap()
assert(sums.size === 2)
assert(sums(1) === 7)
assert(sums(2) === 1)
test("reduceByKey with many output partitons") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
val sums = pairs.reduceByKey(_+_, 10).collect()
assert(sums.toSet === Set((1, 7), (2, 1)))
test("join") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2).collect()
@ -99,11 +102,10 @@ class ShuffleSuite extends FunSuite {
(2, (1, 'y')),
(2, (1, 'z'))
test("join all-to-all") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
val joined = rdd1.join(rdd2).collect()
@ -116,11 +118,10 @@ class ShuffleSuite extends FunSuite {
(1, (3, 'x')),
(1, (3, 'y'))
test("leftOuterJoin") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.leftOuterJoin(rdd2).collect()
@ -132,11 +133,10 @@ class ShuffleSuite extends FunSuite {
(2, (1, Some('z'))),
(3, (1, None))
test("rightOuterJoin") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.rightOuterJoin(rdd2).collect()
@ -148,20 +148,18 @@ class ShuffleSuite extends FunSuite {
(2, (Some(1), 'z')),
(4, (None, 'w'))
test("join with no matches") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
val joined = rdd1.join(rdd2).collect()
assert(joined.size === 0)
test("join with many output partitions") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.join(rdd2, 10).collect()
@ -172,11 +170,10 @@ class ShuffleSuite extends FunSuite {
(2, (1, 'y')),
(2, (1, 'z'))
test("groupWith") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
val joined = rdd1.groupWith(rdd2).collect()
@ -187,17 +184,15 @@ class ShuffleSuite extends FunSuite {
(3, (ArrayBuffer(1), ArrayBuffer())),
(4, (ArrayBuffer(), ArrayBuffer('w')))
test("zero-partition RDD") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val emptyDir = Files.createTempDir()
val file = sc.textFile(emptyDir.getAbsolutePath)
assert(file.splits.size == 0)
assert(file.collect().toList === Nil)
// Test that a shuffle on the file works, because this used to be a bug
assert( => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
assert( => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)

View file

@ -1,50 +1,55 @@
package spark
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import SparkContext._
class SortingSuite extends FunSuite {
test("sortByKey") {
val sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)))
assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
class SortingSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
if(sc != null){
test("sortByKey") {
sc = new SparkContext("local", "test")
val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)))
assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
test("sortLargeArray") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr)
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
test("sortDescending") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr)
assert(pairs.sortByKey(false).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
test("morePartitionsThanElements") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) }
val pairs = sc.parallelize(pairArr, 30)
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
test("emptyRDD") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val rand = new scala.util.Random()
val pairArr = new Array[(Int, Int)](0)
val pairs = sc.parallelize(pairArr)
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))

View file

@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import org.scalatest.FunSuite
import org.scalatest.BeforeAndAfter
import SparkContext._
@ -21,9 +22,19 @@ object ThreadingSuiteState {
class ThreadingSuite extends FunSuite {
class ThreadingSuite extends FunSuite with BeforeAndAfter {
var sc: SparkContext = _
if(sc != null){
test("accessing SparkContext form a different thread") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
val sem = new Semaphore(0)
@volatile var answer1: Int = 0
@ -38,11 +49,10 @@ class ThreadingSuite extends FunSuite {
assert(answer1 === 55)
assert(answer2 === 1)
test("accessing SparkContext form multiple threads") {
val sc = new SparkContext("local", "test")
sc = new SparkContext("local", "test")
val nums = sc.parallelize(1 to 10, 2)
val sem = new Semaphore(0)
@volatile var ok = true
@ -67,11 +77,10 @@ class ThreadingSuite extends FunSuite {
if (!ok) {
fail("One or more threads got the wrong answer from an RDD operation")
test("accessing multi-threaded SparkContext form multiple threads") {
val sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test")
val nums = sc.parallelize(1 to 10, 2)
val sem = new Semaphore(0)
@volatile var ok = true
@ -96,13 +105,12 @@ class ThreadingSuite extends FunSuite {
if (!ok) {
fail("One or more threads got the wrong answer from an RDD operation")
test("parallel job execution") {
// This test launches two jobs with two threads each on a 4-core local cluster. Each thread
// waits until there are 4 threads running at once, to test that both jobs have been launched.
val sc = new SparkContext("local[4]", "test")
sc = new SparkContext("local[4]", "test")
val nums = sc.parallelize(1 to 2, 2)
val sem = new Semaphore(0)
@ -132,6 +140,5 @@ class ThreadingSuite extends FunSuite {
if (ThreadingSuiteState.failed.get()) {
fail("One or more threads didn't see runningThreads = 4")