[SPARK-33346][CORE][SQL][MLLIB][DSTREAM][K8S] Change the never changed 'var' to 'val'

### What changes were proposed in this pull request?
Some local variables are declared as `var`, but they are never reassigned and should be declared as `val`, so this pr turn these  from `var` to  `val` except for `mockito` related cases.

### Why are the changes needed?
Use `val` instead of `var` when possible.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass the Jenkins or GitHub Action

Closes #31142 from LuciferYang/SPARK-33346.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Sean Owen <srowen@gmail.com>
This commit is contained in:
yangjie01 2021-01-15 08:47:02 -06:00 committed by Sean Owen
parent 6cd0092150
commit 9e33d49b5b
35 changed files with 58 additions and 58 deletions

View file

@ -240,7 +240,7 @@ private[spark] class SparkSubmit extends Logging {
}
// Set the deploy mode; default is client mode
var deployMode: Int = args.deployMode match {
val deployMode: Int = args.deployMode match {
case "client" | null => CLIENT
case "cluster" => CLUSTER
case _ =>

View file

@ -125,7 +125,7 @@ private[spark] class TaskSetManager(
val weight = 1
val minShare = 0
var priority = taskSet.priority
var stageId = taskSet.stageId
val stageId = taskSet.stageId
val name = "TaskSet_" + taskSet.id
var parent: Pool = null
private var totalResultSize = 0L

View file

@ -430,7 +430,7 @@ private class LiveStage extends LiveEntity {
// Used for cleanup of tasks after they reach the configured limit. Not written to the store.
@volatile var cleaning = false
var savedTasks = new AtomicInteger(0)
val savedTasks = new AtomicInteger(0)
def executorSummary(executorId: String): LiveExecutorStageSummary = {
executorSummaries.getOrElseUpdate(executorId,

View file

@ -37,13 +37,13 @@ private[spark] class MedianHeap(implicit val ord: Ordering[Double]) {
* Stores all the numbers less than the current median in a smallerHalf,
* i.e median is the maximum, at the root.
*/
private[this] var smallerHalf = PriorityQueue.empty[Double](ord)
private[this] val smallerHalf = PriorityQueue.empty[Double](ord)
/**
* Stores all the numbers greater than the current median in a largerHalf,
* i.e median is the minimum, at the root.
*/
private[this] var largerHalf = PriorityQueue.empty[Double](ord.reverse)
private[this] val largerHalf = PriorityQueue.empty[Double](ord.reverse)
def isEmpty(): Boolean = {
smallerHalf.isEmpty && largerHalf.isEmpty

View file

@ -27,7 +27,7 @@ trait SharedSparkContext extends BeforeAndAfterAll with BeforeAndAfterEach { sel
def sc: SparkContext = _sc
var conf = new SparkConf(false)
val conf = new SparkConf(false)
/**
* Initialize the [[SparkContext]]. Generally, this is just called from beforeAll; however, in

View file

@ -461,7 +461,7 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst
val conf = new SparkConf()
conf.set(TASK_GPU_ID.amountConf, "2")
conf.set(TASK_FPGA_ID.amountConf, "0")
var taskResourceRequirement =
val taskResourceRequirement =
parseResourceRequirements(conf, SPARK_TASK_PREFIX)
.map(req => (req.resourceName, req.amount)).toMap

View file

@ -939,7 +939,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
.setAppName("test-cluster")
conf.set(TASK_GPU_ID.amountConf, "1")
var error = intercept[SparkException] {
val error = intercept[SparkException] {
sc = new SparkContext(conf)
}.getMessage()
@ -954,7 +954,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
conf.set(TASK_GPU_ID.amountConf, "2")
conf.set(EXECUTOR_GPU_ID.amountConf, "1")
var error = intercept[SparkException] {
val error = intercept[SparkException] {
sc = new SparkContext(conf)
}.getMessage()
@ -970,7 +970,7 @@ class SparkContextSuite extends SparkFunSuite with LocalSparkContext with Eventu
conf.set(TASK_GPU_ID.amountConf, "2")
conf.set(EXECUTOR_GPU_ID.amountConf, "4")
var error = intercept[SparkException] {
val error = intercept[SparkException] {
sc = new SparkContext(conf)
}.getMessage()

View file

@ -55,7 +55,7 @@ class ApplicationCacheSuite extends SparkFunSuite with Logging with MockitoSugar
var getAppUICount = 0L
var attachCount = 0L
var detachCount = 0L
var updateProbeCount = 0L
val updateProbeCount = 0L
override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
logDebug(s"getAppUI($appId, $attemptId)")

View file

@ -216,7 +216,7 @@ class SingleFileEventLogFileReaderSuite extends EventLogFileReadersSuite {
Utils.tryWithResource(new ZipInputStream(
new ByteArrayInputStream(underlyingStream.toByteArray))) { is =>
var entry = is.getNextEntry
val entry = is.getNextEntry
assert(entry != null)
val actual = new String(ByteStreams.toByteArray(is), StandardCharsets.UTF_8)
val expected = Files.toString(new File(logPath.toString), StandardCharsets.UTF_8)

View file

@ -58,7 +58,7 @@ class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) extend
val id = seq.toString
override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq,
conf, new SecurityManager(conf))
var apps = new mutable.HashMap[String, String]()
val apps = new mutable.HashMap[String, String]()
val driverIdToAppId = new mutable.HashMap[String, String]()
def newDriver(driverId: String): RpcEndpointRef = {
val name = s"driver_${drivers.size}"

View file

@ -61,7 +61,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val testResourceArgs: JObject = ("" -> "")
val ja = JArray(List(testResourceArgs))
val f1 = createTempJsonFile(tmpDir, "resources", ja)
var error = intercept[SparkException] {
val error = intercept[SparkException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()
@ -146,7 +146,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = createTempJsonFile(tmpDir, "resources", ja)
var error = intercept[IllegalArgumentException] {
val error = intercept[IllegalArgumentException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()
@ -160,7 +160,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val ja = Extraction.decompose(Seq(fpga))
val f1 = createTempJsonFile(tmpDir, "resources", ja)
var error = intercept[SparkException] {
val error = intercept[SparkException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()
@ -199,7 +199,7 @@ class CoarseGrainedExecutorBackendSuite extends SparkFunSuite
val ja = Extraction.decompose(Seq(gpuArgs))
val f1 = createTempJsonFile(tmpDir, "resources", ja)
var error = intercept[IllegalArgumentException] {
val error = intercept[IllegalArgumentException] {
val parsedResources = backend.parseOrFindResources(Some(f1))
}.getMessage()

View file

@ -110,7 +110,7 @@ class ResourceProfileManagerSuite extends SparkFunSuite {
val gpuExecReq =
new ExecutorResourceRequests().resource("gpu", 2, "someScript")
val immrprof = rprof.require(gpuExecReq).build
var error = intercept[SparkException] {
val error = intercept[SparkException] {
rpmanager.isSupported(immrprof)
}.getMessage()

View file

@ -178,7 +178,7 @@ class ResourceUtilsSuite extends SparkFunSuite
test("list resource ids") {
val conf = new SparkConf
conf.set(DRIVER_GPU_ID.amountConf, "2")
var resources = listResourceIds(conf, SPARK_DRIVER_PREFIX)
val resources = listResourceIds(conf, SPARK_DRIVER_PREFIX)
assert(resources.size === 1, "should only have GPU for resource")
assert(resources(0).resourceName == GPU, "name should be gpu")

View file

@ -99,8 +99,8 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
Thread.sleep(5000)
iter
}
var taskStarted = new AtomicBoolean(false)
var taskEnded = new AtomicBoolean(false)
val taskStarted = new AtomicBoolean(false)
val taskEnded = new AtomicBoolean(false)
val listener = new SparkListener() {
override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
taskStarted.set(true)
@ -239,11 +239,11 @@ class CoarseGrainedSchedulerBackendSuite extends SparkFunSuite with LocalSparkCo
var execResources = backend.getExecutorAvailableResources("1")
assert(execResources(GPU).availableAddrs.sorted === Array("0", "1", "3"))
var exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
val exec3ResourceProfileId = backend.getExecutorResourceProfileId("3")
assert(exec3ResourceProfileId === rp.id)
val taskResources = Map(GPU -> new ResourceInformation(GPU, Array("0")))
var taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
val taskDescs: Seq[Seq[TaskDescription]] = Seq(Seq(new TaskDescription(1, 0, "1",
"t1", 0, 1, mutable.Map.empty[String, Long],
mutable.Map.empty[String, Long], mutable.Map.empty[String, Long],
new Properties(), taskResources, bytebuffer)))

View file

@ -3336,7 +3336,7 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
val ereqs3 = new ExecutorResourceRequests().cores(3)
val treqs3 = new TaskResourceRequests().cpus(2)
val rp3 = new ResourceProfile(ereqs3.requests, treqs3.requests)
var mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3))
val mergedRp = scheduler.mergeResourceProfilesForStage(HashSet(rp1, rp2, rp3))
assert(mergedRp.getTaskCpus.get == 2)
assert(mergedRp.getExecutorCores.get == 4)

View file

@ -1007,7 +1007,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
val tsm = stageToMockTaskSetManager(0)
// submit an offer with one executor
var taskAttempts = taskScheduler.resourceOffers(IndexedSeq(
val taskAttempts = taskScheduler.resourceOffers(IndexedSeq(
WorkerOffer("executor0", "host0", 1)
)).flatten
@ -1763,7 +1763,7 @@ class TaskSchedulerImplSuite extends SparkFunSuite with LocalSparkContext with B
taskScheduler.submitTasks(taskSet)
// Launch tasks on executor that satisfies resource requirements.
var taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
val taskDescriptions = taskScheduler.resourceOffers(singleCoreWorkerOffers).flatten
assert(3 === taskDescriptions.length)
assert(!failedTaskSet)
assert(ArrayBuffer("0") === taskDescriptions(0).resources.get(GPU).get.addresses)

View file

@ -54,14 +54,14 @@ class CryptoStreamUtilsSuite extends SparkFunSuite {
test("shuffle encryption key length should be 128 by default") {
val conf = createConf()
var key = CryptoStreamUtils.createKey(conf)
val key = CryptoStreamUtils.createKey(conf)
val actual = key.length * (java.lang.Byte.SIZE)
assert(actual === 128)
}
test("create 256-bit key") {
val conf = createConf(IO_ENCRYPTION_KEY_SIZE_BITS.key -> "256")
var key = CryptoStreamUtils.createKey(conf)
val key = CryptoStreamUtils.createKey(conf)
val actual = key.length * (java.lang.Byte.SIZE)
assert(actual === 256)
}

View file

@ -35,8 +35,8 @@ class ElementTrackingStoreSuite extends SparkFunSuite with Eventually {
val tracking = new ElementTrackingStore(store, new SparkConf()
.set(ASYNC_TRACKING_ENABLED, true))
var done = new AtomicBoolean(false)
var type1 = new AtomicInteger(0)
val done = new AtomicBoolean(false)
val type1 = new AtomicInteger(0)
var queued0: WriteQueueResult = null
var queued1: WriteQueueResult = null
var queued2: WriteQueueResult = null

View file

@ -37,7 +37,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq).
flatMap( x => Stream.range(0, expand_size))
var persisted = data.persist(StorageLevel.DISK_ONLY)
val persisted = data.persist(StorageLevel.DISK_ONLY)
assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5)
}
@ -48,7 +48,7 @@ class FlatmapIteratorSuite extends SparkFunSuite with LocalSparkContext {
val expand_size = 100
val data = sc.parallelize((1 to 5).toSeq).
flatMap(x => Stream.range(0, expand_size))
var persisted = data.persist(StorageLevel.MEMORY_ONLY)
val persisted = data.persist(StorageLevel.MEMORY_ONLY)
assert(persisted.count()===500)
assert(persisted.filter(_==1).count()===5)
}

View file

@ -39,7 +39,7 @@ class MemoryStoreSuite
with BeforeAndAfterEach
with ResetSystemProperties {
var conf: SparkConf = new SparkConf(false)
val conf: SparkConf = new SparkConf(false)
.set(STORAGE_UNROLL_MEMORY_THRESHOLD, 512L)
// Reuse a serializer across tests to avoid creating a new thread-local buffer on each test

View file

@ -147,7 +147,7 @@ object TestObject {
}
class TestClass extends Serializable {
var x = 5
val x = 5
def getX: Int = x
@ -179,7 +179,7 @@ class TestClassWithoutFieldAccess {
def run(): Int = {
var nonSer2 = new NonSerializable
var x = 5
val x = 5
withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
nums.map(_ + x).reduce(_ + _)
@ -218,10 +218,10 @@ object TestObjectWithNesting {
var answer = 0
withSpark(new SparkContext("local", "test")) { sc =>
val nums = sc.parallelize(Array(1, 2, 3, 4))
var y = 1
val y = 1
for (i <- 1 to 4) {
var nonSer2 = new NonSerializable
var x = i
val x = i
answer += nums.map(_ + x + y).reduce(_ + _)
}
answer
@ -239,7 +239,7 @@ class TestClassWithNesting(val y: Int) extends Serializable {
val nums = sc.parallelize(Array(1, 2, 3, 4))
for (i <- 1 to 4) {
var nonSer2 = new NonSerializable
var x = i
val x = i
answer += nums.map(_ + x + getY).reduce(_ + _)
}
answer
@ -339,7 +339,7 @@ private object TestUserClosuresActuallyCleaned {
class TestCreateNullValue {
var x = 5
val x = 5
def getX: Int = x

View file

@ -323,7 +323,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
var d: Double = 0.0
var sampler = new BernoulliSampler[Int](0.1)
val sampler = new BernoulliSampler[Int](0.1)
sampler.setSeed(rngSeed.nextLong)
// Array iterator (indexable type)
@ -547,7 +547,7 @@ class RandomSamplerSuite extends SparkFunSuite with Matchers {
RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
var d: Double = 0.0
var sampler = new PoissonSampler[Int](0.1)
val sampler = new PoissonSampler[Int](0.1)
sampler.setSeed(rngSeed.nextLong)
// Array iterator (indexable type)

View file

@ -101,7 +101,7 @@ object LocalKMeans {
}
}
var newPoints = pointStats.map {mapping =>
val newPoints = pointStats.map { mapping =>
(mapping._1, mapping._2._1 * (1.0 / mapping._2._2))}
tempDist = 0.0

View file

@ -236,7 +236,7 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll {
// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
var sentCount = sent.values.sum
val sentCount = sent.values.sum
val rdd = KafkaUtils.createRDD[String, String](sc, kafkaParams,
Array(OffsetRange(topic, 0, 0, sentCount)), preferredHosts)

View file

@ -67,7 +67,7 @@ private[mllib] object EigenValueDecomposition {
// "LM" : compute the NEV largest (in magnitude) eigenvalues
val which = "LM"
var iparam = new Array[Int](11)
val iparam = new Array[Int](11)
// use exact shift in each iteration
iparam(0) = 1
// maximum number of Arnoldi update iterations, or the actual number of iterations on output

View file

@ -224,7 +224,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite
(3, 4)
)).toDF("src", "dst").repartition(1)
var assignments2 = new PowerIterationClustering()
val assignments2 = new PowerIterationClustering()
.setInitMode("random")
.setK(2)
.assignClusters(data2)
@ -238,7 +238,7 @@ class PowerIterationClusteringSuite extends SparkFunSuite
assert(Set(predictions2(0).size, predictions2(1).size) !== Set(2, 3))
var assignments3 = new PowerIterationClustering()
val assignments3 = new PowerIterationClustering()
.setInitMode("degree")
.setK(2)
.assignClusters(data2)

View file

@ -219,7 +219,7 @@ private[joins] class UnsafeHashedRelation(
var resultRow = new UnsafeRow(numFields)
// re-used in getWithKeyIndex()/getValueWithKeyIndex()/valuesWithKeyIndex()
var valueRowWithKeyIndex = new ValueRowWithKeyIndex
val valueRowWithKeyIndex = new ValueRowWithKeyIndex
override def get(key: InternalRow): Iterator[InternalRow] = {
val unsafeKey = key.asInstanceOf[UnsafeRow]

View file

@ -56,7 +56,7 @@ case class ForeachWriterTable[T](
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
new WriteBuilder with SupportsTruncate with SupportsStreamingUpdateAsAppend {
private var inputSchema: StructType = info.schema()
private val inputSchema: StructType = info.schema()
// Do nothing for truncate. Foreach sink is special and it just forwards all the
// records to ForeachWriter.

View file

@ -430,8 +430,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
.format("org.apache.spark.sql.streaming.test")
.load()
var w = df.writeStream
var e = intercept[IllegalArgumentException](w.foreach(null))
val w = df.writeStream
val e = intercept[IllegalArgumentException](w.foreach(null))
Seq("foreach", "null").foreach { s =>
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
}
@ -447,8 +447,8 @@ class DataStreamReaderWriterSuite extends StreamTest with BeforeAndAfter {
override def process(value: Row): Unit = {}
override def close(errorOrNull: Throwable): Unit = {}
}
var w = df.writeStream.partitionBy("value")
var e = intercept[AnalysisException](w.foreach(foreachWriter).start())
val w = df.writeStream.partitionBy("value")
val e = intercept[AnalysisException](w.foreach(foreachWriter).start())
Seq("foreach", "partitioning").foreach { s =>
assert(e.getMessage.toLowerCase(Locale.ROOT).contains(s.toLowerCase(Locale.ROOT)))
}

View file

@ -380,7 +380,7 @@ class DataFrameReaderWriterSuite extends QueryTest with SharedSparkSession with
withTable("t") {
sql("create table t(i int, d double) using parquet")
// Calling `saveAsTable` to an existing table with append mode results in table insertion.
var msg = intercept[AnalysisException] {
val msg = intercept[AnalysisException] {
Seq((1L, 2.0)).toDF("i", "d").write.mode("append").saveAsTable("t")
}.getMessage
assert(msg.contains("Cannot safely cast 'i': bigint to int"))

View file

@ -208,7 +208,7 @@ class HiveQuerySuite extends HiveComparisonTest with SQLTestUtils with BeforeAnd
|FROM src LIMIT 1""".stripMargin)
test("constant null testing timestamp") {
var r1 = sql(
val r1 = sql(
"""
|SELECT IF(FALSE, CAST(NULL AS TIMESTAMP),
|CAST('1969-12-31 16:00:01' AS TIMESTAMP)) AS COL20

View file

@ -30,7 +30,7 @@ import org.apache.spark.internal.Logging
trait LocalStreamingContext extends BeforeAndAfterEach { self: Suite =>
@transient var ssc: StreamingContext = _
@transient var stopSparkContext: Boolean = true
@transient val stopSparkContext: Boolean = true
override def afterEach(): Unit = {
try {

View file

@ -332,7 +332,7 @@ class KillingThread(ssc: StreamingContext, maxKillWaitTime: Long) extends Thread
override def run(): Unit = {
try {
// If it is the first killing, then allow the first checkpoint to be created
var minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000
val minKillWaitTime = if (MasterFailureTest.killCount == 0) 5000 else 2000
val killWaitTime = minKillWaitTime + math.abs(Random.nextLong % maxKillWaitTime)
logInfo("Kill wait time = " + killWaitTime)
Thread.sleep(killWaitTime)

View file

@ -75,7 +75,7 @@ abstract class BaseReceivedBlockHandlerSuite(enableEncryption: Boolean)
val mapOutputTracker = new MapOutputTrackerMaster(conf, broadcastManager, true)
val shuffleManager = new SortShuffleManager(conf)
val serializer = new KryoSerializer(conf)
var serializerManager = new SerializerManager(serializer, conf, encryptionKey)
val serializerManager = new SerializerManager(serializer, conf, encryptionKey)
val manualClock = new ManualClock
val blockManagerSize = 10000000
val blockManagerBuffer = new ArrayBuffer[BlockManager]()

View file

@ -203,7 +203,7 @@ private[streaming] object RateTestReceiver {
*/
class StoppableReceiver extends Receiver[Int](StorageLevel.MEMORY_ONLY) {
var receivingThreadOption: Option[Thread] = None
val receivingThreadOption: Option[Thread] = None
def onStart(): Unit = {
val thread = new Thread() {