diff --git a/core/src/test/scala/org/apache/spark/DistributedSuite.scala b/core/src/test/scala/org/apache/spark/DistributedSuite.scala index f1c2bc0677..8173a8e545 100644 --- a/core/src/test/scala/org/apache/spark/DistributedSuite.scala +++ b/core/src/test/scala/org/apache/spark/DistributedSuite.scala @@ -77,7 +77,7 @@ class DistributedSuite extends SparkFunSuite with Matchers with LocalSparkContex test("simple groupByKey") { sc = new SparkContext(clusterUrl, "test") - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 5) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1)), 5) val groups = pairs.groupByKey(5).collect() assert(groups.size === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index c7ea195cc9..3446c03973 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -372,7 +372,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("prevent user from overwriting the empty directory (old Hadoop API)") { sc = new SparkContext("local", "test") - val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + val randomRDD = sc.parallelize(Seq((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) intercept[FileAlreadyExistsException] { randomRDD.saveAsTextFile(tempDir.getPath) } @@ -380,7 +380,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("prevent user from overwriting the non-empty directory (old Hadoop API)") { sc = new SparkContext("local", "test") - val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + val randomRDD = sc.parallelize(Seq((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) randomRDD.saveAsTextFile(tempDir.getPath + "/output") assert(new File(tempDir.getPath + "/output/part-00000").exists()) intercept[FileAlreadyExistsException] { @@ -392,7 +392,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { val conf = new SparkConf() conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") sc = new SparkContext(conf) - val randomRDD = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) + val randomRDD = sc.parallelize(Seq((1, "a"), (1, "a"), (2, "b"), (3, "c")), 1) randomRDD.saveAsTextFile(tempDir.getPath + "/output") assert(new File(tempDir.getPath + "/output/part-00000").exists()) randomRDD.saveAsTextFile(tempDir.getPath + "/output") diff --git a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala index ca527ce6e1..1a3259c707 100644 --- a/core/src/test/scala/org/apache/spark/PartitioningSuite.scala +++ b/core/src/test/scala/org/apache/spark/PartitioningSuite.scala @@ -262,11 +262,11 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva test("defaultPartitioner") { val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150) - val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))) + val rdd2 = sc.parallelize(Seq((1, 2), (2, 3), (2, 4), (3, 4))) .partitionBy(new HashPartitioner(10)) - val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14))) + val rdd3 = sc.parallelize(Seq((1, 6), (7, 8), (3, 10), (5, 12), (13, 14))) .partitionBy(new HashPartitioner(100)) - val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))) + val rdd4 = sc.parallelize(Seq((1, 2), (2, 3), (2, 4), (3, 4))) .partitionBy(new HashPartitioner(9)) val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11) @@ -289,14 +289,14 @@ class PartitioningSuite extends SparkFunSuite with SharedSparkContext with Priva sc.conf.set("spark.default.parallelism", "4") val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 150) - val rdd2 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))) + val rdd2 = sc.parallelize(Seq((1, 2), (2, 3), (2, 4), (3, 4))) .partitionBy(new HashPartitioner(10)) - val rdd3 = sc.parallelize(Array((1, 6), (7, 8), (3, 10), (5, 12), (13, 14))) + val rdd3 = sc.parallelize(Seq((1, 6), (7, 8), (3, 10), (5, 12), (13, 14))) .partitionBy(new HashPartitioner(100)) - val rdd4 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))) + val rdd4 = sc.parallelize(Seq((1, 2), (2, 3), (2, 4), (3, 4))) .partitionBy(new HashPartitioner(9)) val rdd5 = sc.parallelize((1 to 10).map(x => (x, x)), 11) - val rdd6 = sc.parallelize(Array((1, 2), (2, 3), (2, 4), (3, 4))) + val rdd6 = sc.parallelize(Seq((1, 2), (2, 3), (2, 4), (3, 4))) .partitionBy(new HashPartitioner(3)) val partitioner1 = Partitioner.defaultPartitioner(rdd1, rdd2) diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index c652f879cc..9e39271bdf 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -44,7 +44,7 @@ abstract class ShuffleSuite extends SparkFunSuite with Matchers with LocalSparkC test("groupByKey without compression") { val myConf = conf.clone().set(config.SHUFFLE_COMPRESS, false) sc = new SparkContext("local", "test", myConf) - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1)), 4) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1)), 4) val groups = pairs.groupByKey(4).collect() assert(groups.size === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 01fe170073..135cfffa1a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -38,7 +38,7 @@ import org.apache.spark.util.Utils class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("aggregateByKey") { - val pairs = sc.parallelize(Array((1, 1), (1, 1), (3, 2), (5, 1), (5, 3)), 2) + val pairs = sc.parallelize(Seq((1, 1), (1, 1), (3, 2), (5, 1), (5, 3)), 2) val sets = pairs.aggregateByKey(new HashSet[Int]())(_ += _, _ ++= _).collect() assert(sets.size === 3) @@ -51,7 +51,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("groupByKey") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1))) val groups = pairs.groupByKey().collect() assert(groups.size === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 @@ -61,7 +61,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("groupByKey with duplicates") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) val groups = pairs.groupByKey().collect() assert(groups.size === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 @@ -71,7 +71,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("groupByKey with negative key hash codes") { - val pairs = sc.parallelize(Array((-1, 1), (-1, 2), (-1, 3), (2, 1))) + val pairs = sc.parallelize(Seq((-1, 1), (-1, 2), (-1, 3), (2, 1))) val groups = pairs.groupByKey().collect() assert(groups.size === 2) val valuesForMinus1 = groups.find(_._1 == -1).get._2 @@ -81,7 +81,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("groupByKey with many output partitions") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (2, 1))) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (2, 1))) val groups = pairs.groupByKey(10).collect() assert(groups.size === 2) val valuesFor1 = groups.find(_._1 == 1).get._2 @@ -170,13 +170,13 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("reduceByKey") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) val sums = pairs.reduceByKey(_ + _).collect() assert(sums.toSet === Set((1, 7), (2, 1))) } test("reduceByKey with collectAsMap") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) val sums = pairs.reduceByKey(_ + _).collectAsMap() assert(sums.size === 2) assert(sums(1) === 7) @@ -184,7 +184,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("reduceByKey with many output partitions") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) val sums = pairs.reduceByKey(_ + _, 10).collect() assert(sums.toSet === Set((1, 7), (2, 1))) } @@ -194,7 +194,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { def numPartitions = 2 def getPartition(key: Any) = key.asInstanceOf[Int] } - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 1), (0, 1))).partitionBy(p) val sums = pairs.reduceByKey(_ + _) assert(sums.collect().toSet === Set((1, 4), (0, 1))) assert(sums.partitioner === Some(p)) @@ -246,8 +246,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("join") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.join(rdd2).collect() assert(joined.size === 4) assert(joined.toSet === Set( @@ -259,8 +259,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("join all-to-all") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (1, 3))) - val rdd2 = sc.parallelize(Array((1, 'x'), (1, 'y'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (1, 3))) + val rdd2 = sc.parallelize(Seq((1, 'x'), (1, 'y'))) val joined = rdd1.join(rdd2).collect() assert(joined.size === 6) assert(joined.toSet === Set( @@ -274,8 +274,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("leftOuterJoin") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.leftOuterJoin(rdd2).collect() assert(joined.size === 5) assert(joined.toSet === Set( @@ -292,7 +292,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { import scala.reflect.classTag val intPairCT = classTag[(Int, Int)] - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.emptyRDD[(Int, Int)](intPairCT) val joined = rdd1.cogroup(rdd2).collect() @@ -304,7 +304,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { import scala.reflect.classTag val intCT = classTag[Int] - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) val rdd2 = sc.emptyRDD[Int](intCT).groupBy((x) => 5) val joined = rdd1.cogroup(rdd2).collect() assert(joined.size > 0) @@ -315,7 +315,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { "with an order of magnitude difference in number of partitions") { val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000) val rdd2 = sc - .parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + .parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) .partitionBy(new HashPartitioner(10)) val joined = rdd1.cogroup(rdd2) assert(joined.getNumPartitions == rdd1.getNumPartitions) @@ -325,7 +325,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("cogroup between multiple RDD with number of partitions similar in order of magnitude") { val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20) val rdd2 = sc - .parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + .parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) .partitionBy(new HashPartitioner(10)) val joined = rdd1.cogroup(rdd2) assert(joined.getNumPartitions == rdd2.getNumPartitions) @@ -336,7 +336,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { try { sc.conf.set("spark.default.parallelism", "4") val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20) - val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1)), 10) + val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1)), 10) val joined = rdd1.cogroup(rdd2) assert(joined.getNumPartitions == sc.defaultParallelism) } finally { @@ -349,7 +349,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { try { sc.conf.set("spark.default.parallelism", "4") val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 20) - val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) .partitionBy(new HashPartitioner(10)) val joined = rdd1.cogroup(rdd2) assert(joined.getNumPartitions == rdd2.getNumPartitions) @@ -364,7 +364,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { try { sc.conf.set("spark.default.parallelism", "4") val rdd1 = sc.parallelize((1 to 1000).map(x => (x, x)), 1000) - val rdd2 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) .partitionBy(new HashPartitioner(10)) val joined = rdd1.cogroup(rdd2) assert(joined.getNumPartitions == rdd2.getNumPartitions) @@ -374,8 +374,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("rightOuterJoin") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.rightOuterJoin(rdd2).collect() assert(joined.size === 5) assert(joined.toSet === Set( @@ -388,8 +388,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("fullOuterJoin") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.fullOuterJoin(rdd2).collect() assert(joined.size === 6) assert(joined.toSet === Set( @@ -403,15 +403,15 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("join with no matches") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((4, 'x'), (5, 'y'), (5, 'z'), (6, 'w'))) val joined = rdd1.join(rdd2).collect() assert(joined.size === 0) } test("join with many output partitions") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.join(rdd2, 10).collect() assert(joined.size === 4) assert(joined.toSet === Set( @@ -423,8 +423,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("groupWith") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) val joined = rdd1.groupWith(rdd2).collect() assert(joined.size === 4) val joinedSet = joined.map(x => (x._1, (x._2._1.toList, x._2._2.toList))).toSet @@ -437,9 +437,9 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("groupWith3") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd3 = sc.parallelize(Seq((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) val joined = rdd1.groupWith(rdd2, rdd3).collect() assert(joined.size === 4) val joinedSet = joined.map(x => (x._1, @@ -453,10 +453,10 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("groupWith4") { - val rdd1 = sc.parallelize(Array((1, 1), (1, 2), (2, 1), (3, 1))) - val rdd2 = sc.parallelize(Array((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) - val rdd3 = sc.parallelize(Array((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) - val rdd4 = sc.parallelize(Array((2, '@'))) + val rdd1 = sc.parallelize(Seq((1, 1), (1, 2), (2, 1), (3, 1))) + val rdd2 = sc.parallelize(Seq((1, 'x'), (2, 'y'), (2, 'z'), (4, 'w'))) + val rdd3 = sc.parallelize(Seq((1, 'a'), (3, 'b'), (4, 'c'), (4, 'd'))) + val rdd4 = sc.parallelize(Seq((2, '@'))) val joined = rdd1.groupWith(rdd2, rdd3, rdd4).collect() assert(joined.size === 4) val joinedSet = joined.map(x => (x._1, @@ -480,7 +480,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("keys and values") { - val rdd = sc.parallelize(Array((1, "a"), (2, "b"))) + val rdd = sc.parallelize(Seq((1, "a"), (2, "b"))) assert(rdd.keys.collect().toList === List(1, 2)) assert(rdd.values.collect().toList === List("a", "b")) } @@ -517,9 +517,9 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { def getPartition(key: Any) = key.asInstanceOf[Int] } // partitionBy so we have a narrow dependency - val a = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + val a = sc.parallelize(Seq((1, "a"), (2, "b"), (3, "c"))).partitionBy(p) // more partitions/no partitioner so a shuffle dependency - val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val b = sc.parallelize(Seq((2, "b"), (3, "cc"), (4, "d")), 4) val c = a.subtract(b) assert(c.collect().toSet === Set((1, "a"), (3, "c"))) // Ideally we could keep the original partitioner... @@ -527,8 +527,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("subtractByKey") { - val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) - val b = sc.parallelize(Array((2, 20), (3, 30), (4, 40)), 4) + val a = sc.parallelize(Seq((1, "a"), (1, "a"), (2, "b"), (3, "c")), 2) + val b = sc.parallelize(Seq((2, 20), (3, 30), (4, 40)), 4) val c = a.subtractByKey(b) assert(c.collect().toSet === Set((1, "a"), (1, "a"))) assert(c.partitions.size === a.partitions.size) @@ -541,22 +541,22 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { def getPartition(key: Any) = key.asInstanceOf[Int] } // partitionBy so we have a narrow dependency - val a = sc.parallelize(Array((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p) + val a = sc.parallelize(Seq((1, "a"), (1, "a"), (2, "b"), (3, "c"))).partitionBy(p) // more partitions/no partitioner so a shuffle dependency - val b = sc.parallelize(Array((2, "b"), (3, "cc"), (4, "d")), 4) + val b = sc.parallelize(Seq((2, "b"), (3, "cc"), (4, "d")), 4) val c = a.subtractByKey(b) assert(c.collect().toSet === Set((1, "a"), (1, "a"))) assert(c.partitioner.get === p) } test("foldByKey") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) val sums = pairs.foldByKey(0)(_ + _).collect() assert(sums.toSet === Set((1, 7), (2, 1))) } test("foldByKey with mutable result type") { - val pairs = sc.parallelize(Array((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) + val pairs = sc.parallelize(Seq((1, 1), (1, 2), (1, 3), (1, 1), (2, 1))) val bufs = pairs.mapValues(v => ArrayBuffer(v)).cache() // Fold the values using in-place mutation val sums = bufs.foldByKey(new ArrayBuffer[Int])(_ ++= _).collect() @@ -571,7 +571,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("saveNewAPIHadoopFile should call setConf if format is configurable") { - val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(1)))) + val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(1)))) // No error, non-configurable formats still work pairs.saveAsNewAPIHadoopFile[NewFakeFormat]("ignored") @@ -587,7 +587,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("The JobId on the driver and executors should be the same during the commit") { // Create more than one rdd to mimic stageId not equal to rddId - val pairs = sc.parallelize(Array((1, 2), (2, 3)), 2) + val pairs = sc.parallelize(Seq((1, 2), (2, 3)), 2) .map { p => (Integer.valueOf(p._1 + 1), Integer.valueOf(p._2 + 1)) } .filter { p => p._1 > 0 } pairs.saveAsNewAPIHadoopFile[YetAnotherFakeFormat]("ignored") @@ -595,7 +595,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("saveAsHadoopFile should respect configured output committers") { - val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(1)))) + val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(1)))) val conf = new JobConf() conf.setOutputCommitter(classOf[FakeOutputCommitter]) @@ -607,7 +607,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("failure callbacks should be called before calling writer.close() in saveNewAPIHadoopFile") { - val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(2))), 1) + val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(2))), 1) FakeWriterWithCallback.calledBy = "" FakeWriterWithCallback.exception = null @@ -622,7 +622,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("failure callbacks should be called before calling writer.close() in saveAsHadoopFile") { - val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(2))), 1) + val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(2))), 1) val conf = new JobConf() FakeWriterWithCallback.calledBy = "" @@ -640,7 +640,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("saveAsNewAPIHadoopDataset should support invalid output paths when " + "there are no files to be committed to an absolute output location") { - val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(2))), 1) + val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(2))), 1) def saveRddWithPath(path: String): Unit = { val job = NewJob.getInstance(new Configuration(sc.hadoopConfiguration)) @@ -668,7 +668,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { // for non-null invalid paths. test("saveAsHadoopDataset should respect empty output directory when " + "there are no files to be committed to an absolute output location") { - val pairs = sc.parallelize(Array((Integer.valueOf(1), Integer.valueOf(2))), 1) + val pairs = sc.parallelize(Seq((Integer.valueOf(1), Integer.valueOf(2))), 1) val conf = new JobConf() conf.setOutputKeyClass(classOf[Integer]) @@ -683,7 +683,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("lookup") { - val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7))) + val pairs = sc.parallelize(Seq((1, 2), (3, 4), (5, 6), (5, 7))) assert(pairs.partitioner === None) assert(pairs.lookup(1) === Seq(2)) @@ -693,7 +693,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("lookup with partitioner") { - val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7))) + val pairs = sc.parallelize(Seq((1, 2), (3, 4), (5, 6), (5, 7))) val p = new Partitioner { def numPartitions: Int = 2 @@ -709,7 +709,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("lookup with bad partitioner") { - val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7))) + val pairs = sc.parallelize(Seq((1, 2), (3, 4), (5, 6), (5, 7))) val p = new Partitioner { def numPartitions: Int = 2 diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala index 7f20206202..d5f7d30a25 100644 --- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala @@ -25,7 +25,7 @@ import org.apache.spark.internal.Logging class SortingSuite extends SparkFunSuite with SharedSparkContext with Matchers with Logging { test("sortByKey") { - val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2) + val pairs = sc.parallelize(Seq((1, 0), (2, 0), (0, 0), (3, 0)), 2) assert(pairs.sortByKey().collect() === Array((0, 0), (1, 0), (2, 0), (3, 0))) } diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index c96f7aaba8..903f8027cc 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -183,7 +183,7 @@ code constructs a graph from a collection of RDDs: val sc: SparkContext // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = - sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), + sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = @@ -420,7 +420,7 @@ interest or eliminate broken links. For example in the following code we remove {% highlight scala %} // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = - sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), + sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (4L, ("peter", "student")))) // Create an RDD for edges diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index dede3b5d35..6f9670f2f2 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -218,7 +218,7 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { test("reverse with join elimination") { withSpark { sc => - val vertices: RDD[(VertexId, Int)] = sc.parallelize(Array((1L, 1), (2L, 2))) + val vertices: RDD[(VertexId, Int)] = sc.parallelize(Seq((1L, 1), (2L, 2))) val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0))) val graph = Graph(vertices, edges).reverse val result = GraphXUtils.mapReduceTriplets[Int, Int, Int]( diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index 1b81423563..d0231c885b 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -101,7 +101,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { withSpark { sc => // Create an RDD for the vertices val users: RDD[(VertexId, (String, String))] = - sc.parallelize(Array((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), + sc.parallelize(Seq((3L, ("rxin", "student")), (7L, ("jgonzal", "postdoc")), (5L, ("franklin", "prof")), (2L, ("istoica", "prof")), (4L, ("peter", "student")))) // Create an RDD for edges diff --git a/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala index 5623142985..db17db9c85 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/rdd/MLPairRDDFunctionsSuite.scala @@ -23,7 +23,7 @@ import org.apache.spark.mllib.util.MLlibTestSparkContext class MLPairRDDFunctionsSuite extends SparkFunSuite with MLlibTestSparkContext { test("topByKey") { - val topMap = sc.parallelize(Array((1, 7), (1, 3), (1, 6), (1, 1), (1, 2), (3, 2), (3, 7), (5, + val topMap = sc.parallelize(Seq((1, 7), (1, 3), (1, 6), (1, 1), (1, 2), (3, 2), (3, 7), (5, 1), (3, 5)), 2) .topByKey(5) .collectAsMap() diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala index 48195ad6d7..52adf3ebdb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameSuite.scala @@ -554,7 +554,7 @@ class DataFrameSuite extends QueryTest with SharedSparkSession { } test("replace column using withColumns") { - val df2 = sparkContext.parallelize(Array((1, 2), (2, 3), (3, 4))).toDF("x", "y") + val df2 = sparkContext.parallelize(Seq((1, 2), (2, 3), (3, 4))).toDF("x", "y") val df3 = df2.withColumns(Seq("x", "newCol1", "newCol2"), Seq(df2("x") + 1, df2("y"), df2("y") + 1)) checkAnswer(