[SPARK-29401][FOLLOWUP] Additional cases where a .parallelize call with Array is ambiguous in 2.13

This is just a followup on https://github.com/apache/spark/pull/26062 -- see it for more detail.

I think we will eventually find more cases of this. It's hard to get them all at once as there are many different types of compile errors in earlier modules. I'm trying to address them in as a big a chunk as possible.

Closes #26074 from srowen/SPARK-29401.2.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
This commit is contained in:
Sean Owen 2019-10-09 10:27:05 -07:00 committed by Dongjoon Hyun
parent fa95a5c395
commit 3b0bca42ac
11 changed files with 39 additions and 40 deletions

View file

@ -402,7 +402,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test ("prevent user from overwriting the empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
intercept[FileAlreadyExistsException] {
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](tempDir.getPath)
}
@ -411,7 +411,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test ("prevent user from overwriting the non-empty directory (new Hadoop API)") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists())
@ -425,7 +425,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
conf.setAppName("test").setMaster("local").set("spark.hadoop.validateOutputSpecs", "false")
sc = new SparkContext(conf)
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
randomRDD.saveAsNewAPIHadoopFile[NewTextOutputFormat[String, String]](
tempDir.getPath + "/output")
assert(new File(tempDir.getPath + "/output/part-r-00000").exists())
@ -437,7 +437,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test ("save Hadoop Dataset through old Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = new JobConf()
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
@ -450,7 +450,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
test ("save Hadoop Dataset through new Hadoop API") {
sc = new SparkContext("local", "test")
val randomRDD = sc.parallelize(
Array(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
Seq(("key1", "a"), ("key2", "a"), ("key3", "b"), ("key4", "c")), 1)
val job = Job.getInstance(sc.hadoopConfiguration)
job.setOutputKeyClass(classOf[String])
job.setOutputValueClass(classOf[String])
@ -559,7 +559,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(conf)
def testIgnoreEmptySplits(
data: Array[Tuple2[String, String]],
data: Seq[Tuple2[String, String]],
actualPartitionNum: Int,
expectedPartitionNum: Int): Unit = {
val output = new File(tempDir, "output")
@ -581,13 +581,13 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
// Ensure that if no split is empty, we don't lose any splits
testIgnoreEmptySplits(
data = Array(("key1", "a"), ("key2", "a"), ("key3", "b")),
data = Seq(("key1", "a"), ("key2", "a"), ("key3", "b")),
actualPartitionNum = 2,
expectedPartitionNum = 2)
// Ensure that if part of the splits are empty, we remove the splits correctly
testIgnoreEmptySplits(
data = Array(("key1", "a"), ("key2", "a")),
data = Seq(("key1", "a"), ("key2", "a")),
actualPartitionNum = 5,
expectedPartitionNum = 2)
}
@ -600,7 +600,7 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
sc = new SparkContext(conf)
def testIgnoreEmptySplits(
data: Array[Tuple2[String, String]],
data: Seq[Tuple2[String, String]],
actualPartitionNum: Int,
expectedPartitionNum: Int): Unit = {
val output = new File(tempDir, "output")
@ -624,13 +624,13 @@ class FileSuite extends SparkFunSuite with LocalSparkContext {
// Ensure that if no split is empty, we don't lose any splits
testIgnoreEmptySplits(
data = Array(("1", "a"), ("2", "a"), ("3", "b")),
data = Seq(("1", "a"), ("2", "a"), ("3", "b")),
actualPartitionNum = 2,
expectedPartitionNum = 2)
// Ensure that if part of the splits are empty, we remove the splits correctly
testIgnoreEmptySplits(
data = Array(("1", "a"), ("2", "b")),
data = Seq(("1", "a"), ("2", "b")),
actualPartitionNum = 5,
expectedPartitionNum = 2)
}

View file

@ -17,7 +17,7 @@
package org.apache.spark.metrics
import java.io.{File, FileWriter, PrintWriter}
import java.io.{File, PrintWriter}
import scala.collection.mutable.ArrayBuffer
@ -289,7 +289,7 @@ class InputOutputMetricsSuite extends SparkFunSuite with SharedSparkContext
}
})
val rdd = sc.parallelize(Array("a", "b", "c", "d"), 2)
val rdd = sc.parallelize(Seq("a", "b", "c", "d"), 2)
try {
rdd.saveAsTextFile(outPath.toString)

View file

@ -34,7 +34,6 @@ import org.scalatest.Assertions
import org.apache.spark._
import org.apache.spark.Partitioner
import org.apache.spark.util.Utils
class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
test("aggregateByKey") {
@ -496,8 +495,8 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext {
}
test("default partitioner uses largest partitioner") {
val a = sc.makeRDD(Array((1, "a"), (2, "b")), 2)
val b = sc.makeRDD(Array((1, "a"), (2, "b")), 2000)
val a = sc.makeRDD(Seq((1, "a"), (2, "b")), 2)
val b = sc.makeRDD(Seq((1, "a"), (2, "b")), 2000)
val c = a.join(b)
assert(c.partitions.size === 2000)
}

View file

@ -138,7 +138,7 @@ class PipedRDDSuite extends SparkFunSuite with SharedSparkContext with Eventuall
assert(c(6) === "3_")
assert(c(7) === "4_")
val nums1 = sc.makeRDD(Array("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val nums1 = sc.makeRDD(Seq("a\t1", "b\t2", "a\t3", "b\t4"), 2)
val d = nums1.groupBy(str => str.split("\t")(0)).
pipe(Seq("cat"),
Map[String, String](),

View file

@ -236,7 +236,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually {
}
test("aggregate") {
val pairs = sc.makeRDD(Array(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
val pairs = sc.makeRDD(Seq(("a", 1), ("b", 2), ("a", 2), ("c", 5), ("a", 3)))
type StringMap = HashMap[String, Int]
val emptyMap = new StringMap {
override def default(key: String): Int = 0

View file

@ -27,9 +27,9 @@ object ZippedPartitionsSuite {
class ZippedPartitionsSuite extends SparkFunSuite with SharedSparkContext {
test("print sizes") {
val data1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
val data2 = sc.makeRDD(Array("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Array(1.0, 2.0), 2)
val data1 = sc.makeRDD(Seq(1, 2, 3, 4), 2)
val data2 = sc.makeRDD(Seq("1", "2", "3", "4", "5", "6"), 2)
val data3 = sc.makeRDD(Seq(1.0, 2.0), 2)
val zippedRDD = data1.zipPartitions(data2, data3)(ZippedPartitionsSuite.procZippedData)

View file

@ -37,7 +37,7 @@ import org.apache.spark.internal.config.Kryo._
import org.apache.spark.scheduler.HighlyCompressedMapStatus
import org.apache.spark.serializer.KryoTest._
import org.apache.spark.storage.BlockManagerId
import org.apache.spark.util.{ThreadUtils, Utils}
import org.apache.spark.util.ThreadUtils
class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
conf.set(SERIALIZER, "org.apache.spark.serializer.KryoSerializer")
@ -274,19 +274,19 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
}
test("kryo with parallelize for specialized tuples") {
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).count === 3)
assert(sc.parallelize(Seq((1, 11), (2, 22), (3, 33))).count === 3)
}
test("kryo with parallelize for primitive arrays") {
assert (sc.parallelize( Array(1, 2, 3) ).count === 3)
assert(sc.parallelize(Array(1, 2, 3)).count === 3)
}
test("kryo with collect for specialized tuples") {
assert (sc.parallelize( Array((1, 11), (2, 22), (3, 33)) ).collect().head === ((1, 11)))
assert(sc.parallelize(Seq((1, 11), (2, 22), (3, 33))).collect().head === ((1, 11)))
}
test("kryo with SerializableHyperLogLog") {
assert(sc.parallelize( Array(1, 2, 3, 2, 3, 3, 2, 3, 1) ).countApproxDistinct(0.01) === 3)
assert(sc.parallelize(Array(1, 2, 3, 2, 3, 3, 2, 3, 1)).countApproxDistinct(0.01) === 3)
}
test("kryo with reduce") {

View file

@ -187,7 +187,7 @@ val users: RDD[(VertexId, (String, String))] =
(5L, ("franklin", "prof")), (2L, ("istoica", "prof"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
// Define a default user in case there are relationship with missing user
val defaultUser = ("John Doe", "Missing")
@ -425,7 +425,7 @@ val users: RDD[(VertexId, (String, String))] =
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Define a default user in case there are relationship with missing user

View file

@ -164,12 +164,12 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
test("mapVertices changing type with same erased type") {
withSpark { sc =>
val vertices = sc.parallelize(Array[(Long, Option[java.lang.Integer])](
val vertices = sc.parallelize(Seq[(Long, Option[java.lang.Integer])](
(1L, Some(1)),
(2L, Some(2)),
(3L, Some(3))
))
val edges = sc.parallelize(Array(
val edges = sc.parallelize(Seq(
Edge(1L, 2L, 0),
Edge(2L, 3L, 0),
Edge(3L, 1L, 0)
@ -219,7 +219,7 @@ class GraphSuite extends SparkFunSuite with LocalSparkContext {
test("reverse with join elimination") {
withSpark { sc =>
val vertices: RDD[(VertexId, Int)] = sc.parallelize(Seq((1L, 1), (2L, 2)))
val edges: RDD[Edge[Int]] = sc.parallelize(Array(Edge(1L, 2L, 0)))
val edges: RDD[Edge[Int]] = sc.parallelize(Seq(Edge(1L, 2L, 0)))
val graph = Graph(vertices, edges).reverse
val result = GraphXUtils.mapReduceTriplets[Int, Int, Int](
graph, et => Iterator((et.dstId, et.srcAttr)), _ + _)

View file

@ -106,7 +106,7 @@ class ConnectedComponentsSuite extends SparkFunSuite with LocalSparkContext {
(4L, ("peter", "student"))))
// Create an RDD for edges
val relationships: RDD[Edge[String]] =
sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
sc.parallelize(Seq(Edge(3L, 7L, "collab"), Edge(5L, 3L, "advisor"),
Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi"),
Edge(4L, 0L, "student"), Edge(5L, 0L, "colleague")))
// Edges are:

View file

@ -26,7 +26,7 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
test("Count a single triangle") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array( 0L -> 1L, 1L -> 2L, 2L -> 0L ), 2)
val rawEdges = sc.parallelize(Seq(0L -> 1L, 1L -> 2L, 2L -> 0L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices
@ -36,8 +36,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
test("Count two triangles") {
withSpark { sc =>
val triangles = Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
val triangles = Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Seq(0L -> -1L, -1L -> -2L, -2L -> 0L)
val rawEdges = sc.parallelize(triangles, 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
val triangleCount = graph.triangleCount()
@ -55,8 +55,8 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
test("Count two triangles with bi-directed edges") {
withSpark { sc =>
val triangles =
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> -1L, -1L -> -2L, -2L -> 0L)
Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Seq(0L -> -1L, -1L -> -2L, -2L -> 0L)
val revTriangles = triangles.map { case (a, b) => (b, a) }
val rawEdges = sc.parallelize(triangles ++ revTriangles, 2)
val graph = Graph.fromEdgeTuples(rawEdges, true).cache()
@ -74,9 +74,9 @@ class TriangleCountSuite extends SparkFunSuite with LocalSparkContext {
test("Count a single triangle with duplicate edges") {
withSpark { sc =>
val rawEdges = sc.parallelize(Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Array(1L -> 0L, 1L -> 1L), 2)
val rawEdges = sc.parallelize(Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Seq(0L -> 1L, 1L -> 2L, 2L -> 0L) ++
Seq(1L -> 0L, 1L -> 1L), 2)
val graph = Graph.fromEdgeTuples(rawEdges, true, uniqueEdges = Some(RandomVertexCut)).cache()
val triangleCount = graph.triangleCount()
val verts = triangleCount.vertices