From 7aca0dd658b8bda05574b3df3254aaf66eb2a174 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 3 Oct 2019 08:56:08 -0500 Subject: [PATCH] [SPARK-29296][BUILD][CORE] Remove use of .par to make 2.13 support easier; add scala-2.13 profile to enable pulling in par collections library separately, for the future ### What changes were proposed in this pull request? Scala 2.13 removes the parallel collections classes to a separate library, so first, this establishes a `scala-2.13` profile to bring it back, for future use. However the library enables use of `.par` implicit conversions via a new class that is not in 2.12, which makes cross-building hard. This implements a suggested workaround from https://github.com/scala/scala-parallel-collections/issues/22 to avoid `.par` entirely. ### Why are the changes needed? To compile for 2.13 and later to work with 2.13. ### Does this PR introduce any user-facing change? Should not, no. ### How was this patch tested? Existing tests. Closes #25980 from srowen/SPARK-29296. Authored-by: Sean Owen Signed-off-by: Sean Owen --- core/pom.xml | 9 +++++++++ .../main/scala/org/apache/spark/rdd/UnionRDD.scala | 5 +++-- pom.xml | 13 +++++++++++++ sql/catalyst/pom.xml | 12 ++++++++++++ .../spark/sql/catalyst/expressions/CastSuite.scala | 4 +++- sql/core/pom.xml | 12 ++++++++++++ .../apache/spark/sql/execution/command/ddl.scala | 3 ++- .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 5 ++++- .../spark/sql/execution/SQLExecutionSuite.scala | 4 ++-- streaming/pom.xml | 12 ++++++++++++ .../org/apache/spark/streaming/DStreamGraph.scala | 7 ++++--- .../streaming/util/FileBasedWriteAheadLog.scala | 3 ++- 12 files changed, 78 insertions(+), 11 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 1d95b2fb2a..38eb8adac5 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -556,6 +556,15 @@ + + scala-2.13 + + + org.scala-lang.modules + scala-parallel-collections_${scala.binary.version} + + + diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index f592e1129c..63fa3c2487 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -21,6 +21,7 @@ import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer import scala.collection.parallel.ForkJoinTaskSupport +import scala.collection.parallel.immutable.ParVector import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} @@ -75,13 +76,13 @@ class UnionRDD[T: ClassTag]( override def getPartitions: Array[Partition] = { val parRDDs = if (isPartitionListingParallel) { - val parArray = rdds.par + val parArray = new ParVector(rdds.toVector) parArray.tasksupport = UnionRDD.partitionEvalTaskSupport parArray } else { rdds } - val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum) + val array = new Array[Partition](parRDDs.map(_.partitions.length).sum) var pos = 0 for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) { array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index) diff --git a/pom.xml b/pom.xml index e0742e678d..9c2aa9de85 100644 --- a/pom.xml +++ b/pom.xml @@ -2992,6 +2992,19 @@ scala-2.12 + + + scala-2.13 + + + + org.scala-lang.modules + scala-parallel-collections_${scala.binary.version} + 0.2.0 + + + +