From 8bc304f97ee693b57f33fa6708eb63e2d641c609 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Mon, 25 Mar 2019 10:46:42 -0500 Subject: [PATCH] [SPARK-26132][BUILD][CORE] Remove support for Scala 2.11 in Spark 3.0.0 ## What changes were proposed in this pull request? Remove Scala 2.11 support in build files and docs, and in various parts of code that accommodated 2.11. See some targeted comments below. ## How was this patch tested? Existing tests. Closes #23098 from srowen/SPARK-26132. Authored-by: Sean Owen Signed-off-by: Sean Owen --- R/pkg/R/sparkR.R | 2 +- R/pkg/tests/fulltests/test_client.R | 4 +- R/pkg/vignettes/sparkr-vignettes.Rmd | 2 +- bin/load-spark-env.cmd | 49 +-- bin/load-spark-env.sh | 42 +-- .../scala/org/apache/spark/FutureAction.scala | 63 +--- .../scala/org/apache/spark/rdd/UnionRDD.scala | 5 +- .../apache/spark/util/ClosureCleaner.scala | 5 - .../org/apache/spark/util/ThreadUtils.scala | 13 +- .../spark/deploy/SparkSubmitUtilsSuite.scala | 8 +- .../spark/util/ClosureCleanerSuite.scala | 73 +--- .../spark/util/ClosureCleanerSuite2.scala | 311 +----------------- dev/change-scala-version.sh | 6 +- dev/create-release/release-build.sh | 89 ++--- docs/building-spark.md | 15 +- docs/index.md | 3 +- docs/storage-openstack-swift.md | 2 +- external/docker/spark-test/base/Dockerfile | 2 +- .../graphx/util/BytecodeUtilsSuite.scala | 119 ------- .../launcher/AbstractCommandBuilder.java | 23 +- pom.xml | 36 -- project/MimaExcludes.scala | 4 + project/SparkBuild.scala | 8 +- python/run-tests.py | 3 +- .../org/apache/spark/repl/SparkILoop.scala | 50 +-- .../kubernetes/integration-tests/README.md | 2 +- .../sql/execution/WholeStageCodegenExec.scala | 5 +- .../spark/sql/hive/HiveSparkSubmitSuite.scala | 3 +- 28 files changed, 163 insertions(+), 784 deletions(-) delete mode 100644 graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala diff --git a/R/pkg/R/sparkR.R b/R/pkg/R/sparkR.R index ac289d38d0..31b986c326 100644 --- a/R/pkg/R/sparkR.R +++ b/R/pkg/R/sparkR.R @@ -269,7 +269,7 @@ sparkR.sparkContext <- function( #' sparkR.session("yarn-client", "SparkR", "/home/spark", #' list(spark.executor.memory="4g"), #' c("one.jar", "two.jar", "three.jar"), -#' c("com.databricks:spark-avro_2.11:2.0.1")) +#' c("com.databricks:spark-avro_2.12:2.0.1")) #' sparkR.session(spark.master = "yarn-client", spark.executor.memory = "4g") #'} #' @note sparkR.session since 2.0.0 diff --git a/R/pkg/tests/fulltests/test_client.R b/R/pkg/tests/fulltests/test_client.R index de624b572c..9798627ffc 100644 --- a/R/pkg/tests/fulltests/test_client.R +++ b/R/pkg/tests/fulltests/test_client.R @@ -37,7 +37,7 @@ test_that("multiple packages don't produce a warning", { test_that("sparkJars sparkPackages as character vectors", { args <- generateSparkSubmitArgs("", "", c("one.jar", "two.jar", "three.jar"), "", - c("com.databricks:spark-avro_2.11:2.0.1")) + c("com.databricks:spark-avro_2.12:2.0.1")) expect_match(args, "--jars one.jar,two.jar,three.jar") - expect_match(args, "--packages com.databricks:spark-avro_2.11:2.0.1") + expect_match(args, "--packages com.databricks:spark-avro_2.12:2.0.1") }) diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd b/R/pkg/vignettes/sparkr-vignettes.Rmd index cbe8c61725..9e48ae3463 100644 --- a/R/pkg/vignettes/sparkr-vignettes.Rmd +++ b/R/pkg/vignettes/sparkr-vignettes.Rmd @@ -219,7 +219,7 @@ SparkR supports operating on a variety of data sources through the `SparkDataFra The general method for creating `SparkDataFrame` from data sources is `read.df`. This method takes in the path for the file to load and the type of data source, and the currently active Spark Session will be used automatically. SparkR supports reading CSV, JSON and Parquet files natively and through Spark Packages you can find data source connectors for popular file formats like Avro. These packages can be added with `sparkPackages` parameter when initializing SparkSession using `sparkR.session`. ```{r, eval=FALSE} -sparkR.session(sparkPackages = "com.databricks:spark-avro_2.11:3.0.0") +sparkR.session(sparkPackages = "com.databricks:spark-avro_2.12:3.0.0") ``` We can see how to use data sources using an example CSV input file. For more information please refer to SparkR [read.df](https://spark.apache.org/docs/latest/api/R/read.df.html) API documentation. diff --git a/bin/load-spark-env.cmd b/bin/load-spark-env.cmd index cefa513b6f..5f98cc34b6 100644 --- a/bin/load-spark-env.cmd +++ b/bin/load-spark-env.cmd @@ -21,6 +21,7 @@ rem This script loads spark-env.cmd if it exists, and ensures it is only loaded rem spark-env.cmd is loaded from SPARK_CONF_DIR if set, or within the current directory's rem conf\ subdirectory. +set SPARK_ENV_CMD=spark-env.cmd if [%SPARK_ENV_LOADED%] == [] ( set SPARK_ENV_LOADED=1 @@ -28,30 +29,34 @@ if [%SPARK_ENV_LOADED%] == [] ( set SPARK_CONF_DIR=%~dp0..\conf ) - call :LoadSparkEnv + set SPARK_ENV_CMD=%SPARK_CONF_DIR%\%SPARK_ENV_CMD% + if exist %SPARK_ENV_CMD% ( + call %SPARK_ENV_CMD% + ) ) rem Setting SPARK_SCALA_VERSION if not already set. -set ASSEMBLY_DIR2="%SPARK_HOME%\assembly\target\scala-2.11" -set ASSEMBLY_DIR1="%SPARK_HOME%\assembly\target\scala-2.12" - -if [%SPARK_SCALA_VERSION%] == [] ( - - if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% ( - echo "Presence of build for multiple Scala versions detected." - echo "Either clean one of them or, set SPARK_SCALA_VERSION in spark-env.cmd." - exit 1 - ) - if exist %ASSEMBLY_DIR2% ( - set SPARK_SCALA_VERSION=2.11 - ) else ( - set SPARK_SCALA_VERSION=2.12 - ) -) +rem TODO: revisit for Scala 2.13 support +set SPARK_SCALA_VERSION=2.12 +rem if [%SPARK_SCALA_VERSION%] == [] ( +rem set SCALA_VERSION_1=2.12 +rem set SCALA_VERSION_2=2.11 +rem +rem set ASSEMBLY_DIR1=%SPARK_HOME%\assembly\target\scala-%SCALA_VERSION_1% +rem set ASSEMBLY_DIR2=%SPARK_HOME%\assembly\target\scala-%SCALA_VERSION_2% +rem set ENV_VARIABLE_DOC=https://spark.apache.org/docs/latest/configuration.html#environment-variables +rem if exist %ASSEMBLY_DIR2% if exist %ASSEMBLY_DIR1% ( +rem echo "Presence of build for multiple Scala versions detected (%ASSEMBLY_DIR1% and %ASSEMBLY_DIR2%)." +rem echo "Remove one of them or, set SPARK_SCALA_VERSION=%SCALA_VERSION_1% in %SPARK_ENV_CMD%." +rem echo "Visit %ENV_VARIABLE_DOC% for more details about setting environment variables in spark-env.cmd." +rem echo "Either clean one of them or, set SPARK_SCALA_VERSION in spark-env.cmd." +rem exit 1 +rem ) +rem if exist %ASSEMBLY_DIR1% ( +rem set SPARK_SCALA_VERSION=%SCALA_VERSION_1% +rem ) else ( +rem set SPARK_SCALA_VERSION=%SCALA_VERSION_2% +rem ) +rem ) exit /b 0 - -:LoadSparkEnv -if exist "%SPARK_CONF_DIR%\spark-env.cmd" ( - call "%SPARK_CONF_DIR%\spark-env.cmd" -) diff --git a/bin/load-spark-env.sh b/bin/load-spark-env.sh index 0ada5d8d0f..107e7991c2 100644 --- a/bin/load-spark-env.sh +++ b/bin/load-spark-env.sh @@ -43,23 +43,25 @@ fi # Setting SPARK_SCALA_VERSION if not already set. -if [ -z "$SPARK_SCALA_VERSION" ]; then - SCALA_VERSION_1=2.12 - SCALA_VERSION_2=2.11 - - ASSEMBLY_DIR_1="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_1}" - ASSEMBLY_DIR_2="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_2}" - ENV_VARIABLE_DOC="https://spark.apache.org/docs/latest/configuration.html#environment-variables" - if [[ -d "$ASSEMBLY_DIR_1" && -d "$ASSEMBLY_DIR_2" ]]; then - echo "Presence of build for multiple Scala versions detected ($ASSEMBLY_DIR_1 and $ASSEMBLY_DIR_2)." 1>&2 - echo "Remove one of them or, export SPARK_SCALA_VERSION=$SCALA_VERSION_1 in ${SPARK_ENV_SH}." 1>&2 - echo "Visit ${ENV_VARIABLE_DOC} for more details about setting environment variables in spark-env.sh." 1>&2 - exit 1 - fi - - if [[ -d "$ASSEMBLY_DIR_1" ]]; then - export SPARK_SCALA_VERSION=${SCALA_VERSION_1} - else - export SPARK_SCALA_VERSION=${SCALA_VERSION_2} - fi -fi +# TODO: revisit for Scala 2.13 support +export SPARK_SCALA_VERSION=2.12 +#if [ -z "$SPARK_SCALA_VERSION" ]; then +# SCALA_VERSION_1=2.12 +# SCALA_VERSION_2=2.11 +# +# ASSEMBLY_DIR_1="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_1}" +# ASSEMBLY_DIR_2="${SPARK_HOME}/assembly/target/scala-${SCALA_VERSION_2}" +# ENV_VARIABLE_DOC="https://spark.apache.org/docs/latest/configuration.html#environment-variables" +# if [[ -d "$ASSEMBLY_DIR_1" && -d "$ASSEMBLY_DIR_2" ]]; then +# echo "Presence of build for multiple Scala versions detected ($ASSEMBLY_DIR_1 and $ASSEMBLY_DIR_2)." 1>&2 +# echo "Remove one of them or, export SPARK_SCALA_VERSION=$SCALA_VERSION_1 in ${SPARK_ENV_SH}." 1>&2 +# echo "Visit ${ENV_VARIABLE_DOC} for more details about setting environment variables in spark-env.sh." 1>&2 +# exit 1 +# fi +# +# if [[ -d "$ASSEMBLY_DIR_1" ]]; then +# export SPARK_SCALA_VERSION=${SCALA_VERSION_1} +# else +# export SPARK_SCALA_VERSION=${SCALA_VERSION_2} +# fi +#fi diff --git a/core/src/main/scala/org/apache/spark/FutureAction.scala b/core/src/main/scala/org/apache/spark/FutureAction.scala index 036c9a6063..8230533f9d 100644 --- a/core/src/main/scala/org/apache/spark/FutureAction.scala +++ b/core/src/main/scala/org/apache/spark/FutureAction.scala @@ -89,18 +89,6 @@ trait FutureAction[T] extends Future[T] { */ override def value: Option[Try[T]] - // These two methods must be implemented in Scala 2.12. They're implemented as a no-op here - // and then filled in with a real implementation in the two subclasses below. The no-op exists - // here so that those implementations can declare "override", necessary in 2.12, while working - // in 2.11, where the method doesn't exist in the superclass. - // After 2.11 support goes away, remove these two: - - def transform[S](f: (Try[T]) => Try[S])(implicit executor: ExecutionContext): Future[S] = - throw new UnsupportedOperationException() - - def transformWith[S](f: (Try[T]) => Future[S])(implicit executor: ExecutionContext): Future[S] = - throw new UnsupportedOperationException() - /** * Blocks and returns the result of this job. */ @@ -117,43 +105,6 @@ trait FutureAction[T] extends Future[T] { } -/** - * Scala 2.12 defines the two new transform/transformWith methods mentioned above. Impementing - * these for 2.12 in the Spark class here requires delegating to these same methods in an - * underlying Future object. But that only exists in 2.12. But these methods are only called - * in 2.12. So define helper shims to access these methods on a Future by reflection. - */ -private[spark] object FutureAction { - - private val transformTryMethod = - try { - classOf[Future[_]].getMethod("transform", classOf[(_) => _], classOf[ExecutionContext]) - } catch { - case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11 - } - - private val transformWithTryMethod = - try { - classOf[Future[_]].getMethod("transformWith", classOf[(_) => _], classOf[ExecutionContext]) - } catch { - case _: NoSuchMethodException => null // Would fail later in 2.11, but not called in 2.11 - } - - private[spark] def transform[T, S]( - future: Future[T], - f: (Try[T]) => Try[S], - executor: ExecutionContext): Future[S] = - transformTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]] - - private[spark] def transformWith[T, S]( - future: Future[T], - f: (Try[T]) => Future[S], - executor: ExecutionContext): Future[S] = - transformWithTryMethod.invoke(future, f, executor).asInstanceOf[Future[S]] - -} - - /** * A [[FutureAction]] holding the result of an action that triggers a single job. Examples include * count, collect, reduce. @@ -195,16 +146,10 @@ class SimpleFutureAction[T] private[spark](jobWaiter: JobWaiter[_], resultFunc: def jobIds: Seq[Int] = Seq(jobWaiter.jobId) override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] = - FutureAction.transform( - jobWaiter.completionFuture, - (u: Try[Unit]) => f(u.map(_ => resultFunc)), - e) + jobWaiter.completionFuture.transform((u: Try[Unit]) => f(u.map(_ => resultFunc))) override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] = - FutureAction.transformWith( - jobWaiter.completionFuture, - (u: Try[Unit]) => f(u.map(_ => resultFunc)), - e) + jobWaiter.completionFuture.transformWith((u: Try[Unit]) => f(u.map(_ => resultFunc))) } @@ -299,10 +244,10 @@ class ComplexFutureAction[T](run : JobSubmitter => Future[T]) def jobIds: Seq[Int] = subActions.flatMap(_.jobIds) override def transform[S](f: (Try[T]) => Try[S])(implicit e: ExecutionContext): Future[S] = - FutureAction.transform(p.future, f, e) + p.future.transform(f) override def transformWith[S](f: (Try[T]) => Future[S])(implicit e: ExecutionContext): Future[S] = - FutureAction.transformWith(p.future, f, e) + p.future.transformWith(f) } diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala index 6480e87c47..36589e93a1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala @@ -21,13 +21,12 @@ import java.io.{IOException, ObjectOutputStream} import scala.collection.mutable.ArrayBuffer import scala.collection.parallel.ForkJoinTaskSupport -import scala.concurrent.forkjoin.ForkJoinPool import scala.reflect.ClassTag import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, TaskContext} import org.apache.spark.annotation.DeveloperApi import org.apache.spark.internal.config.RDD_PARALLEL_LISTING_THRESHOLD -import org.apache.spark.util.Utils +import org.apache.spark.util.{ThreadUtils, Utils} /** * Partition for UnionRDD. @@ -61,7 +60,7 @@ private[spark] class UnionPartition[T: ClassTag]( object UnionRDD { private[spark] lazy val partitionEvalTaskSupport = - new ForkJoinTaskSupport(new ForkJoinPool(8)) + new ForkJoinTaskSupport(ThreadUtils.newForkJoinPool("partition-eval-task-support", 8)) } @DeveloperApi diff --git a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index 5f725d8697..df696a3127 100644 --- a/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -34,8 +34,6 @@ import org.apache.spark.internal.Logging */ private[spark] object ClosureCleaner extends Logging { - private val isScala2_11 = scala.util.Properties.versionString.contains("2.11") - // Get an ASM class reader for a given class from the JAR that loaded it private[util] def getClassReader(cls: Class[_]): ClassReader = { // Copy data over, before delegating to ClassReader - else we can run out of open file handles. @@ -168,9 +166,6 @@ private[spark] object ClosureCleaner extends Logging { * @param closure the closure to check. */ private def getSerializedLambda(closure: AnyRef): Option[SerializedLambda] = { - if (isScala2_11) { - return None - } val isClosureCandidate = closure.getClass.isSynthetic && closure diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala index cb0c20541d..04b0b4c37d 100644 --- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala @@ -26,7 +26,6 @@ import scala.language.higherKinds import com.google.common.util.concurrent.{MoreExecutors, ThreadFactoryBuilder} import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor, Future} import scala.concurrent.duration.{Duration, FiniteDuration} -import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, ForkJoinWorkerThread => SForkJoinWorkerThread} import scala.util.control.NonFatal import org.apache.spark.SparkException @@ -181,17 +180,17 @@ private[spark] object ThreadUtils { } /** - * Construct a new Scala ForkJoinPool with a specified max parallelism and name prefix. + * Construct a new ForkJoinPool with a specified max parallelism and name prefix. */ - def newForkJoinPool(prefix: String, maxThreadNumber: Int): SForkJoinPool = { + def newForkJoinPool(prefix: String, maxThreadNumber: Int): ForkJoinPool = { // Custom factory to set thread names - val factory = new SForkJoinPool.ForkJoinWorkerThreadFactory { - override def newThread(pool: SForkJoinPool) = - new SForkJoinWorkerThread(pool) { + val factory = new ForkJoinPool.ForkJoinWorkerThreadFactory { + override def newThread(pool: ForkJoinPool) = + new ForkJoinWorkerThread(pool) { setName(prefix + "-" + super.getName) } } - new SForkJoinPool(maxThreadNumber, factory, + new ForkJoinPool(maxThreadNumber, factory, null, // handler false // asyncMode ) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index a0f0989178..8e1a519e18 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -94,8 +94,8 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { test("add dependencies works correctly") { val md = SparkSubmitUtils.getModuleDescriptor - val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.11:0.1," + - "com.databricks:spark-avro_2.11:0.1") + val artifacts = SparkSubmitUtils.extractMavenCoordinates("com.databricks:spark-csv_2.12:0.1," + + "com.databricks:spark-avro_2.12:0.1") SparkSubmitUtils.addDependenciesToIvy(md, artifacts, "default") assert(md.getDependencies.length === 2) @@ -189,7 +189,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { test("neglects Spark and Spark's dependencies") { val coordinates = SparkSubmitUtils.IVY_DEFAULT_EXCLUDES - .map(comp => s"org.apache.spark:spark-${comp}2.11:2.1.1") + .map(comp => s"org.apache.spark:spark-${comp}2.12:2.4.0") .mkString(",") + ",org.apache.spark:spark-core_fake:1.2.0" val path = SparkSubmitUtils.resolveMavenCoordinates( @@ -197,7 +197,7 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { SparkSubmitUtils.buildIvySettings(None, None), isTest = true) assert(path === "", "should return empty path") - val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.11", "1.2.0") + val main = MavenCoordinate("org.apache.spark", "spark-streaming-kafka-assembly_2.12", "1.2.0") IvyTestUtils.withRepository(main, None, None) { repo => val files = SparkSubmitUtils.resolveMavenCoordinates( coordinates + "," + main.toString, diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index 3c6660800f..b7032e81a9 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -74,7 +74,7 @@ class ClosureCleanerSuite extends SparkFunSuite { try { body } catch { - case rse: ReturnStatementInClosureException => // Success! + case _: ReturnStatementInClosureException => // Success! case e @ (_: NotSerializableException | _: SparkException) => fail(s"Expected ReturnStatementInClosureException, but got $e.\n" + "This means the closure provided by user is not actually cleaned.") @@ -122,65 +122,6 @@ class ClosureCleanerSuite extends SparkFunSuite { new TestCreateNullValue().run() } - test("SPARK-22328: ClosureCleaner misses referenced superclass fields: case 1") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val concreteObject = new TestAbstractClass { - val n2 = 222 - val s2 = "bbb" - val d2 = 2.0d - - def run(): Seq[(Int, Int, String, String, Double, Double)] = { - withSpark(new SparkContext("local", "test")) { sc => - val rdd = sc.parallelize(1 to 1) - body(rdd) - } - } - - def body(rdd: RDD[Int]): Seq[(Int, Int, String, String, Double, Double)] = rdd.map { _ => - (n1, n2, s1, s2, d1, d2) - }.collect() - } - assert(concreteObject.run() === Seq((111, 222, "aaa", "bbb", 1.0d, 2.0d))) - } - - test("SPARK-22328: ClosureCleaner misses referenced superclass fields: case 2") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val concreteObject = new TestAbstractClass2 { - val n2 = 222 - val s2 = "bbb" - val d2 = 2.0d - def getData: Int => (Int, Int, String, String, Double, Double) = _ => (n1, n2, s1, s2, d1, d2) - } - withSpark(new SparkContext("local", "test")) { sc => - val rdd = sc.parallelize(1 to 1).map(concreteObject.getData) - assert(rdd.collect() === Seq((111, 222, "aaa", "bbb", 1.0d, 2.0d))) - } - } - - test("SPARK-22328: multiple outer classes have the same parent class") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val concreteObject = new TestAbstractClass2 { - - val innerObject = new TestAbstractClass2 { - override val n1 = 222 - override val s1 = "bbb" - } - - val innerObject2 = new TestAbstractClass2 { - override val n1 = 444 - val n3 = 333 - val s3 = "ccc" - val d3 = 3.0d - - def getData: Int => (Int, Int, String, String, Double, Double, Int, String) = - _ => (n1, n3, s1, s3, d1, d3, innerObject.n1, innerObject.s1) - } - } - withSpark(new SparkContext("local", "test")) { sc => - val rdd = sc.parallelize(1 to 1).map(concreteObject.innerObject2.getData) - assert(rdd.collect() === Seq((444, 333, "aaa", "ccc", 1.0d, 3.0d, 222, "bbb"))) - } - } } // A non-serializable class we create in closures to make sure that we aren't @@ -328,13 +269,13 @@ private object TestUserClosuresActuallyCleaned { rdd.mapPartitionsWithIndex { (_, it) => return; it }.count() } def testZipPartitions2(rdd: RDD[Int]): Unit = { - rdd.zipPartitions(rdd) { case (it1, it2) => return; it1 }.count() + rdd.zipPartitions(rdd) { case (it1, _) => return; it1 }.count() } def testZipPartitions3(rdd: RDD[Int]): Unit = { - rdd.zipPartitions(rdd, rdd) { case (it1, it2, it3) => return; it1 }.count() + rdd.zipPartitions(rdd, rdd) { case (it1, _, _) => return; it1 }.count() } def testZipPartitions4(rdd: RDD[Int]): Unit = { - rdd.zipPartitions(rdd, rdd, rdd) { case (it1, it2, it3, it4) => return; it1 }.count() + rdd.zipPartitions(rdd, rdd, rdd) { case (it1, _, _, _) => return; it1 }.count() } def testForeach(rdd: RDD[Int]): Unit = { rdd.foreach { _ => return } } def testForeachPartition(rdd: RDD[Int]): Unit = { rdd.foreachPartition { _ => return } } @@ -374,17 +315,17 @@ private object TestUserClosuresActuallyCleaned { // Test SparkContext runJob def testRunJob1(sc: SparkContext): Unit = { val rdd = sc.parallelize(1 to 10, 10) - sc.runJob(rdd, { (ctx: TaskContext, iter: Iterator[Int]) => return; 1 } ) + sc.runJob(rdd, { (_: TaskContext, _: Iterator[Int]) => return; 1 } ) } def testRunJob2(sc: SparkContext): Unit = { val rdd = sc.parallelize(1 to 10, 10) - sc.runJob(rdd, { iter: Iterator[Int] => return; 1 } ) + sc.runJob(rdd, { _: Iterator[Int] => return; 1 } ) } def testRunApproximateJob(sc: SparkContext): Unit = { val rdd = sc.parallelize(1 to 10, 10) val evaluator = new CountEvaluator(1, 0.5) sc.runApproximateJob( - rdd, { (ctx: TaskContext, iter: Iterator[Int]) => return; 1L }, evaluator, 1000) + rdd, { (_: TaskContext, _: Iterator[Int]) => return; 1L }, evaluator, 1000) } def testSubmitJob(sc: SparkContext): Unit = { val rdd = sc.parallelize(1 to 10, 10) diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala index 96da8ec3b2..0635b4a358 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite2.scala @@ -19,8 +19,6 @@ package org.apache.spark.util import java.io.NotSerializableException -import scala.collection.mutable - import org.scalatest.{BeforeAndAfterAll, PrivateMethodTester} import org.apache.spark.{SparkContext, SparkException, SparkFunSuite} @@ -107,271 +105,6 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri assertSerializable(closure, serializableAfter) } - /** - * Return the fields accessed by the given closure by class. - * This also optionally finds the fields transitively referenced through methods invocations. - */ - private def findAccessedFields( - closure: AnyRef, - outerClasses: Seq[Class[_]], - findTransitively: Boolean): Map[Class[_], Set[String]] = { - val fields = new mutable.HashMap[Class[_], mutable.Set[String]] - outerClasses.foreach { c => fields(c) = new mutable.HashSet[String] } - val cr = ClosureCleaner.getClassReader(closure.getClass) - if (cr == null) { - Map.empty - } else { - cr.accept(new FieldAccessFinder(fields, findTransitively), 0) - fields.mapValues(_.toSet).toMap - } - } - - // Accessors for private methods - private val _isClosure = PrivateMethod[Boolean]('isClosure) - private val _getInnerClosureClasses = PrivateMethod[List[Class[_]]]('getInnerClosureClasses) - private val _getOuterClassesAndObjects = - PrivateMethod[(List[Class[_]], List[AnyRef])]('getOuterClassesAndObjects) - - private def isClosure(obj: AnyRef): Boolean = { - ClosureCleaner invokePrivate _isClosure(obj) - } - - private def getInnerClosureClasses(closure: AnyRef): List[Class[_]] = { - ClosureCleaner invokePrivate _getInnerClosureClasses(closure) - } - - private def getOuterClassesAndObjects(closure: AnyRef): (List[Class[_]], List[AnyRef]) = { - ClosureCleaner invokePrivate _getOuterClassesAndObjects(closure) - } - - test("get inner closure classes") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val closure1 = () => 1 - val closure2 = () => { () => 1 } - val closure3 = (i: Int) => { - (1 to i).map { x => x + 1 }.filter { x => x > 5 } - } - val closure4 = (j: Int) => { - (1 to j).flatMap { x => - (1 to x).flatMap { y => - (1 to y).map { z => z + 1 } - } - } - } - val inner1 = getInnerClosureClasses(closure1) - val inner2 = getInnerClosureClasses(closure2) - val inner3 = getInnerClosureClasses(closure3) - val inner4 = getInnerClosureClasses(closure4) - assert(inner1.isEmpty) - assert(inner2.size === 1) - assert(inner3.size === 2) - assert(inner4.size === 3) - assert(inner2.forall(isClosure)) - assert(inner3.forall(isClosure)) - assert(inner4.forall(isClosure)) - } - - test("get outer classes and objects") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val localValue = someSerializableValue - val closure1 = () => 1 - val closure2 = () => localValue - val closure3 = () => someSerializableValue - val closure4 = () => someSerializableMethod() - - val (outerClasses1, outerObjects1) = getOuterClassesAndObjects(closure1) - val (outerClasses2, outerObjects2) = getOuterClassesAndObjects(closure2) - val (outerClasses3, outerObjects3) = getOuterClassesAndObjects(closure3) - val (outerClasses4, outerObjects4) = getOuterClassesAndObjects(closure4) - - // The classes and objects should have the same size - assert(outerClasses1.size === outerObjects1.size) - assert(outerClasses2.size === outerObjects2.size) - assert(outerClasses3.size === outerObjects3.size) - assert(outerClasses4.size === outerObjects4.size) - - // These do not have $outer pointers because they reference only local variables - assert(outerClasses1.isEmpty) - assert(outerClasses2.isEmpty) - - // These closures do have $outer pointers because they ultimately reference `this` - // The first $outer pointer refers to the closure defines this test (see FunSuite#test) - // The second $outer pointer refers to ClosureCleanerSuite2 - assert(outerClasses3.size === 2) - assert(outerClasses4.size === 2) - assert(isClosure(outerClasses3(0))) - assert(isClosure(outerClasses4(0))) - assert(outerClasses3(0) === outerClasses4(0)) // part of the same "FunSuite#test" scope - assert(outerClasses3(1) === this.getClass) - assert(outerClasses4(1) === this.getClass) - assert(outerObjects3(1) === this) - assert(outerObjects4(1) === this) - } - - test("get outer classes and objects with nesting") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val localValue = someSerializableValue - - val test1 = () => { - val x = 1 - val closure1 = () => 1 - val closure2 = () => x - val (outerClasses1, outerObjects1) = getOuterClassesAndObjects(closure1) - val (outerClasses2, outerObjects2) = getOuterClassesAndObjects(closure2) - assert(outerClasses1.size === outerObjects1.size) - assert(outerClasses2.size === outerObjects2.size) - // These inner closures only reference local variables, and so do not have $outer pointers - assert(outerClasses1.isEmpty) - assert(outerClasses2.isEmpty) - } - - val test2 = () => { - def y = 1 - val closure1 = () => 1 - val closure2 = () => y - val closure3 = () => localValue - val (outerClasses1, outerObjects1) = getOuterClassesAndObjects(closure1) - val (outerClasses2, outerObjects2) = getOuterClassesAndObjects(closure2) - val (outerClasses3, outerObjects3) = getOuterClassesAndObjects(closure3) - assert(outerClasses1.size === outerObjects1.size) - assert(outerClasses2.size === outerObjects2.size) - assert(outerClasses3.size === outerObjects3.size) - // Same as above, this closure only references local variables - assert(outerClasses1.isEmpty) - // This closure references the "test2" scope because it needs to find the method `y` - // Scope hierarchy: "test2" < "FunSuite#test" < ClosureCleanerSuite2 - assert(outerClasses2.size === 3) - // This closure references the "test2" scope because it needs to find the `localValue` - // defined outside of this scope - assert(outerClasses3.size === 3) - assert(isClosure(outerClasses2(0))) - assert(isClosure(outerClasses3(0))) - assert(isClosure(outerClasses2(1))) - assert(isClosure(outerClasses3(1))) - assert(outerClasses2(0) === outerClasses3(0)) // part of the same "test2" scope - assert(outerClasses2(1) === outerClasses3(1)) // part of the same "FunSuite#test" scope - assert(outerClasses2(2) === this.getClass) - assert(outerClasses3(2) === this.getClass) - assert(outerObjects2(2) === this) - assert(outerObjects3(2) === this) - } - - test1() - test2() - } - - test("find accessed fields") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val localValue = someSerializableValue - val closure1 = () => 1 - val closure2 = () => localValue - val closure3 = () => someSerializableValue - val (outerClasses1, _) = getOuterClassesAndObjects(closure1) - val (outerClasses2, _) = getOuterClassesAndObjects(closure2) - val (outerClasses3, _) = getOuterClassesAndObjects(closure3) - - val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false) - val fields2 = findAccessedFields(closure2, outerClasses2, findTransitively = false) - val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false) - assert(fields1.isEmpty) - assert(fields2.isEmpty) - assert(fields3.size === 2) - // This corresponds to the "FunSuite#test" closure. This is empty because the - // `someSerializableValue` belongs to its parent (i.e. ClosureCleanerSuite2). - assert(fields3(outerClasses3(0)).isEmpty) - // This corresponds to the ClosureCleanerSuite2. This is also empty, however, - // because accessing a `ClosureCleanerSuite2#someSerializableValue` actually involves a - // method call. Since we do not find fields transitively, we will not recursively trace - // through the fields referenced by this method. - assert(fields3(outerClasses3(1)).isEmpty) - - val fields1t = findAccessedFields(closure1, outerClasses1, findTransitively = true) - val fields2t = findAccessedFields(closure2, outerClasses2, findTransitively = true) - val fields3t = findAccessedFields(closure3, outerClasses3, findTransitively = true) - assert(fields1t.isEmpty) - assert(fields2t.isEmpty) - assert(fields3t.size === 2) - // Because we find fields transitively now, we are able to detect that we need the - // $outer pointer to get the field from the ClosureCleanerSuite2 - assert(fields3t(outerClasses3(0)).size === 1) - assert(fields3t(outerClasses3(0)).head === "$outer") - assert(fields3t(outerClasses3(1)).size === 1) - assert(fields3t(outerClasses3(1)).head.contains("someSerializableValue")) - } - - test("find accessed fields with nesting") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val localValue = someSerializableValue - - val test1 = () => { - def a = localValue + 1 - val closure1 = () => 1 - val closure2 = () => a - val closure3 = () => localValue - val closure4 = () => someSerializableValue - val (outerClasses1, _) = getOuterClassesAndObjects(closure1) - val (outerClasses2, _) = getOuterClassesAndObjects(closure2) - val (outerClasses3, _) = getOuterClassesAndObjects(closure3) - val (outerClasses4, _) = getOuterClassesAndObjects(closure4) - - // First, find only fields accessed directly, not transitively, by these closures - val fields1 = findAccessedFields(closure1, outerClasses1, findTransitively = false) - val fields2 = findAccessedFields(closure2, outerClasses2, findTransitively = false) - val fields3 = findAccessedFields(closure3, outerClasses3, findTransitively = false) - val fields4 = findAccessedFields(closure4, outerClasses4, findTransitively = false) - assert(fields1.isEmpty) - // Note that the size here represents the number of outer classes, not the number of fields - // "test1" < parameter of "FunSuite#test" < ClosureCleanerSuite2 - assert(fields2.size === 3) - // Since we do not find fields transitively here, we do not look into what `def a` references - assert(fields2(outerClasses2(0)).isEmpty) // This corresponds to the "test1" scope - assert(fields2(outerClasses2(1)).isEmpty) // This corresponds to the "FunSuite#test" scope - assert(fields2(outerClasses2(2)).isEmpty) // This corresponds to the ClosureCleanerSuite2 - assert(fields3.size === 3) - // Note that `localValue` is a field of the "test1" scope because `def a` references it, - // but NOT a field of the "FunSuite#test" scope because it is only a local variable there - assert(fields3(outerClasses3(0)).size === 1) - assert(fields3(outerClasses3(0)).head.contains("localValue")) - assert(fields3(outerClasses3(1)).isEmpty) - assert(fields3(outerClasses3(2)).isEmpty) - assert(fields4.size === 3) - // Because `val someSerializableValue` is an instance variable, even an explicit reference - // here actually involves a method call to access the underlying value of the variable. - // Because we are not finding fields transitively here, we do not consider the fields - // accessed by this "method" (i.e. the val's accessor). - assert(fields4(outerClasses4(0)).isEmpty) - assert(fields4(outerClasses4(1)).isEmpty) - assert(fields4(outerClasses4(2)).isEmpty) - - // Now do the same, but find fields that the closures transitively reference - val fields1t = findAccessedFields(closure1, outerClasses1, findTransitively = true) - val fields2t = findAccessedFields(closure2, outerClasses2, findTransitively = true) - val fields3t = findAccessedFields(closure3, outerClasses3, findTransitively = true) - val fields4t = findAccessedFields(closure4, outerClasses4, findTransitively = true) - assert(fields1t.isEmpty) - assert(fields2t.size === 3) - assert(fields2t(outerClasses2(0)).size === 1) // `def a` references `localValue` - assert(fields2t(outerClasses2(0)).head.contains("localValue")) - assert(fields2t(outerClasses2(1)).isEmpty) - assert(fields2t(outerClasses2(2)).isEmpty) - assert(fields3t.size === 3) - assert(fields3t(outerClasses3(0)).size === 1) // as before - assert(fields3t(outerClasses3(0)).head.contains("localValue")) - assert(fields3t(outerClasses3(1)).isEmpty) - assert(fields3t(outerClasses3(2)).isEmpty) - assert(fields4t.size === 3) - // Through a series of method calls, we are able to detect that we ultimately access - // ClosureCleanerSuite2's field `someSerializableValue`. Along the way, we also accessed - // a few $outer parent pointers to get to the outermost object. - assert(fields4t(outerClasses4(0)) === Set("$outer")) - assert(fields4t(outerClasses4(1)) === Set("$outer")) - assert(fields4t(outerClasses4(2)).size === 1) - assert(fields4t(outerClasses4(2)).head.contains("someSerializableValue")) - } - - test1() - } - test("clean basic serializable closures") { val localValue = someSerializableValue val closure1 = () => 1 @@ -452,12 +185,12 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri val closure2 = (j: Int) => { (1 to j).map { x => x + someSerializableMethod() } } val closure4 = (k: Int) => { (1 to k).map { x => x + localSerializableMethod() } } // This closure references a local non-serializable value - val closure3 = (l: Int) => { (1 to l).map { x => localNonSerializableValue } } + val closure3 = (l: Int) => { (1 to l).map { _ => localNonSerializableValue } } // This is non-serializable no matter how many levels we nest it val closure5 = (m: Int) => { (1 to m).foreach { x => (1 to x).foreach { y => - (1 to y).foreach { z => + (1 to y).foreach { _ => someSerializableValue } } @@ -542,23 +275,8 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri // As before, this closure is neither serializable nor cleanable verifyCleaning(inner1, serializableBefore = false, serializableAfter = false) - - if (ClosureCleanerSuite2.supportsLMFs) { - verifyCleaning( - inner2, serializableBefore = true, serializableAfter = true) - } else { - // This closure is no longer serializable because it now has a pointer to the outer closure, - // which is itself not serializable because it has a pointer to the ClosureCleanerSuite2. - // If we do not clean transitively, we will not null out this indirect reference. - verifyCleaning( - inner2, serializableBefore = false, serializableAfter = false, transitive = false) - - // If we clean transitively, we will find that method `a` does not actually reference the - // outer closure's parent (i.e. the ClosureCleanerSuite), so we can additionally null out - // the outer closure's parent pointer. This will make `inner2` serializable. - verifyCleaning( - inner2, serializableBefore = false, serializableAfter = true, transitive = true) - } + verifyCleaning( + inner2, serializableBefore = true, serializableAfter = true) } // Same as above, but with more levels of nesting @@ -575,25 +293,4 @@ class ClosureCleanerSuite2 extends SparkFunSuite with BeforeAndAfterAll with Pri test6()()() } - test("verify nested non-LMF closures") { - assume(ClosureCleanerSuite2.supportsLMFs) - class A1(val f: Int => Int) - class A2(val f: Int => Int => Int) - class B extends A1(x => x*x) - class C extends A2(x => new B().f ) - val closure1 = new B().f - val closure2 = new C().f - // serializable already - verifyCleaning(closure1, serializableBefore = true, serializableAfter = true) - // brings in deps that can't be cleaned - verifyCleaning(closure2, serializableBefore = false, serializableAfter = false) - } -} - -object ClosureCleanerSuite2 { - // Scala 2.12 allows better interop with Java 8 via lambda syntax. This is supported - // by implementing FunctionN classes in Scala’s standard library as Single Abstract - // Method (SAM) types. Lambdas are implemented via the invokedynamic instruction and - // the use of the LambdaMwtaFactory (LMF) machanism. - val supportsLMFs = scala.util.Properties.versionString.contains("2.12") } diff --git a/dev/change-scala-version.sh b/dev/change-scala-version.sh index 022e68c279..4054d530d0 100755 --- a/dev/change-scala-version.sh +++ b/dev/change-scala-version.sh @@ -19,7 +19,7 @@ set -e -VALID_VERSIONS=( 2.11 2.12 ) +VALID_VERSIONS=( 2.12 ) usage() { echo "Usage: $(basename $0) [-h|--help] @@ -44,10 +44,10 @@ check_scala_version() { check_scala_version "$TO_VERSION" -if [ $TO_VERSION = "2.11" ]; then +if [ $TO_VERSION = "2.13" ]; then FROM_VERSION="2.12" else - FROM_VERSION="2.11" + FROM_VERSION="2.13" fi sed_i() { diff --git a/dev/create-release/release-build.sh b/dev/create-release/release-build.sh index 35deadfbbf..f35bc4f486 100755 --- a/dev/create-release/release-build.sh +++ b/dev/create-release/release-build.sh @@ -110,16 +110,20 @@ fi # Depending on the version being built, certain extra profiles need to be activated, and # different versions of Scala are supported. BASE_PROFILES="-Pmesos -Pyarn" -PUBLISH_SCALA_2_10=0 -SCALA_2_10_PROFILES="-Pscala-2.10" -SCALA_2_11_PROFILES= if [[ $SPARK_VERSION > "2.3" ]]; then BASE_PROFILES="$BASE_PROFILES -Pkubernetes" +fi + +# TODO: revisit for Scala 2.13 + +PUBLISH_SCALA_2_11=1 +SCALA_2_11_PROFILES="-Pscala-2.11" +if [[ $SPARK_VERSION > "2.3" ]]; then if [[ $SPARK_VERSION < "3.0." ]]; then - SCALA_2_11_PROFILES="-Pkafka-0-8 -Pflume" + SCALA_2_11_PROFILES="-Pkafka-0-8 -Pflume $SCALA_2_11_PROFILES" + else + PUBLISH_SCALA_2_11=0 fi -else - PUBLISH_SCALA_2_10=1 fi PUBLISH_SCALA_2_12=0 @@ -138,22 +142,10 @@ PUBLISH_PROFILES="$BASE_PROFILES $HIVE_PROFILES -Pspark-ganglia-lgpl -Pkinesis-a # Profiles for building binary releases BASE_RELEASE_PROFILES="$BASE_PROFILES -Psparkr" -if [[ ! $SPARK_VERSION < "2.2." ]]; then - if [[ $JAVA_VERSION < "1.8." ]]; then - echo "Java version $JAVA_VERSION is less than required 1.8 for 2.2+" - echo "Please set JAVA_HOME correctly." - exit 1 - fi -else - if ! [[ $JAVA_VERSION =~ 1\.7\..* ]]; then - if [ -z "$JAVA_7_HOME" ]; then - echo "Java version $JAVA_VERSION is higher than required 1.7 for pre-2.2" - echo "Please set JAVA_HOME correctly." - exit 1 - else - export JAVA_HOME="$JAVA_7_HOME" - fi - fi +if [[ $JAVA_VERSION < "1.8." ]]; then + echo "Java version $JAVA_VERSION is less than required 1.8 for 2.2+" + echo "Please set JAVA_HOME correctly." + exit 1 fi # This is a band-aid fix to avoid the failure of Maven nightly snapshot in some Jenkins @@ -221,9 +213,7 @@ if [[ "$1" == "package" ]]; then cp -r spark spark-$SPARK_VERSION-bin-$NAME cd spark-$SPARK_VERSION-bin-$NAME - if [[ "$SCALA_VERSION" != "2.11" ]]; then - ./dev/change-scala-version.sh $SCALA_VERSION - fi + ./dev/change-scala-version.sh $SCALA_VERSION export ZINC_PORT=$ZINC_PORT echo "Creating distribution: $NAME ($FLAGS)" @@ -292,31 +282,29 @@ if [[ "$1" == "package" ]]; then if [[ $SPARK_VERSION < "3.0." ]]; then BINARY_PKGS_ARGS["hadoop2.6"]="-Phadoop-2.6 $HIVE_PROFILES" fi - if [[ $SPARK_VERSION < "2.2." ]]; then - BINARY_PKGS_ARGS["hadoop2.4"]="-Phadoop-2.4 $HIVE_PROFILES" - BINARY_PKGS_ARGS["hadoop2.3"]="-Phadoop-2.3 $HIVE_PROFILES" - fi fi declare -A BINARY_PKGS_EXTRA BINARY_PKGS_EXTRA["hadoop2.7"]="withpip,withr" - echo "Packages to build: ${!BINARY_PKGS_ARGS[@]}" - for key in ${!BINARY_PKGS_ARGS[@]}; do - args=${BINARY_PKGS_ARGS[$key]} - extra=${BINARY_PKGS_EXTRA[$key]} + if [[ $PUBLISH_SCALA_2_11 = 1 ]]; then + key="without-hadoop-scala-2.11" + args="-Phadoop-provided" + extra="" if ! make_binary_release "$key" "$SCALA_2_11_PROFILES $args" "$extra" "2.11"; then error "Failed to build $key package. Check logs for details." fi - done + fi if [[ $PUBLISH_SCALA_2_12 = 1 ]]; then - key="without-hadoop-scala-2.12" - args="-Phadoop-provided" - extra="" - if ! make_binary_release "$key" "$SCALA_2_12_PROFILES $args" "$extra" "2.12"; then - error "Failed to build $key package. Check logs for details." - fi + echo "Packages to build: ${!BINARY_PKGS_ARGS[@]}" + for key in ${!BINARY_PKGS_ARGS[@]}; do + args=${BINARY_PKGS_ARGS[$key]} + extra=${BINARY_PKGS_EXTRA[$key]} + if ! make_binary_release "$key" "$SCALA_2_12_PROFILES $args" "$extra" "2.12"; then + error "Failed to build $key package. Check logs for details." + fi + done fi rm -rf spark-$SPARK_VERSION-bin-*/ @@ -391,10 +379,7 @@ if [[ "$1" == "publish-snapshot" ]]; then # Generate random point for Zinc export ZINC_PORT=$(python -S -c "import random; print random.randrange(3030,4030)") - $MVN -DzincPort=$ZINC_PORT --settings $tmp_settings -DskipTests $SCALA_2_11_PROFILES $PUBLISH_PROFILES deploy - #./dev/change-scala-version.sh 2.12 - #$MVN -DzincPort=$ZINC_PORT --settings $tmp_settings \ - # -DskipTests $SCALA_2_12_PROFILES $PUBLISH_PROFILES clean deploy + $MVN -DzincPort=$ZINC_PORT --settings $tmp_settings -DskipTests $SCALA_2_12_PROFILES $PUBLISH_PROFILES deploy rm $tmp_settings cd .. @@ -426,22 +411,20 @@ if [[ "$1" == "publish-release" ]]; then # Generate random point for Zinc export ZINC_PORT=$(python -S -c "import random; print random.randrange(3030,4030)") - $MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo -DskipTests $SCALA_2_11_PROFILES $PUBLISH_PROFILES clean install + # TODO: revisit for Scala 2.13 support - if ! is_dry_run && [[ $PUBLISH_SCALA_2_10 = 1 ]]; then - ./dev/change-scala-version.sh 2.10 - $MVN -DzincPort=$((ZINC_PORT + 1)) -Dmaven.repo.local=$tmp_repo -Dscala-2.10 \ - -DskipTests $PUBLISH_PROFILES $SCALA_2_10_PROFILES clean install + if ! is_dry_run && [[ $PUBLISH_SCALA_2_11 = 1 ]]; then + ./dev/change-scala-version.sh 2.11 + $MVN -DzincPort=$ZINC_PORT -Dmaven.repo.local=$tmp_repo -DskipTests \ + $SCALA_2_11_PROFILES $PUBLISH_PROFILES clean install fi if ! is_dry_run && [[ $PUBLISH_SCALA_2_12 = 1 ]]; then ./dev/change-scala-version.sh 2.12 - $MVN -DzincPort=$((ZINC_PORT + 2)) -Dmaven.repo.local=$tmp_repo -Dscala-2.12 \ - -DskipTests $PUBLISH_PROFILES $SCALA_2_12_PROFILES clean install + $MVN -DzincPort=$((ZINC_PORT + 2)) -Dmaven.repo.local=$tmp_repo -DskipTests \ + $SCALA_2_11_PROFILES $PUBLISH_PROFILES clean install fi - ./dev/change-scala-version.sh 2.11 - pushd $tmp_repo/org/apache/spark # Remove any extra files generated during install diff --git a/docs/building-spark.md b/docs/building-spark.md index b16083f9f8..ef2f2bf231 100644 --- a/docs/building-spark.md +++ b/docs/building-spark.md @@ -13,7 +13,7 @@ redirect_from: "building-with-maven.html" The Maven-based build is the build of reference for Apache Spark. Building Spark using Maven requires Maven 3.6.0 and Java 8. -Note that support for Java 7 was removed as of Spark 2.2.0. +Spark requires Scala 2.12; support for Scala 2.11 was removed in Spark 3.0.0. ### Setting up Maven's Memory Usage @@ -246,17 +246,18 @@ or ## Change Scala Version -To build Spark using another supported Scala version, please change the major Scala version using (e.g. 2.11): +When other versions of Scala like 2.13 are supported, it will be possible to build for that version. +Change the major Scala version using (e.g. 2.13): - ./dev/change-scala-version.sh 2.11 + ./dev/change-scala-version.sh 2.13 -For Maven, please enable the profile (e.g. 2.11): +For Maven, please enable the profile (e.g. 2.13): - ./build/mvn -Pscala-2.11 compile + ./build/mvn -Pscala-2.13 compile -For SBT, specify a complete scala version using (e.g. 2.11.12): +For SBT, specify a complete scala version using (e.g. 2.13.0): - ./build/sbt -Dscala.version=2.11.12 + ./build/sbt -Dscala.version=2.13.0 Otherwise, the sbt-pom-reader plugin will use the `scala.version` specified in the spark-parent pom. diff --git a/docs/index.md b/docs/index.md index a85dd9e553..59cd3f6faa 100644 --- a/docs/index.md +++ b/docs/index.md @@ -31,7 +31,8 @@ Spark runs on both Windows and UNIX-like systems (e.g. Linux, Mac OS). It's easy locally on one machine --- all you need is to have `java` installed on your system `PATH`, or the `JAVA_HOME` environment variable pointing to a Java installation. -Spark runs on Java 8+, Python 2.7+/3.4+ and R 3.1+. R prior to version 3.4 support is deprecated as of Spark 3.0.0. +Spark runs on Java 8+, Scala 2.12, Python 2.7+/3.4+ and R 3.1+. +R prior to version 3.4 support is deprecated as of Spark 3.0.0. For the Scala API, Spark {{site.SPARK_VERSION}} uses Scala {{site.SCALA_BINARY_VERSION}}. You will need to use a compatible Scala version ({{site.SCALA_BINARY_VERSION}}.x). diff --git a/docs/storage-openstack-swift.md b/docs/storage-openstack-swift.md index dacaa3438d..3264711df7 100644 --- a/docs/storage-openstack-swift.md +++ b/docs/storage-openstack-swift.md @@ -29,7 +29,7 @@ For example, for Maven support, add the following to the pom.xml fi ... org.apache.spark - hadoop-cloud_2.11 + hadoop-cloud_2.12 ${spark.version} ... diff --git a/external/docker/spark-test/base/Dockerfile b/external/docker/spark-test/base/Dockerfile index c70cd71367..224e7031fd 100644 --- a/external/docker/spark-test/base/Dockerfile +++ b/external/docker/spark-test/base/Dockerfile @@ -25,7 +25,7 @@ RUN apt-get update && \ apt-get install -y less openjdk-8-jre-headless iproute2 vim-tiny sudo openssh-server && \ rm -rf /var/lib/apt/lists/* -ENV SCALA_VERSION 2.11.8 +ENV SCALA_VERSION 2.12.8 ENV CDH_VERSION cdh4 ENV SCALA_HOME /opt/scala-$SCALA_VERSION ENV SPARK_HOME /opt/spark diff --git a/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala deleted file mode 100644 index 5325978a0a..0000000000 --- a/graphx/src/test/scala/org/apache/spark/graphx/util/BytecodeUtilsSuite.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.graphx.util - -import org.apache.spark.SparkFunSuite -import org.apache.spark.util.ClosureCleanerSuite2 - - -// scalastyle:off println -class BytecodeUtilsSuite extends SparkFunSuite { - - import BytecodeUtilsSuite.TestClass - - test("closure invokes a method") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); } - assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) - assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) - assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) - - val c2 = {e: TestClass => println(e.foo); println(e.bar); } - assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo")) - assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar")) - assert(!BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz")) - - val c3 = {e: TestClass => println(e.foo); } - assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo")) - assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar")) - assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz")) - } - - test("closure inside a closure invokes a method") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val c1 = {e: TestClass => println(e.foo); println(e.bar); println(e.baz); } - val c2 = {e: TestClass => c1(e); println(e.foo); } - assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "foo")) - assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "bar")) - assert(BytecodeUtils.invokedMethod(c2, classOf[TestClass], "baz")) - } - - test("closure inside a closure inside a closure invokes a method") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val c1 = {e: TestClass => println(e.baz); } - val c2 = {e: TestClass => c1(e); println(e.foo); } - val c3 = {e: TestClass => c2(e) } - assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "foo")) - assert(!BytecodeUtils.invokedMethod(c3, classOf[TestClass], "bar")) - assert(BytecodeUtils.invokedMethod(c3, classOf[TestClass], "baz")) - } - - test("closure calling a function that invokes a method") { - assume(!ClosureCleanerSuite2.supportsLMFs) - def zoo(e: TestClass) { - println(e.baz) - } - val c1 = {e: TestClass => zoo(e)} - assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) - assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) - assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) - } - - test("closure calling a function that invokes a method which uses another closure") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val c2 = {e: TestClass => println(e.baz)} - def zoo(e: TestClass) { - c2(e) - } - val c1 = {e: TestClass => zoo(e)} - assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) - assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) - assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) - } - - test("nested closure") { - assume(!ClosureCleanerSuite2.supportsLMFs) - val c2 = {e: TestClass => println(e.baz)} - def zoo(e: TestClass, c: TestClass => Unit) { - c(e) - } - val c1 = {e: TestClass => zoo(e, c2)} - assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) - assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) - assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) - } - - // The following doesn't work yet, because the byte code doesn't contain any information - // about what exactly "c" is. -// test("invoke interface") { -// val c1 = {e: TestClass => c(e)} -// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "foo")) -// assert(!BytecodeUtils.invokedMethod(c1, classOf[TestClass], "bar")) -// assert(BytecodeUtils.invokedMethod(c1, classOf[TestClass], "baz")) -// } - - private val c = {e: TestClass => println(e.baz)} -} - -// scalastyle:on println - -object BytecodeUtilsSuite { - class TestClass(val foo: Int, val bar: Long) { - def baz: Boolean = false - } -} diff --git a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java index 56edceb17b..3ae4633c79 100644 --- a/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java +++ b/launcher/src/main/java/org/apache/spark/launcher/AbstractCommandBuilder.java @@ -235,17 +235,20 @@ abstract class AbstractCommandBuilder { return scala; } String sparkHome = getSparkHome(); + // TODO: revisit for Scala 2.13 support File scala212 = new File(sparkHome, "launcher/target/scala-2.12"); - File scala211 = new File(sparkHome, "launcher/target/scala-2.11"); - checkState(!scala212.isDirectory() || !scala211.isDirectory(), - "Presence of build for multiple Scala versions detected.\n" + - "Either clean one of them or set SPARK_SCALA_VERSION in your environment."); - if (scala212.isDirectory()) { - return "2.12"; - } else { - checkState(scala211.isDirectory(), "Cannot find any build directories."); - return "2.11"; - } + // File scala211 = new File(sparkHome, "launcher/target/scala-2.11"); + // checkState(!scala212.isDirectory() || !scala211.isDirectory(), + // "Presence of build for multiple Scala versions detected.\n" + + // "Either clean one of them or set SPARK_SCALA_VERSION in your environment."); + // if (scala212.isDirectory()) { + // return "2.12"; + // } else { + // checkState(scala211.isDirectory(), "Cannot find any build directories."); + // return "2.11"; + // } + checkState(scala212.isDirectory(), "Cannot find any build directories."); + return "2.12"; } String getSparkHome() { diff --git a/pom.xml b/pom.xml index 19775050de..0cce66472b 100644 --- a/pom.xml +++ b/pom.xml @@ -774,7 +774,6 @@ scala-parser-combinators_${scala.binary.version} 1.1.0 - jline jline @@ -2719,41 +2718,6 @@ scala-2.12 - - scala-2.11 - - 2.11.12 - 2.11 - - - - - org.apache.maven.plugins - maven-enforcer-plugin - - - enforce-versions - - enforce - - - - - - org.jboss.netty - org.codehaus.groovy - *:*_2.10 - - - - - - - - - - -