diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 8bf7795b7b..8f1425fbb8 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -240,7 +240,7 @@ private[spark] class SparkSubmit extends Logging { } // Set the deploy mode; default is client mode - var deployMode: Int = args.deployMode match { + val deployMode: Int = args.deployMode match { case "client" | null => CLIENT case "cluster" => CLUSTER case _ => diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index a3819fee98..acfb1da1f4 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -125,7 +125,7 @@ private[spark] class TaskSetManager( val weight = 1 val minShare = 0 var priority = taskSet.priority - var stageId = taskSet.stageId + val stageId = taskSet.stageId val name = "TaskSet_" + taskSet.id var parent: Pool = null private var totalResultSize = 0L diff --git a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala index 38f1f25f2f..3105e7b5e7 100644 --- a/core/src/main/scala/org/apache/spark/status/LiveEntity.scala +++ b/core/src/main/scala/org/apache/spark/status/LiveEntity.scala @@ -430,7 +430,7 @@ private class LiveStage extends LiveEntity { // Used for cleanup of tasks after they reach the configured limit. Not written to the store. @volatile var cleaning = false - var savedTasks = new AtomicInteger(0) + val savedTasks = new AtomicInteger(0) def executorSummary(executorId: String): LiveExecutorStageSummary = { executorSummaries.getOrElseUpdate(executorId, diff --git a/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala b/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala index 6e57c3c5be..f1a3932bb0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/MedianHeap.scala @@ -37,13 +37,13 @@ private[spark] class MedianHeap(implicit val ord: Ordering[Double]) { * Stores all the numbers less than the current median in a smallerHalf, * i.e median is the maximum, at the root. */ - private[this] var smallerHalf = PriorityQueue.empty[Double](ord) + private[this] val smallerHalf = PriorityQueue.empty[Double](ord) /** * Stores all the numbers greater than the current median in a largerHalf, * i.e median is the minimum, at the root. */ - private[this] var largerHalf = PriorityQueue.empty[Double](ord.reverse) + private[this] val largerHalf = PriorityQueue.empty[Double](ord.reverse) def isEmpty(): Boolean = { smallerHalf.isEmpty && largerHalf.isEmpty diff --git a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala index bdeb631878..7106a780b3 100644 --- a/core/src/test/scala/org/apache/spark/SharedSparkContext.scala +++ b/core/src/test/scala/org/apache/spark/SharedSparkContext.scala @@ -27,7 +27,7 @@ trait SharedSparkContext extends BeforeAndAfterAll with BeforeAndAfterEach { sel def sc: SparkContext = _sc - var conf = new SparkConf(false) + val conf = new SparkConf(false) /** * Initialize the [[SparkContext]]. Generally, this is just called from beforeAll; however, in diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 72e7ee0214..7779fb2aea 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -461,7 +461,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst val conf = new SparkConf() conf.set(TASK_GPU_ID.amountConf, "2") conf.set(TASK_FPGA_ID.amountConf, "0") - var taskResourceRequirement = + val taskResourceRequirement = parseResourceRequirements(conf, SPARK_TASK_PREFIX) .map(req => (req.resourceName, req.amount)).toMap diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index f36789c5a1..21bb47dd0c 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -939,7 +939,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu .setAppName("test-cluster") conf.set(TASK_GPU_ID.amountConf, "1") - var error = intercept[SparkException] { + val error = intercept[SparkException] { sc = new SparkContext(conf) }.getMessage() @@ -954,7 +954,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu conf.set(TASK_GPU_ID.amountConf, "2") conf.set(EXECUTOR_GPU_ID.amountConf, "1") - var error = intercept[SparkException] { + val error = intercept[SparkException] { sc = new SparkContext(conf) }.getMessage() @@ -970,7 +970,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu conf.set(TASK_GPU_ID.amountConf, "2") conf.set(EXECUTOR_GPU_ID.amountConf, "4") - var error = intercept[SparkException] { + val error = intercept[SparkException] { sc = new SparkContext(conf) }.getMessage() diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index 566eb9cf3c..7cf533e58b 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -55,7 +55,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar var getAppUICount = 0L var attachCount = 0L var detachCount = 0L - var updateProbeCount = 0L + val updateProbeCount = 0L override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = { logDebug(s"getAppUI($appId, $attemptId)") diff --git a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala index 8eab2da1a3..7db30548fd 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/EventLogFileReadersSuite.scala @@ -216,7 +216,7 @@ class SingleFileEventLogFileReaderSuite extends EventLogFileReadersSuite { Utils.tryWithResource(new ZipInputStream( new ByteArrayInputStream(underlyingStream.toByteArray))) { is => - var entry = is.getNextEntry + val entry = is.getNextEntry assert(entry != null) val actual = new String(ByteStreams.toByteArray(is), StandardCharsets.UTF_8) val expected = Files.toString(new File(logPath.toString), StandardCharsets.UTF_8) diff --git a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala index b1b97a61ed..022654139e 100644 --- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala @@ -58,7 +58,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend val id = seq.toString override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq, conf, new SecurityManager(conf)) - var apps = new mutable.HashMap[String, String]() + val apps = new mutable.HashMap[String, String]() val driverIdToAppId = new mutable.HashMap[String, String]() def newDriver(driverId: String): RpcEndpointRef = { val name = s"driver_${drivers.size}" diff --git a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala index 810dcf0e61..4909a586d3 100644 --- a/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/executor/CoarseGrainedExecutorBackendSuite.scala @@ -61,7 +61,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val testResourceArgs: JObject = ("" -> "") val ja = JArray(List(testResourceArgs)) val f1 = createTempJsonFile(tmpDir, "resources", ja) - var error = intercept[SparkException] { + val error = intercept[SparkException] { val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() @@ -146,7 +146,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val ja = Extraction.decompose(Seq(gpuArgs)) val f1 = createTempJsonFile(tmpDir, "resources", ja) - var error = intercept[IllegalArgumentException] { + val error = intercept[IllegalArgumentException] { val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() @@ -160,7 +160,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val ja = Extraction.decompose(Seq(fpga)) val f1 = createTempJsonFile(tmpDir, "resources", ja) - var error = intercept[SparkException] { + val error = intercept[SparkException] { val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() @@ -199,7 +199,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite val ja = Extraction.decompose(Seq(gpuArgs)) val f1 = createTempJsonFile(tmpDir, "resources", ja) - var error = intercept[IllegalArgumentException] { + val error = intercept[IllegalArgumentException] { val parsedResources = backend.parseOrFindResources(Some(f1)) }.getMessage() diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala index 36a5620729..65e41986ff 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceProfileManagerSuite.scala @@ -110,7 +110,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite { val gpuExecReq = new ExecutorResourceRequests().resource("gpu", 2, "someScript") val immrprof = rprof.require(gpuExecReq).build - var error = intercept[SparkException] { + val error = intercept[SparkException] { rpmanager.isSupported(immrprof) }.getMessage() diff --git a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala index eac45e6ac5..ffe5ff5787 100644 --- a/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/resource/ResourceUtilsSuite.scala @@ -178,7 +178,7 @@ class ResourceUtilsSuite extends SparkFunSuite test("list resource ids") { val conf = new SparkConf conf.set(DRIVER_GPU_ID.amountConf, "2") - var resources = listResourceIds(conf, SPARK_DRIVER_PREFIX) + val resources = listResourceIds(conf, SPARK_DRIVER_PREFIX) assert(resources.size === 1, "should only have GPU for resource") assert(resources(0).resourceName == GPU, "name should be gpu") diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index 7a74dd877a..3ce4ccf8d6 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo Thread.sleep(5000) iter } - var taskStarted = new AtomicBoolean(false) - var taskEnded = new AtomicBoolean(false) + val taskStarted = new AtomicBoolean(false) + val taskEnded = new AtomicBoolean(false) val listener = new SparkListener() { override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = { taskStarted.set(true) @@ -239,11 +239,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo var execResources = backend.getExecutorAvailableResources("1") assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3")) - var exec3ResourceProfileId = backend.getExecutorResourceProfileId("3") + val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3") assert(exec3ResourceProfileId === rp.id) val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0"))) - var taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", + val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1", "t1", 0, 1, mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], mutable.Map.empty[String, Long], new Properties(), taskResources, bytebuffer))) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 194e0dfe31..6b332ec129 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3336,7 +3336,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val ereqs3 = new ExecutorResourceRequests().cores(3) val treqs3 = new TaskResourceRequests().cpus(2) val rp3 = new ResourceProfile(ereqs3.requests, treqs3.requests) - var mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3)) + val mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3)) assert(mergedRp.getTaskCpus.get == 2) assert(mergedRp.getExecutorCores.get == 4) diff --git a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala index b6a59c8bbd..6552da8fdc 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/TaskSchedulerImplSuite.scala @@ -1007,7 +1007,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B val tsm = stageToMockTaskSetManager(0) // submit an offer with one executor - var taskAttempts = taskScheduler.resourceOffers(IndexedSeq( + val taskAttempts = taskScheduler.resourceOffers(IndexedSeq( WorkerOffer("executor0", "host0", 1) )).flatten @@ -1763,7 +1763,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B taskScheduler.submitTasks(taskSet) // Launch tasks on executor that satisfies resource requirements. - var taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten + val taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten assert(3 === taskDescriptions.length) assert(!failedTaskSet) assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.addresses) diff --git a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala index abccb8e9bb..c3d96e7c42 100644 --- a/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/security/CryptoStreamUtilsSuite.scala @@ -54,14 +54,14 @@ class CryptoStreamUtilsSuite extends SparkFunSuite { test("shuffle encryption key length should be 128 by default") { val conf = createConf() - var key = CryptoStreamUtils.createKey(conf) + val key = CryptoStreamUtils.createKey(conf) val actual = key.length * (java.lang.Byte.SIZE) assert(actual === 128) } test("create 256-bit key") { val conf = createConf(IO_ENCRYPTION_KEY_SIZE_BITS.key -> "256") - var key = CryptoStreamUtils.createKey(conf) + val key = CryptoStreamUtils.createKey(conf) val actual = key.length * (java.lang.Byte.SIZE) assert(actual === 256) } diff --git a/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala b/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala index fec7007279..98d528b621 100644 --- a/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/ElementTrackingStoreSuite.scala @@ -35,8 +35,8 @@ class ElementTrackingStoreSuite extends SparkFunSuite with Eventually { val tracking = new ElementTrackingStore(store, new SparkConf() .set(ASYNC_TRACKING_ENABLED, true)) - var done = new AtomicBoolean(false) - var type1 = new AtomicInteger(0) + val done = new AtomicBoolean(false) + val type1 = new AtomicInteger(0) var queued0: WriteQueueResult = null var queued1: WriteQueueResult = null var queued2: WriteQueueResult = null diff --git a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala index fc16fe3628..e719c722d0 100644 --- a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala @@ -37,7 +37,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext { val expand_size = 100 val data = sc.parallelize((1 to 5).toSeq). flatMap( x => Stream.range(0, expand_size)) - var persisted = data.persist(StorageLevel.DISK_ONLY) + val persisted = data.persist(StorageLevel.DISK_ONLY) assert(persisted.count()===500) assert(persisted.filter(_==1).count()===5) } @@ -48,7 +48,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext { val expand_size = 100 val data = sc.parallelize((1 to 5).toSeq). flatMap(x => Stream.range(0, expand_size)) - var persisted = data.persist(StorageLevel.MEMORY_ONLY) + val persisted = data.persist(StorageLevel.MEMORY_ONLY) assert(persisted.count()===500) assert(persisted.filter(_==1).count()===5) } diff --git a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala index c46ab2d199..d6a4e5bb2b 100644 --- a/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/MemoryStoreSuite.scala @@ -39,7 +39,7 @@ class MemoryStoreSuite with BeforeAndAfterEach with ResetSystemProperties { - var conf: SparkConf = new SparkConf(false) + val conf: SparkConf = new SparkConf(false) .set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L) // Reuse a serializer across tests to avoid creating a new thread-local buffer on each test diff --git a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala index b0520c7ab1..cef0d8c1de 100644 --- a/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/ClosureCleanerSuite.scala @@ -147,7 +147,7 @@ object TestObject { } class TestClass extends Serializable { - var x = 5 + val x = 5 def getX: Int = x @@ -179,7 +179,7 @@ class TestClassWithoutFieldAccess { def run(): Int = { var nonSer2 = new NonSerializable - var x = 5 + val x = 5 withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) nums.map(_ + x).reduce(_ + _) @@ -218,10 +218,10 @@ object TestObjectWithNesting { var answer = 0 withSpark(new SparkContext("local", "test")) { sc => val nums = sc.parallelize(Array(1, 2, 3, 4)) - var y = 1 + val y = 1 for (i <- 1 to 4) { var nonSer2 = new NonSerializable - var x = i + val x = i answer += nums.map(_ + x + y).reduce(_ + _) } answer @@ -239,7 +239,7 @@ class TestClassWithNesting(val y: Int) extends Serializable { val nums = sc.parallelize(Array(1, 2, 3, 4)) for (i <- 1 to 4) { var nonSer2 = new NonSerializable - var x = i + val x = i answer += nums.map(_ + x + getY).reduce(_ + _) } answer @@ -339,7 +339,7 @@ private object TestUserClosuresActuallyCleaned { class TestCreateNullValue { - var x = 5 + val x = 5 def getX: Int = x diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala index b3f54ff186..eb1aab645f 100644 --- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala @@ -323,7 +323,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers { RandomSampler.defaultMaxGapSamplingFraction should be (0.4) var d: Double = 0.0 - var sampler = new BernoulliSampler[Int](0.1) + val sampler = new BernoulliSampler[Int](0.1) sampler.setSeed(rngSeed.nextLong) // Array iterator (indexable type) @@ -547,7 +547,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers { RandomSampler.defaultMaxGapSamplingFraction should be (0.4) var d: Double = 0.0 - var sampler = new PoissonSampler[Int](0.1) + val sampler = new PoissonSampler[Int](0.1) sampler.setSeed(rngSeed.nextLong) // Array iterator (indexable type) diff --git a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala index 4a73466841..f649c310b3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala +++ b/examples/src/main/scala/org/apache/spark/examples/LocalKMeans.scala @@ -101,7 +101,7 @@ object LocalKMeans { } } - var newPoints = pointStats.map {mapping => + val newPoints = pointStats.map { mapping => (mapping._1, mapping._2._1 * (1.0 / mapping._2._2))} tempDist = 0.0 diff --git a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala index 2053d3655d..b9ef16fb58 100644 --- a/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala +++ b/external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala @@ -236,7 +236,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { // this is the "lots of messages" case kafkaTestUtils.sendMessages(topic, sent) - var sentCount = sent.values.sum + val sentCount = sent.values.sum val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams, Array(OffsetRange(topic, 0, 0, sentCount)), preferredHosts) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala index c7c1a5404e..4c71cd6496 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/EigenValueDecomposition.scala @@ -67,7 +67,7 @@ private[mllib] object EigenValueDecomposition { // "LM" : compute the NEV largest (in magnitude) eigenvalues val which = "LM" - var iparam = new Array[Int](11) + val iparam = new Array[Int](11) // use exact shift in each iteration iparam(0) = 1 // maximum number of Arnoldi update iterations, or the actual number of iterations on output diff --git a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala index 6b3970def1..0ffef986fb 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/clustering/PowerIterationClusteringSuite.scala @@ -224,7 +224,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite (3, 4) )).toDF("src", "dst").repartition(1) - var assignments2 = new PowerIterationClustering() + val assignments2 = new PowerIterationClustering() .setInitMode("random") .setK(2) .assignClusters(data2) @@ -238,7 +238,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite assert(Set(predictions2(0).size, predictions2(1).size) !== Set(2, 3)) - var assignments3 = new PowerIterationClustering() + val assignments3 = new PowerIterationClustering() .setInitMode("degree") .setK(2) .assignClusters(data2) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala index a91cc0782e..e25b9dd47e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala @@ -219,7 +219,7 @@ private[joins] class UnsafeHashedRelation( var resultRow = new UnsafeRow(numFields) // re-used in getWithKeyIndex()/getValueWithKeyIndex()/valuesWithKeyIndex() - var valueRowWithKeyIndex = new ValueRowWithKeyIndex + val valueRowWithKeyIndex = new ValueRowWithKeyIndex override def get(key: InternalRow): Iterator[InternalRow] = { val unsafeKey = key.asInstanceOf[UnsafeRow] diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala index 09abc51c69..55aecc0611 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/ForeachWriterTable.scala @@ -56,7 +56,7 @@ case class ForeachWriterTable[T]( override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = { new WriteBuilder with SupportsTruncate with SupportsStreamingUpdateAsAppend { - private var inputSchema: StructType = info.schema() + private val inputSchema: StructType = info.schema() // Do nothing for truncate. Foreach sink is special and it just forwards all the // records to ForeachWriter. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala index bdc714d49f..5e62db0834 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala @@ -430,8 +430,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { .format("org.apache.spark.sql.streaming.test") .load() - var w = df.writeStream - var e = intercept[IllegalArgumentException](w.foreach(null)) + val w = df.writeStream + val e = intercept[IllegalArgumentException](w.foreach(null)) Seq("foreach", "null").foreach { s => assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) } @@ -447,8 +447,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter { override def process(value: Row): Unit = {} override def close(errorOrNull: Throwable): Unit = {} } - var w = df.writeStream.partitionBy("value") - var e = intercept[AnalysisException](w.foreach(foreachWriter).start()) + val w = df.writeStream.partitionBy("value") + val e = intercept[AnalysisException](w.foreach(foreachWriter).start()) Seq("foreach", "partitioning").foreach { s => assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala index 4e61dba495..273658fcfa 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala @@ -380,7 +380,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with withTable("t") { sql("create table t(i int, d double) using parquet") // Calling `saveAsTable` to an existing table with append mode results in table insertion. - var msg = intercept[AnalysisException] { + val msg = intercept[AnalysisException] { Seq((1L, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t") }.getMessage assert(msg.contains("Cannot safely cast 'i': bigint to int")) diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala index 21cc6af398..dc2ff26a8a 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveQuerySuite.scala @@ -208,7 +208,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd |FROM src LIMIT 1""".stripMargin) test("constant null testing timestamp") { - var r1 = sql( + val r1 = sql( """ |SELECT IF(FALSE, CAST(NULL AS TIMESTAMP), |CAST('1969-12-31 16:00:01' AS TIMESTAMP)) AS COL20 diff --git a/streaming/src/test/scala/org/apache/spark/streaming/LocalStreamingContext.scala b/streaming/src/test/scala/org/apache/spark/streaming/LocalStreamingContext.scala index 5bf24a9705..99ad8ede78 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/LocalStreamingContext.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/LocalStreamingContext.scala @@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging trait LocalStreamingContext extends BeforeAndAfterEach { self: Suite => @transient var ssc: StreamingContext = _ - @transient var stopSparkContext: Boolean = true + @transient val stopSparkContext: Boolean = true override def afterEach(): Unit = { try { diff --git a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala index 9d735a32f7..4fc9d13cdd 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/MasterFailureTest.scala @@ -332,7 +332,7 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread override def run(): Unit = { try { // If it is the first killing, then allow the first checkpoint to be created - var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000 + val minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000 val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime) logInfo("Kill wait time = " + killWaitTime) Thread.sleep(killWaitTime) diff --git a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala index 86c20f5a46..913ab1f46d 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/ReceivedBlockHandlerSuite.scala @@ -75,7 +75,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean) val mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true) val shuffleManager = new SortShuffleManager(conf) val serializer = new KryoSerializer(conf) - var serializerManager = new SerializerManager(serializer, conf, encryptionKey) + val serializerManager = new SerializerManager(serializer, conf, encryptionKey) val manualClock = new ManualClock val blockManagerSize = 10000000 val blockManagerBuffer = new ArrayBuffer[BlockManager]() diff --git a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala index 1a0154600b..5a8941de3f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/scheduler/ReceiverTrackerSuite.scala @@ -203,7 +203,7 @@ private[streaming] object RateTestReceiver { */ class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) { - var receivingThreadOption: Option[Thread] = None + val receivingThreadOption: Option[Thread] = None def onStart(): Unit = { val thread = new Thread() {