Fix Scala Style

Any comments are welcome

Author: Sandeep <sandeep@techaddict.me>

Closes #531 from techaddict/stylefix-1 and squashes the following commits:

7492730 [Sandeep] Pass 4
98b2428 [Sandeep] fix rxin suggestions
b5e2e6f [Sandeep] Pass 3
05932d7 [Sandeep] fix if else styling 2
08690e5 [Sandeep] fix if else styling
This commit is contained in:
Sandeep 2014-04-24 15:07:23 -07:00 committed by Reynold Xin
parent c5c1916dd1
commit a03ac222d8
20 changed files with 109 additions and 83 deletions

View file

@ -104,8 +104,11 @@ class Accumulable[R, T] (
* Set the accumulator's value; only allowed on master.
*/
def value_= (newValue: R) {
if (!deserialized) value_ = newValue
else throw new UnsupportedOperationException("Can't assign accumulator value in task")
if (!deserialized) {
value_ = newValue
} else {
throw new UnsupportedOperationException("Can't assign accumulator value in task")
}
}
/**

View file

@ -66,8 +66,7 @@ private[spark] class SparkSubmitArguments(args: Array[String]) {
if (k.startsWith("spark")) {
defaultProperties(k) = v
if (verbose) SparkSubmit.printStream.println(s"Adding default property: $k=$v")
}
else {
} else {
SparkSubmit.printWarning(s"Ignoring non-spark config property: $k=$v")
}
}

View file

@ -237,8 +237,7 @@ private[spark] class Master(
if (waitingDrivers.contains(d)) {
waitingDrivers -= d
self ! DriverStateChanged(driverId, DriverState.KILLED, None)
}
else {
} else {
// We just notify the worker to kill the driver here. The final bookkeeping occurs
// on the return path when the worker submits a state change back to the master
// to notify it that the driver was successfully killed.

View file

@ -91,9 +91,11 @@ private[spark] class DriverRunner(
}
val state =
if (killed) { DriverState.KILLED }
else if (finalException.isDefined) { DriverState.ERROR }
else {
if (killed) {
DriverState.KILLED
} else if (finalException.isDefined) {
DriverState.ERROR
} else {
finalExitCode match {
case Some(0) => DriverState.FINISHED
case _ => DriverState.FAILED

View file

@ -89,8 +89,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
Previous {Utils.bytesToString(math.min(byteLength, startByte))}
</button>
</a>
}
else {
} else {
<button type="button" class="btn btn-default" disabled="disabled">
Previous 0 B
</button>
@ -104,8 +103,7 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
Next {Utils.bytesToString(math.min(byteLength, logLength - endByte))}
</button>
</a>
}
else {
} else {
<button type="button" class="btn btn-default" disabled="disabled">
Next 0 B
</button>
@ -137,9 +135,13 @@ private[spark] class LogPage(parent: WorkerWebUI) extends WebUIPage("logPage") {
val logLength = file.length()
val getOffset = offset.getOrElse(logLength - defaultBytes)
val startByte =
if (getOffset < 0) 0L
else if (getOffset > logLength) logLength
else getOffset
if (getOffset < 0) {
0L
} else if (getOffset > logLength) {
logLength
} else {
getOffset
}
val logPageLength = math.min(byteLength, maxBytes)
val endByte = math.min(startByte + logPageLength, logLength)
(startByte, endByte)

View file

@ -281,7 +281,9 @@ private[spark] class BlockManager(
val onDiskSize = status.diskSize
master.updateBlockInfo(
blockManagerId, blockId, storageLevel, inMemSize, onDiskSize, inTachyonSize)
} else true
} else {
true
}
}
/**
@ -676,7 +678,7 @@ private[spark] class BlockManager(
tachyonStore.putValues(blockId, iterator, level, false)
case ArrayBufferValues(array) =>
tachyonStore.putValues(blockId, array, level, false)
case ByteBufferValues(bytes) =>
case ByteBufferValues(bytes) =>
bytes.rewind()
tachyonStore.putBytes(blockId, bytes, level)
}
@ -695,7 +697,7 @@ private[spark] class BlockManager(
diskStore.putValues(blockId, iterator, level, askForBytes)
case ArrayBufferValues(array) =>
diskStore.putValues(blockId, array, level, askForBytes)
case ByteBufferValues(bytes) =>
case ByteBufferValues(bytes) =>
bytes.rewind()
diskStore.putBytes(blockId, bytes, level)
}

View file

@ -43,8 +43,11 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin
}
override def +=(elem: A): this.type = {
if (size < maxSize) underlying.offer(elem)
else maybeReplaceLowest(elem)
if (size < maxSize) {
underlying.offer(elem)
} else {
maybeReplaceLowest(elem)
}
this
}
@ -59,7 +62,8 @@ private[spark] class BoundedPriorityQueue[A](maxSize: Int)(implicit ord: Orderin
if (head != null && ord.gt(a, head)) {
underlying.poll()
underlying.offer(a)
} else false
} else {
false
}
}
}

View file

@ -113,7 +113,9 @@ private[spark] class FileLogger(
* @param withTime Whether to prepend message with a timestamp
*/
def log(msg: String, withTime: Boolean = false) {
val writeInfo = if (!withTime) msg else {
val writeInfo = if (!withTime) {
msg
} else {
val date = new Date(System.currentTimeMillis())
dateFormat.get.format(date) + ": " + msg
}

View file

@ -811,8 +811,7 @@ private[spark] object Utils extends Logging {
} else {
el.getMethodName
}
}
else {
} else {
firstUserLine = el.getLineNumber
firstUserFile = el.getFileName
firstUserClass = el.getClassName

View file

@ -381,8 +381,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
val prng42 = new Random(42)
val prng43 = new Random(43)
Array(1, 2, 3, 4, 5, 6).filter{i =>
if (i < 4) 0 == prng42.nextInt(3)
else 0 == prng43.nextInt(3)}
if (i < 4) 0 == prng42.nextInt(3) else 0 == prng43.nextInt(3)
}
}
assert(sample.size === checkSample.size)
for (i <- 0 until sample.size) assert(sample(i) === checkSample(i))

View file

@ -49,8 +49,7 @@ object LogQuery {
System.getenv("SPARK_HOME"), SparkContext.jarOfClass(this.getClass).toSeq)
val dataSet =
if (args.length == 2) sc.textFile(args(1))
else sc.parallelize(exampleApacheLogs)
if (args.length == 2) sc.textFile(args(1)) else sc.parallelize(exampleApacheLogs)
// scalastyle:off
val apacheLogRegex =
"""^([\d.]+) (\S+) (\S+) \[([\w\d:/]+\s[+\-]\d{4})\] "(.+?)" (\d{3}) ([\d\-]+) "([^"]+)" "([^"]+)".*""".r

View file

@ -69,8 +69,11 @@ object PageViewStream {
val normalCount = statuses.filter(_ == 200).size
val errorCount = statuses.size - normalCount
val errorRatio = errorCount.toFloat / statuses.size
if (errorRatio > 0.05) {"%s: **%s**".format(zip, errorRatio)}
else {"%s: %s".format(zip, errorRatio)}
if (errorRatio > 0.05) {
"%s: **%s**".format(zip, errorRatio)
} else {
"%s: %s".format(zip, errorRatio)
}
}
// Return the number unique users in last 15 seconds

View file

@ -165,8 +165,11 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
// not have any edges in the specified direction.
assert(edges.count === 50)
edges.collect.foreach {
case (vid, edges) => if (vid > 0 && vid < 49) assert(edges.size == 2)
else assert(edges.size == 1)
case (vid, edges) => if (vid > 0 && vid < 49) {
assert(edges.size == 2)
} else {
assert(edges.size == 1)
}
}
edges.collect.foreach {
case (vid, edges) =>

View file

@ -47,9 +47,13 @@ trait SparkExprTyper extends Logging {
var isIncomplete = false
reporter.withIncompleteHandler((_, _) => isIncomplete = true) {
val trees = codeParser.stmts(line)
if (reporter.hasErrors) Some(Nil)
else if (isIncomplete) None
else Some(trees)
if (reporter.hasErrors) {
Some(Nil)
} else if (isIncomplete) {
None
} else {
Some(trees)
}
}
}
// def parsesAsExpr(line: String) = {
@ -70,8 +74,7 @@ trait SparkExprTyper extends Logging {
val sym0 = symbolOfTerm(name)
// drop NullaryMethodType
val sym = sym0.cloneSymbol setInfo afterTyper(sym0.info.finalResultType)
if (sym.info.typeSymbol eq UnitClass) NoSymbol
else sym
if (sym.info.typeSymbol eq UnitClass) NoSymbol else sym
case _ => NoSymbol
}
}

View file

@ -203,8 +203,9 @@ case class InsertIntoParquetTable(
val stageId = sc.newRddId()
val taskIdOffset =
if (overwrite) 1
else {
if (overwrite) {
1
} else {
FileSystemHelper
.findMaxTaskId(NewFileOutputFormat.getOutputPath(job).toString, job.getConfiguration) + 1
}

View file

@ -158,8 +158,11 @@ private[parquet] class CatalystGroupConverter(
a => a.dataType match {
case ctype: NativeType =>
// note: for some reason matching for StringType fails so use this ugly if instead
if (ctype == StringType) new CatalystPrimitiveStringConverter(this, schema.indexOf(a))
else new CatalystPrimitiveConverter(this, schema.indexOf(a))
if (ctype == StringType) {
new CatalystPrimitiveStringConverter(this, schema.indexOf(a))
} else {
new CatalystPrimitiveConverter(this, schema.indexOf(a))
}
case _ => throw new RuntimeException(
s"unable to convert datatype ${a.dataType.toString} in CatalystGroupConverter")
}

View file

@ -240,8 +240,7 @@ class ReceiverTracker(ssc: StreamingContext) extends Logging {
if (hasLocationPreferences) {
val receiversWithPreferences = receivers.map(r => (r, Seq(r.preferredLocation.get)))
ssc.sc.makeRDD[Receiver[_]](receiversWithPreferences)
}
else {
} else {
ssc.sc.makeRDD(receivers, receivers.size)
}

View file

@ -147,12 +147,12 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
// LOCAL_DIRS => 2.X, YARN_LOCAL_DIRS => 0.23.X
val localDirs = Option(System.getenv("YARN_LOCAL_DIRS"))
.orElse(Option(System.getenv("LOCAL_DIRS")))
localDirs match {
case None => throw new Exception("Yarn Local dirs can't be empty")
case Some(l) => l
}
}
}
private def getApplicationAttemptId(): ApplicationAttemptId = {
val envs = System.getenv()
@ -321,8 +321,9 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
logInfo("Allocating %d containers to make up for (potentially) lost containers".
format(missingExecutorCount))
yarnAllocator.allocateContainers(missingExecutorCount)
} else {
sendProgress()
}
else sendProgress()
Thread.sleep(sleepTime)
}
}
@ -361,7 +362,7 @@ class ApplicationMaster(args: ApplicationMasterArguments, conf: Configuration,
return
}
isFinished = true
logInfo("finishApplicationMaster with " + status)
if (registered) {
val finishReq = Records.newRecord(classOf[FinishApplicationMasterRequest])

View file

@ -243,8 +243,9 @@ class ExecutorLauncher(args: ApplicationMasterArguments, conf: Configuration, sp
logInfo("Allocating " + missingExecutorCount +
" containers to make up for (potentially ?) lost containers")
yarnAllocator.allocateContainers(missingExecutorCount)
} else {
sendProgress()
}
else sendProgress()
Thread.sleep(sleepTime)
}
}

View file

@ -60,12 +60,12 @@ object AllocationType extends Enumeration {
*/
private[yarn] class YarnAllocationHandler(
val conf: Configuration,
val resourceManager: AMRMProtocol,
val resourceManager: AMRMProtocol,
val appAttemptId: ApplicationAttemptId,
val maxExecutors: Int,
val executorMemory: Int,
val executorCores: Int,
val preferredHostToCount: Map[String, Int],
val preferredHostToCount: Map[String, Int],
val preferredRackToCount: Map[String, Int],
val sparkConf: SparkConf)
extends Logging {
@ -136,9 +136,10 @@ private[yarn] class YarnAllocationHandler(
val containers = hostToContainers.getOrElseUpdate(host, new ArrayBuffer[Container]())
containers += container
} else {
// Add all ignored containers to released list
releasedContainerList.add(container.getId())
}
// Add all ignored containers to released list
else releasedContainerList.add(container.getId())
}
// Find the appropriate containers to use. Slightly non trivial groupBy ...
@ -159,8 +160,7 @@ private[yarn] class YarnAllocationHandler(
dataLocalContainers.put(candidateHost, remainingContainers)
// all consumed
remainingContainers = null
}
else if (requiredHostCount > 0) {
} else if (requiredHostCount > 0) {
// Container list has more containers than we need for data locality.
// Split into two : data local container count of (remainingContainers.size -
// requiredHostCount) and rest as remainingContainer
@ -170,7 +170,7 @@ private[yarn] class YarnAllocationHandler(
// remainingContainers = remaining
// yarn has nasty habit of allocating a tonne of containers on a host - discourage this :
// add remaining to release list. If we have insufficient containers, next allocation
// add remaining to release list. If we have insufficient containers, next allocation
// cycle will reallocate (but wont treat it as data local)
for (container <- remaining) releasedContainerList.add(container.getId())
remainingContainers = null
@ -182,7 +182,7 @@ private[yarn] class YarnAllocationHandler(
if (rack != null){
val maxExpectedRackCount = preferredRackToCount.getOrElse(rack, 0)
val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
val requiredRackCount = maxExpectedRackCount - allocatedContainersOnRack(rack) -
rackLocalContainers.get(rack).getOrElse(List()).size
@ -191,8 +191,7 @@ private[yarn] class YarnAllocationHandler(
dataLocalContainers.put(rack, remainingContainers)
// All consumed
remainingContainers = null
}
else if (requiredRackCount > 0) {
} else if (requiredRackCount > 0) {
// container list has more containers than we need for data locality.
// Split into two : data local container count of (remainingContainers.size -
// requiredRackCount) and rest as remainingContainer
@ -213,7 +212,7 @@ private[yarn] class YarnAllocationHandler(
}
}
// Now that we have split the containers into various groups, go through them in order :
// Now that we have split the containers into various groups, go through them in order :
// first host local, then rack local and then off rack (everything else).
// Note that the list we create below tries to ensure that not all containers end up within a
// host if there are sufficiently large number of hosts/containers.
@ -238,8 +237,7 @@ private[yarn] class YarnAllocationHandler(
releasedContainerList.add(containerId)
// reset counter back to old value.
numExecutorsRunning.decrementAndGet()
}
else {
} else {
// Deallocate + allocate can result in reusing id's wrongly - so use a different counter
// (executorIdCounter)
val executorId = executorIdCounter.incrementAndGet().toString
@ -293,8 +291,7 @@ private[yarn] class YarnAllocationHandler(
// Was this released by us ? If yes, then simply remove from containerSet and move on.
if (pendingReleaseContainers.containsKey(containerId)) {
pendingReleaseContainers.remove(containerId)
}
else {
} else {
// Simply decrement count - next iteration of ReporterThread will take care of allocating.
numExecutorsRunning.decrementAndGet()
logInfo("Completed container %s (state: %s, exit status: %s)".format(
@ -319,8 +316,11 @@ private[yarn] class YarnAllocationHandler(
assert (containerSet != null)
containerSet -= containerId
if (containerSet.isEmpty) allocatedHostToContainersMap.remove(host)
else allocatedHostToContainersMap.update(host, containerSet)
if (containerSet.isEmpty) {
allocatedHostToContainersMap.remove(host)
} else {
allocatedHostToContainersMap.update(host, containerSet)
}
allocatedContainerToHostMap -= containerId
@ -328,8 +328,11 @@ private[yarn] class YarnAllocationHandler(
val rack = YarnAllocationHandler.lookupRack(conf, host)
if (rack != null) {
val rackCount = allocatedRackCount.getOrElse(rack, 0) - 1
if (rackCount > 0) allocatedRackCount.put(rack, rackCount)
else allocatedRackCount.remove(rack)
if (rackCount > 0) {
allocatedRackCount.put(rack, rackCount)
} else {
allocatedRackCount.remove(rack)
}
}
}
}
@ -365,10 +368,10 @@ private[yarn] class YarnAllocationHandler(
}
}
val requestedContainers: ArrayBuffer[ResourceRequest] =
val requestedContainers: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](rackToCounts.size)
for ((rack, count) <- rackToCounts){
requestedContainers +=
requestedContainers +=
createResourceRequest(AllocationType.RACK, rack, count, YarnAllocationHandler.PRIORITY)
}
@ -401,11 +404,10 @@ private[yarn] class YarnAllocationHandler(
preferredHostToCount.isEmpty)
resourceRequests = List(createResourceRequest(
AllocationType.ANY, null, numExecutors, YarnAllocationHandler.PRIORITY))
}
else {
// request for all hosts in preferred nodes and for numExecutors -
} else {
// request for all hosts in preferred nodes and for numExecutors -
// candidates.size, request by default allocation policy.
val hostContainerRequests: ArrayBuffer[ResourceRequest] =
val hostContainerRequests: ArrayBuffer[ResourceRequest] =
new ArrayBuffer[ResourceRequest](preferredHostToCount.size)
for ((candidateHost, candidateCount) <- preferredHostToCount) {
val requiredCount = candidateCount - allocatedContainersOnHost(candidateHost)
@ -449,8 +451,7 @@ private[yarn] class YarnAllocationHandler(
if (numExecutors > 0) {
logInfo("Allocating %d executor containers with %d of memory each.".format(numExecutors,
executorMemory + YarnAllocationHandler.MEMORY_OVERHEAD))
}
else {
} else {
logDebug("Empty allocation req .. release : " + releasedContainerList)
}
@ -467,7 +468,7 @@ private[yarn] class YarnAllocationHandler(
private def createResourceRequest(
requestType: AllocationType.AllocationType,
requestType: AllocationType.AllocationType,
resource:String,
numExecutors: Int,
priority: Int): ResourceRequest = {
@ -528,7 +529,7 @@ private[yarn] class YarnAllocationHandler(
if (! retval.isEmpty) {
releasedContainerList.removeAll(retval)
for (v <- retval) pendingReleaseContainers.put(v, true)
logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
logInfo("Releasing " + retval.size + " containers. pendingReleaseContainers : " +
pendingReleaseContainers)
}
@ -539,7 +540,7 @@ private[yarn] class YarnAllocationHandler(
object YarnAllocationHandler {
val ANY_HOST = "*"
// All requests are issued with same priority : we do not (yet) have any distinction between
// All requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
val PRIORITY = 1
@ -548,7 +549,7 @@ object YarnAllocationHandler {
// Host to rack map - saved from allocation requests
// We are expecting this not to change.
// Note that it is possible for this to change : and RM will indicate that to us via update
// Note that it is possible for this to change : and RM will indicate that to us via update
// response to allocate. But we are punting on handling that for now.
private val hostToRack = new ConcurrentHashMap[String, String]()
private val rackToHostSet = new ConcurrentHashMap[String, JSet[String]]()
@ -565,7 +566,7 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
args.numExecutors,
args.numExecutors,
args.executorMemory,
args.executorCores,
Map[String, Int](),
@ -587,7 +588,7 @@ object YarnAllocationHandler {
conf,
resourceManager,
appAttemptId,
args.numExecutors,
args.numExecutors,
args.executorMemory,
args.executorCores,
hostToCount,