From 7f7505d8db7759ea46e904f767c23130eff1104a Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Thu, 28 May 2015 20:15:52 -0700 Subject: [PATCH] [SPARK-7927] whitespace fixes for core. So we can enable a whitespace enforcement rule in the style checker to save code review time. Author: Reynold Xin Closes #6473 from rxin/whitespace-core and squashes the following commits: 058195d [Reynold Xin] Fixed tests. fce11e9 [Reynold Xin] [SPARK-7927] whitespace fixes for core. --- .../scala/org/apache/spark/Accumulators.scala | 2 +- .../scala/org/apache/spark/Aggregator.scala | 4 +-- .../scala/org/apache/spark/Partitioner.scala | 8 ++--- .../scala/org/apache/spark/SparkConf.scala | 2 +- .../scala/org/apache/spark/SparkContext.scala | 10 +++--- .../scala/org/apache/spark/SparkEnv.scala | 4 +-- .../org/apache/spark/SparkHadoopWriter.scala | 10 +++--- .../apache/spark/api/java/JavaRDDLike.scala | 2 +- .../apache/spark/api/python/PythonRDD.scala | 2 +- .../apache/spark/api/r/RBackendHandler.scala | 4 +-- .../scala/org/apache/spark/api/r/RRDD.scala | 2 +- .../spark/broadcast/HttpBroadcast.scala | 4 +-- .../spark/deploy/FaultToleranceTest.scala | 2 +- .../org/apache/spark/deploy/SparkSubmit.scala | 2 +- .../apache/spark/deploy/worker/Worker.scala | 2 +- .../apache/spark/executor/TaskMetrics.scala | 2 +- .../mapreduce/SparkHadoopMapReduceUtil.scala | 2 +- .../spark/network/nio/BlockMessage.scala | 2 +- .../spark/network/nio/BlockMessageArray.scala | 4 +-- .../spark/network/nio/SecurityMessage.scala | 2 +- .../spark/partial/GroupedCountEvaluator.scala | 6 ++-- .../org/apache/spark/rdd/CheckpointRDD.scala | 2 +- .../org/apache/spark/rdd/CoalescedRDD.scala | 4 +-- .../apache/spark/rdd/PairRDDFunctions.scala | 6 ++-- .../main/scala/org/apache/spark/rdd/RDD.scala | 12 +++---- .../spark/rdd/SequenceFileRDDFunctions.scala | 6 ++-- .../org/apache/spark/rdd/SubtractedRDD.scala | 2 +- .../spark/rdd/ZippedPartitionsRDD.scala | 2 +- .../apache/spark/scheduler/DAGScheduler.scala | 4 +-- .../spark/scheduler/DAGSchedulerSource.scala | 3 +- .../spark/scheduler/SchedulingAlgorithm.scala | 2 +- .../spark/scheduler/SparkListener.scala | 6 ++-- .../spark/scheduler/TaskSetManager.scala | 4 +-- .../cluster/CoarseGrainedClusterMessage.scala | 3 +- .../cluster/YarnSchedulerBackend.scala | 2 +- .../mesos/CoarseMesosSchedulerBackend.scala | 2 +- .../cluster/mesos/MesosSchedulerBackend.scala | 2 +- .../mesos/MesosSchedulerBackendUtil.scala | 2 +- .../status/api/v1/AllStagesResource.scala | 4 +-- .../spark/status/api/v1/ApiRootResource.scala | 8 ++--- .../spark/status/api/v1/OneRDDResource.scala | 2 +- .../org/apache/spark/status/api/v1/api.scala | 2 +- .../storage/BlockManagerSlaveEndpoint.scala | 2 +- .../spark/storage/BlockManagerSource.scala | 3 +- .../scala/org/apache/spark/ui/SparkUI.scala | 2 +- .../scala/org/apache/spark/ui/UIUtils.scala | 2 +- .../apache/spark/ui/UIWorkloadGenerator.scala | 2 +- .../org/apache/spark/ui/storage/RDDPage.scala | 2 +- .../org/apache/spark/util/AkkaUtils.scala | 2 +- .../spark/util/CompletionIterator.scala | 2 +- .../org/apache/spark/util/Distribution.scala | 4 +-- .../apache/spark/util/MetadataCleaner.scala | 2 +- .../org/apache/spark/util/MutablePair.scala | 2 +- .../org/apache/spark/util/SizeEstimator.scala | 16 ++++----- .../scala/org/apache/spark/util/Utils.scala | 2 +- .../apache/spark/util/collection/BitSet.scala | 2 +- .../util/collection/SortDataFormat.scala | 4 +-- .../util/random/StratifiedSamplingUtils.scala | 2 +- .../org/apache/spark/AccumulatorSuite.scala | 2 +- .../org/apache/spark/CheckpointSuite.scala | 4 +-- .../apache/spark/ContextCleanerSuite.scala | 6 ++-- .../scala/org/apache/spark/FailureSuite.scala | 2 +- .../org/apache/spark/FileServerSuite.scala | 20 +++++------ .../scala/org/apache/spark/FileSuite.scala | 2 +- .../apache/spark/ImplicitOrderingSuite.scala | 4 +-- .../org/apache/spark/SparkConfSuite.scala | 12 +++---- .../org/apache/spark/SparkContextSuite.scala | 14 ++++---- .../spark/broadcast/BroadcastSuite.scala | 2 +- .../deploy/worker/WorkerArgumentsTest.scala | 2 +- .../spark/deploy/worker/WorkerSuite.scala | 2 +- .../metrics/InputOutputMetricsSuite.scala | 6 ++-- .../spark/rdd/PairRDDFunctionsSuite.scala | 10 +++--- .../scala/org/apache/spark/rdd/RDDSuite.scala | 34 +++++++++---------- .../org/apache/spark/rdd/RDDSuiteUtils.scala | 6 ++-- .../org/apache/spark/rdd/SortingSuite.scala | 4 +-- .../org/apache/spark/rpc/RpcEnvSuite.scala | 6 ++-- .../CoarseGrainedSchedulerBackendSuite.scala | 4 +-- .../spark/scheduler/DAGSchedulerSuite.scala | 6 ++-- .../apache/spark/scheduler/PoolSuite.scala | 4 +-- .../serializer/KryoSerializerSuite.scala | 4 +-- .../ProactiveClosureSerializationSuite.scala | 10 +++--- .../spark/serializer/TestSerializer.scala | 11 +++--- .../spark/storage/FlatmapIteratorSuite.scala | 6 ++-- .../org/apache/spark/ui/UISeleniumSuite.scala | 8 ++--- .../spark/ui/storage/StorageTabSuite.scala | 2 +- .../apache/spark/util/AkkaUtilsSuite.scala | 2 +- .../org/apache/spark/util/UtilsSuite.scala | 2 +- .../spark/util/collection/BitSetSuite.scala | 2 +- 88 files changed, 205 insertions(+), 203 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/Accumulators.scala b/core/src/main/scala/org/apache/spark/Accumulators.scala index 330df1d59a..5a8d17bd99 100644 --- a/core/src/main/scala/org/apache/spark/Accumulators.scala +++ b/core/src/main/scala/org/apache/spark/Accumulators.scala @@ -228,7 +228,7 @@ GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializa * @tparam T result type */ class Accumulator[T](@transient initialValue: T, param: AccumulatorParam[T], name: Option[String]) - extends Accumulable[T,T](initialValue, param, name) { + extends Accumulable[T, T](initialValue, param, name) { def this(initialValue: T, param: AccumulatorParam[T]) = this(initialValue, param, None) } diff --git a/core/src/main/scala/org/apache/spark/Aggregator.scala b/core/src/main/scala/org/apache/spark/Aggregator.scala index af9765d313..b8a5f50168 100644 --- a/core/src/main/scala/org/apache/spark/Aggregator.scala +++ b/core/src/main/scala/org/apache/spark/Aggregator.scala @@ -45,7 +45,7 @@ case class Aggregator[K, V, C] ( def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]], context: TaskContext): Iterator[(K, C)] = { if (!isSpillEnabled) { - val combiners = new AppendOnlyMap[K,C] + val combiners = new AppendOnlyMap[K, C] var kv: Product2[K, V] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeValue(oldValue, kv._2) else createCombiner(kv._2) @@ -76,7 +76,7 @@ case class Aggregator[K, V, C] ( : Iterator[(K, C)] = { if (!isSpillEnabled) { - val combiners = new AppendOnlyMap[K,C] + val combiners = new AppendOnlyMap[K, C] var kc: Product2[K, C] = null val update = (hadValue: Boolean, oldValue: C) => { if (hadValue) mergeCombiners(oldValue, kc._2) else kc._2 diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala index b8d244408b..82889bcd30 100644 --- a/core/src/main/scala/org/apache/spark/Partitioner.scala +++ b/core/src/main/scala/org/apache/spark/Partitioner.scala @@ -103,7 +103,7 @@ class HashPartitioner(partitions: Int) extends Partitioner { */ class RangePartitioner[K : Ordering : ClassTag, V]( @transient partitions: Int, - @transient rdd: RDD[_ <: Product2[K,V]], + @transient rdd: RDD[_ <: Product2[K, V]], private var ascending: Boolean = true) extends Partitioner { @@ -185,7 +185,7 @@ class RangePartitioner[K : Ordering : ClassTag, V]( } override def equals(other: Any): Boolean = other match { - case r: RangePartitioner[_,_] => + case r: RangePartitioner[_, _] => r.rangeBounds.sameElements(rangeBounds) && r.ascending == ascending case _ => false @@ -249,7 +249,7 @@ private[spark] object RangePartitioner { * @param sampleSizePerPartition max sample size per partition * @return (total number of items, an array of (partitionId, number of items, sample)) */ - def sketch[K:ClassTag]( + def sketch[K : ClassTag]( rdd: RDD[K], sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = { val shift = rdd.id @@ -272,7 +272,7 @@ private[spark] object RangePartitioner { * @param partitions number of partitions * @return selected bounds */ - def determineBounds[K:Ordering:ClassTag]( + def determineBounds[K : Ordering : ClassTag]( candidates: ArrayBuffer[(K, Float)], partitions: Int): Array[K] = { val ordering = implicitly[Ordering[K]] diff --git a/core/src/main/scala/org/apache/spark/SparkConf.scala b/core/src/main/scala/org/apache/spark/SparkConf.scala index b5e5d6f146..4b5bcb54aa 100644 --- a/core/src/main/scala/org/apache/spark/SparkConf.scala +++ b/core/src/main/scala/org/apache/spark/SparkConf.scala @@ -481,7 +481,7 @@ private[spark] object SparkConf extends Logging { "are no longer accepted. To specify the equivalent now, one may use '64k'.") ) - Map(configs.map { cfg => (cfg.key -> cfg) }:_*) + Map(configs.map { cfg => (cfg.key -> cfg) } : _*) } /** diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index ea6c0dea08..a453c9bf48 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -389,7 +389,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) - _jars =_conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten + _jars = _conf.getOption("spark.jars").map(_.split(",")).map(_.filter(_.size != 0)).toSeq.flatten _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.size != 0)) .toSeq.flatten @@ -438,7 +438,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.createLiveUI(this, _conf, listenerBus, _jobProgressListener, - _env.securityManager,appName, startTime = startTime)) + _env.securityManager, appName, startTime = startTime)) } else { // For tests, do not enable the UI None @@ -917,7 +917,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli classOf[FixedLengthBinaryInputFormat], classOf[LongWritable], classOf[BytesWritable], - conf=conf) + conf = conf) val data = br.map { case (k, v) => val bytes = v.getBytes assert(bytes.length == recordLength, "Byte array does not have correct length") @@ -1267,7 +1267,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli */ def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable: ClassTag, T] (initialValue: R): Accumulable[R, T] = { - val param = new GrowableAccumulableParam[R,T] + val param = new GrowableAccumulableParam[R, T] val acc = new Accumulable(initialValue, param) cleaner.foreach(_.registerAccumulatorForCleanup(acc)) acc @@ -1316,7 +1316,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli val uri = new URI(path) val schemeCorrectedPath = uri.getScheme match { case null | "local" => new File(path).getCanonicalFile.toURI.toString - case _ => path + case _ => path } val hadoopPath = new Path(schemeCorrectedPath) diff --git a/core/src/main/scala/org/apache/spark/SparkEnv.scala b/core/src/main/scala/org/apache/spark/SparkEnv.scala index 3271145428..a185954089 100644 --- a/core/src/main/scala/org/apache/spark/SparkEnv.scala +++ b/core/src/main/scala/org/apache/spark/SparkEnv.scala @@ -298,7 +298,7 @@ object SparkEnv extends Logging { } } - val mapOutputTracker = if (isDriver) { + val mapOutputTracker = if (isDriver) { new MapOutputTrackerMaster(conf) } else { new MapOutputTrackerWorker(conf) @@ -348,7 +348,7 @@ object SparkEnv extends Logging { val fileServerPort = conf.getInt("spark.fileserver.port", 0) val server = new HttpFileServer(conf, securityManager, fileServerPort) server.initialize() - conf.set("spark.fileserver.uri", server.serverUri) + conf.set("spark.fileserver.uri", server.serverUri) server } else { null diff --git a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala index 2ec42d3aea..59ac82ccec 100644 --- a/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala +++ b/core/src/main/scala/org/apache/spark/SparkHadoopWriter.scala @@ -50,8 +50,8 @@ class SparkHadoopWriter(@transient jobConf: JobConf) private var jID: SerializableWritable[JobID] = null private var taID: SerializableWritable[TaskAttemptID] = null - @transient private var writer: RecordWriter[AnyRef,AnyRef] = null - @transient private var format: OutputFormat[AnyRef,AnyRef] = null + @transient private var writer: RecordWriter[AnyRef, AnyRef] = null + @transient private var format: OutputFormat[AnyRef, AnyRef] = null @transient private var committer: OutputCommitter = null @transient private var jobContext: JobContext = null @transient private var taskContext: TaskAttemptContext = null @@ -114,10 +114,10 @@ class SparkHadoopWriter(@transient jobConf: JobConf) // ********* Private Functions ********* - private def getOutputFormat(): OutputFormat[AnyRef,AnyRef] = { + private def getOutputFormat(): OutputFormat[AnyRef, AnyRef] = { if (format == null) { format = conf.value.getOutputFormat() - .asInstanceOf[OutputFormat[AnyRef,AnyRef]] + .asInstanceOf[OutputFormat[AnyRef, AnyRef]] } format } @@ -138,7 +138,7 @@ class SparkHadoopWriter(@transient jobConf: JobConf) private def getTaskContext(): TaskAttemptContext = { if (taskContext == null) { - taskContext = newTaskAttemptContext(conf.value, taID.value) + taskContext = newTaskAttemptContext(conf.value, taID.value) } taskContext } diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index 74db764322..b8e15f38a2 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -96,7 +96,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsWithIndex[R]( f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = - new JavaRDD(rdd.mapPartitionsWithIndex(((a,b) => f(a,asJavaIterator(b))), + new JavaRDD(rdd.mapPartitionsWithIndex(((a, b) => f(a, asJavaIterator(b))), preservesPartitioning)(fakeClassTag))(fakeClassTag) /** diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 2d92f6a42b..a77bf42ce1 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -723,7 +723,7 @@ private[spark] object PythonRDD extends Logging { val converted = convertRDD(rdd, keyConverterClass, valueConverterClass, new JavaToWritableConverter) val fc = Utils.classForName(outputFormatClass).asInstanceOf[Class[F]] - converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec=codec) + converted.saveAsHadoopFile(path, kc, vc, fc, new JobConf(mergedConf), codec = codec) } /** diff --git a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala index 0075d96371..026a1b9380 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RBackendHandler.scala @@ -124,7 +124,7 @@ private[r] class RBackendHandler(server: RBackend) } throw new Exception(s"No matched method found for $cls.$methodName") } - val ret = methods.head.invoke(obj, args:_*) + val ret = methods.head.invoke(obj, args : _*) // Write status bit writeInt(dos, 0) @@ -135,7 +135,7 @@ private[r] class RBackendHandler(server: RBackend) matchMethod(numArgs, args, x.getParameterTypes) }.head - val obj = ctor.newInstance(args:_*) + val obj = ctor.newInstance(args : _*) writeInt(dos, 0) writeObject(dos, obj.asInstanceOf[AnyRef]) diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index 06247f7e8b..e020458888 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -309,7 +309,7 @@ private class StringRRDD[T: ClassTag]( } private object SpecialLengths { - val TIMING_DATA = -1 + val TIMING_DATA = -1 } private[r] class BufferedStreamThread( diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index 4457c75e8b..b69af639f7 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -125,7 +125,7 @@ private[broadcast] object HttpBroadcast extends Logging { securityManager = securityMgr if (isDriver) { createServer(conf) - conf.set("spark.httpBroadcast.uri", serverUri) + conf.set("spark.httpBroadcast.uri", serverUri) } serverUri = conf.get("spark.httpBroadcast.uri") cleaner = new MetadataCleaner(MetadataCleanerType.HTTP_BROADCAST, cleanup, conf) @@ -187,7 +187,7 @@ private[broadcast] object HttpBroadcast extends Logging { } private def read[T: ClassTag](id: Long): T = { - logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id) + logDebug("broadcast read server: " + serverUri + " id: broadcast-" + id) val url = serverUri + "/" + BroadcastBlockId(id).name var uc: URLConnection = null diff --git a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala index c048b78910..b4edb6109e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala +++ b/core/src/main/scala/org/apache/spark/deploy/FaultToleranceTest.scala @@ -65,7 +65,7 @@ private object FaultToleranceTest extends App with Logging { private val workers = ListBuffer[TestWorkerInfo]() private var sc: SparkContext = _ - private val zk = SparkCuratorUtil.newClient(conf) + private val zk = SparkCuratorUtil.newClient(conf) private var numPassed = 0 private var numFailed = 0 diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 198371b70f..92bb5059a0 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -361,7 +361,7 @@ object SparkSubmit { pyArchives = pythonPath.mkString(",") } - pyArchives = pyArchives.split(",").map { localPath=> + pyArchives = pyArchives.split(",").map { localPath => val localURI = Utils.resolveURI(localPath) if (localURI.getScheme != "local") { args.files = mergeFileLists(args.files, localURI.toString) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index c8df024dda..ebc6cd76c6 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -554,7 +554,7 @@ private[deploy] object Worker extends Logging { conf = conf, securityManager = securityMgr) val masterAkkaUrls = masterUrls.map(Master.toAkkaUrl(_, AkkaUtils.protocol(actorSystem))) actorSystem.actorOf(Props(classOf[Worker], host, boundPort, webUiPort, cores, memory, - masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) + masterAkkaUrls, systemName, actorName, workDir, conf, securityMgr), name = actorName) (actorSystem, boundPort) } diff --git a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala index 06152f16ae..d90ae405a0 100644 --- a/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala +++ b/core/src/main/scala/org/apache/spark/executor/TaskMetrics.scala @@ -261,7 +261,7 @@ case class InputMetrics(readMethod: DataReadMethod.Value) { */ private var _recordsRead: Long = _ def recordsRead: Long = _recordsRead - def incRecordsRead(records: Long): Unit = _recordsRead += records + def incRecordsRead(records: Long): Unit = _recordsRead += records /** * Invoke the bytesReadCallback and mutate bytesRead. diff --git a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala index cfd20392d1..390d148bc9 100644 --- a/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala +++ b/core/src/main/scala/org/apache/spark/mapreduce/SparkHadoopMapReduceUtil.scala @@ -60,7 +60,7 @@ trait SparkHadoopMapReduceUtil { val taskTypeClass = Class.forName("org.apache.hadoop.mapreduce.TaskType") .asInstanceOf[Class[Enum[_]]] val taskType = taskTypeClass.getMethod("valueOf", classOf[String]).invoke( - taskTypeClass, if(isMap) "MAP" else "REDUCE") + taskTypeClass, if (isMap) "MAP" else "REDUCE") val ctor = klass.getDeclaredConstructor(classOf[String], classOf[Int], taskTypeClass, classOf[Int], classOf[Int]) ctor.newInstance(jtIdentifier, new JInteger(jobId), taskType, new JInteger(taskId), diff --git a/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala b/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala index b573f1a8a5..1a92a799d0 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/BlockMessage.scala @@ -110,7 +110,7 @@ private[nio] class BlockMessage() { def getType: Int = typ def getId: BlockId = id def getData: ByteBuffer = data - def getLevel: StorageLevel = level + def getLevel: StorageLevel = level def toBufferMessage: BufferMessage = { val buffers = new ArrayBuffer[ByteBuffer]() diff --git a/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala b/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala index 1ba25aa74a..7d0806f0c2 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/BlockMessageArray.scala @@ -114,8 +114,8 @@ private[nio] object BlockMessageArray { val blockMessages = (0 until 10).map { i => if (i % 2 == 0) { - val buffer = ByteBuffer.allocate(100) - buffer.clear + val buffer = ByteBuffer.allocate(100) + buffer.clear() BlockMessage.fromPutBlock(PutBlock(TestBlockId(i.toString), buffer, StorageLevel.MEMORY_ONLY_SER)) } else { diff --git a/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala b/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala index 747a2088a7..232c552f98 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/SecurityMessage.scala @@ -75,7 +75,7 @@ private[nio] class SecurityMessage extends Logging { for (i <- 1 to idLength) { idBuilder += buffer.getChar() } - connectionId = idBuilder.toString() + connectionId = idBuilder.toString() val tokenLength = buffer.getInt() token = new Array[Byte](tokenLength) diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala index 3ef3cc219d..91b07ce3af 100644 --- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala @@ -32,12 +32,12 @@ import org.apache.spark.util.collection.OpenHashMap * An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval. */ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double) - extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] { + extends ApproximateEvaluator[OpenHashMap[T, Long], Map[T, BoundedDouble]] { var outputsMerged = 0 - var sums = new OpenHashMap[T,Long]() // Sum of counts for each key + var sums = new OpenHashMap[T, Long]() // Sum of counts for each key - override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) { + override def merge(outputId: Int, taskResult: OpenHashMap[T, Long]) { outputsMerged += 1 taskResult.foreach { case (key, value) => sums.changeValue(key, value, _ + value) diff --git a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala index 0d130dd4c7..a4715e3437 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CheckpointRDD.scala @@ -49,7 +49,7 @@ class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String) if (fs.exists(cpath)) { val dirContents = fs.listStatus(cpath).map(_.getPath) val partitionFiles = dirContents.filter(_.getName.startsWith("part-")).map(_.toString).sorted - val numPart = partitionFiles.length + val numPart = partitionFiles.length if (numPart > 0 && (! partitionFiles(0).endsWith(CheckpointRDD.splitIdToFile(0)) || ! partitionFiles(numPart-1).endsWith(CheckpointRDD.splitIdToFile(numPart-1)))) { throw new SparkException("Invalid checkpoint directory: " + checkpointPath) diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala index 0c1b02c07d..663eebb8e4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala @@ -310,11 +310,11 @@ private class PartitionCoalescer(maxPartitions: Int, prev: RDD[_], balanceSlack: def throwBalls() { if (noLocality) { // no preferredLocations in parent RDD, no randomization needed if (maxPartitions > groupArr.size) { // just return prev.partitions - for ((p,i) <- prev.partitions.zipWithIndex) { + for ((p, i) <- prev.partitions.zipWithIndex) { groupArr(i).arr += p } } else { // no locality available, then simply split partitions based on positions in array - for(i <- 0 until maxPartitions) { + for (i <- 0 until maxPartitions) { val rangeStart = ((i.toLong * prev.partitions.length) / maxPartitions).toInt val rangeEnd = (((i.toLong + 1) * prev.partitions.length) / maxPartitions).toInt (rangeStart until rangeEnd).foreach{ j => groupArr(i).arr += prev.partitions(j) } diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 8653cdee1a..004899f27b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -467,7 +467,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKey[CompactBuffer[V]]( - createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false) + createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] } @@ -1011,7 +1011,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) jobFormat.checkOutputSpecs(job) } - val writeShard = (context: TaskContext, iter: Iterator[(K,V)]) => { + val writeShard = (context: TaskContext, iter: Iterator[(K, V)]) => { val config = wrappedConf.value /* "reduce task" */ val attemptId = newTaskAttemptID(jobtrackerID, stageId, isMap = false, context.partitionId, @@ -1027,7 +1027,7 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) val (outputMetrics, bytesWrittenCallback) = initHadoopOutputMetrics(context) - val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K,V]] + val writer = format.getRecordWriter(hadoopContext).asInstanceOf[NewRecordWriter[K, V]] require(writer != null, "Unable to obtain RecordWriter") var recordsWritten = 0L Utils.tryWithSafeFinally { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index d772f03f76..5fcef255e1 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -454,7 +454,7 @@ abstract class RDD[T: ClassTag]( withReplacement: Boolean, num: Int, seed: Long = Utils.random.nextLong): Array[T] = { - val numStDev = 10.0 + val numStDev = 10.0 if (num < 0) { throw new IllegalArgumentException("Negative number of elements requested") @@ -1138,8 +1138,8 @@ abstract class RDD[T: ClassTag]( if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } - val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) => - val map = new OpenHashMap[T,Long] + val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T, Long] = { (ctx, iter) => + val map = new OpenHashMap[T, Long] iter.foreach { t => map.changeValue(t, 1L, _ + 1L) } @@ -1585,15 +1585,15 @@ abstract class RDD[T: ClassTag]( case 0 => Seq.empty case 1 => val d = rdd.dependencies.head - debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]], true) + debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]], true) case _ => val frontDeps = rdd.dependencies.take(len - 1) val frontDepStrings = frontDeps.flatMap( - d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_,_,_]])) + d => debugString(d.rdd, prefix, d.isInstanceOf[ShuffleDependency[_, _, _]])) val lastDep = rdd.dependencies.last val lastDepStrings = - debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_,_,_]], true) + debugString(lastDep.rdd, prefix, lastDep.isInstanceOf[ShuffleDependency[_, _, _]], true) (frontDepStrings ++ lastDepStrings) } diff --git a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala index 3dfcf67f0e..4b5f15dd06 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SequenceFileRDDFunctions.scala @@ -104,13 +104,13 @@ class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag if (!convertKey && !convertValue) { self.saveAsHadoopFile(path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (!convertKey && convertValue) { - self.map(x => (x._1,anyToWritable(x._2))).saveAsHadoopFile( + self.map(x => (x._1, anyToWritable(x._2))).saveAsHadoopFile( path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (convertKey && !convertValue) { - self.map(x => (anyToWritable(x._1),x._2)).saveAsHadoopFile( + self.map(x => (anyToWritable(x._1), x._2)).saveAsHadoopFile( path, keyWritableClass, valueWritableClass, format, jobConf, codec) } else if (convertKey && convertValue) { - self.map(x => (anyToWritable(x._1),anyToWritable(x._2))).saveAsHadoopFile( + self.map(x => (anyToWritable(x._1), anyToWritable(x._2))).saveAsHadoopFile( path, keyWritableClass, valueWritableClass, format, jobConf, codec) } } diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index 633aeba3bb..f7cb1791d4 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -125,7 +125,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( integrate(0, t => getSeq(t._1) += t._2) // the second dep is rdd2; remove all of its keys integrate(1, t => map.remove(t._1)) - map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten + map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten } override def clearDependencies() { diff --git a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala index a96b6c3d23..81f40ad33a 100644 --- a/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/ZippedPartitionsRDD.scala @@ -123,7 +123,7 @@ private[spark] class ZippedPartitionsRDD3 } private[spark] class ZippedPartitionsRDD4 - [A: ClassTag, B: ClassTag, C: ClassTag, D:ClassTag, V: ClassTag]( + [A: ClassTag, B: ClassTag, C: ClassTag, D: ClassTag, V: ClassTag]( sc: SparkContext, var f: (Iterator[A], Iterator[B], Iterator[C], Iterator[D]) => Iterator[V], var rdd1: RDD[A], diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index a2299e907c..75a567fb31 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1367,10 +1367,10 @@ class DAGScheduler( private def getPreferredLocsInternal( rdd: RDD[_], partition: Int, - visited: HashSet[(RDD[_],Int)]): Seq[TaskLocation] = { + visited: HashSet[(RDD[_], Int)]): Seq[TaskLocation] = { // If the partition has already been visited, no need to re-visit. // This avoids exponential path exploration. SPARK-695 - if (!visited.add((rdd,partition))) { + if (!visited.add((rdd, partition))) { // Nil has already been returned for previously visited partitions. return Nil } diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala index 12668b6c09..02c67073af 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerSource.scala @@ -17,9 +17,8 @@ package org.apache.spark.scheduler -import com.codahale.metrics.{Gauge,MetricRegistry} +import com.codahale.metrics.{Gauge, MetricRegistry} -import org.apache.spark.SparkContext import org.apache.spark.metrics.source.Source private[spark] class DAGSchedulerSource(val dagScheduler: DAGScheduler) diff --git a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala index 5e62c8468f..864941d468 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SchedulingAlgorithm.scala @@ -56,7 +56,7 @@ private[spark] class FairSchedulingAlgorithm extends SchedulingAlgorithm { val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0).toDouble val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble - var compare:Int = 0 + var compare: Int = 0 if (s1Needy && !s2Needy) { return true diff --git a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala index 863d0befbc..9620915f49 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/SparkListener.scala @@ -270,7 +270,7 @@ class StatsReportListener extends SparkListener with Logging { private[spark] object StatsReportListener extends Logging { // For profiling, the extremes are more interesting - val percentiles = Array[Int](0,5,10,25,50,75,90,95,100) + val percentiles = Array[Int](0, 5, 10, 25, 50, 75, 90, 95, 100) val probabilities = percentiles.map(_ / 100.0) val percentilesHeader = "\t" + percentiles.mkString("%\t") + "%" @@ -304,7 +304,7 @@ private[spark] object StatsReportListener extends Logging { dOpt.foreach { d => showDistribution(heading, d, formatNumber)} } - def showDistribution(heading: String, dOpt: Option[Distribution], format:String) { + def showDistribution(heading: String, dOpt: Option[Distribution], format: String) { def f(d: Double): String = format.format(d) showDistribution(heading, dOpt, f _) } @@ -318,7 +318,7 @@ private[spark] object StatsReportListener extends Logging { } def showBytesDistribution( - heading:String, + heading: String, getMetric: (TaskInfo, TaskMetrics) => Option[Long], taskInfoMetrics: Seq[(TaskInfo, TaskMetrics)]) { showBytesDistribution(heading, extractLongDistribution(taskInfoMetrics, getMetric)) diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala index c4487d5b37..d473e51aba 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala @@ -781,10 +781,10 @@ private[spark] class TaskSetManager( // that it's okay if we add a task to the same queue twice (if it had multiple preferred // locations), because dequeueTaskFromList will skip already-running tasks. for (index <- getPendingTasksForExecutor(execId)) { - addPendingTask(index, readding=true) + addPendingTask(index, readding = true) } for (index <- getPendingTasksForHost(host)) { - addPendingTask(index, readding=true) + addPendingTask(index, readding = true) } // Re-enqueue any tasks that ran on the failed executor if this is a shuffle map stage, diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala index 70364cea62..4be1eda2e9 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedClusterMessage.scala @@ -75,7 +75,8 @@ private[spark] object CoarseGrainedClusterMessages { case class SetupDriver(driver: RpcEndpointRef) extends CoarseGrainedClusterMessage // Exchanged between the driver and the AM in Yarn client mode - case class AddWebUIFilter(filterName:String, filterParams: Map[String, String], proxyBase: String) + case class AddWebUIFilter( + filterName: String, filterParams: Map[String, String], proxyBase: String) extends CoarseGrainedClusterMessage // Messages exchanged between the driver and the cluster manager for executor allocation diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala index 2a3a5d925d..190ff61d68 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/YarnSchedulerBackend.scala @@ -149,7 +149,7 @@ private[spark] abstract class YarnSchedulerBackend( } } - override def onStop(): Unit ={ + override def onStop(): Unit = { askAmThreadPool.shutdownNow() } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index aff086594c..6b8edca5aa 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -52,7 +52,7 @@ private[spark] class CoarseMesosSchedulerBackend( val MAX_SLAVE_FAILURES = 2 // Blacklist a slave after this many failures // Maximum number of cores to acquire (TODO: we'll need more flexible controls here) - val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt + val maxCores = conf.get("spark.cores.max", Int.MaxValue.toString).toInt // Cores we have acquired with each Mesos task ID val coresByTaskId = new HashMap[Int, Int] diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index db0a080b3b..49de85ef48 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -146,7 +146,7 @@ private[spark] class MesosSchedulerBackend( private def createExecArg(): Array[Byte] = { if (execArgs == null) { val props = new HashMap[String, String] - for ((key,value) <- sc.conf.getAll) { + for ((key, value) <- sc.conf.getAll) { props(key) = value } // Serialize the map as an array of (String, String) pairs diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index 928c5cfed4..2f2934c249 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -108,7 +108,7 @@ private[mesos] object MesosSchedulerBackendUtil extends Logging { image: String, volumes: Option[List[Volume]] = None, network: Option[ContainerInfo.DockerInfo.Network] = None, - portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None):Unit = { + portmaps: Option[List[ContainerInfo.DockerInfo.PortMapping]] = None): Unit = { val docker = ContainerInfo.DockerInfo.newBuilder().setImage(image) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala index 50608588f0..390c136df7 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/AllStagesResource.scala @@ -169,7 +169,7 @@ private[v1] object AllStagesResource { val outputMetrics: Option[OutputMetricDistributions] = new MetricHelper[InternalOutputMetrics, OutputMetricDistributions](rawMetrics, quantiles) { - def getSubmetrics(raw:InternalTaskMetrics): Option[InternalOutputMetrics] = { + def getSubmetrics(raw: InternalTaskMetrics): Option[InternalOutputMetrics] = { raw.outputMetrics } def build: OutputMetricDistributions = new OutputMetricDistributions( @@ -284,7 +284,7 @@ private[v1] object AllStagesResource { * the options (returning None if the metrics are all empty), and extract the quantiles for each * metric. After creating an instance, call metricOption to get the result type. */ -private[v1] abstract class MetricHelper[I,O]( +private[v1] abstract class MetricHelper[I, O]( rawMetrics: Seq[InternalTaskMetrics], quantiles: Array[Double]) { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index bf2cc2e72f..f73c742732 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -101,7 +101,7 @@ private[v1] class ApiRootResource extends UIRootFromServletContext { @Path("applications/{appId}/stages") - def getStages(@PathParam("appId") appId: String): AllStagesResource= { + def getStages(@PathParam("appId") appId: String): AllStagesResource = { uiRoot.withSparkUI(appId, None) { ui => new AllStagesResource(ui) } @@ -110,14 +110,14 @@ private[v1] class ApiRootResource extends UIRootFromServletContext { @Path("applications/{appId}/{attemptId}/stages") def getStages( @PathParam("appId") appId: String, - @PathParam("attemptId") attemptId: String): AllStagesResource= { + @PathParam("attemptId") attemptId: String): AllStagesResource = { uiRoot.withSparkUI(appId, Some(attemptId)) { ui => new AllStagesResource(ui) } } @Path("applications/{appId}/stages/{stageId: \\d+}") - def getStage(@PathParam("appId") appId: String): OneStageResource= { + def getStage(@PathParam("appId") appId: String): OneStageResource = { uiRoot.withSparkUI(appId, None) { ui => new OneStageResource(ui) } @@ -171,7 +171,7 @@ private[spark] object ApiRootResource { def getServletHandler(uiRoot: UIRoot): ServletContextHandler = { val jerseyContext = new ServletContextHandler(ServletContextHandler.NO_SESSIONS) jerseyContext.setContextPath("/api") - val holder:ServletHolder = new ServletHolder(classOf[ServletContainer]) + val holder: ServletHolder = new ServletHolder(classOf[ServletContainer]) holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass", "com.sun.jersey.api.core.PackagesResourceConfig") holder.setInitParameter("com.sun.jersey.config.property.packages", diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala index 07b224fac4..dfdc09c6ca 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/OneRDDResource.scala @@ -25,7 +25,7 @@ import org.apache.spark.ui.SparkUI private[v1] class OneRDDResource(ui: SparkUI) { @GET - def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = { + def rddData(@PathParam("rddId") rddId: Int): RDDStorageInfo = { AllRDDResource.getRDDStorageInfo(rddId, ui.storageListener, true).getOrElse( throw new NotFoundException(s"no rdd found w/ id $rddId") ) diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala index ef3c8570d8..2bec64f2ef 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala @@ -134,7 +134,7 @@ class StageData private[spark]( val accumulatorUpdates: Seq[AccumulableInfo], val tasks: Option[Map[Long, TaskData]], - val executorSummary:Option[Map[String,ExecutorStageSummary]]) + val executorSummary: Option[Map[String, ExecutorStageSummary]]) class TaskData private[spark]( val taskId: Long, diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala index 543df4e135..7478ab0fc2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSlaveEndpoint.scala @@ -40,7 +40,7 @@ class BlockManagerSlaveEndpoint( private implicit val asyncExecutionContext = ExecutionContext.fromExecutorService(asyncThreadPool) // Operations that involve removing blocks may be slow and should be done asynchronously - override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { + override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case RemoveBlock(blockId) => doAsync[Boolean]("removing block " + blockId, context) { blockManager.removeBlock(blockId) diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala index 8569c6f3cb..c5ba9af3e2 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerSource.scala @@ -17,9 +17,8 @@ package org.apache.spark.storage -import com.codahale.metrics.{Gauge,MetricRegistry} +import com.codahale.metrics.{Gauge, MetricRegistry} -import org.apache.spark.SparkContext import org.apache.spark.metrics.source.Source private[spark] class BlockManagerSource(val blockManager: BlockManager) diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 0b11e914bb..3788916cf3 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -137,7 +137,7 @@ private[spark] object SparkUI { jobProgressListener: JobProgressListener, securityManager: SecurityManager, appName: String, - startTime: Long): SparkUI = { + startTime: Long): SparkUI = { create(Some(sc), conf, listenerBus, securityManager, appName, jobProgressListener = Some(jobProgressListener), startTime = startTime) } diff --git a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala index 6194c50ec8..65162f4fdc 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIUtils.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIUtils.scala @@ -309,7 +309,7 @@ private[spark] object UIUtils extends Logging { started: Int, completed: Int, failed: Int, - skipped:Int, + skipped: Int, total: Int): Seq[Node] = { val completeWidth = "width: %s%%".format((completed.toDouble/total)*100) val startWidth = "width: %s%%".format((started.toDouble/total)*100) diff --git a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala index 5fbcd6bb8a..ba03acdb38 100644 --- a/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala +++ b/core/src/main/scala/org/apache/spark/ui/UIWorkloadGenerator.scala @@ -54,7 +54,7 @@ private[spark] object UIWorkloadGenerator { val sc = new SparkContext(conf) def setProperties(s: String): Unit = { - if(schedulingMode == SchedulingMode.FAIR) { + if (schedulingMode == SchedulingMode.FAIR) { sc.setLocalProperty("spark.scheduler.pool", s) } sc.setLocalProperty(SparkContext.SPARK_JOB_DESCRIPTION, s) diff --git a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala index fbce917a08..36943978ff 100644 --- a/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala +++ b/core/src/main/scala/org/apache/spark/ui/storage/RDDPage.scala @@ -33,7 +33,7 @@ private[ui] class RDDPage(parent: StorageTab) extends WebUIPage("rdd") { val parameterId = request.getParameter("id") require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") val rddId = parameterId.toInt - val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener,includeDetails = true) + val rddStorageInfo = AllRDDResource.getRDDStorageInfo(rddId, listener, includeDetails = true) .getOrElse { // Rather than crashing, render an "RDD Not Found" page return UIUtils.headerSparkPage("RDD Not Found", Seq[Node](), parent) diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 7513b1b795..96aa2fe164 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -63,7 +63,7 @@ private[spark] object AkkaUtils extends Logging { conf: SparkConf, securityManager: SecurityManager): (ActorSystem, Int) = { - val akkaThreads = conf.getInt("spark.akka.threads", 4) + val akkaThreads = conf.getInt("spark.akka.threads", 4) val akkaBatchSize = conf.getInt("spark.akka.batchSize", 15) val akkaTimeoutS = conf.getTimeAsSeconds("spark.akka.timeout", conf.get("spark.network.timeout", "120s")) diff --git a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala index 9044aaeef2..31d230d0fe 100644 --- a/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala +++ b/core/src/main/scala/org/apache/spark/util/CompletionIterator.scala @@ -42,7 +42,7 @@ abstract class CompletionIterator[ +A, +I <: Iterator[A]](sub: I) extends Iterat private[spark] object CompletionIterator { def apply[A, I <: Iterator[A]](sub: I, completionFunction: => Unit) : CompletionIterator[A, I] = { - new CompletionIterator[A,I](sub) { + new CompletionIterator[A, I](sub) { def completion(): Unit = completionFunction } } diff --git a/core/src/main/scala/org/apache/spark/util/Distribution.scala b/core/src/main/scala/org/apache/spark/util/Distribution.scala index 9aea8efa38..1bab707235 100644 --- a/core/src/main/scala/org/apache/spark/util/Distribution.scala +++ b/core/src/main/scala/org/apache/spark/util/Distribution.scala @@ -35,7 +35,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va java.util.Arrays.sort(data, startIdx, endIdx) val length = endIdx - startIdx - val defaultProbabilities = Array(0,0.25,0.5,0.75,1.0) + val defaultProbabilities = Array(0, 0.25, 0.5, 0.75, 1.0) /** * Get the value of the distribution at the given probabilities. Probabilities should be @@ -44,7 +44,7 @@ private[spark] class Distribution(val data: Array[Double], val startIdx: Int, va */ def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) : IndexedSeq[Double] = { - probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))} + probabilities.toIndexedSeq.map { p: Double => data(closestIndex(p)) } } private def closestIndex(p: Double) = { diff --git a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala index 2bbfc988a9..a8bbad0868 100644 --- a/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala +++ b/core/src/main/scala/org/apache/spark/util/MetadataCleaner.scala @@ -89,7 +89,7 @@ private[spark] object MetadataCleaner { conf: SparkConf, cleanerType: MetadataCleanerType.MetadataCleanerType, delay: Int) { - conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString) + conf.set(MetadataCleanerType.systemProperty(cleanerType), delay.toString) } /** diff --git a/core/src/main/scala/org/apache/spark/util/MutablePair.scala b/core/src/main/scala/org/apache/spark/util/MutablePair.scala index dad888548e..3d95b7869f 100644 --- a/core/src/main/scala/org/apache/spark/util/MutablePair.scala +++ b/core/src/main/scala/org/apache/spark/util/MutablePair.scala @@ -45,5 +45,5 @@ case class MutablePair[@specialized(Int, Long, Double, Char, Boolean/* , AnyRef override def toString: String = "(" + _1 + "," + _2 + ")" - override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[_,_]] + override def canEqual(that: Any): Boolean = that.isInstanceOf[MutablePair[_, _]] } diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index f38949c3cb..f1f6b5e1f9 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -54,14 +54,14 @@ object SizeEstimator extends Logging { def estimate(obj: AnyRef): Long = estimate(obj, new IdentityHashMap[AnyRef, AnyRef]) // Sizes of primitive types - private val BYTE_SIZE = 1 + private val BYTE_SIZE = 1 private val BOOLEAN_SIZE = 1 - private val CHAR_SIZE = 2 - private val SHORT_SIZE = 2 - private val INT_SIZE = 4 - private val LONG_SIZE = 8 - private val FLOAT_SIZE = 4 - private val DOUBLE_SIZE = 8 + private val CHAR_SIZE = 2 + private val SHORT_SIZE = 2 + private val INT_SIZE = 4 + private val LONG_SIZE = 8 + private val FLOAT_SIZE = 4 + private val DOUBLE_SIZE = 8 // Fields can be primitive types, sizes are: 1, 2, 4, 8. Or fields can be pointers. The size of // a pointer is 4 or 8 depending on the JVM (32-bit or 64-bit) and UseCompressedOops flag. @@ -96,7 +96,7 @@ object SizeEstimator extends Logging { isCompressedOops = getIsCompressedOops objectSize = if (!is64bit) 8 else { - if(!isCompressedOops) { + if (!isCompressedOops) { 16 } else { 12 diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b7a2473dfe..763d4db690 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -882,7 +882,7 @@ private[spark] object Utils extends Logging { // If not, we should change it to LRUCache or something. private val hostPortParseResults = new ConcurrentHashMap[String, (String, Int)]() - def parseHostPort(hostPort: String): (String, Int) = { + def parseHostPort(hostPort: String): (String, Int) = { // Check cache first. val cached = hostPortParseResults.get(hostPort) if (cached != null) { diff --git a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala index 41cb8cfe2a..9c15b1188d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/BitSet.scala @@ -161,7 +161,7 @@ class BitSet(numBits: Int) extends Serializable { override def hasNext: Boolean = ind >= 0 override def next(): Int = { val tmp = ind - ind = nextSetBit(ind + 1) + ind = nextSetBit(ind + 1) tmp } } diff --git a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala index 4f0bf8384a..9a7a5a4e74 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/SortDataFormat.scala @@ -90,9 +90,9 @@ class KVArraySortDataFormat[K, T <: AnyRef : ClassTag] extends SortDataFormat[K, override def swap(data: Array[T], pos0: Int, pos1: Int) { val tmpKey = data(2 * pos0) val tmpVal = data(2 * pos0 + 1) - data(2 * pos0) = data(2 * pos1) + data(2 * pos0) = data(2 * pos1) data(2 * pos0 + 1) = data(2 * pos1 + 1) - data(2 * pos1) = tmpKey + data(2 * pos1) = tmpKey data(2 * pos1 + 1) = tmpVal } diff --git a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala index 9e29bf9d61..effe6fa2ad 100644 --- a/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/random/StratifiedSamplingUtils.scala @@ -196,7 +196,7 @@ private[spark] object StratifiedSamplingUtils extends Logging { * * The sampling function has a unique seed per partition. */ - def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)], + def getBernoulliSamplingFunction[K, V](rdd: RDD[(K, V)], fractions: Map[K, Double], exact: Boolean, seed: Long): (Int, Iterator[(K, V)]) => Iterator[(K, V)] = { diff --git a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala index 75399461f2..746a40a21b 100644 --- a/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala +++ b/core/src/test/scala/org/apache/spark/AccumulatorSuite.scala @@ -103,7 +103,7 @@ class AccumulatorSuite extends FunSuite with Matchers with LocalSparkContext { sc = new SparkContext("local[" + nThreads + "]", "test") val setAcc = sc.accumulableCollection(mutable.HashSet[Int]()) val bufferAcc = sc.accumulableCollection(mutable.ArrayBuffer[Int]()) - val mapAcc = sc.accumulableCollection(mutable.HashMap[Int,String]()) + val mapAcc = sc.accumulableCollection(mutable.HashMap[Int, String]()) val d = sc.parallelize((1 to maxI) ++ (1 to maxI)) d.foreach { x => {setAcc += x; bufferAcc += x; mapAcc += (x -> x.toString)} diff --git a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala index e1faddeabe..91d8fdedbe 100644 --- a/core/src/test/scala/org/apache/spark/CheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/CheckpointSuite.scala @@ -218,10 +218,10 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging { val pairRDD = generateFatPairRDD() pairRDD.checkpoint() val unionRDD = new PartitionerAwareUnionRDD(sc, Array(pairRDD)) - val partitionBeforeCheckpoint = serializeDeserialize( + val partitionBeforeCheckpoint = serializeDeserialize( unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition]) pairRDD.count() - val partitionAfterCheckpoint = serializeDeserialize( + val partitionAfterCheckpoint = serializeDeserialize( unionRDD.partitions.head.asInstanceOf[PartitionerAwareUnionRDDPartition]) assert( partitionBeforeCheckpoint.parents.head.getClass != diff --git a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala index 0922a2c359..4a48f6580c 100644 --- a/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala +++ b/core/src/test/scala/org/apache/spark/ContextCleanerSuite.scala @@ -158,7 +158,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { rdd.count() // Test that GC does not cause RDD cleanup due to a strong reference - val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id)) + val preGCTester = new CleanerTester(sc, rddIds = Seq(rdd.id)) runGC() intercept[Exception] { preGCTester.assertCleanup()(timeout(1000 millis)) @@ -195,7 +195,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { var broadcast = newBroadcast() // Test that GC does not cause broadcast cleanup due to a strong reference - val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id)) + val preGCTester = new CleanerTester(sc, broadcastIds = Seq(broadcast.id)) runGC() intercept[Exception] { preGCTester.assertCleanup()(timeout(1000 millis)) @@ -267,7 +267,7 @@ class ContextCleanerSuite extends ContextCleanerSuiteBase { val shuffleIds = 0 until sc.newShuffleId val broadcastIds = broadcastBuffer.map(_.id) - val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) + val preGCTester = new CleanerTester(sc, rddIds, shuffleIds, broadcastIds) runGC() intercept[Exception] { preGCTester.assertCleanup()(timeout(1000 millis)) diff --git a/core/src/test/scala/org/apache/spark/FailureSuite.scala b/core/src/test/scala/org/apache/spark/FailureSuite.scala index 1212d0b432..cade1fda2c 100644 --- a/core/src/test/scala/org/apache/spark/FailureSuite.scala +++ b/core/src/test/scala/org/apache/spark/FailureSuite.scala @@ -57,7 +57,7 @@ class FailureSuite extends FunSuite with LocalSparkContext { FailureSuiteState.synchronized { assert(FailureSuiteState.tasksRun === 4) } - assert(results.toList === List(1,4,9)) + assert(results.toList === List(1, 4, 9)) FailureSuiteState.clear() } diff --git a/core/src/test/scala/org/apache/spark/FileServerSuite.scala b/core/src/test/scala/org/apache/spark/FileServerSuite.scala index c0439f9348..bff2d10b99 100644 --- a/core/src/test/scala/org/apache/spark/FileServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileServerSuite.scala @@ -81,7 +81,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test("Distributing files locally") { sc = new SparkContext("local[4]", "test", newConf) sc.addFile(tmpFile.toString) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) val result = sc.parallelize(testData).reduceByKey { val path = SparkFiles.get("FileServerSuite.txt") val in = new BufferedReader(new FileReader(path)) @@ -89,7 +89,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { in.close() _ * fileVal + _ * fileVal }.collect() - assert(result.toSet === Set((1,200), (2,300), (3,500))) + assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) } test("Distributing files locally security On") { @@ -100,7 +100,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { sc.addFile(tmpFile.toString) assert(sc.env.securityManager.isAuthenticationEnabled() === true) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) val result = sc.parallelize(testData).reduceByKey { val path = SparkFiles.get("FileServerSuite.txt") val in = new BufferedReader(new FileReader(path)) @@ -108,14 +108,14 @@ class FileServerSuite extends FunSuite with LocalSparkContext { in.close() _ * fileVal + _ * fileVal }.collect() - assert(result.toSet === Set((1,200), (2,300), (3,500))) + assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) } test("Distributing files locally using URL as input") { // addFile("file:///....") sc = new SparkContext("local[4]", "test", newConf) sc.addFile(new File(tmpFile.toString).toURI.toString) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) val result = sc.parallelize(testData).reduceByKey { val path = SparkFiles.get("FileServerSuite.txt") val in = new BufferedReader(new FileReader(path)) @@ -123,7 +123,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { in.close() _ * fileVal + _ * fileVal }.collect() - assert(result.toSet === Set((1,200), (2,300), (3,500))) + assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) } test ("Dynamically adding JARS locally") { @@ -140,7 +140,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test("Distributing files on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test", newConf) sc.addFile(tmpFile.toString) - val testData = Array((1,1), (1,1), (2,1), (3,5), (2,2), (3,0)) + val testData = Array((1, 1), (1, 1), (2, 1), (3, 5), (2, 2), (3, 0)) val result = sc.parallelize(testData).reduceByKey { val path = SparkFiles.get("FileServerSuite.txt") val in = new BufferedReader(new FileReader(path)) @@ -148,13 +148,13 @@ class FileServerSuite extends FunSuite with LocalSparkContext { in.close() _ * fileVal + _ * fileVal }.collect() - assert(result.toSet === Set((1,200), (2,300), (3,500))) + assert(result.toSet === Set((1, 200), (2, 300), (3, 500))) } test ("Dynamically adding JARS on a standalone cluster") { sc = new SparkContext("local-cluster[1,1,512]", "test", newConf) sc.addJar(tmpJarUrl) - val testData = Array((1,1)) + val testData = Array((1, 1)) sc.parallelize(testData).foreach { x => if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { throw new SparkException("jar not added") @@ -165,7 +165,7 @@ class FileServerSuite extends FunSuite with LocalSparkContext { test ("Dynamically adding JARS on a standalone cluster using local: URL") { sc = new SparkContext("local-cluster[1,1,512]", "test", newConf) sc.addJar(tmpJarUrl.replace("file", "local")) - val testData = Array((1,1)) + val testData = Array((1, 1)) sc.parallelize(testData).foreach { x => if (Thread.currentThread.getContextClassLoader.getResource("FileServerSuite.txt") == null) { throw new SparkException("jar not added") diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index c8f08eed47..d67de8692d 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -334,7 +334,7 @@ class FileSuite extends FunSuite with LocalSparkContext { } val copyRdd = mappedRdd.flatMap { curData: (String, PortableDataStream) => - for(i <- 1 to numOfCopies) yield (i, curData._2) + for (i <- 1 to numOfCopies) yield (i, curData._2) } val copyArr: Array[(Int, PortableDataStream)] = copyRdd.collect() diff --git a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala index 51348c039b..69314deda1 100644 --- a/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala +++ b/core/src/test/scala/org/apache/spark/ImplicitOrderingSuite.scala @@ -44,11 +44,11 @@ private object ImplicitOrderingSuite { class NonOrderedClass {} class ComparableClass extends Comparable[ComparableClass] { - override def compareTo(o: ComparableClass): Int = ??? + override def compareTo(o: ComparableClass): Int = throw new UnsupportedOperationException } class OrderedClass extends Ordered[OrderedClass] { - override def compare(o: OrderedClass): Int = ??? + override def compare(o: OrderedClass): Int = throw new UnsupportedOperationException } def basicMapExpectations(rdd: RDD[Int]): List[(Boolean, String)] = { diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index fafa4ed606..fafc9d4750 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -34,18 +34,18 @@ class SparkConfSuite extends FunSuite with LocalSparkContext with ResetSystemPro val conf = new SparkConf() // Simply exercise the API, we don't need a complete conversion test since that's handled in // UtilsSuite.scala - assert(conf.getSizeAsBytes("fake","1k") === ByteUnit.KiB.toBytes(1)) - assert(conf.getSizeAsKb("fake","1k") === ByteUnit.KiB.toKiB(1)) - assert(conf.getSizeAsMb("fake","1k") === ByteUnit.KiB.toMiB(1)) - assert(conf.getSizeAsGb("fake","1k") === ByteUnit.KiB.toGiB(1)) + assert(conf.getSizeAsBytes("fake", "1k") === ByteUnit.KiB.toBytes(1)) + assert(conf.getSizeAsKb("fake", "1k") === ByteUnit.KiB.toKiB(1)) + assert(conf.getSizeAsMb("fake", "1k") === ByteUnit.KiB.toMiB(1)) + assert(conf.getSizeAsGb("fake", "1k") === ByteUnit.KiB.toGiB(1)) } test("Test timeString conversion") { val conf = new SparkConf() // Simply exercise the API, we don't need a complete conversion test since that's handled in // UtilsSuite.scala - assert(conf.getTimeAsMs("fake","1ms") === TimeUnit.MILLISECONDS.toMillis(1)) - assert(conf.getTimeAsSeconds("fake","1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000)) + assert(conf.getTimeAsMs("fake", "1ms") === TimeUnit.MILLISECONDS.toMillis(1)) + assert(conf.getTimeAsSeconds("fake", "1000ms") === TimeUnit.MILLISECONDS.toSeconds(1000)) } test("loading from system properties") { diff --git a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala index 9049db7755..31ef5cd75b 100644 --- a/core/src/test/scala/org/apache/spark/SparkContextSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkContextSuite.scala @@ -222,8 +222,8 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { val dir1 = Utils.createTempDir() val dir2 = Utils.createTempDir() - val dirpath1=dir1.getAbsolutePath - val dirpath2=dir2.getAbsolutePath + val dirpath1 = dir1.getAbsolutePath + val dirpath2 = dir2.getAbsolutePath // file1 and file2 are placed inside dir1, they are also used for // textFile, hadoopFile, and newAPIHadoopFile @@ -235,11 +235,11 @@ class SparkContextSuite extends FunSuite with LocalSparkContext { val file4 = new File(dir2, "part-00001") val file5 = new File(dir2, "part-00002") - val filepath1=file1.getAbsolutePath - val filepath2=file2.getAbsolutePath - val filepath3=file3.getAbsolutePath - val filepath4=file4.getAbsolutePath - val filepath5=file5.getAbsolutePath + val filepath1 = file1.getAbsolutePath + val filepath2 = file2.getAbsolutePath + val filepath3 = file3.getAbsolutePath + val filepath4 = file4.getAbsolutePath + val filepath5 = file5.getAbsolutePath try { diff --git a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala index 06e5f1cf6b..c38e306b6a 100644 --- a/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala +++ b/core/src/test/scala/org/apache/spark/broadcast/BroadcastSuite.scala @@ -286,7 +286,7 @@ class BroadcastSuite extends FunSuite with LocalSparkContext { assert(statuses.size === expectedNumBlocks) } - testUnpersistBroadcast(distributed, numSlaves, torrentConf, afterCreation, + testUnpersistBroadcast(distributed, numSlaves, torrentConf, afterCreation, afterUsingBroadcast, afterUnpersist, removeFromDriver) } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala index 7cc2104281..e432b8e946 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerArgumentsTest.scala @@ -66,7 +66,7 @@ class WorkerArgumentsTest extends FunSuite { } } val conf = new MySparkConf() - val workerArgs = new WorkerArguments(args, conf) + val workerArgs = new WorkerArguments(args, conf) assert(workerArgs.memory === 5120) } diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala index 450fba21f4..93a779d5ce 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/WorkerSuite.scala @@ -25,7 +25,7 @@ import org.scalatest.{Matchers, FunSuite} class WorkerSuite extends FunSuite with Matchers { def cmd(javaOpts: String*): Command = { - Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts:_*)) + Command("", Seq.empty, Map.empty, Seq.empty, Seq.empty, Seq(javaOpts : _*)) } def conf(opts: (String, String)*): SparkConf = new SparkConf(loadDefaults = false).setAll(opts) diff --git a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala index ef3e213f1f..60dba3b2d6 100644 --- a/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala +++ b/core/src/test/scala/org/apache/spark/metrics/InputOutputMetricsSuite.scala @@ -263,7 +263,7 @@ class InputOutputMetricsSuite extends FunSuite with SharedSparkContext val tmpRdd = sc.textFile(tmpFilePath, numPartitions) - val firstSize= runAndReturnBytesRead { + val firstSize = runAndReturnBytesRead { aRdd.count() } val secondSize = runAndReturnBytesRead { @@ -433,10 +433,10 @@ class OldCombineTextRecordReaderWrapper( /** * Hadoop 2 has a version of this, but we can't use it for backwards compatibility */ -class NewCombineTextInputFormat extends NewCombineFileInputFormat[LongWritable,Text] { +class NewCombineTextInputFormat extends NewCombineFileInputFormat[LongWritable, Text] { def createRecordReader(split: NewInputSplit, context: TaskAttemptContext) : NewRecordReader[LongWritable, Text] = { - new NewCombineFileRecordReader[LongWritable,Text](split.asInstanceOf[NewCombineFileSplit], + new NewCombineFileRecordReader[LongWritable, Text](split.asInstanceOf[NewCombineFileSplit], context, classOf[NewCombineTextRecordReaderWrapper]) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index ca0d953d30..6564232986 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -512,17 +512,17 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { } test("lookup") { - val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) + val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7))) assert(pairs.partitioner === None) assert(pairs.lookup(1) === Seq(2)) - assert(pairs.lookup(5) === Seq(6,7)) + assert(pairs.lookup(5) === Seq(6, 7)) assert(pairs.lookup(-1) === Seq()) } test("lookup with partitioner") { - val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) + val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7))) val p = new Partitioner { def numPartitions: Int = 2 @@ -533,12 +533,12 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext { assert(shuffled.partitioner === Some(p)) assert(shuffled.lookup(1) === Seq(2)) - assert(shuffled.lookup(5) === Seq(6,7)) + assert(shuffled.lookup(5) === Seq(6, 7)) assert(shuffled.lookup(-1) === Seq()) } test("lookup with bad partitioner") { - val pairs = sc.parallelize(Array((1,2), (3,4), (5,6), (5,7))) + val pairs = sc.parallelize(Array((1, 2), (3, 4), (5, 6), (5, 7))) val p = new Partitioner { def numPartitions: Int = 2 diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index afc11bdc4d..8079d5dcae 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -338,10 +338,10 @@ class RDDSuite extends FunSuite with SharedSparkContext { } test("coalesced RDDs with locality") { - val data3 = sc.makeRDD(List((1,List("a","c")), (2,List("a","b","c")), (3,List("b")))) + val data3 = sc.makeRDD(List((1, List("a", "c")), (2, List("a", "b", "c")), (3, List("b")))) val coal3 = data3.coalesce(3) val list3 = coal3.partitions.flatMap(_.asInstanceOf[CoalescedRDDPartition].preferredLocation) - assert(list3.sorted === Array("a","b","c"), "Locality preferences are dropped") + assert(list3.sorted === Array("a", "b", "c"), "Locality preferences are dropped") // RDD with locality preferences spread (non-randomly) over 6 machines, m0 through m5 val data = sc.makeRDD((1 to 9).map(i => (i, (i to (i + 2)).map{ j => "m" + (j%6)}))) @@ -591,8 +591,8 @@ class RDDSuite extends FunSuite with SharedSparkContext { assert(sc.emptyRDD.isEmpty()) assert(sc.parallelize(Seq[Int]()).isEmpty()) assert(!sc.parallelize(Seq(1)).isEmpty()) - assert(sc.parallelize(Seq(1,2,3), 3).filter(_ < 0).isEmpty()) - assert(!sc.parallelize(Seq(1,2,3), 3).filter(_ > 1).isEmpty()) + assert(sc.parallelize(Seq(1, 2, 3), 3).filter(_ < 0).isEmpty()) + assert(!sc.parallelize(Seq(1, 2, 3), 3).filter(_ > 1).isEmpty()) } test("sample preserves partitioner") { @@ -609,49 +609,49 @@ class RDDSuite extends FunSuite with SharedSparkContext { val data = sc.parallelize(1 to n, 2) for (num <- List(5, 20, 100)) { - val sample = data.takeSample(withReplacement=false, num=num) + val sample = data.takeSample(withReplacement = false, num = num) assert(sample.size === num) // Got exactly num elements assert(sample.toSet.size === num) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { - val sample = data.takeSample(withReplacement=false, 20, seed) + val sample = data.takeSample(withReplacement = false, 20, seed) assert(sample.size === 20) // Got exactly 20 elements assert(sample.toSet.size === 20) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { - val sample = data.takeSample(withReplacement=false, 100, seed) + val sample = data.takeSample(withReplacement = false, 100, seed) assert(sample.size === 100) // Got only 100 elements assert(sample.toSet.size === 100) // Elements are distinct assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { - val sample = data.takeSample(withReplacement=true, 20, seed) + val sample = data.takeSample(withReplacement = true, 20, seed) assert(sample.size === 20) // Got exactly 20 elements assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } { - val sample = data.takeSample(withReplacement=true, num=20) + val sample = data.takeSample(withReplacement = true, num = 20) assert(sample.size === 20) // Got exactly 100 elements assert(sample.toSet.size <= 20, "sampling with replacement returned all distinct elements") assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } { - val sample = data.takeSample(withReplacement=true, num=n) + val sample = data.takeSample(withReplacement = true, num = n) assert(sample.size === n) // Got exactly 100 elements // Chance of getting all distinct elements is astronomically low, so test we got < 100 assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements") assert(sample.forall(x => 1 <= x && x <= n), s"elements not in [1, $n]") } for (seed <- 1 to 5) { - val sample = data.takeSample(withReplacement=true, n, seed) + val sample = data.takeSample(withReplacement = true, n, seed) assert(sample.size === n) // Got exactly 100 elements // Chance of getting all distinct elements is astronomically low, so test we got < 100 assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements") } for (seed <- 1 to 5) { - val sample = data.takeSample(withReplacement=true, 2 * n, seed) + val sample = data.takeSample(withReplacement = true, 2 * n, seed) assert(sample.size === 2 * n) // Got exactly 200 elements // Chance of getting all distinct elements is still quite low, so test we got < 100 assert(sample.toSet.size < n, "sampling with replacement returned all distinct elements") @@ -691,7 +691,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { } test("sortByKey") { - val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B")) + val data = sc.parallelize(Seq("5|50|A", "4|60|C", "6|40|B")) val col1 = Array("4|60|C", "5|50|A", "6|40|B") val col2 = Array("6|40|B", "5|50|A", "4|60|C") @@ -703,7 +703,7 @@ class RDDSuite extends FunSuite with SharedSparkContext { } test("sortByKey ascending parameter") { - val data = sc.parallelize(Seq("5|50|A","4|60|C", "6|40|B")) + val data = sc.parallelize(Seq("5|50|A", "4|60|C", "6|40|B")) val asc = Array("4|60|C", "5|50|A", "6|40|B") val desc = Array("6|40|B", "5|50|A", "4|60|C") @@ -764,9 +764,9 @@ class RDDSuite extends FunSuite with SharedSparkContext { } test("intersection strips duplicates in an input") { - val a = sc.parallelize(Seq(1,2,3,3)) - val b = sc.parallelize(Seq(1,1,2,3)) - val intersection = Array(1,2,3) + val a = sc.parallelize(Seq(1, 2, 3, 3)) + val b = sc.parallelize(Seq(1, 1, 2, 3)) + val intersection = Array(1, 2, 3) assert(a.intersection(b).collect().sorted === intersection) assert(b.intersection(a).collect().sorted === intersection) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala index fe695d85e2..194dc45d6e 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuiteUtils.scala @@ -21,11 +21,11 @@ object RDDSuiteUtils { case class Person(first: String, last: String, age: Int) object AgeOrdering extends Ordering[Person] { - def compare(a:Person, b:Person): Int = a.age.compare(b.age) + def compare(a: Person, b: Person): Int = a.age.compare(b.age) } object NameOrdering extends Ordering[Person] { - def compare(a:Person, b:Person): Int = - implicitly[Ordering[Tuple2[String,String]]].compare((a.last, a.first), (b.last, b.first)) + def compare(a: Person, b: Person): Int = + implicitly[Ordering[Tuple2[String, String]]].compare((a.last, a.first), (b.last, b.first)) } } diff --git a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala index 64b1c24c47..54fc914722 100644 --- a/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/SortingSuite.scala @@ -26,7 +26,7 @@ class SortingSuite extends FunSuite with SharedSparkContext with Matchers with L test("sortByKey") { val pairs = sc.parallelize(Array((1, 0), (2, 0), (0, 0), (3, 0)), 2) - assert(pairs.sortByKey().collect() === Array((0,0), (1,0), (2,0), (3,0))) + assert(pairs.sortByKey().collect() === Array((0, 0), (1, 0), (2, 0), (3, 0))) } test("large array") { @@ -136,7 +136,7 @@ class SortingSuite extends FunSuite with SharedSparkContext with Matchers with L test("get a range of elements in an array not partitioned by a range partitioner") { val pairArr = util.Random.shuffle((1 to 1000).toList).map(x => (x, x)) - val pairs = sc.parallelize(pairArr,10) + val pairs = sc.parallelize(pairArr, 10) val range = pairs.filterByRange(200, 800).collect() assert((800 to 200 by -1).toArray.sorted === range.map(_._1).sorted) } diff --git a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala index ae3339d80f..21eb71d9ac 100644 --- a/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala +++ b/core/src/test/scala/org/apache/spark/rpc/RpcEnvSuite.scala @@ -42,7 +42,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } override def afterAll(): Unit = { - if(env != null) { + if (env != null) { env.shutdown() } } @@ -75,7 +75,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { } }) - val anotherEnv = createRpcEnv(new SparkConf(), "remote" ,13345) + val anotherEnv = createRpcEnv(new SparkConf(), "remote", 13345) // Use anotherEnv to find out the RpcEndpointRef val rpcEndpointRef = anotherEnv.setupEndpointRef("local", env.address, "send-remotely") try { @@ -338,7 +338,7 @@ abstract class RpcEnvSuite extends FunSuite with BeforeAndAfterAll { test("call receive in sequence") { // If a RpcEnv implementation breaks the `receive` contract, hope this test can expose it - for(i <- 0 until 100) { + for (i <- 0 until 100) { @volatile var result = 0 val endpointRef = env.setupEndpoint(s"receive-in-sequence-$i", new ThreadSafeRpcEndpoint { override val rpcEnv = env diff --git a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala index f77661ccbd..3821166386 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/CoarseGrainedSchedulerBackendSuite.scala @@ -26,8 +26,8 @@ class CoarseGrainedSchedulerBackendSuite extends FunSuite with LocalSparkContext test("serialized task larger than akka frame size") { val conf = new SparkConf - conf.set("spark.akka.frameSize","1") - conf.set("spark.default.parallelism","1") + conf.set("spark.akka.frameSize", "1") + conf.set("spark.default.parallelism", "1") sc = new SparkContext("local-cluster[2 , 1 , 512]", "test", conf) val frameSize = AkkaUtils.maxFrameSizeBytes(sc.conf) val buffer = new SerializableBuffer(java.nio.ByteBuffer.allocate(2 * frameSize)) diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 46642236e4..eea7a60084 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -375,7 +375,7 @@ class DAGSchedulerSuite (1 to 30).foreach(_ => rdd = rdd.zip(rdd)) // getPreferredLocs runs quickly, indicating that exponential graph traversal is avoided. failAfter(10 seconds) { - val preferredLocs = scheduler.getPreferredLocs(rdd,0) + val preferredLocs = scheduler.getPreferredLocs(rdd, 0) // No preferred locations are returned. assert(preferredLocs.length === 0) } @@ -634,8 +634,8 @@ class DAGSchedulerSuite val listener1 = new FailureRecordingJobListener() val listener2 = new FailureRecordingJobListener() - submit(reduceRdd1, Array(0, 1), listener=listener1) - submit(reduceRdd2, Array(0, 1), listener=listener2) + submit(reduceRdd1, Array(0, 1), listener = listener1) + submit(reduceRdd2, Array(0, 1), listener = listener2) val stageFailureMessage = "Exception failure in map stage" failed(taskSets(0), stageFailureMessage) diff --git a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala index e8f461e2f5..456451b676 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/PoolSuite.scala @@ -97,9 +97,9 @@ class PoolSuite extends FunSuite with LocalSparkContext { assert(rootPool.getSchedulableByName("3").weight === 1) val properties1 = new Properties() - properties1.setProperty("spark.scheduler.pool","1") + properties1.setProperty("spark.scheduler.pool", "1") val properties2 = new Properties() - properties2.setProperty("spark.scheduler.pool","2") + properties2.setProperty("spark.scheduler.pool", "2") val taskSetManager10 = createTaskSetManager(0, 1, taskScheduler) val taskSetManager11 = createTaskSetManager(1, 1, taskScheduler) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index ef50bc9438..14c0172fa9 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -109,7 +109,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { check((1, 1)) check((1, 1L)) check((1L, 1)) - check((1L, 1L)) + check((1L, 1L)) check((1.0, 1)) check((1, 1.0)) check((1.0, 1.0)) @@ -147,7 +147,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { check(List(Some(mutable.HashMap(1->1, 2->2)), None, Some(mutable.HashMap(3->4)))) check(List( mutable.HashMap("one" -> 1, "two" -> 2), - mutable.HashMap(1->"one",2->"two",3->"three"))) + mutable.HashMap(1->"one", 2->"two", 3->"three"))) } test("ranges") { diff --git a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala index 433fd6bb4a..673948d84d 100644 --- a/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/ProactiveClosureSerializationSuite.scala @@ -66,18 +66,18 @@ class ProactiveClosureSerializationSuite extends FunSuite with SharedSparkContex } private def xmap(x: RDD[String], uc: UnserializableClass): RDD[String] = - x.map(y=>uc.op(y)) + x.map(y => uc.op(y)) private def xflatMap(x: RDD[String], uc: UnserializableClass): RDD[String] = - x.flatMap(y=>Seq(uc.op(y))) + x.flatMap(y => Seq(uc.op(y))) private def xfilter(x: RDD[String], uc: UnserializableClass): RDD[String] = - x.filter(y=>uc.pred(y)) + x.filter(y => uc.pred(y)) private def xmapPartitions(x: RDD[String], uc: UnserializableClass): RDD[String] = - x.mapPartitions(_.map(y=>uc.op(y))) + x.mapPartitions(_.map(y => uc.op(y))) private def xmapPartitionsWithIndex(x: RDD[String], uc: UnserializableClass): RDD[String] = - x.mapPartitionsWithIndex((_, it) => it.map(y=>uc.op(y))) + x.mapPartitionsWithIndex((_, it) => it.map(y => uc.op(y))) } diff --git a/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala b/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala index 86fcf44728..c1e0a29a34 100644 --- a/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala +++ b/core/src/test/scala/org/apache/spark/serializer/TestSerializer.scala @@ -32,16 +32,19 @@ class TestSerializer extends Serializer { class TestSerializerInstance extends SerializerInstance { - override def serialize[T: ClassTag](t: T): ByteBuffer = ??? + override def serialize[T: ClassTag](t: T): ByteBuffer = throw new UnsupportedOperationException - override def serializeStream(s: OutputStream): SerializationStream = ??? + override def serializeStream(s: OutputStream): SerializationStream = + throw new UnsupportedOperationException override def deserializeStream(s: InputStream): TestDeserializationStream = new TestDeserializationStream - override def deserialize[T: ClassTag](bytes: ByteBuffer): T = ??? + override def deserialize[T: ClassTag](bytes: ByteBuffer): T = + throw new UnsupportedOperationException - override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = ??? + override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = + throw new UnsupportedOperationException } diff --git a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala index bcf138b5ee..47341b74e9 100644 --- a/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/FlatmapIteratorSuite.scala @@ -59,10 +59,10 @@ class FlatmapIteratorSuite extends FunSuite with LocalSparkContext { .set("spark.serializer.objectStreamReset", "10") sc = new SparkContext(sconf) val expand_size = 500 - val data = sc.parallelize(Seq(1,2)). + val data = sc.parallelize(Seq(1, 2)). flatMap(x => Stream.range(1, expand_size). - map(y => "%d: string test %d".format(y,x))) - var persisted = data.persist(StorageLevel.MEMORY_ONLY_SER) + map(y => "%d: string test %d".format(y, x))) + val persisted = data.persist(StorageLevel.MEMORY_ONLY_SER) assert(persisted.filter(_.startsWith("1:")).count()===2) } diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index b6f5accef0..a727a43f44 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -483,11 +483,11 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before val jobsJson = getJson(sc.ui.get, "jobs") jobsJson.children.size should be (expJobInfo.size) for { - (job @ JObject(_),idx) <- jobsJson.children.zipWithIndex + (job @ JObject(_), idx) <- jobsJson.children.zipWithIndex id = (job \ "jobId").extract[String] name = (job \ "name").extract[String] } { - withClue(s"idx = $idx; id = $id; name = ${name.substring(0,20)}") { + withClue(s"idx = $idx; id = $id; name = ${name.substring(0, 20)}") { id should be (expJobInfo(idx)._1) name should include (expJobInfo(idx)._2) } @@ -540,12 +540,12 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers with Before goToUi(sc, "/stages/stage/?id=12&attempt=0") find("no-info").get.text should be ("No information to display for Stage 12 (Attempt 0)") - val badStage = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get,"stages/12/0")) + val badStage = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get, "stages/12/0")) badStage._1 should be (HttpServletResponse.SC_NOT_FOUND) badStage._2 should be (None) badStage._3 should be (Some("unknown stage: 12")) - val badAttempt = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get,"stages/19/15")) + val badAttempt = HistoryServerSuite.getContentAndCode(apiUrl(sc.ui.get, "stages/19/15")) badAttempt._1 should be (HttpServletResponse.SC_NOT_FOUND) badAttempt._2 should be (None) badAttempt._3 should be (Some("unknown attempt for stage 19. Found attempts: [0]")) diff --git a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala index 7b38e6d947..8778042e34 100644 --- a/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/storage/StorageTabSuite.scala @@ -169,7 +169,7 @@ class StorageTabSuite extends FunSuite with BeforeAndAfter { test("verify StorageTab contains all cached rdds") { val rddInfo0 = new RDDInfo(0, "rdd0", 1, memOnly, Seq(4)) - val rddInfo1 = new RDDInfo(1, "rdd1", 1 ,memOnly, Seq(4)) + val rddInfo1 = new RDDInfo(1, "rdd1", 1, memOnly, Seq(4)) val stageInfo0 = new StageInfo(0, 0, "stage0", 1, Seq(rddInfo0), Seq.empty, "details") val stageInfo1 = new StageInfo(1, 0, "stage1", 1, Seq(rddInfo1), Seq.empty, "details") val taskMetrics0 = new TaskMetrics diff --git a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala index bec79fc4dc..ccdb3f5714 100644 --- a/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/AkkaUtilsSuite.scala @@ -138,7 +138,7 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext with ResetSystemPro assert(securityManagerGood.isAuthenticationEnabled() === true) - val slaveRpcEnv =RpcEnv.create("spark-slave", hostname, 0, goodconf, securityManagerGood) + val slaveRpcEnv = RpcEnv.create("spark-slave", hostname, 0, goodconf, securityManagerGood) val slaveTracker = new MapOutputTrackerWorker(conf) slaveTracker.trackerEndpoint = slaveRpcEnv.setupEndpointRef("spark", rpcEnv.address, MapOutputTracker.ENDPOINT_NAME) diff --git a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala index 61152c29a6..afa5cdc819 100644 --- a/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/UtilsSuite.scala @@ -551,7 +551,7 @@ class UtilsSuite extends FunSuite with ResetSystemProperties with Logging { test("fetch hcfs dir") { val tempDir = Utils.createTempDir() val sourceDir = new File(tempDir, "source-dir") - val innerSourceDir = Utils.createTempDir(root=sourceDir.getPath) + val innerSourceDir = Utils.createTempDir(root = sourceDir.getPath) val sourceFile = File.createTempFile("someprefix", "somesuffix", innerSourceDir) val targetDir = new File(tempDir, "target-dir") Files.write("some text", sourceFile, UTF_8) diff --git a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala index b85a409a4b..ffc2069919 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/BitSetSuite.scala @@ -94,7 +94,7 @@ class BitSetSuite extends FunSuite { test( "xor len(bitsetX) > len(bitsetY)" ) { val setBitsX = Seq( 0, 1, 3, 37, 38, 41, 85) - val setBitsY = Seq( 0, 2, 3, 37, 41 ) + val setBitsY = Seq( 0, 2, 3, 37, 41) val bitsetX = new BitSet(100) setBitsX.foreach( i => bitsetX.set(i)) val bitsetY = new BitSet(60)