From 576c43fb4226e4efa12189b41c3bc862019862c6 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Sun, 24 Sep 2017 09:40:13 +0100 Subject: [PATCH] [SPARK-22087][SPARK-14650][WIP][BUILD][REPL][CORE] Compile Spark REPL for Scala 2.12 + other 2.12 fixes ## What changes were proposed in this pull request? Enable Scala 2.12 REPL. Fix most remaining issues with 2.12 compilation and warnings, including: - Selecting Kafka 0.10.1+ for Scala 2.12 and patching over a minor API difference - Fixing lots of "eta expansion of zero arg method deprecated" warnings - Resolving the SparkContext.sequenceFile implicits compile problem - Fixing an odd but valid jetty-server missing dependency in hive-thriftserver ## How was this patch tested? Existing tests Author: Sean Owen Closes #19307 from srowen/Scala212. --- .../scala/org/apache/spark/SparkContext.scala | 36 +++++ .../deploy/history/FsHistoryProvider.scala | 8 +- .../apache/spark/deploy/worker/Worker.scala | 14 +- .../CoarseGrainedExecutorBackend.scala | 4 +- .../spark/memory/UnifiedMemoryManager.scala | 2 +- .../apache/spark/rdd/DoubleRDDFunctions.scala | 6 +- .../apache/spark/rpc/netty/NettyRpcEnv.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 14 +- .../CoarseGrainedSchedulerBackend.scala | 10 +- .../spark/serializer/KryoSerializer.scala | 4 +- .../spark/storage/BlockManagerMaster.scala | 23 ++- .../storage/BlockManagerSlaveEndpoint.scala | 10 +- .../apache/spark/ui/UIWorkloadGenerator.scala | 14 +- .../spark/deploy/SparkSubmitUtilsSuite.scala | 11 +- .../history/ApplicationCacheSuite.scala | 4 +- .../spark/rdd/AsyncRDDActionsSuite.scala | 8 +- .../scheduler/SchedulerIntegrationSuite.scala | 2 + .../spark/serializer/KryoBenchmark.scala | 16 +-- examples/pom.xml | 2 +- .../flume/FlumePollingStreamSuite.scala | 4 +- external/kafka-0-10-sql/pom.xml | 11 ++ .../spark/sql/kafka010/KafkaTestUtils.scala | 5 +- external/kafka-0-10/pom.xml | 11 ++ .../scala/org/apache/spark/ml/Pipeline.scala | 5 +- .../spark/ml/classification/LinearSVC.scala | 2 +- .../classification/LogisticRegression.scala | 2 +- .../spark/ml/classification/OneVsRest.scala | 2 +- .../ml/regression/LinearRegression.scala | 2 +- .../spark/ml/tree/impl/RandomForest.scala | 6 +- .../spark/mllib/tree/impurity/Impurity.scala | 2 +- .../DifferentiableLossAggregatorSuite.scala | 4 +- .../ml/tree/impl/RandomForestSuite.scala | 6 +- repl/pom.xml | 2 - .../org/apache/spark/repl/SparkILoop.scala | 134 ++++++++++++++++++ .../scala/org/apache/spark/repl/Main.scala | 0 .../org/apache/spark/repl/ReplSuite.scala | 0 .../spark/repl/SingletonReplSuite.scala | 0 .../expressions/aggregate/Percentile.scala | 2 +- .../org/apache/spark/sql/types/Metadata.scala | 2 +- .../spark/sql/execution/GenerateExec.scala | 4 +- .../datasources/InMemoryFileIndex.scala | 1 + .../datasources/csv/UnivocityParser.scala | 2 +- .../sql/execution/joins/HashedRelation.scala | 8 +- .../apache/spark/sql/JavaDatasetSuite.java | 4 + .../apache/spark/sql/DataFrameStatSuite.scala | 3 - .../StreamingQueryManagerSuite.scala | 2 +- sql/hive-thriftserver/pom.xml | 10 ++ .../spark/streaming/StreamingContext.scala | 2 +- 48 files changed, 316 insertions(+), 112 deletions(-) create mode 100644 repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala rename repl/{scala-2.11 => }/src/main/scala/org/apache/spark/repl/Main.scala (100%) rename repl/{scala-2.11 => }/src/test/scala/org/apache/spark/repl/ReplSuite.scala (100%) rename repl/{scala-2.11 => }/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala (100%) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 1821bc87bf..cec61d85cc 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -2826,6 +2826,42 @@ object WritableConverter { // them automatically. However, we still keep the old functions in SparkContext for backward // compatibility and forward to the following functions directly. + // The following implicit declarations have been added on top of the very similar ones + // below in order to enable compatibility with Scala 2.12. Scala 2.12 deprecates eta + // expansion of zero-arg methods and thus won't match a no-arg method where it expects + // an implicit that is a function of no args. + + implicit val intWritableConverterFn: () => WritableConverter[Int] = + () => simpleWritableConverter[Int, IntWritable](_.get) + + implicit val longWritableConverterFn: () => WritableConverter[Long] = + () => simpleWritableConverter[Long, LongWritable](_.get) + + implicit val doubleWritableConverterFn: () => WritableConverter[Double] = + () => simpleWritableConverter[Double, DoubleWritable](_.get) + + implicit val floatWritableConverterFn: () => WritableConverter[Float] = + () => simpleWritableConverter[Float, FloatWritable](_.get) + + implicit val booleanWritableConverterFn: () => WritableConverter[Boolean] = + () => simpleWritableConverter[Boolean, BooleanWritable](_.get) + + implicit val bytesWritableConverterFn: () => WritableConverter[Array[Byte]] = { + () => simpleWritableConverter[Array[Byte], BytesWritable] { bw => + // getBytes method returns array which is longer then data to be returned + Arrays.copyOfRange(bw.getBytes, 0, bw.getLength) + } + } + + implicit val stringWritableConverterFn: () => WritableConverter[String] = + () => simpleWritableConverter[String, Text](_.toString) + + implicit def writableWritableConverterFn[T <: Writable : ClassTag]: () => WritableConverter[T] = + () => new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T]) + + // These implicits remain included for backwards-compatibility. They fulfill the + // same role as those above. + implicit def intWritableConverter(): WritableConverter[Int] = simpleWritableConverter[Int, IntWritable](_.get) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 20fe911f2d..910121e987 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -218,11 +218,13 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) if (!conf.contains("spark.testing")) { // A task that periodically checks for event log updates on disk. logDebug(s"Scheduling update thread every $UPDATE_INTERVAL_S seconds") - pool.scheduleWithFixedDelay(getRunner(checkForLogs), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) + pool.scheduleWithFixedDelay( + getRunner(() => checkForLogs()), 0, UPDATE_INTERVAL_S, TimeUnit.SECONDS) if (conf.getBoolean("spark.history.fs.cleaner.enabled", false)) { // A task that periodically cleans event logs on disk. - pool.scheduleWithFixedDelay(getRunner(cleanLogs), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) + pool.scheduleWithFixedDelay( + getRunner(() => cleanLogs()), 0, CLEAN_INTERVAL_S, TimeUnit.SECONDS) } } else { logDebug("Background update thread disabled for testing") @@ -268,7 +270,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) appListener.adminAclsGroups.getOrElse("") ui.getSecurityManager.setAdminAclsGroups(adminAclsGroups) ui.getSecurityManager.setViewAclsGroups(appListener.viewAclsGroups.getOrElse("")) - Some(LoadedAppUI(ui, updateProbe(appId, attemptId, attempt.fileSize))) + Some(LoadedAppUI(ui, () => updateProbe(appId, attemptId, attempt.fileSize))) } else { None } diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 29a810fe7a..ed5fa4b839 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -450,10 +450,9 @@ private[deploy] class Worker( } }(cleanupThreadExecutor) - cleanupFuture.onFailure { - case e: Throwable => - logError("App dir cleanup failed: " + e.getMessage, e) - }(cleanupThreadExecutor) + cleanupFuture.failed.foreach(e => + logError("App dir cleanup failed: " + e.getMessage, e) + )(cleanupThreadExecutor) case MasterChanged(masterRef, masterWebUiUrl) => logInfo("Master has changed, new master is at " + masterRef.address.toSparkURL) @@ -622,10 +621,9 @@ private[deploy] class Worker( dirList.foreach { dir => Utils.deleteRecursively(new File(dir)) } - }(cleanupThreadExecutor).onFailure { - case e: Throwable => - logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e) - }(cleanupThreadExecutor) + }(cleanupThreadExecutor).failed.foreach(e => + logError(s"Clean up app dir $dirList failed: ${e.getMessage}", e) + )(cleanupThreadExecutor) } shuffleService.applicationRemoved(id) } diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index ed893cd1e9..d27362ae85 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -163,9 +163,9 @@ private[spark] class CoarseGrainedExecutorBackend( if (notifyDriver && driver.nonEmpty) { driver.get.ask[Boolean]( RemoveExecutor(executorId, new ExecutorLossReason(reason)) - ).onFailure { case e => + ).failed.foreach(e => logWarning(s"Unable to notify the driver due to " + e.getMessage, e) - }(ThreadUtils.sameThread) + )(ThreadUtils.sameThread) } System.exit(code) diff --git a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala index df193552be..78edd2c4d7 100644 --- a/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala +++ b/core/src/main/scala/org/apache/spark/memory/UnifiedMemoryManager.scala @@ -143,7 +143,7 @@ private[spark] class UnifiedMemoryManager private[memory] ( } executionPool.acquireMemory( - numBytes, taskAttemptId, maybeGrowExecutionPool, computeMaxExecutionPoolSize) + numBytes, taskAttemptId, maybeGrowExecutionPool, () => computeMaxExecutionPoolSize) } override def acquireStorageMemory( diff --git a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala index 57782c0ebf..943abae17a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/DoubleRDDFunctions.scala @@ -128,9 +128,9 @@ class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { } // Compute the minimum and the maximum val (max: Double, min: Double) = self.mapPartitions { items => - Iterator(items.foldRight(Double.NegativeInfinity, - Double.PositiveInfinity)((e: Double, x: (Double, Double)) => - (x._1.max(e), x._2.min(e)))) + Iterator( + items.foldRight((Double.NegativeInfinity, Double.PositiveInfinity) + )((e: Double, x: (Double, Double)) => (x._1.max(e), x._2.min(e)))) }.reduce { (maxmin1, maxmin2) => (maxmin1._1.max(maxmin2._1), maxmin1._2.min(maxmin2._2)) } diff --git a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala index 1777e7a539..f951591e02 100644 --- a/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala +++ b/core/src/main/scala/org/apache/spark/rpc/netty/NettyRpcEnv.scala @@ -232,7 +232,7 @@ private[netty] class NettyRpcEnv( onFailure, (client, response) => onSuccess(deserialize[Any](client, response))) postToOutbox(message.receiver, rpcMessage) - promise.future.onFailure { + promise.future.failed.foreach { case _: TimeoutException => rpcMessage.onTimeout() case _ => }(ThreadUtils.sameThread) diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 562dd1da4f..9153751d03 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger import scala.annotation.tailrec import scala.collection.Map -import scala.collection.mutable.{HashMap, HashSet, Stack} +import scala.collection.mutable.{ArrayStack, HashMap, HashSet} import scala.concurrent.duration._ import scala.language.existentials import scala.language.postfixOps @@ -396,12 +396,12 @@ class DAGScheduler( /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ private def getMissingAncestorShuffleDependencies( - rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = { - val ancestors = new Stack[ShuffleDependency[_, _, _]] + rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = { + val ancestors = new ArrayStack[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting - val waitingForVisit = new Stack[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() @@ -434,7 +434,7 @@ class DAGScheduler( rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = { val parents = new HashSet[ShuffleDependency[_, _, _]] val visited = new HashSet[RDD[_]] - val waitingForVisit = new Stack[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] waitingForVisit.push(rdd) while (waitingForVisit.nonEmpty) { val toVisit = waitingForVisit.pop() @@ -456,7 +456,7 @@ class DAGScheduler( val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting - val waitingForVisit = new Stack[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd @@ -1633,7 +1633,7 @@ class DAGScheduler( val visitedRdds = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting - val waitingForVisit = new Stack[RDD[_]] + val waitingForVisit = new ArrayStack[RDD[_]] def visit(rdd: RDD[_]) { if (!visitedRdds(rdd)) { visitedRdds += rdd diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index a0ef209779..424e43b25c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -471,15 +471,13 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp */ protected def removeExecutor(executorId: String, reason: ExecutorLossReason): Unit = { // Only log the failure since we don't care about the result. - driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).onFailure { - case t => logError(t.getMessage, t) - }(ThreadUtils.sameThread) + driverEndpoint.ask[Boolean](RemoveExecutor(executorId, reason)).failed.foreach(t => + logError(t.getMessage, t))(ThreadUtils.sameThread) } protected def removeWorker(workerId: String, host: String, message: String): Unit = { - driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).onFailure { - case t => logError(t.getMessage, t) - }(ThreadUtils.sameThread) + driverEndpoint.ask[Boolean](RemoveWorker(workerId, host, message)).failed.foreach(t => + logError(t.getMessage, t))(ThreadUtils.sameThread) } def sufficientResourcesRegistered(): Boolean = true diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 4f03e54e30..58483c9577 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -501,8 +501,8 @@ private class JavaIterableWrapperSerializer private object JavaIterableWrapperSerializer extends Logging { // The class returned by JavaConverters.asJava // (scala.collection.convert.Wrappers$IterableWrapper). - val wrapperClass = - scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass + import scala.collection.JavaConverters._ + val wrapperClass = Seq(1).asJava.getClass // Get the underlying method so we can use it to get the Scala collection for serialization. private val underlyingMethodOpt = { diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala index ea5d8423a5..8b1dc0ba63 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMaster.scala @@ -118,10 +118,9 @@ class BlockManagerMaster( /** Remove all blocks belonging to the given RDD. */ def removeRdd(rddId: Int, blocking: Boolean) { val future = driverEndpoint.askSync[Future[Seq[Int]]](RemoveRdd(rddId)) - future.onFailure { - case e: Exception => - logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e) - }(ThreadUtils.sameThread) + future.failed.foreach(e => + logWarning(s"Failed to remove RDD $rddId - ${e.getMessage}", e) + )(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } @@ -130,10 +129,9 @@ class BlockManagerMaster( /** Remove all blocks belonging to the given shuffle. */ def removeShuffle(shuffleId: Int, blocking: Boolean) { val future = driverEndpoint.askSync[Future[Seq[Boolean]]](RemoveShuffle(shuffleId)) - future.onFailure { - case e: Exception => - logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e) - }(ThreadUtils.sameThread) + future.failed.foreach(e => + logWarning(s"Failed to remove shuffle $shuffleId - ${e.getMessage}", e) + )(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } @@ -143,11 +141,10 @@ class BlockManagerMaster( def removeBroadcast(broadcastId: Long, removeFromMaster: Boolean, blocking: Boolean) { val future = driverEndpoint.askSync[Future[Seq[Int]]]( RemoveBroadcast(broadcastId, removeFromMaster)) - future.onFailure { - case e: Exception => - logWarning(s"Failed to remove broadcast $broadcastId" + - s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e) - }(ThreadUtils.sameThread) + future.failed.foreach(e => + logWarning(s"Failed to remove broadcast $broadcastId" + + s" with removeFromMaster = $removeFromMaster - ${e.getMessage}", e) + )(ThreadUtils.sameThread) if (blocking) { timeout.awaitResult(future) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index 1aaa42459d..742cf4fe39 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -85,13 +85,13 @@ class BlockManagerSlaveEndpoint( logDebug(actionMessage) body } - future.onSuccess { case response => - logDebug("Done " + actionMessage + ", response is " + response) + future.foreach { response => + logDebug(s"Done $actionMessage, response is $response") context.reply(response) - logDebug("Sent response: " + response + " to " + context.senderAddress) + logDebug(s"Sent response: $response to ${context.senderAddress}") } - future.onFailure { case t: Throwable => - logError("Error in " + actionMessage, t) + future.failed.foreach { t => + logError(s"Error in $actionMessage", t) context.sendFailure(t) } } diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 094953f2f5..6229e80095 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -66,11 +66,11 @@ private[spark] object UIWorkloadGenerator { def nextFloat(): Float = new Random().nextFloat() val jobs = Seq[(String, () => Long)]( - ("Count", baseData.count), - ("Cache and Count", baseData.map(x => x).cache().count), - ("Single Shuffle", baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count), - ("Entirely failed phase", baseData.map(x => throw new Exception).count), - ("Partially failed phase", { + ("Count", () => baseData.count), + ("Cache and Count", () => baseData.map(x => x).cache().count), + ("Single Shuffle", () => baseData.map(x => (x % 10, x)).reduceByKey(_ + _).count), + ("Entirely failed phase", () => baseData.map { x => throw new Exception(); 1 }.count), + ("Partially failed phase", () => { baseData.map{x => val probFailure = (4.0 / NUM_PARTITIONS) if (nextFloat() < probFailure) { @@ -79,7 +79,7 @@ private[spark] object UIWorkloadGenerator { 1 }.count }), - ("Partially failed phase (longer tasks)", { + ("Partially failed phase (longer tasks)", () => { baseData.map{x => val probFailure = (4.0 / NUM_PARTITIONS) if (nextFloat() < probFailure) { @@ -89,7 +89,7 @@ private[spark] object UIWorkloadGenerator { 1 }.count }), - ("Job with delays", baseData.map(x => Thread.sleep(100)).count) + ("Job with delays", () => baseData.map(x => Thread.sleep(100)).count) ) val barrier = new Semaphore(-nJobSet * jobs.size + 1) diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala index 88b77e5143..eb8c203ae7 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitUtilsSuite.scala @@ -83,11 +83,12 @@ class SparkSubmitUtilsSuite extends SparkFunSuite with BeforeAndAfterAll { val resolver = settings.getDefaultResolver.asInstanceOf[ChainResolver] assert(resolver.getResolvers.size() === 4) val expected = repos.split(",").map(r => s"$r/") - resolver.getResolvers.toArray.zipWithIndex.foreach { case (resolver: AbstractResolver, i) => - if (1 < i && i < 3) { - assert(resolver.getName === s"repo-$i") - assert(resolver.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1)) - } + resolver.getResolvers.toArray.map(_.asInstanceOf[AbstractResolver]).zipWithIndex.foreach { + case (r, i) => + if (1 < i && i < 3) { + assert(r.getName === s"repo-$i") + assert(r.asInstanceOf[IBiblioResolver].getRoot === expected(i - 1)) + } } } diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index c175ed3fb6..6e50e84549 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -78,7 +78,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar logDebug(s"getAppUI($appId, $attemptId)") getAppUICount += 1 instances.get(CacheKey(appId, attemptId)).map( e => - LoadedAppUI(e.ui, updateProbe(appId, attemptId, e.probeTime))) + LoadedAppUI(e.ui, () => updateProbe(appId, attemptId, e.probeTime))) } override def attachSparkUI( @@ -122,7 +122,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar completed: Boolean, timestamp: Long): Unit = { instances += (CacheKey(appId, attemptId) -> - new CacheEntry(ui, completed, updateProbe(appId, attemptId, timestamp), timestamp)) + new CacheEntry(ui, completed, () => updateProbe(appId, attemptId, timestamp), timestamp)) } /** diff --git a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala index f4be8eaef7..de0e71a332 100644 --- a/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/AsyncRDDActionsSuite.scala @@ -130,10 +130,10 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim info("Should not have reached this code path (onComplete matching Failure)") throw new Exception("Task should succeed") } - f.onSuccess { case a: Any => + f.foreach { a => sem.release() } - f.onFailure { case t => + f.failed.foreach { t => info("Should not have reached this code path (onFailure)") throw new Exception("Task should succeed") } @@ -164,11 +164,11 @@ class AsyncRDDActionsSuite extends SparkFunSuite with BeforeAndAfterAll with Tim case scala.util.Failure(e) => sem.release() } - f.onSuccess { case a: Any => + f.foreach { a => info("Should not have reached this code path (onSuccess)") throw new Exception("Task should fail") } - f.onFailure { case t => + f.failed.foreach { t => sem.release() } intercept[SparkException] { diff --git a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala index a8249e123f..75ea409e16 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SchedulerIntegrationSuite.scala @@ -625,6 +625,8 @@ class BasicSchedulerIntegrationSuite extends SchedulerIntegrationSuite[SingleCor backend.taskFailed(taskDescription, fetchFailed) case (1, _, partition) => backend.taskSuccess(taskDescription, 42 + partition) + case unmatched => + fail(s"Unexpected shuffle output $unmatched") } } withBackend(runBackend _) { diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala index 64be966276..a1cf3570a7 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoBenchmark.scala @@ -78,10 +78,10 @@ class KryoBenchmark extends SparkFunSuite { sum } } - basicTypes("Int", Random.nextInt) - basicTypes("Long", Random.nextLong) - basicTypes("Float", Random.nextFloat) - basicTypes("Double", Random.nextDouble) + basicTypes("Int", () => Random.nextInt()) + basicTypes("Long", () => Random.nextLong()) + basicTypes("Float", () => Random.nextFloat()) + basicTypes("Double", () => Random.nextDouble()) // Benchmark Array of Primitives val arrayCount = 10000 @@ -101,10 +101,10 @@ class KryoBenchmark extends SparkFunSuite { sum } } - basicTypeArray("Int", Random.nextInt) - basicTypeArray("Long", Random.nextLong) - basicTypeArray("Float", Random.nextFloat) - basicTypeArray("Double", Random.nextDouble) + basicTypeArray("Int", () => Random.nextInt()) + basicTypeArray("Long", () => Random.nextLong()) + basicTypeArray("Float", () => Random.nextFloat()) + basicTypeArray("Double", () => Random.nextDouble()) // Benchmark Maps val mapsCount = 1000 diff --git a/examples/pom.xml b/examples/pom.xml index 33eca48645..52a6764ae2 100644 --- a/examples/pom.xml +++ b/examples/pom.xml @@ -114,7 +114,7 @@ com.github.scopt scopt_${scala.binary.version} - 3.3.0 + 3.7.0 com.twitter diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index 1c93079497..4324cc6d0f 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -61,11 +61,11 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfterAll with } test("flume polling test") { - testMultipleTimes(testFlumePolling) + testMultipleTimes(() => testFlumePolling()) } test("flume polling test multiple hosts") { - testMultipleTimes(testFlumePollingMultipleHost) + testMultipleTimes(() => testFlumePollingMultipleHost()) } /** diff --git a/external/kafka-0-10-sql/pom.xml b/external/kafka-0-10-sql/pom.xml index 0f61a10e9b..0c9f0aa765 100644 --- a/external/kafka-0-10-sql/pom.xml +++ b/external/kafka-0-10-sql/pom.xml @@ -102,8 +102,19 @@ + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes + + + + scala-2.12 + + 0.10.1.1 + + + + diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index 066a68a5dd..2df8352f48 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -173,7 +173,10 @@ class KafkaTestUtils(withBrokerProps: Map[String, Object] = Map.empty) extends L AdminUtils.createTopic(zkUtils, topic, partitions, 1) created = true } catch { - case e: kafka.common.TopicExistsException if overwrite => deleteTopic(topic) + // Workaround fact that TopicExistsException is in kafka.common in 0.10.0 and + // org.apache.kafka.common.errors in 0.10.1 (!) + case e: Exception if (e.getClass.getSimpleName == "TopicExistsException") && overwrite => + deleteTopic(topic) } } // wait until metadata is propagated diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml index 4d9861af1c..6eb7ba5f00 100644 --- a/external/kafka-0-10/pom.xml +++ b/external/kafka-0-10/pom.xml @@ -87,8 +87,19 @@ + target/scala-${scala.binary.version}/classes target/scala-${scala.binary.version}/test-classes + + + + scala-2.12 + + 0.10.1.1 + + + + diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala index b76dc5f931..103082b7b9 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala @@ -250,8 +250,9 @@ object Pipeline extends MLReadable[Pipeline] { // Save stages val stagesDir = new Path(path, "stages").toString - stages.zipWithIndex.foreach { case (stage: MLWritable, idx: Int) => - stage.write.save(getStagePath(stage.uid, idx, stages.length, stagesDir)) + stages.zipWithIndex.foreach { case (stage, idx) => + stage.asInstanceOf[MLWritable].write.save( + getStagePath(stage.uid, idx, stages.length, stagesDir)) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala index 1c97d77d38..ce400f4f1f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LinearSVC.scala @@ -184,7 +184,7 @@ class LinearSVC @Since("2.2.0") ( (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.treeAggregate( - new MultivariateOnlineSummarizer, new MultiClassSummarizer + (new MultivariateOnlineSummarizer, new MultiClassSummarizer) )(seqOp, combOp, $(aggregationDepth)) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index cbc8f4a2d8..fa19160421 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -514,7 +514,7 @@ class LogisticRegression @Since("1.2.0") ( (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.treeAggregate( - new MultivariateOnlineSummarizer, new MultiClassSummarizer + (new MultivariateOnlineSummarizer, new MultiClassSummarizer) )(seqOp, combOp, $(aggregationDepth)) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala index 92a7742f6c..3ab99b35ec 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/OneVsRest.scala @@ -235,7 +235,7 @@ object OneVsRestModel extends MLReadable[OneVsRestModel] { val extraJson = ("labelMetadata" -> instance.labelMetadata.json) ~ ("numClasses" -> instance.models.length) OneVsRestParams.saveImpl(path, instance, sc, Some(extraJson)) - instance.models.zipWithIndex.foreach { case (model: MLWritable, idx) => + instance.models.map(_.asInstanceOf[MLWritable]).zipWithIndex.foreach { case (model, idx) => val modelPath = new Path(path, s"model_$idx").toString model.save(modelPath) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala index b2a968118d..df1aa609c1 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/regression/LinearRegression.scala @@ -265,7 +265,7 @@ class LinearRegression @Since("1.3.0") (@Since("1.3.0") override val uid: String (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.treeAggregate( - new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer + (new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer) )(seqOp, combOp, $(aggregationDepth)) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index f7d969f4ca..acfc6399c5 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -169,7 +169,7 @@ private[spark] object RandomForest extends Logging { training the same tree in the next iteration. This focus allows us to send fewer trees to workers on each iteration; see topNodesForGroup below. */ - val nodeStack = new mutable.Stack[(Int, LearningNode)] + val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] val rng = new Random() rng.setSeed(seed) @@ -367,7 +367,7 @@ private[spark] object RandomForest extends Logging { nodesForGroup: Map[Int, Array[LearningNode]], treeToNodeToIndexInfo: Map[Int, Map[Int, NodeIndexInfo]], splits: Array[Array[Split]], - nodeStack: mutable.Stack[(Int, LearningNode)], + nodeStack: mutable.ArrayStack[(Int, LearningNode)], timer: TimeTracker = new TimeTracker, nodeIdCache: Option[NodeIdCache] = None): Unit = { @@ -1076,7 +1076,7 @@ private[spark] object RandomForest extends Logging { * The feature indices are None if not subsampling features. */ private[tree] def selectNodesToSplit( - nodeStack: mutable.Stack[(Int, LearningNode)], + nodeStack: mutable.ArrayStack[(Int, LearningNode)], maxMemoryUsage: Long, metadata: DecisionTreeMetadata, rng: Random): (Map[Int, Array[LearningNode]], Map[Int, Map[Int, NodeIndexInfo]]) = { diff --git a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala index 4c7746869d..f151a6a01b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/tree/impurity/Impurity.scala @@ -162,7 +162,7 @@ private[spark] abstract class ImpurityCalculator(val stats: Array[Double]) exten * Fails if the array is empty. */ protected def indexOfLargestArrayElement(array: Array[Double]): Int = { - val result = array.foldLeft(-1, Double.MinValue, 0) { + val result = array.foldLeft((-1, Double.MinValue, 0)) { case ((maxIndex, maxValue, currentIndex), currentValue) => if (currentValue > maxValue) { (currentIndex, currentValue, currentIndex + 1) diff --git a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala index d7cdeae30b..9fddf09bab 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/optim/aggregator/DifferentiableLossAggregatorSuite.scala @@ -174,7 +174,7 @@ object DifferentiableLossAggregatorSuite { (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.aggregate( - new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer + (new MultivariateOnlineSummarizer, new MultivariateOnlineSummarizer) )(seqOp, combOp) } @@ -191,7 +191,7 @@ object DifferentiableLossAggregatorSuite { (c1._1.merge(c2._1), c1._2.merge(c2._2)) instances.aggregate( - new MultivariateOnlineSummarizer, new MultiClassSummarizer + (new MultivariateOnlineSummarizer, new MultiClassSummarizer) )(seqOp, combOp) } } diff --git a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala index df155b464c..dbe2ea931f 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala @@ -324,7 +324,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val treeToNodeToIndexInfo = Map((0, Map( (topNode.id, new RandomForest.NodeIndexInfo(0, None)) ))) - val nodeStack = new mutable.Stack[(Int, LearningNode)] + val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode), nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack) @@ -366,7 +366,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val treeToNodeToIndexInfo = Map((0, Map( (topNode.id, new RandomForest.NodeIndexInfo(0, None)) ))) - val nodeStack = new mutable.Stack[(Int, LearningNode)] + val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode), nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack) @@ -478,7 +478,7 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val failString = s"Failed on test with:" + s"numTrees=$numTrees, featureSubsetStrategy=$featureSubsetStrategy," + s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed" - val nodeStack = new mutable.Stack[(Int, LearningNode)] + val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] val topNodes: Array[LearningNode] = new Array[LearningNode](numTrees) Range(0, numTrees).foreach { treeIndex => topNodes(treeIndex) = LearningNode.emptyNode(nodeIndex = 1) diff --git a/repl/pom.xml b/repl/pom.xml index 51eb9b60dd..bd2cfc465a 100644 --- a/repl/pom.xml +++ b/repl/pom.xml @@ -171,7 +171,6 @@ - diff --git a/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala new file mode 100644 index 0000000000..4135940219 --- /dev/null +++ b/repl/scala-2.12/src/main/scala/org/apache/spark/repl/SparkILoop.scala @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.repl + +import java.io.BufferedReader + +// scalastyle:off println +import scala.Predef.{println => _, _} +// scalastyle:on println +import scala.tools.nsc.Settings +import scala.tools.nsc.interpreter.{ILoop, JPrintWriter} +import scala.tools.nsc.util.stringFromStream +import scala.util.Properties.{javaVersion, javaVmName, versionString} + +/** + * A Spark-specific interactive shell. + */ +class SparkILoop(in0: Option[BufferedReader], out: JPrintWriter) + extends ILoop(in0, out) { + def this(in0: BufferedReader, out: JPrintWriter) = this(Some(in0), out) + def this() = this(None, new JPrintWriter(Console.out, true)) + + def initializeSpark() { + intp.beQuietDuring { + processLine(""" + @transient val spark = if (org.apache.spark.repl.Main.sparkSession != null) { + org.apache.spark.repl.Main.sparkSession + } else { + org.apache.spark.repl.Main.createSparkSession() + } + @transient val sc = { + val _sc = spark.sparkContext + if (_sc.getConf.getBoolean("spark.ui.reverseProxy", false)) { + val proxyUrl = _sc.getConf.get("spark.ui.reverseProxyUrl", null) + if (proxyUrl != null) { + println( + s"Spark Context Web UI is available at ${proxyUrl}/proxy/${_sc.applicationId}") + } else { + println(s"Spark Context Web UI is available at Spark Master Public URL") + } + } else { + _sc.uiWebUrl.foreach { + webUrl => println(s"Spark context Web UI available at ${webUrl}") + } + } + println("Spark context available as 'sc' " + + s"(master = ${_sc.master}, app id = ${_sc.applicationId}).") + println("Spark session available as 'spark'.") + _sc + } + """) + processLine("import org.apache.spark.SparkContext._") + processLine("import spark.implicits._") + processLine("import spark.sql") + processLine("import org.apache.spark.sql.functions._") + } + } + + /** Print a welcome message */ + override def printWelcome() { + import org.apache.spark.SPARK_VERSION + echo("""Welcome to + ____ __ + / __/__ ___ _____/ /__ + _\ \/ _ \/ _ `/ __/ '_/ + /___/ .__/\_,_/_/ /_/\_\ version %s + /_/ + """.format(SPARK_VERSION)) + val welcomeMsg = "Using Scala %s (%s, Java %s)".format( + versionString, javaVmName, javaVersion) + echo(welcomeMsg) + echo("Type in expressions to have them evaluated.") + echo("Type :help for more information.") + } + + /** Available commands */ + override def commands: List[LoopCommand] = standardCommands + + /** + * We override `createInterpreter` because we need to initialize Spark *before* the REPL + * sees any files, so that the Spark context is visible in those files. This is a bit of a + * hack, but there isn't another hook available to us at this point. + */ + override def createInterpreter(): Unit = { + super.createInterpreter() + initializeSpark() + } + + override def resetCommand(line: String): Unit = { + super.resetCommand(line) + initializeSpark() + echo("Note that after :reset, state of SparkSession and SparkContext is unchanged.") + } +} + +object SparkILoop { + + /** + * Creates an interpreter loop with default settings and feeds + * the given code to it as input. + */ + def run(code: String, sets: Settings = new Settings): String = { + import java.io.{ BufferedReader, StringReader, OutputStreamWriter } + + stringFromStream { ostream => + Console.withOut(ostream) { + val input = new BufferedReader(new StringReader(code)) + val output = new JPrintWriter(new OutputStreamWriter(ostream), true) + val repl = new SparkILoop(input, output) + + if (sets.classpath.isDefault) { + sets.classpath.value = sys.props("java.class.path") + } + repl process sets + } + } + } + def run(lines: List[String]): String = run(lines.map(_ + "\n").mkString) +} diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala b/repl/src/main/scala/org/apache/spark/repl/Main.scala similarity index 100% rename from repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala rename to repl/src/main/scala/org/apache/spark/repl/Main.scala diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala similarity index 100% rename from repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala rename to repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala diff --git a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala b/repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala similarity index 100% rename from repl/scala-2.11/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala rename to repl/src/test/scala/org/apache/spark/repl/SingletonReplSuite.scala diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala index 8761ae4020..4894036e27 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Percentile.scala @@ -179,7 +179,7 @@ case class Percentile( val sortedCounts = buffer.toSeq.sortBy(_._1)( child.dataType.asInstanceOf[NumericType].ordering.asInstanceOf[Ordering[AnyRef]]) - val accumlatedCounts = sortedCounts.scanLeft(sortedCounts.head._1, 0L) { + val accumlatedCounts = sortedCounts.scanLeft((sortedCounts.head._1, 0L)) { case ((key1, count1), (key2, count2)) => (key2, count1 + count2) }.tail val maxPosition = accumlatedCounts.last._2 - 1 diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 3aa4bf619f..352fb545f4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -177,7 +177,7 @@ object Metadata { private def toJsonValue(obj: Any): JValue = { obj match { case map: Map[_, _] => - val fields = map.toList.map { case (k: String, v) => (k, toJsonValue(v)) } + val fields = map.toList.map { case (k, v) => (k.toString, toJsonValue(v)) } JObject(fields) case arr: Array[_] => val values = arr.toList.map(toJsonValue) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala index c35e5638e9..65ca37491b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/GenerateExec.scala @@ -96,7 +96,7 @@ case class GenerateExec( } else { outputRows.map(joinedRow.withRight) } - } ++ LazyIterator(boundGenerator.terminate).map { row => + } ++ LazyIterator(() => boundGenerator.terminate()).map { row => // we leave the left side as the last element of its child output // keep it the same as Hive does joinedRow.withRight(row) @@ -109,7 +109,7 @@ case class GenerateExec( } else { outputRows } - } ++ LazyIterator(boundGenerator.terminate) + } ++ LazyIterator(() => boundGenerator.terminate()) } // Convert the rows to unsafe rows. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala index d74aae3525..203d449717 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InMemoryFileIndex.scala @@ -119,6 +119,7 @@ class InMemoryFileIndex( case None => pathsToFetch += path } + Unit // for some reasons scalac 2.12 needs this; return type doesn't matter } val filter = FileInputFormat.getInputPathFilter(new JobConf(hadoopConf, this.getClass)) val discovered = InMemoryFileIndex.bulkListLeafFiles( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala index 0e41f3c7aa..7d6d7e7eef 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/csv/UnivocityParser.scala @@ -205,7 +205,7 @@ class UnivocityParser( } throw BadRecordException( () => getCurrentInput, - getPartialResult, + () => getPartialResult(), new RuntimeException("Malformed CSV record")) } else { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index 1b6a28cde2..f8058b2f78 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -216,7 +216,7 @@ private[joins] class UnsafeHashedRelation( } override def readExternal(in: ObjectInput): Unit = Utils.tryOrIOException { - read(in.readInt, in.readLong, in.readFully) + read(() => in.readInt(), () => in.readLong(), in.readFully) } private def read( @@ -277,7 +277,7 @@ private[joins] class UnsafeHashedRelation( } override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException { - read(in.readInt, in.readLong, in.readBytes) + read(() => in.readInt(), () => in.readLong(), in.readBytes) } override def getAverageProbesPerLookup: Double = binaryMap.getAverageProbesPerLookup @@ -766,11 +766,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager, cap } override def readExternal(in: ObjectInput): Unit = { - read(in.readBoolean, in.readLong, in.readFully) + read(() => in.readBoolean(), () => in.readLong(), in.readFully) } override def read(kryo: Kryo, in: Input): Unit = { - read(in.readBoolean, in.readLong, in.readBytes) + read(() => in.readBoolean(), () => in.readLong(), in.readBytes) } /** diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java index 13b006fc48..c132cab1b3 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDatasetSuite.java @@ -1334,6 +1334,10 @@ public class JavaDatasetSuite implements Serializable { return "BeanWithEnum(" + enumField + ", " + regularField + ")"; } + public int hashCode() { + return Objects.hashCode(enumField, regularField); + } + public boolean equals(Object other) { if (other instanceof BeanWithEnum) { BeanWithEnum beanWithEnum = (BeanWithEnum) other; diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 09502d05f7..247c30e2ee 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -230,11 +230,9 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { val resNaN1 = dfNaN.stat.approxQuantile("input1", Array(q1, q2), epsilon) assert(resNaN1.count(_.isNaN) === 0) - assert(resNaN1.count(_ == null) === 0) val resNaN2 = dfNaN.stat.approxQuantile("input2", Array(q1, q2), epsilon) assert(resNaN2.count(_.isNaN) === 0) - assert(resNaN2.count(_ == null) === 0) val resNaN3 = dfNaN.stat.approxQuantile("input3", Array(q1, q2), epsilon) assert(resNaN3.isEmpty) @@ -242,7 +240,6 @@ class DataFrameStatSuite extends QueryTest with SharedSQLContext { val resNaNAll = dfNaN.stat.approxQuantile(Array("input1", "input2", "input3"), Array(q1, q2), epsilon) assert(resNaNAll.flatten.count(_.isNaN) === 0) - assert(resNaNAll.flatten.count(_ == null) === 0) assert(resNaN1(0) === resNaNAll(0)(0)) assert(resNaN1(1) === resNaNAll(0)(1)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala index 2986b7f1ee..46eec736d4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala @@ -289,7 +289,7 @@ class StreamingQueryManagerSuite extends StreamTest with BeforeAndAfter { } } - AwaitTerminationTester.test(expectedBehavior, awaitTermFunc, testBehaviorFor) + AwaitTerminationTester.test(expectedBehavior, () => awaitTermFunc(), testBehaviorFor) } /** Stop a random active query either with `stop()` or with an error */ diff --git a/sql/hive-thriftserver/pom.xml b/sql/hive-thriftserver/pom.xml index a5a8e26405..3135a8a275 100644 --- a/sql/hive-thriftserver/pom.xml +++ b/sql/hive-thriftserver/pom.xml @@ -63,6 +63,16 @@ ${hive.group} hive-beeline + + org.eclipse.jetty + jetty-server + provided + + + org.eclipse.jetty + jetty-servlet + provided + org.seleniumhq.selenium diff --git a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala index 8c7418ec7a..027403816f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/StreamingContext.scala @@ -596,7 +596,7 @@ class StreamingContext private[streaming] ( } logDebug("Adding shutdown hook") // force eager creation of logger shutdownHookRef = ShutdownHookManager.addShutdownHook( - StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown) + StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown()) // Registering Streaming Metrics at the start of the StreamingContext assert(env.metricsSystem != null) env.metricsSystem.registerSource(streamingSource)