[SPARK-7927] whitespace fixes for streaming.

So we can enable a whitespace enforcement rule in the style checker to save code review time.

Author: Reynold Xin <rxin@databricks.com>

Closes #6475 from rxin/whitespace-streaming and squashes the following commits:

810dae4 [Reynold Xin] Fixed tests.
89068ad [Reynold Xin] [SPARK-7927] whitespace fixes for streaming.
This commit is contained in:
Reynold Xin 2015-05-28 17:55:22 -07:00
parent 1bd63e82fd
commit 3af0b3136e
19 changed files with 40 additions and 35 deletions

View file

@ -461,7 +461,7 @@ class StreamingContext private[streaming] (
val conf = sc_.hadoopConfiguration
conf.setInt(FixedLengthBinaryInputFormat.RECORD_LENGTH_PROPERTY, recordLength)
val br = fileStream[LongWritable, BytesWritable, FixedLengthBinaryInputFormat](
directory, FileInputDStream.defaultFilter : Path => Boolean, newFilesOnly=true, conf)
directory, FileInputDStream.defaultFilter: Path => Boolean, newFilesOnly = true, conf)
val data = br.map { case (k, v) =>
val bytes = v.getBytes
require(bytes.length == recordLength, "Byte array does not have correct length. " +

View file

@ -227,7 +227,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
:JavaPairDStream[K, JIterable[V]] = {
: JavaPairDStream[K, JIterable[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
.mapValues(asJavaIterable _)
}
@ -247,7 +247,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
):JavaPairDStream[K, JIterable[V]] = {
): JavaPairDStream[K, JIterable[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
.mapValues(asJavaIterable _)
}
@ -262,7 +262,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* batching interval
*/
def reduceByKeyAndWindow(reduceFunc: JFunction2[V, V, V], windowDuration: Duration)
:JavaPairDStream[K, V] = {
: JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
}
@ -281,7 +281,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
reduceFunc: JFunction2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
):JavaPairDStream[K, V] = {
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
}

View file

@ -659,7 +659,7 @@ abstract class DStream[T: ClassTag] (
// DStreams can't be serialized with closures, we can't proactively check
// it for serializability and so we pass the optional false to SparkContext.clean
val cleanedF = context.sparkContext.clean(transformFunc, false)
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
assert(rdds.length == 1)
cleanedF(rdds.head.asInstanceOf[RDD[T]], time)
}

View file

@ -69,7 +69,7 @@ import org.apache.spark.util.{TimeStampedHashMap, Utils}
* processing semantics are undefined.
*/
private[streaming]
class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
class FileInputDStream[K, V, F <: NewInputFormat[K, V]](
@transient ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
@ -251,7 +251,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
/** Generate one RDD from an array of files */
private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
val fileRDDs = files.map(file =>{
val fileRDDs = files.map { file =>
val rdd = serializableConfOpt.map(_.value) match {
case Some(config) => context.sparkContext.newAPIHadoopFile(
file,
@ -267,7 +267,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
"Refer to the streaming programming guide for more details.")
}
rdd
})
}
new UnionRDD(context.sparkContext, fileRDDs)
}
@ -294,7 +294,7 @@ class FileInputDStream[K, V, F <: NewInputFormat[K,V]](
private def readObject(ois: ObjectInputStream): Unit = Utils.tryOrIOException {
logDebug(this.getClass().getSimpleName + ".readObject used")
ois.defaultReadObject()
generatedRDDs = new mutable.HashMap[Time, RDD[(K,V)]] ()
generatedRDDs = new mutable.HashMap[Time, RDD[(K, V)]]()
batchTimeToSelectedFiles =
new mutable.HashMap[Time, Array[String]] with mutable.SynchronizedMap[Time, Array[String]]
recentlySelectedFiles = new mutable.HashSet[String]()

View file

@ -32,7 +32,7 @@ import org.apache.spark.streaming.StreamingContext.rddToFileName
/**
* Extra functions available on DStream of (key, value) pairs through an implicit conversion.
*/
class PairDStreamFunctions[K, V](self: DStream[(K,V)])
class PairDStreamFunctions[K, V](self: DStream[(K, V)])
(implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K])
extends Serializable
{

View file

@ -38,7 +38,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
_windowDuration: Duration,
_slideDuration: Duration,
partitioner: Partitioner
) extends DStream[(K,V)](parent.ssc) {
) extends DStream[(K, V)](parent.ssc) {
require(_windowDuration.isMultipleOf(parent.slideDuration),
"The window duration of ReducedWindowedDStream (" + _windowDuration + ") " +
@ -58,7 +58,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
super.persist(StorageLevel.MEMORY_ONLY_SER)
reducedStream.persist(StorageLevel.MEMORY_ONLY_SER)
def windowDuration: Duration = _windowDuration
def windowDuration: Duration = _windowDuration
override def dependencies: List[DStream[_]] = List(reducedStream)
@ -68,7 +68,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
override def parentRememberDuration: Duration = rememberDuration + windowDuration
override def persist(storageLevel: StorageLevel): DStream[(K,V)] = {
override def persist(storageLevel: StorageLevel): DStream[(K, V)] = {
super.persist(storageLevel)
reducedStream.persist(storageLevel)
this
@ -118,7 +118,7 @@ class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
// Get the RDD of the reduced value of the previous window
val previousWindowRDD =
getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K,V)]()))
getOrCompute(previousWindow.endTime).getOrElse(ssc.sc.makeRDD(Seq[(K, V)]()))
// Make the list of RDDs that needs to cogrouped together for reducing their reduced values
val allRDDs = new ArrayBuffer[RDD[(K, V)]]() += previousWindowRDD ++= oldRDDs ++= newRDDs

View file

@ -25,19 +25,19 @@ import scala.reflect.ClassTag
private[streaming]
class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
parent: DStream[(K,V)],
parent: DStream[(K, V)],
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true
) extends DStream[(K,C)] (parent.ssc) {
) extends DStream[(K, C)] (parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[(K,C)]] = {
override def compute(validTime: Time): Option[RDD[(K, C)]] = {
parent.getOrCompute(validTime) match {
case Some(rdd) => Some(rdd.combineByKey[C](
createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))

View file

@ -51,7 +51,7 @@ class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
val finalFunc = (iterator: Iterator[(K, (Iterable[V], Iterable[S]))]) => {
val i = iterator.map(t => {
val itr = t._2._2.iterator
val headOption = if(itr.hasNext) Some(itr.next) else None
val headOption = if (itr.hasNext) Some(itr.next()) else None
(t._1, t._2._1.toSeq, headOption)
})
updateFuncLocal(i)

View file

@ -44,7 +44,7 @@ class WindowedDStream[T: ClassTag](
// Persist parent level by default, as those RDDs are going to be obviously reused.
parent.persist(StorageLevel.MEMORY_ONLY_SER)
def windowDuration: Duration = _windowDuration
def windowDuration: Duration = _windowDuration
override def dependencies: List[DStream[_]] = List(parent)
@ -68,7 +68,7 @@ class WindowedDStream[T: ClassTag](
new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow)
} else {
logDebug("Using normal union for windowing at " + validTime)
new UnionRDD(ssc.sc,rddsInWindow)
new UnionRDD(ssc.sc, rddsInWindow)
}
Some(windowRDD)
}

View file

@ -164,7 +164,7 @@ private[streaming] class BlockGenerator(
private def keepPushingBlocks() {
logInfo("Started block pushing thread")
try {
while(!stopped) {
while (!stopped) {
Option(blocksForPushing.poll(100, TimeUnit.MILLISECONDS)) match {
case Some(block) => pushBlock(block)
case None =>

View file

@ -17,8 +17,9 @@
package org.apache.spark.streaming.receiver
import com.google.common.util.concurrent.{RateLimiter => GuavaRateLimiter}
import org.apache.spark.{Logging, SparkConf}
import com.google.common.util.concurrent.{RateLimiter=>GuavaRateLimiter}
/** Provides waitToPush() method to limit the rate at which receivers consume data.
*

View file

@ -230,7 +230,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false
class ReceiverLauncher {
@transient val env = ssc.env
@volatile @transient private var running = false
@transient val thread = new Thread() {
@transient val thread = new Thread() {
override def run() {
try {
SparkEnv.set(env)

View file

@ -200,7 +200,7 @@ private[streaming] class FileBasedWriteAheadLog(
/** Initialize the log directory or recover existing logs inside the directory */
private def initializeOrRecover(): Unit = synchronized {
val logDirectoryPath = new Path(logDirectory)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
val fileSystem = HdfsUtils.getFileSystemForPath(logDirectoryPath, hadoopConf)
if (fileSystem.exists(logDirectoryPath) && fileSystem.getFileStatus(logDirectoryPath).isDir) {
val logFileInfo = logFilesTologInfo(fileSystem.listStatus(logDirectoryPath).map { _.getPath })

View file

@ -27,7 +27,7 @@ object RawTextHelper {
* Splits lines and counts the words.
*/
def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = {
val map = new OpenHashMap[String,Long]
val map = new OpenHashMap[String, Long]
var i = 0
var j = 0
while (iter.hasNext) {
@ -98,7 +98,7 @@ object RawTextHelper {
* before real workload starts.
*/
def warmUp(sc: SparkContext) {
for(i <- 0 to 1) {
for (i <- 0 to 1) {
sc.parallelize(1 to 200000, 1000)
.map(_ % 1331).map(_.toString)
.mapPartitions(splitAndCountPartitions).reduceByKey(_ + _, 10)

View file

@ -255,7 +255,7 @@ class BasicOperationsSuite extends TestSuiteBase {
Seq( )
)
val operation = (s1: DStream[String], s2: DStream[String]) => {
s1.map(x => (x,1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq))
s1.map(x => (x, 1)).cogroup(s2.map(x => (x, "x"))).mapValues(x => (x._1.toSeq, x._2.toSeq))
}
testOperation(inputData1, inputData2, operation, outputData, true)
}
@ -427,9 +427,9 @@ class BasicOperationsSuite extends TestSuiteBase {
test("updateStateByKey - object lifecycle") {
val inputData =
Seq(
Seq("a","b"),
Seq("a", "b"),
null,
Seq("a","c","a"),
Seq("a", "c", "a"),
Seq("c"),
null,
null

View file

@ -418,7 +418,7 @@ class TestServer(portToBind: Int = 0) extends Logging {
val servingThread = new Thread() {
override def run() {
try {
while(true) {
while (true) {
logInfo("Accepting connections on port " + port)
val clientSocket = serverSocket.accept()
if (startLatch.getCount == 1) {

View file

@ -732,7 +732,9 @@ class SlowTestReceiver(totalRecords: Int, recordsPerSecond: Int)
def onStop() {
// Simulate slow receiver by waiting for all records to be produced
while(!SlowTestReceiver.receivedAllRecords) Thread.sleep(100)
while (!SlowTestReceiver.receivedAllRecords) {
Thread.sleep(100)
}
// no clean to be done, the receiving thread should stop on it own
}
}

View file

@ -133,8 +133,10 @@ class StreamingListenerSuite extends TestSuiteBase with Matchers {
/** Check if a sequence of numbers is in increasing order */
def isInIncreasingOrder(seq: Seq[Long]): Boolean = {
for(i <- 1 until seq.size) {
if (seq(i - 1) > seq(i)) return false
for (i <- 1 until seq.size) {
if (seq(i - 1) > seq(i)) {
return false
}
}
true
}

View file

@ -64,7 +64,7 @@ class StreamingJobProgressListenerSuite extends TestSuiteBase with Matchers {
listener.numTotalReceivedRecords should be (0)
// onBatchStarted
val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
val batchInfoStarted = BatchInfo(Time(1000), streamIdToNumRecords, 1000, Some(2000), None)
listener.onBatchStarted(StreamingListenerBatchStarted(batchInfoStarted))
listener.waitingBatches should be (Nil)
listener.runningBatches should be (List(BatchUIData(batchInfoStarted)))