Always destroy SparkContext in after block for the unit tests.
This commit is contained in:
parent
4259d37f84
commit
5559608e6f
|
@ -1,6 +1,6 @@
|
|||
package spark.bagel
|
||||
|
||||
import org.scalatest.{FunSuite, Assertions}
|
||||
import org.scalatest.{FunSuite, Assertions, BeforeAndAfter}
|
||||
import org.scalatest.prop.Checkers
|
||||
import org.scalacheck.Arbitrary._
|
||||
import org.scalacheck.Gen
|
||||
|
@ -13,9 +13,16 @@ import spark._
|
|||
class TestVertex(val active: Boolean, val age: Int) extends Vertex with Serializable
|
||||
class TestMessage(val targetId: String) extends Message[String] with Serializable
|
||||
|
||||
class BagelSuite extends FunSuite with Assertions {
|
||||
class BagelSuite extends FunSuite with Assertions with BeforeAndAfter{
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("halting by voting") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(true, 0))))
|
||||
val msgs = sc.parallelize(Array[(String, TestMessage)]())
|
||||
val numSupersteps = 5
|
||||
|
@ -26,11 +33,10 @@ class BagelSuite extends FunSuite with Assertions {
|
|||
}
|
||||
for ((id, vert) <- result.collect)
|
||||
assert(vert.age === numSupersteps)
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("halting by message silence") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val verts = sc.parallelize(Array("a", "b", "c", "d").map(id => (id, new TestVertex(false, 0))))
|
||||
val msgs = sc.parallelize(Array("a" -> new TestMessage("a")))
|
||||
val numSupersteps = 5
|
||||
|
@ -48,6 +54,5 @@ class BagelSuite extends FunSuite with Assertions {
|
|||
}
|
||||
for ((id, vert) <- result.collect)
|
||||
assert(vert.age === numSupersteps)
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,23 +1,31 @@
|
|||
package spark
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
|
||||
class BroadcastSuite extends FunSuite {
|
||||
class BroadcastSuite extends FunSuite with BeforeAndAfter {
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
if(sc != null){
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
test("basic broadcast") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val list = List(1, 2, 3, 4)
|
||||
val listBroadcast = sc.broadcast(list)
|
||||
val results = sc.parallelize(1 to 2).map(x => (x, listBroadcast.value.sum))
|
||||
assert(results.collect.toSet === Set((1, 10), (2, 10)))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("broadcast variables accessed in multiple threads") {
|
||||
val sc = new SparkContext("local[10]", "test")
|
||||
sc = new SparkContext("local[10]", "test")
|
||||
val list = List(1, 2, 3, 4)
|
||||
val listBroadcast = sc.broadcast(list)
|
||||
val results = sc.parallelize(1 to 10).map(x => (x, listBroadcast.value.sum))
|
||||
assert(results.collect.toSet === (1 to 10).map(x => (x, 10)).toSet)
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package spark
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import org.scalatest.prop.Checkers
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
@ -20,11 +21,20 @@ object FailureSuiteState {
|
|||
}
|
||||
}
|
||||
|
||||
class FailureSuite extends FunSuite {
|
||||
class FailureSuite extends FunSuite with BeforeAndAfter {
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
if(sc != null){
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
// Run a 3-task map job in which task 1 deterministically fails once, and check
|
||||
// whether the job completes successfully and we ran 4 tasks in total.
|
||||
test("failure in a single-stage job") {
|
||||
val sc = new SparkContext("local[1,1]", "test")
|
||||
sc = new SparkContext("local[1,1]", "test")
|
||||
val results = sc.makeRDD(1 to 3, 3).map { x =>
|
||||
FailureSuiteState.synchronized {
|
||||
FailureSuiteState.tasksRun += 1
|
||||
|
@ -39,13 +49,12 @@ class FailureSuite extends FunSuite {
|
|||
assert(FailureSuiteState.tasksRun === 4)
|
||||
}
|
||||
assert(results.toList === List(1,4,9))
|
||||
sc.stop()
|
||||
FailureSuiteState.clear()
|
||||
}
|
||||
|
||||
// Run a map-reduce job in which a reduce task deterministically fails once.
|
||||
test("failure in a two-stage job") {
|
||||
val sc = new SparkContext("local[1,1]", "test")
|
||||
sc = new SparkContext("local[1,1]", "test")
|
||||
val results = sc.makeRDD(1 to 3).map(x => (x, x)).groupByKey(3).map {
|
||||
case (k, v) =>
|
||||
FailureSuiteState.synchronized {
|
||||
|
@ -61,12 +70,11 @@ class FailureSuite extends FunSuite {
|
|||
assert(FailureSuiteState.tasksRun === 4)
|
||||
}
|
||||
assert(results.toSet === Set((1, 1), (2, 4), (3, 9)))
|
||||
sc.stop()
|
||||
FailureSuiteState.clear()
|
||||
}
|
||||
|
||||
test("failure because task results are not serializable") {
|
||||
val sc = new SparkContext("local[1,1]", "test")
|
||||
sc = new SparkContext("local[1,1]", "test")
|
||||
val results = sc.makeRDD(1 to 3).map(x => new NonSerializable)
|
||||
|
||||
val thrown = intercept[spark.SparkException] {
|
||||
|
@ -75,7 +83,6 @@ class FailureSuite extends FunSuite {
|
|||
assert(thrown.getClass === classOf[spark.SparkException])
|
||||
assert(thrown.getMessage.contains("NotSerializableException"))
|
||||
|
||||
sc.stop()
|
||||
FailureSuiteState.clear()
|
||||
}
|
||||
|
||||
|
|
|
@ -6,13 +6,23 @@ import scala.io.Source
|
|||
|
||||
import com.google.common.io.Files
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import org.apache.hadoop.io._
|
||||
|
||||
import SparkContext._
|
||||
|
||||
class FileSuite extends FunSuite {
|
||||
class FileSuite extends FunSuite with BeforeAndAfter{
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
if(sc != null){
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
test("text files") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 4)
|
||||
|
@ -23,11 +33,10 @@ class FileSuite extends FunSuite {
|
|||
assert(content === "1\n2\n3\n4\n")
|
||||
// Also try reading it in as a text file RDD
|
||||
assert(sc.textFile(outputDir).collect().toList === List("1", "2", "3", "4"))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("SequenceFiles") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
|
||||
|
@ -35,11 +44,10 @@ class FileSuite extends FunSuite {
|
|||
// Try reading the output back as a SequenceFile
|
||||
val output = sc.sequenceFile[IntWritable, Text](outputDir)
|
||||
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("SequenceFile with writable key") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), "a" * x))
|
||||
|
@ -47,11 +55,10 @@ class FileSuite extends FunSuite {
|
|||
// Try reading the output back as a SequenceFile
|
||||
val output = sc.sequenceFile[IntWritable, Text](outputDir)
|
||||
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("SequenceFile with writable value") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 3).map(x => (x, new Text("a" * x)))
|
||||
|
@ -59,11 +66,10 @@ class FileSuite extends FunSuite {
|
|||
// Try reading the output back as a SequenceFile
|
||||
val output = sc.sequenceFile[IntWritable, Text](outputDir)
|
||||
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("SequenceFile with writable key and value") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
|
||||
|
@ -71,11 +77,10 @@ class FileSuite extends FunSuite {
|
|||
// Try reading the output back as a SequenceFile
|
||||
val output = sc.sequenceFile[IntWritable, Text](outputDir)
|
||||
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("implicit conversions in reading SequenceFiles") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x)) // (1,a), (2,aa), (3,aaa)
|
||||
|
@ -89,11 +94,10 @@ class FileSuite extends FunSuite {
|
|||
assert(output2.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
|
||||
val output3 = sc.sequenceFile[IntWritable, String](outputDir)
|
||||
assert(output3.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("object files of ints") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 4)
|
||||
|
@ -101,11 +105,10 @@ class FileSuite extends FunSuite {
|
|||
// Try reading the output back as an object file
|
||||
val output = sc.objectFile[Int](outputDir)
|
||||
assert(output.collect().toList === List(1, 2, 3, 4))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("object files of complex types") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 3).map(x => (x, "a" * x))
|
||||
|
@ -113,12 +116,11 @@ class FileSuite extends FunSuite {
|
|||
// Try reading the output back as an object file
|
||||
val output = sc.objectFile[(Int, String)](outputDir)
|
||||
assert(output.collect().toList === List((1, "a"), (2, "aa"), (3, "aaa")))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("write SequenceFile using new Hadoop API") {
|
||||
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
|
||||
|
@ -126,12 +128,11 @@ class FileSuite extends FunSuite {
|
|||
outputDir)
|
||||
val output = sc.sequenceFile[IntWritable, Text](outputDir)
|
||||
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("read SequenceFile using new Hadoop API") {
|
||||
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val tempDir = Files.createTempDir()
|
||||
val outputDir = new File(tempDir, "output").getAbsolutePath
|
||||
val nums = sc.makeRDD(1 to 3).map(x => (new IntWritable(x), new Text("a" * x)))
|
||||
|
@ -139,6 +140,5 @@ class FileSuite extends FunSuite {
|
|||
val output =
|
||||
sc.newAPIHadoopFile[IntWritable, Text, SequenceFileInputFormat[IntWritable, Text]](outputDir)
|
||||
assert(output.map(_.toString).collect().toList === List("(1,a)", "(2,aa)", "(3,aaa)"))
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,7 +8,8 @@ import com.esotericsoftware.kryo._
|
|||
|
||||
import SparkContext._
|
||||
|
||||
class KryoSerializerSuite extends FunSuite {
|
||||
class KryoSerializerSuite extends FunSuite{
|
||||
|
||||
test("basic types") {
|
||||
val ser = (new KryoSerializer).newInstance()
|
||||
def check[T](t: T): Unit =
|
||||
|
|
|
@ -1,12 +1,23 @@
|
|||
package spark
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
|
||||
import scala.collection.mutable.ArrayBuffer
|
||||
|
||||
import SparkContext._
|
||||
|
||||
class PartitioningSuite extends FunSuite {
|
||||
class PartitioningSuite extends FunSuite with BeforeAndAfter {
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
if(sc != null){
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
test("HashPartitioner equality") {
|
||||
val p2 = new HashPartitioner(2)
|
||||
val p4 = new HashPartitioner(4)
|
||||
|
@ -20,7 +31,7 @@ class PartitioningSuite extends FunSuite {
|
|||
}
|
||||
|
||||
test("RangePartitioner equality") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
|
||||
// Make an RDD where all the elements are the same so that the partition range bounds
|
||||
// are deterministically all the same.
|
||||
|
@ -46,12 +57,10 @@ class PartitioningSuite extends FunSuite {
|
|||
assert(p4 != descendingP4)
|
||||
assert(descendingP2 != p2)
|
||||
assert(descendingP4 != p4)
|
||||
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("HashPartitioner not equal to RangePartitioner") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd = sc.parallelize(1 to 10).map(x => (x, x))
|
||||
val rangeP2 = new RangePartitioner(2, rdd)
|
||||
val hashP2 = new HashPartitioner(2)
|
||||
|
@ -59,11 +68,10 @@ class PartitioningSuite extends FunSuite {
|
|||
assert(hashP2 === hashP2)
|
||||
assert(hashP2 != rangeP2)
|
||||
assert(rangeP2 != hashP2)
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("partitioner preservation") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
|
||||
val rdd = sc.parallelize(1 to 10, 4).map(x => (x, x))
|
||||
|
||||
|
@ -95,7 +103,5 @@ class PartitioningSuite extends FunSuite {
|
|||
assert(grouped2.leftOuterJoin(reduced2).partitioner === grouped2.partitioner)
|
||||
assert(grouped2.rightOuterJoin(reduced2).partitioner === grouped2.partitioner)
|
||||
assert(grouped2.cogroup(reduced2).partitioner === grouped2.partitioner)
|
||||
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,12 +1,21 @@
|
|||
package spark
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import SparkContext._
|
||||
|
||||
class PipedRDDSuite extends FunSuite {
|
||||
|
||||
class PipedRDDSuite extends FunSuite with BeforeAndAfter {
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
if(sc != null){
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
test("basic pipe") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
|
||||
|
||||
val piped = nums.pipe(Seq("cat"))
|
||||
|
@ -18,18 +27,16 @@ class PipedRDDSuite extends FunSuite {
|
|||
assert(c(1) === "2")
|
||||
assert(c(2) === "3")
|
||||
assert(c(3) === "4")
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("pipe with env variable") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
|
||||
val piped = nums.pipe(Seq("printenv", "MY_TEST_ENV"), Map("MY_TEST_ENV" -> "LALALA"))
|
||||
val c = piped.collect()
|
||||
assert(c.size === 2)
|
||||
assert(c(0) === "LALALA")
|
||||
assert(c(1) === "LALALA")
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2,11 +2,21 @@ package spark
|
|||
|
||||
import scala.collection.mutable.HashMap
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import SparkContext._
|
||||
|
||||
class RDDSuite extends FunSuite {
|
||||
class RDDSuite extends FunSuite with BeforeAndAfter{
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
if(sc != null){
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
test("basic operations") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val nums = sc.makeRDD(Array(1, 2, 3, 4), 2)
|
||||
assert(nums.collect().toList === List(1, 2, 3, 4))
|
||||
assert(nums.reduce(_ + _) === 10)
|
||||
|
@ -18,11 +28,10 @@ class RDDSuite extends FunSuite {
|
|||
assert(nums.glom().map(_.toList).collect().toList === List(List(1, 2), List(3, 4)))
|
||||
val partitionSums = nums.mapPartitions(iter => Iterator(iter.reduceLeft(_ + _)))
|
||||
assert(partitionSums.collect().toList === List(3, 7))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("aggregate") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
|
||||
type StringMap = HashMap[String, Int]
|
||||
val emptyMap = new StringMap {
|
||||
|
@ -40,6 +49,5 @@ class RDDSuite extends FunSuite {
|
|||
}
|
||||
val result = pairs.aggregate(emptyMap)(mergeElement, mergeMaps)
|
||||
assert(result.toSet === Set(("a", 6), ("b", 2), ("c", 5)))
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
package spark
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import org.scalatest.prop.Checkers
|
||||
import org.scalacheck.Arbitrary._
|
||||
import org.scalacheck.Gen
|
||||
|
@ -12,9 +13,18 @@ import scala.collection.mutable.ArrayBuffer
|
|||
|
||||
import SparkContext._
|
||||
|
||||
class ShuffleSuite extends FunSuite {
|
||||
class ShuffleSuite extends FunSuite with BeforeAndAfter {
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
if(sc != null){
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
test("groupByKey") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
|
||||
val groups = pairs.groupByKey().collect()
|
||||
assert(groups.size === 2)
|
||||
|
@ -22,11 +32,10 @@ class ShuffleSuite extends FunSuite {
|
|||
assert(valuesFor1.toList.sorted === List(1, 2, 3))
|
||||
val valuesFor2 = groups.find(_._1 == 2).get._2
|
||||
assert(valuesFor2.toList.sorted === List(1))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("groupByKey with duplicates") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
||||
val groups = pairs.groupByKey().collect()
|
||||
assert(groups.size === 2)
|
||||
|
@ -34,11 +43,10 @@ class ShuffleSuite extends FunSuite {
|
|||
assert(valuesFor1.toList.sorted === List(1, 1, 2, 3))
|
||||
val valuesFor2 = groups.find(_._1 == 2).get._2
|
||||
assert(valuesFor2.toList.sorted === List(1))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("groupByKey with negative key hash codes") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1)))
|
||||
val groups = pairs.groupByKey().collect()
|
||||
assert(groups.size === 2)
|
||||
|
@ -46,11 +54,10 @@ class ShuffleSuite extends FunSuite {
|
|||
assert(valuesForMinus1.toList.sorted === List(1, 2, 3))
|
||||
val valuesFor2 = groups.find(_._1 == 2).get._2
|
||||
assert(valuesFor2.toList.sorted === List(1))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("groupByKey with many output partitions") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)))
|
||||
val groups = pairs.groupByKey(10).collect()
|
||||
assert(groups.size === 2)
|
||||
|
@ -58,37 +65,33 @@ class ShuffleSuite extends FunSuite {
|
|||
assert(valuesFor1.toList.sorted === List(1, 2, 3))
|
||||
val valuesFor2 = groups.find(_._1 == 2).get._2
|
||||
assert(valuesFor2.toList.sorted === List(1))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("reduceByKey") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
||||
val sums = pairs.reduceByKey(_+_).collect()
|
||||
assert(sums.toSet === Set((1, 7), (2, 1)))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("reduceByKey with collectAsMap") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
||||
val sums = pairs.reduceByKey(_+_).collectAsMap()
|
||||
assert(sums.size === 2)
|
||||
assert(sums(1) === 7)
|
||||
assert(sums(2) === 1)
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("reduceByKey with many output partitons") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1)))
|
||||
val sums = pairs.reduceByKey(_+_, 10).collect()
|
||||
assert(sums.toSet === Set((1, 7), (2, 1)))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("join") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
||||
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
||||
val joined = rdd1.join(rdd2).collect()
|
||||
|
@ -99,11 +102,10 @@ class ShuffleSuite extends FunSuite {
|
|||
(2, (1, 'y')),
|
||||
(2, (1, 'z'))
|
||||
))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("join all-to-all") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3)))
|
||||
val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y')))
|
||||
val joined = rdd1.join(rdd2).collect()
|
||||
|
@ -116,11 +118,10 @@ class ShuffleSuite extends FunSuite {
|
|||
(1, (3, 'x')),
|
||||
(1, (3, 'y'))
|
||||
))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("leftOuterJoin") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
||||
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
||||
val joined = rdd1.leftOuterJoin(rdd2).collect()
|
||||
|
@ -132,11 +133,10 @@ class ShuffleSuite extends FunSuite {
|
|||
(2, (1, Some('z'))),
|
||||
(3, (1, None))
|
||||
))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("rightOuterJoin") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
||||
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
||||
val joined = rdd1.rightOuterJoin(rdd2).collect()
|
||||
|
@ -148,20 +148,18 @@ class ShuffleSuite extends FunSuite {
|
|||
(2, (Some(1), 'z')),
|
||||
(4, (None, 'w'))
|
||||
))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("join with no matches") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
||||
val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w')))
|
||||
val joined = rdd1.join(rdd2).collect()
|
||||
assert(joined.size === 0)
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("join with many output partitions") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
||||
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
||||
val joined = rdd1.join(rdd2, 10).collect()
|
||||
|
@ -172,11 +170,10 @@ class ShuffleSuite extends FunSuite {
|
|||
(2, (1, 'y')),
|
||||
(2, (1, 'z'))
|
||||
))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("groupWith") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)))
|
||||
val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w')))
|
||||
val joined = rdd1.groupWith(rdd2).collect()
|
||||
|
@ -187,17 +184,15 @@ class ShuffleSuite extends FunSuite {
|
|||
(3, (ArrayBuffer(1), ArrayBuffer())),
|
||||
(4, (ArrayBuffer(), ArrayBuffer('w')))
|
||||
))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("zero-partition RDD") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val emptyDir = Files.createTempDir()
|
||||
val file = sc.textFile(emptyDir.getAbsolutePath)
|
||||
assert(file.splits.size == 0)
|
||||
assert(file.collect().toList === Nil)
|
||||
// Test that a shuffle on the file works, because this used to be a bug
|
||||
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
|
||||
sc.stop()
|
||||
assert(file.map(line => (line, 1)).reduceByKey(_ + _).collect().toList === Nil)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,50 +1,55 @@
|
|||
package spark
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
import SparkContext._
|
||||
|
||||
class SortingSuite extends FunSuite {
|
||||
test("sortByKey") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)))
|
||||
assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
|
||||
class SortingSuite extends FunSuite with BeforeAndAfter {
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
if(sc != null){
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
test("sortByKey") {
|
||||
sc = new SparkContext("local", "test")
|
||||
val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)))
|
||||
assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0)))
|
||||
}
|
||||
|
||||
test("sortLargeArray") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rand = new scala.util.Random()
|
||||
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
|
||||
val pairs = sc.parallelize(pairArr)
|
||||
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("sortDescending") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rand = new scala.util.Random()
|
||||
val pairArr = Array.fill(1000) { (rand.nextInt(), rand.nextInt()) }
|
||||
val pairs = sc.parallelize(pairArr)
|
||||
assert(pairs.sortByKey(false).collect() === pairArr.sortWith((x, y) => x._1 > y._1))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("morePartitionsThanElements") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rand = new scala.util.Random()
|
||||
val pairArr = Array.fill(10) { (rand.nextInt(), rand.nextInt()) }
|
||||
val pairs = sc.parallelize(pairArr, 30)
|
||||
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("emptyRDD") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val rand = new scala.util.Random()
|
||||
val pairArr = new Array[(Int, Int)](0)
|
||||
val pairs = sc.parallelize(pairArr)
|
||||
assert(pairs.sortByKey().collect() === pairArr.sortBy(_._1))
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -5,6 +5,7 @@ import java.util.concurrent.atomic.AtomicBoolean
|
|||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
import org.scalatest.FunSuite
|
||||
import org.scalatest.BeforeAndAfter
|
||||
|
||||
import SparkContext._
|
||||
|
||||
|
@ -21,9 +22,19 @@ object ThreadingSuiteState {
|
|||
}
|
||||
}
|
||||
|
||||
class ThreadingSuite extends FunSuite {
|
||||
class ThreadingSuite extends FunSuite with BeforeAndAfter {
|
||||
|
||||
var sc: SparkContext = _
|
||||
|
||||
after{
|
||||
if(sc != null){
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
test("accessing SparkContext form a different thread") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val nums = sc.parallelize(1 to 10, 2)
|
||||
val sem = new Semaphore(0)
|
||||
@volatile var answer1: Int = 0
|
||||
|
@ -38,11 +49,10 @@ class ThreadingSuite extends FunSuite {
|
|||
sem.acquire()
|
||||
assert(answer1 === 55)
|
||||
assert(answer2 === 1)
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("accessing SparkContext form multiple threads") {
|
||||
val sc = new SparkContext("local", "test")
|
||||
sc = new SparkContext("local", "test")
|
||||
val nums = sc.parallelize(1 to 10, 2)
|
||||
val sem = new Semaphore(0)
|
||||
@volatile var ok = true
|
||||
|
@ -67,11 +77,10 @@ class ThreadingSuite extends FunSuite {
|
|||
if (!ok) {
|
||||
fail("One or more threads got the wrong answer from an RDD operation")
|
||||
}
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("accessing multi-threaded SparkContext form multiple threads") {
|
||||
val sc = new SparkContext("local[4]", "test")
|
||||
sc = new SparkContext("local[4]", "test")
|
||||
val nums = sc.parallelize(1 to 10, 2)
|
||||
val sem = new Semaphore(0)
|
||||
@volatile var ok = true
|
||||
|
@ -96,13 +105,12 @@ class ThreadingSuite extends FunSuite {
|
|||
if (!ok) {
|
||||
fail("One or more threads got the wrong answer from an RDD operation")
|
||||
}
|
||||
sc.stop()
|
||||
}
|
||||
|
||||
test("parallel job execution") {
|
||||
// This test launches two jobs with two threads each on a 4-core local cluster. Each thread
|
||||
// waits until there are 4 threads running at once, to test that both jobs have been launched.
|
||||
val sc = new SparkContext("local[4]", "test")
|
||||
sc = new SparkContext("local[4]", "test")
|
||||
val nums = sc.parallelize(1 to 2, 2)
|
||||
val sem = new Semaphore(0)
|
||||
ThreadingSuiteState.clear()
|
||||
|
@ -132,6 +140,5 @@ class ThreadingSuite extends FunSuite {
|
|||
if (ThreadingSuiteState.failed.get()) {
|
||||
fail("One or more threads didn't see runningThreads = 4")
|
||||
}
|
||||
sc.stop()
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue