diff --git a/core/pom.xml b/core/pom.xml index 1d95b2fb2a..38eb8adac5 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -556,6 +556,15 @@ + + scala-2.13 + + + org.scala-lang.modules + scala-parallel-collections_${scala.binary.version} + + + diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index f592e1129c..63fa3c2487 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -21,6 +21,7 @@ import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer import scala.collection.parallel.ForkJoinTaskSupport +import scala.collection.parallel.immutable.ParVector import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} @@ -75,13 +76,13 @@ class UnionRDD[T: ClassTag]( override def getPartitions: Array[Partition] = { val parRDDs = if (isPartitionListingParallel) { - val parArray = rdds.par + val parArray = new ParVector(rdds.toVector) parArray.tasksupport = UnionRDD.partitionEvalTaskSupport parArray } else { rdds } - val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum) + val array = new Array[Partition](parRDDs.map(_.partitions.length).sum) var pos = 0 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) diff --git a/pom.xml b/pom.xml index e0742e678d..9c2aa9de85 100644 --- a/pom.xml +++ b/pom.xml @@ -2992,6 +2992,19 @@ scala-2.12 + + + scala-2.13 + + + + org.scala-lang.modules + scala-parallel-collections_${scala.binary.version} + 0.2.0 + + + +