[SPARK-29296][BUILD][CORE] Remove use of .par to make 2.13 support easier; add scala-2.13 profile to enable pulling in par collections library separately, for the future

### What changes were proposed in this pull request?

Scala 2.13 removes the parallel collections classes to a separate library, so first, this establishes a `scala-2.13` profile to bring it back, for future use.

However the library enables use of `.par` implicit conversions via a new class that is not in 2.12, which makes cross-building hard. This implements a suggested workaround from https://github.com/scala/scala-parallel-collections/issues/22 to avoid `.par` entirely.

### Why are the changes needed?

To compile for 2.13 and later to work with 2.13.

### Does this PR introduce any user-facing change?

Should not, no.

### How was this patch tested?

Existing tests.

Closes #25980 from srowen/SPARK-29296.

Authored-by: Sean Owen <sean.owen@databricks.com>
Signed-off-by: Sean Owen <sean.owen@databricks.com>
This commit is contained in:
Sean Owen 2019-10-03 08:56:08 -05:00
parent 2bc3fff13b
commit 7aca0dd658
12 changed files with 78 additions and 11 deletions

View file

@ -556,6 +556,15 @@
</plugins>
</build>
</profile>
<profile>
<id>scala-2.13</id>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View file

@ -21,6 +21,7 @@ import java.io.{IOException, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.reflect.ClassTag
import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext}
@ -75,13 +76,13 @@ class UnionRDD[T: ClassTag](
override def getPartitions: Array[Partition] = {
val parRDDs = if (isPartitionListingParallel) {
val parArray = rdds.par
val parArray = new ParVector(rdds.toVector)
parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
parArray
} else {
rdds
}
val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum)
val array = new Array[Partition](parRDDs.map(_.partitions.length).sum)
var pos = 0
for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)

13
pom.xml
View file

@ -2992,6 +2992,19 @@
<profile>
<id>scala-2.12</id>
</profile>
<profile>
<id>scala-2.13</id>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
<version>0.2.0</version>
</dependency>
</dependencies>
</dependencyManagement>
</profile>
<!--
This is a profile to enable the use of the ASF snapshot and staging repositories

View file

@ -170,4 +170,16 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>scala-2.13</id>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View file

@ -21,6 +21,8 @@ import java.sql.{Date, Timestamp}
import java.util.{Calendar, TimeZone}
import java.util.concurrent.TimeUnit._
import scala.collection.parallel.immutable.ParVector
import org.apache.spark.SparkFunSuite
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
@ -113,7 +115,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper {
}
test("cast string to timestamp") {
ALL_TIMEZONES.par.foreach { tz =>
new ParVector(ALL_TIMEZONES.toVector).foreach { tz =>
def checkCastStringToTimestamp(str: String, expected: Timestamp): Unit = {
checkEvaluation(cast(Literal(str), TimestampType, Option(tz.getID)), expected)
}

View file

@ -215,4 +215,16 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>scala-2.13</id>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View file

@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit._
import scala.collection.{GenMap, GenSeq}
import scala.collection.parallel.ForkJoinTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.util.control.NonFatal
import org.apache.hadoop.conf.Configuration
@ -684,7 +685,7 @@ case class AlterTableRecoverPartitionsCommand(
val statusPar: GenSeq[FileStatus] =
if (partitionNames.length > 1 && statuses.length > threshold || partitionNames.length > 2) {
// parallelize the list of partitions here, then we can have better parallelism later.
val parArray = statuses.par
val parArray = new ParVector(statuses.toVector)
parArray.tasksupport = evalTaskSupport
parArray
} else {

View file

@ -22,6 +22,8 @@ import java.net.{MalformedURLException, URL}
import java.sql.{Date, Timestamp}
import java.util.concurrent.atomic.AtomicBoolean
import scala.collection.parallel.immutable.ParVector
import org.apache.spark.{AccumulatorSuite, SparkException}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
@ -169,7 +171,8 @@ class SQLQuerySuite extends QueryTest with SharedSparkSession {
"org.apache.spark.sql.catalyst.expressions.CallMethodViaReflection")
withSQLConf(SQLConf.UTC_TIMESTAMP_FUNC_ENABLED.key -> "true") {
spark.sessionState.functionRegistry.listFunction().par.foreach { funcId =>
val parFuncs = new ParVector(spark.sessionState.functionRegistry.listFunction().toVector)
parFuncs.foreach { funcId =>
// Examples can change settings. We clone the session to prevent tests clashing.
val clonedSpark = spark.cloneSession()
val info = clonedSpark.sessionState.catalog.lookupFunctionInfo(funcId)

View file

@ -17,7 +17,7 @@
package org.apache.spark.sql.execution
import java.util.Properties
import scala.collection.parallel.immutable.ParRange
import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite}
import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
@ -46,7 +46,7 @@ class SQLExecutionSuite extends SparkFunSuite {
import spark.implicits._
try {
// Should not throw IllegalArgumentException
(1 to 100).par.foreach { _ =>
new ParRange(1 to 100).foreach { _ =>
spark.sparkContext.parallelize(1 to 5).map { i => (i, i) }.toDF("a", "b").count()
}
} finally {

View file

@ -131,4 +131,16 @@
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>scala-2.13</id>
<dependencies>
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-parallel-collections_${scala.binary.version}</artifactId>
</dependency>
</dependencies>
</profile>
</profiles>
</project>

View file

@ -20,6 +20,7 @@ package org.apache.spark.streaming
import java.io.{IOException, ObjectInputStream, ObjectOutputStream}
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.immutable.ParVector
import org.apache.spark.internal.Logging
import org.apache.spark.streaming.dstream.{DStream, InputDStream, ReceiverInputDStream}
@ -50,8 +51,8 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
outputStreams.foreach(_.remember(rememberDuration))
outputStreams.foreach(_.validateAtStart())
numReceivers = inputStreams.count(_.isInstanceOf[ReceiverInputDStream[_]])
inputStreamNameAndID = inputStreams.map(is => (is.name, is.id))
inputStreams.par.foreach(_.start())
inputStreamNameAndID = inputStreams.map(is => (is.name, is.id)).toSeq
new ParVector(inputStreams.toVector).foreach(_.start())
}
}
@ -61,7 +62,7 @@ final private[streaming] class DStreamGraph extends Serializable with Logging {
def stop(): Unit = {
this.synchronized {
inputStreams.par.foreach(_.stop())
new ParVector(inputStreams.toVector).foreach(_.stop())
}
}

View file

@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.collection.parallel.ExecutionContextTaskSupport
import scala.collection.parallel.immutable.ParVector
import scala.concurrent.{Await, ExecutionContext, Future}
import org.apache.hadoop.conf.Configuration
@ -313,7 +314,7 @@ private[streaming] object FileBasedWriteAheadLog {
val groupSize = taskSupport.parallelismLevel.max(8)
source.grouped(groupSize).flatMap { group =>
val parallelCollection = group.par
val parallelCollection = new ParVector(group.toVector)
parallelCollection.tasksupport = taskSupport
parallelCollection.map(handler)
}.flatten