From 3b0bca42ac638f476729b6868875e68720b16c2b Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Wed, 9 Oct 2019 10:27:05 -0700 Subject: [PATCH] [SPARK-29401][FOLLOWUP] Additional cases where a .parallelize call with Array is ambiguous in 2.13 This is just a followup on https://github.com/apache/spark/pull/26062 -- see it for more detail. I think we will eventually find more cases of this. It's hard to get them all at once as there are many different types of compile errors in earlier modules. I'm trying to address them in as a big a chunk as possible. Closes #26074 from srowen/SPARK-29401.2. Authored-by: Sean Owen Signed-off-by: Dongjoon Hyun --- .../scala/org/apache/spark/FileSuite.scala | 22 +++++++++---------- .../metrics/InputOutputMetricsSuite.scala | 4 ++-- .../spark/rdd/PairRDDFunctionsSuite.scala | 5 ++--- .../org/apache/spark/rdd/PipedRDDSuite.scala | 2 +- .../scala/org/apache/spark/rdd/RDDSuite.scala | 2 +- .../spark/rdd/ZippedPartitionsSuite.scala | 6 ++--- .../serializer/KryoSerializerSuite.scala | 10 ++++----- docs/graphx-programming-guide.md | 4 ++-- .../org/apache/spark/graphx/GraphSuite.scala | 6 ++--- .../graphx/lib/ConnectedComponentsSuite.scala | 2 +- .../spark/graphx/lib/TriangleCountSuite.scala | 16 +++++++------- 11 files changed, 39 insertions(+), 40 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 3446c03973..0368d77e3d 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -402,7 +402,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("prevent user from overwriting the empty directory (new Hadoop API)") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) intercept[FileAlreadyExistsException] { randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath) } @@ -411,7 +411,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("prevent user from overwriting the non-empty directory (new Hadoop API)") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( tempDir.getPath + "/output") assert(new File(tempDir.getPath + "/output/part-r-00000").exists()) @@ -425,7 +425,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false") sc = new SparkContext(conf) val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]]( tempDir.getPath + "/output") assert(new File(tempDir.getPath + "/output/part-r-00000").exists()) @@ -437,7 +437,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("save Hadoop Dataset through old Hadoop API") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) val job = new JobConf() job.setOutputKeyClass(classOf[String]) job.setOutputValueClass(classOf[String]) @@ -450,7 +450,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { test ("save Hadoop Dataset through new Hadoop API") { sc = new SparkContext("local", "test") val randomRDD = sc.parallelize( - Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) + Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1) val job = Job.getInstance(sc.hadoopConfiguration) job.setOutputKeyClass(classOf[String]) job.setOutputValueClass(classOf[String]) @@ -559,7 +559,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(conf) def testIgnoreEmptySplits( - data: Array[Tuple2[String, String]], + data: Seq[Tuple2[String, String]], actualPartitionNum: Int, expectedPartitionNum: Int): Unit = { val output = new File(tempDir, "output") @@ -581,13 +581,13 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Ensure that if no split is empty, we don't lose any splits testIgnoreEmptySplits( - data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")), + data = Seq(("key1", "a"), ("key2", "a"), ("key3", "b")), actualPartitionNum = 2, expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( - data = Array(("key1", "a"), ("key2", "a")), + data = Seq(("key1", "a"), ("key2", "a")), actualPartitionNum = 5, expectedPartitionNum = 2) } @@ -600,7 +600,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { sc = new SparkContext(conf) def testIgnoreEmptySplits( - data: Array[Tuple2[String, String]], + data: Seq[Tuple2[String, String]], actualPartitionNum: Int, expectedPartitionNum: Int): Unit = { val output = new File(tempDir, "output") @@ -624,13 +624,13 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { // Ensure that if no split is empty, we don't lose any splits testIgnoreEmptySplits( - data = Array(("1", "a"), ("2", "a"), ("3", "b")), + data = Seq(("1", "a"), ("2", "a"), ("3", "b")), actualPartitionNum = 2, expectedPartitionNum = 2) // Ensure that if part of the splits are empty, we remove the splits correctly testIgnoreEmptySplits( - data = Array(("1", "a"), ("2", "b")), + data = Seq(("1", "a"), ("2", "b")), actualPartitionNum = 5, expectedPartitionNum = 2) } diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index dbcec647a3..330347299a 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.metrics -import java.io.{File, FileWriter, PrintWriter} +import java.io.{File, PrintWriter} import scala.collection.mutable.ArrayBuffer @@ -289,7 +289,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext } }) - val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2) + val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 2) try { rdd.saveAsTextFile(outPath.toString) diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index 135cfffa1a..2de4b109e4 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -34,7 +34,6 @@ import org.scalatest.Assertions import org.apache.spark._ import org.apache.spark.Partitioner -import org.apache.spark.util.Utils class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { test("aggregateByKey") { @@ -496,8 +495,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { } test("default partitioner uses largest partitioner") { - val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2) - val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000) + val a = sc.makeRDD(Seq((1, "a"), (2, "b")), 2) + val b = sc.makeRDD(Seq((1, "a"), (2, "b")), 2000) val c = a.join(b) assert(c.partitions.size === 2000) } diff --git a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala index 860cf4d7ed..2da2854dfb 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PipedRDDSuite.scala @@ -138,7 +138,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall assert(c(6) === "3_") assert(c(7) === "4_") - val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2) + val nums1 = sc.makeRDD(Seq("a\t1", "b\t2", "a\t3", "b\t4"), 2) val d = nums1.groupBy(str => str.split("\t")(0)). pipe(Seq("cat"), Map[String, String](), diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 859c25ff03..18154d861a 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -236,7 +236,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { } test("aggregate") { - val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) + val pairs = sc.makeRDD(Seq(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3))) type StringMap = HashMap[String, Int] val emptyMap = new StringMap { override def default(key: String): Int = 0 diff --git a/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala index 5d7b973fbd..7079b9ea8e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/ZippedPartitionsSuite.scala @@ -27,9 +27,9 @@ object ZippedPartitionsSuite { class ZippedPartitionsSuite extends SparkFunSuite with SharedSparkContext { test("print sizes") { - val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2) - val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2) - val data3 = sc.makeRDD(Array(1.0, 2.0), 2) + val data1 = sc.makeRDD(Seq(1, 2, 3, 4), 2) + val data2 = sc.makeRDD(Seq("1", "2", "3", "4", "5", "6"), 2) + val data3 = sc.makeRDD(Seq(1.0, 2.0), 2) val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index b5313fc24c..d7c151209f 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -37,7 +37,7 @@ import org.apache.spark.internal.config.Kryo._ import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ import org.apache.spark.storage.BlockManagerId -import org.apache.spark.util.{ThreadUtils, Utils} +import org.apache.spark.util.ThreadUtils class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer") @@ -274,19 +274,19 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { } test("kryo with parallelize for specialized tuples") { - assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).count === 3) + assert(sc.parallelize(Seq((1, 11), (2, 22), (3, 33))).count === 3) } test("kryo with parallelize for primitive arrays") { - assert (sc.parallelize( Array(1, 2, 3) ).count === 3) + assert(sc.parallelize(Array(1, 2, 3)).count === 3) } test("kryo with collect for specialized tuples") { - assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === ((1, 11))) + assert(sc.parallelize(Seq((1, 11), (2, 22), (3, 33))).collect().head === ((1, 11))) } test("kryo with SerializableHyperLogLog") { - assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3) + assert(sc.parallelize(Array(1, 2, 3, 2, 3, 3, 2, 3, 1)).countApproxDistinct(0.01) === 3) } test("kryo with reduce") { diff --git a/docs/graphx-programming-guide.md b/docs/graphx-programming-guide.md index 903f8027cc..167c44aa1b 100644 --- a/docs/graphx-programming-guide.md +++ b/docs/graphx-programming-guide.md @@ -187,7 +187,7 @@ val users: RDD[(VertexId, (String, String))] = (5L, ("franklin", "prof")), (2L, ("istoica", "prof")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = - sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"))) // Define a default user in case there are relationship with missing user val defaultUser = ("John Doe", "Missing") @@ -425,7 +425,7 @@ val users: RDD[(VertexId, (String, String))] = (4L, ("peter", "student")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = - sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) // Define a default user in case there are relationship with missing user diff --git a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala index 6f9670f2f2..459cddb9a3 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/GraphSuite.scala @@ -164,12 +164,12 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { test("mapVertices changing type with same erased type") { withSpark { sc => - val vertices = sc.parallelize(Array[(Long, Option[java.lang.Integer])]( + val vertices = sc.parallelize(Seq[(Long, Option[java.lang.Integer])]( (1L, Some(1)), (2L, Some(2)), (3L, Some(3)) )) - val edges = sc.parallelize(Array( + val edges = sc.parallelize(Seq( Edge(1L, 2L, 0), Edge(2L, 3L, 0), Edge(3L, 1L, 0) @@ -219,7 +219,7 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext { test("reverse with join elimination") { withSpark { sc => val vertices: RDD[(VertexId, Int)] = sc.parallelize(Seq((1L, 1), (2L, 2))) - val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0))) + val edges: RDD[Edge[Int]] = sc.parallelize(Seq(Edge(1L, 2L, 0))) val graph = Graph(vertices, edges).reverse val result = GraphXUtils.mapReduceTriplets[Int, Int, Int]( graph, et => Iterator((et.dstId, et.srcAttr)), _ + _) diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala index d0231c885b..baa1c42235 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ConnectedComponentsSuite.scala @@ -106,7 +106,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext { (4L, ("peter", "student")))) // Create an RDD for edges val relationships: RDD[Edge[String]] = - sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), + sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"), Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"), Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague"))) // Edges are: diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala index f19c3acdc8..abbd89b8ee 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/TriangleCountSuite.scala @@ -26,7 +26,7 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { test("Count a single triangle") { withSpark { sc => - val rawEdges = sc.parallelize(Array( 0L -> 1L, 1L -> 2L, 2L -> 0L ), 2) + val rawEdges = sc.parallelize(Seq(0L -> 1L, 1L -> 2L, 2L -> 0L), 2) val graph = Graph.fromEdgeTuples(rawEdges, true).cache() val triangleCount = graph.triangleCount() val verts = triangleCount.vertices @@ -36,8 +36,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { test("Count two triangles") { withSpark { sc => - val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + val triangles = Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Seq(0L -> -1L, -1L -> -2L, -2L -> 0L) val rawEdges = sc.parallelize(triangles, 2) val graph = Graph.fromEdgeTuples(rawEdges, true).cache() val triangleCount = graph.triangleCount() @@ -55,8 +55,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { test("Count two triangles with bi-directed edges") { withSpark { sc => val triangles = - Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> -1L, -1L -> -2L, -2L -> 0L) + Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Seq(0L -> -1L, -1L -> -2L, -2L -> 0L) val revTriangles = triangles.map { case (a, b) => (b, a) } val rawEdges = sc.parallelize(triangles ++ revTriangles, 2) val graph = Graph.fromEdgeTuples(rawEdges, true).cache() @@ -74,9 +74,9 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext { test("Count a single triangle with duplicate edges") { withSpark { sc => - val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ - Array(1L -> 0L, 1L -> 1L), 2) + val rawEdges = sc.parallelize(Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++ + Seq(1L -> 0L, 1L -> 1L), 2) val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache() val triangleCount = graph.triangleCount() val verts = triangleCount.vertices