Spark 1095 : Adding explicit return types to all public methods

Excluded those that are self-evident and the cases that are discussed in the mailing list.

Author: NirmalReddy <nirmal_reddy2000@yahoo.com>
Author: NirmalReddy <nirmal.reddy@imaginea.com>

Closes #168 from NirmalReddy/Spark-1095 and squashes the following commits:

ac54b29 [NirmalReddy] import misplaced
8c5ff3e [NirmalReddy] Changed syntax of unit returning methods
02d0778 [NirmalReddy] fixed explicit types in all the other packages
1c17773 [NirmalReddy] fixed explicit types in core package
This commit is contained in:
NirmalReddy 2014-03-26 18:24:55 -07:00 committed by Patrick Wendell
parent be6d96c15b
commit 3e63d98f09
25 changed files with 97 additions and 57 deletions

View file

@ -35,6 +35,7 @@ import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat, Job => NewHad
import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFormat}
import org.apache.mesos.MesosNativeLibrary
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
import org.apache.spark.rdd._
@ -230,7 +231,7 @@ class SparkContext(
postEnvironmentUpdate()
/** A default Hadoop Configuration for the Hadoop code (e.g. file systems) that we reuse. */
val hadoopConfiguration = {
val hadoopConfiguration: Configuration = {
val env = SparkEnv.get
val hadoopConf = SparkHadoopUtil.get.newConfiguration()
// Explicitly check for S3 environment variables
@ -630,7 +631,7 @@ class SparkContext(
* standard mutable collections. So you can use this with mutable Map, Set, etc.
*/
def accumulableCollection[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
(initialValue: R) = {
(initialValue: R): Accumulable[R, T] = {
val param = new GrowableAccumulableParam[R,T]
new Accumulable(initialValue, param)
}
@ -640,7 +641,7 @@ class SparkContext(
* [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions.
* The variable will be sent to each cluster only once.
*/
def broadcast[T](value: T) = env.broadcastManager.newBroadcast[T](value, isLocal)
def broadcast[T](value: T): Broadcast[T] = env.broadcastManager.newBroadcast[T](value, isLocal)
/**
* Add a file to be downloaded with this Spark job on every node.
@ -1126,7 +1127,7 @@ object SparkContext extends Logging {
implicit def rddToAsyncRDDActions[T: ClassTag](rdd: RDD[T]) = new AsyncRDDActions(rdd)
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
@ -1163,27 +1164,33 @@ object SparkContext extends Logging {
}
// Helper objects for converting common types to Writable
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = {
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T)
: WritableConverter[T] = {
val wClass = classTag[W].runtimeClass.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}
implicit def intWritableConverter() = simpleWritableConverter[Int, IntWritable](_.get)
implicit def intWritableConverter(): WritableConverter[Int] =
simpleWritableConverter[Int, IntWritable](_.get)
implicit def longWritableConverter() = simpleWritableConverter[Long, LongWritable](_.get)
implicit def longWritableConverter(): WritableConverter[Long] =
simpleWritableConverter[Long, LongWritable](_.get)
implicit def doubleWritableConverter() = simpleWritableConverter[Double, DoubleWritable](_.get)
implicit def doubleWritableConverter(): WritableConverter[Double] =
simpleWritableConverter[Double, DoubleWritable](_.get)
implicit def floatWritableConverter() = simpleWritableConverter[Float, FloatWritable](_.get)
implicit def floatWritableConverter(): WritableConverter[Float] =
simpleWritableConverter[Float, FloatWritable](_.get)
implicit def booleanWritableConverter() =
implicit def booleanWritableConverter(): WritableConverter[Boolean] =
simpleWritableConverter[Boolean, BooleanWritable](_.get)
implicit def bytesWritableConverter() = {
implicit def bytesWritableConverter(): WritableConverter[Array[Byte]] = {
simpleWritableConverter[Array[Byte], BytesWritable](_.getBytes)
}
implicit def stringWritableConverter() = simpleWritableConverter[String, Text](_.toString)
implicit def stringWritableConverter(): WritableConverter[String] =
simpleWritableConverter[String, Text](_.toString)
implicit def writableWritableConverter[T <: Writable]() =
new WritableConverter[T](_.runtimeClass.asInstanceOf[Class[T]], _.asInstanceOf[T])

View file

@ -391,19 +391,24 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
/**
* Save this RDD as a text file, using string representations of elements.
*/
def saveAsTextFile(path: String) = rdd.saveAsTextFile(path)
def saveAsTextFile(path: String): Unit = {
rdd.saveAsTextFile(path)
}
/**
* Save this RDD as a compressed text file, using string representations of elements.
*/
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]) =
def saveAsTextFile(path: String, codec: Class[_ <: CompressionCodec]): Unit = {
rdd.saveAsTextFile(path, codec)
}
/**
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
def saveAsObjectFile(path: String): Unit = {
rdd.saveAsObjectFile(path)
}
/**
* Creates tuples of the elements in this RDD by applying `f`.
@ -420,7 +425,9 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* executed on this RDD. It is strongly recommended that this RDD is persisted in
* memory, otherwise saving it on a file will require recomputation.
*/
def checkpoint() = rdd.checkpoint()
def checkpoint(): Unit = {
rdd.checkpoint()
}
/**
* Return whether this RDD has been checkpointed or not

View file

@ -463,7 +463,7 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
sc.setCheckpointDir(dir)
}
def getCheckpointDir = JavaUtils.optionToOptional(sc.getCheckpointDir)
def getCheckpointDir: Optional[String] = JavaUtils.optionToOptional(sc.getCheckpointDir)
protected def checkpointFile[T](path: String): JavaRDD[T] = {
implicit val ctag: ClassTag[T] = fakeClassTag

View file

@ -112,5 +112,5 @@ private[spark] class ClientArguments(args: Array[String]) {
}
object ClientArguments {
def isValidJarUrl(s: String) = s.matches("(.+):(.+)jar")
def isValidJarUrl(s: String): Boolean = s.matches("(.+):(.+)jar")
}

View file

@ -32,7 +32,7 @@ import scala.collection.JavaConversions._
* Contains util methods to interact with Hadoop from Spark.
*/
class SparkHadoopUtil {
val conf = newConfiguration()
val conf: Configuration = newConfiguration()
UserGroupInformation.setConfiguration(conf)
def runAsUser(user: String)(func: () => Unit) {

View file

@ -20,6 +20,7 @@ package org.apache.spark.deploy.master
import scala.collection.JavaConversions._
import akka.serialization.Serialization
import org.apache.curator.framework.CuratorFramework
import org.apache.zookeeper.CreateMode
import org.apache.spark.{Logging, SparkConf}
@ -29,7 +30,7 @@ class ZooKeeperPersistenceEngine(serialization: Serialization, conf: SparkConf)
with Logging
{
val WORKING_DIR = conf.get("spark.deploy.zookeeper.dir", "/spark") + "/master_status"
val zk = SparkCuratorUtil.newClient(conf)
val zk: CuratorFramework = SparkCuratorUtil.newClient(conf)
SparkCuratorUtil.mkdir(zk, WORKING_DIR)

View file

@ -38,7 +38,7 @@ class ConsoleSink(val property: Properties, val registry: MetricRegistry,
case None => CONSOLE_DEFAULT_PERIOD
}
val pollUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
val pollUnit: TimeUnit = Option(property.getProperty(CONSOLE_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CONSOLE_DEFAULT_UNIT)
}

View file

@ -41,7 +41,7 @@ class CsvSink(val property: Properties, val registry: MetricRegistry,
case None => CSV_DEFAULT_PERIOD
}
val pollUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
val pollUnit: TimeUnit = Option(property.getProperty(CSV_KEY_UNIT)) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(CSV_DEFAULT_UNIT)
}

View file

@ -39,7 +39,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
val GRAPHITE_KEY_UNIT = "unit"
val GRAPHITE_KEY_PREFIX = "prefix"
def propertyToOption(prop: String) = Option(property.getProperty(prop))
def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) {
throw new Exception("Graphite sink requires 'host' property.")
@ -57,7 +57,7 @@ class GraphiteSink(val property: Properties, val registry: MetricRegistry,
case None => GRAPHITE_DEFAULT_PERIOD
}
val pollUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
val pollUnit: TimeUnit = propertyToOption(GRAPHITE_KEY_UNIT) match {
case Some(s) => TimeUnit.valueOf(s.toUpperCase())
case None => TimeUnit.valueOf(GRAPHITE_DEFAULT_UNIT)
}

View file

@ -103,7 +103,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part:
array
}
override val partitioner = Some(part)
override val partitioner: Some[Partitioner] = Some(part)
override def compute(s: Partition, context: TaskContext): Iterator[(K, CoGroupCombiner)] = {
val sparkConf = SparkEnv.get.conf

View file

@ -171,7 +171,7 @@ class HadoopRDD[K, V](
array
}
override def compute(theSplit: Partition, context: TaskContext) = {
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new NextIterator[(K, V)] {
val split = theSplit.asInstanceOf[HadoopPartition]

View file

@ -116,7 +116,7 @@ class JdbcRDD[T: ClassTag](
}
object JdbcRDD {
def resultSetToObjectArray(rs: ResultSet) = {
def resultSetToObjectArray(rs: ResultSet): Array[Object] = {
Array.tabulate[Object](rs.getMetaData.getColumnCount)(i => rs.getObject(i + 1))
}
}

View file

@ -80,7 +80,7 @@ class NewHadoopRDD[K, V](
result
}
override def compute(theSplit: Partition, context: TaskContext) = {
override def compute(theSplit: Partition, context: TaskContext): InterruptibleIterator[(K, V)] = {
val iter = new Iterator[(K, V)] {
val split = theSplit.asInstanceOf[NewHadoopPartition]
logInfo("Input split: " + split.serializableHadoopSplit)

View file

@ -121,7 +121,7 @@ abstract class RDD[T: ClassTag](
@transient var name: String = null
/** Assign a name to this RDD */
def setName(_name: String) = {
def setName(_name: String): RDD[T] = {
name = _name
this
}

View file

@ -126,15 +126,16 @@ object StorageLevel {
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, 2)
/** Create a new StorageLevel object */
def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean, replication: Int = 1) =
def apply(useDisk: Boolean, useMemory: Boolean, deserialized: Boolean,
replication: Int = 1): StorageLevel =
getCachedStorageLevel(new StorageLevel(useDisk, useMemory, deserialized, replication))
/** Create a new StorageLevel object from its integer representation */
def apply(flags: Int, replication: Int) =
def apply(flags: Int, replication: Int): StorageLevel =
getCachedStorageLevel(new StorageLevel(flags, replication))
/** Read StorageLevel object from ObjectInput stream */
def apply(in: ObjectInput) = {
def apply(in: ObjectInput): StorageLevel = {
val obj = new StorageLevel()
obj.readExternal(in)
getCachedStorageLevel(obj)

View file

@ -19,6 +19,8 @@ package org.apache.spark.util
import java.io.PrintStream
import scala.collection.immutable.IndexedSeq
/**
* Util for getting some stats from a small sample of numeric values, with some handy
* summary functions.
@ -40,7 +42,8 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int)
* given from 0 to 1
* @param probabilities
*/
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities) = {
def getQuantiles(probabilities: Traversable[Double] = defaultProbabilities)
: IndexedSeq[Double] = {
probabilities.toIndexedSeq.map{p:Double => data(closestIndex(p))}
}
@ -48,7 +51,7 @@ class Distribution(val data: Array[Double], val startIdx: Int, val endIdx: Int)
math.min((p * length).toInt + startIdx, endIdx - 1)
}
def showQuantiles(out: PrintStream = System.out) = {
def showQuantiles(out: PrintStream = System.out): Unit = {
out.println("min\t25%\t50%\t75%\tmax")
getQuantiles(defaultProbabilities).foreach{q => out.print(q + "\t")}
out.println

View file

@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit
import com.codahale.metrics.MetricRegistry
import com.codahale.metrics.ganglia.GangliaReporter
import info.ganglia.gmetric4j.gmetric.GMetric
import info.ganglia.gmetric4j.gmetric.GMetric.UDPAddressingMode
import org.apache.spark.SecurityManager
import org.apache.spark.metrics.MetricsSystem
@ -33,10 +34,10 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
val GANGLIA_DEFAULT_PERIOD = 10
val GANGLIA_KEY_UNIT = "unit"
val GANGLIA_DEFAULT_UNIT = TimeUnit.SECONDS
val GANGLIA_DEFAULT_UNIT: TimeUnit = TimeUnit.SECONDS
val GANGLIA_KEY_MODE = "mode"
val GANGLIA_DEFAULT_MODE = GMetric.UDPAddressingMode.MULTICAST
val GANGLIA_DEFAULT_MODE: UDPAddressingMode = GMetric.UDPAddressingMode.MULTICAST
// TTL for multicast messages. If listeners are X hops away in network, must be at least X.
val GANGLIA_KEY_TTL = "ttl"
@ -45,7 +46,7 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
val GANGLIA_KEY_HOST = "host"
val GANGLIA_KEY_PORT = "port"
def propertyToOption(prop: String) = Option(property.getProperty(prop))
def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop))
if (!propertyToOption(GANGLIA_KEY_HOST).isDefined) {
throw new Exception("Ganglia sink requires 'host' property.")
@ -58,11 +59,12 @@ class GangliaSink(val property: Properties, val registry: MetricRegistry,
val host = propertyToOption(GANGLIA_KEY_HOST).get
val port = propertyToOption(GANGLIA_KEY_PORT).get.toInt
val ttl = propertyToOption(GANGLIA_KEY_TTL).map(_.toInt).getOrElse(GANGLIA_DEFAULT_TTL)
val mode = propertyToOption(GANGLIA_KEY_MODE)
val mode: UDPAddressingMode = propertyToOption(GANGLIA_KEY_MODE)
.map(u => GMetric.UDPAddressingMode.valueOf(u.toUpperCase)).getOrElse(GANGLIA_DEFAULT_MODE)
val pollPeriod = propertyToOption(GANGLIA_KEY_PERIOD).map(_.toInt)
.getOrElse(GANGLIA_DEFAULT_PERIOD)
val pollUnit = propertyToOption(GANGLIA_KEY_UNIT).map(u => TimeUnit.valueOf(u.toUpperCase))
val pollUnit: TimeUnit = propertyToOption(GANGLIA_KEY_UNIT)
.map(u => TimeUnit.valueOf(u.toUpperCase))
.getOrElse(GANGLIA_DEFAULT_UNIT)
MetricsSystem.checkMinimalPollingPeriod(pollUnit, pollPeriod)

View file

@ -419,5 +419,6 @@ object Graph {
* All the convenience operations are defined in the [[GraphOps]] class which may be
* shared across multiple graph implementations.
*/
implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag](g: Graph[VD, ED]) = g.ops
implicit def graphToGraphOps[VD: ClassTag, ED: ClassTag]
(g: Graph[VD, ED]): GraphOps[VD, ED] = g.ops
} // end of Graph object

View file

@ -197,7 +197,7 @@ class GraphImpl[VD: ClassTag, ED: ClassTag] protected (
override def mapReduceTriplets[A: ClassTag](
mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
reduceFunc: (A, A) => A,
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None) = {
activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A] = {
ClosureCleaner.clean(mapFunc)
ClosureCleaner.clean(reduceFunc)

View file

@ -26,7 +26,7 @@ import org.apache.spark.graphx.PartitionStrategy._
*/
object Analytics extends Logging {
def main(args: Array[String]) = {
def main(args: Array[String]): Unit = {
val host = args(0)
val taskType = args(1)
val fname = args(2)

View file

@ -431,7 +431,7 @@ class StreamingContext private[streaming] (
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
*/
def stop(stopSparkContext: Boolean = true) = synchronized {
def stop(stopSparkContext: Boolean = true): Unit = synchronized {
scheduler.stop()
logInfo("StreamingContext stopped successfully")
waiter.notifyStop()
@ -489,7 +489,7 @@ object StreamingContext extends Logging {
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
*/
def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls)
def jarOfClass(cls: Class[_]): Seq[String] = SparkContext.jarOfClass(cls)
private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
// Set the default cleaner delay to an hour if not already set.

View file

@ -49,7 +49,9 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print() = dstream.print()
def print(): Unit = {
dstream.print()
}
/**
* Return a new DStream in which each RDD has a single element generated by counting each RDD
@ -401,7 +403,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* Enable periodic checkpointing of RDDs of this DStream.
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration) = {
def checkpoint(interval: Duration): DStream[T] = {
dstream.checkpoint(interval)
}
}

View file

@ -477,31 +477,41 @@ class JavaStreamingContext(val ssc: StreamingContext) {
/**
* Start the execution of the streams.
*/
def start() = ssc.start()
def start(): Unit = {
ssc.start()
}
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
*/
def awaitTermination() = ssc.awaitTermination()
def awaitTermination(): Unit = {
ssc.awaitTermination()
}
/**
* Wait for the execution to stop. Any exceptions that occurs during the execution
* will be thrown in this thread.
* @param timeout time to wait in milliseconds
*/
def awaitTermination(timeout: Long) = ssc.awaitTermination(timeout)
def awaitTermination(timeout: Long): Unit = {
ssc.awaitTermination(timeout)
}
/**
* Stop the execution of the streams. Will stop the associated JavaSparkContext as well.
*/
def stop() = ssc.stop()
def stop(): Unit = {
ssc.stop()
}
/**
* Stop the execution of the streams.
* @param stopSparkContext Stop the associated SparkContext or not
*/
def stop(stopSparkContext: Boolean) = ssc.stop(stopSparkContext)
def stop(stopSparkContext: Boolean): Unit = {
ssc.stop(stopSparkContext)
}
}
/**
@ -579,7 +589,7 @@ object JavaStreamingContext {
* Find the JAR from which a given class was loaded, to make it easy for users to pass
* their JARs to StreamingContext.
*/
def jarOfClass(cls: Class[_]) = SparkContext.jarOfClass(cls).toArray
def jarOfClass(cls: Class[_]): Array[String] = SparkContext.jarOfClass(cls).toArray
}
/**

View file

@ -503,14 +503,18 @@ abstract class DStream[T: ClassTag] (
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
@deprecated("use foreachRDD", "0.9.0")
def foreach(foreachFunc: RDD[T] => Unit) = this.foreachRDD(foreachFunc)
def foreach(foreachFunc: RDD[T] => Unit): Unit = {
this.foreachRDD(foreachFunc)
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* 'this' DStream will be registered as an output stream and therefore materialized.
*/
@deprecated("use foreachRDD", "0.9.0")
def foreach(foreachFunc: (RDD[T], Time) => Unit) = this.foreachRDD(foreachFunc)
def foreach(foreachFunc: (RDD[T], Time) => Unit): Unit = {
this.foreachRDD(foreachFunc)
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so

View file

@ -39,17 +39,19 @@ case class BatchInfo(
* was submitted to the streaming scheduler. Essentially, it is
* `processingStartTime` - `submissionTime`.
*/
def schedulingDelay = processingStartTime.map(_ - submissionTime)
def schedulingDelay: Option[Long] = processingStartTime.map(_ - submissionTime)
/**
* Time taken for the all jobs of this batch to finish processing from the time they started
* processing. Essentially, it is `processingEndTime` - `processingStartTime`.
*/
def processingDelay = processingEndTime.zip(processingStartTime).map(x => x._1 - x._2).headOption
def processingDelay: Option[Long] = processingEndTime.zip(processingStartTime)
.map(x => x._1 - x._2).headOption
/**
* Time taken for all the jobs of this batch to finish processing from the time they
* were submitted. Essentially, it is `processingDelay` + `schedulingDelay`.
*/
def totalDelay = schedulingDelay.zip(processingDelay).map(x => x._1 + x._2).headOption
def totalDelay: Option[Long] = schedulingDelay.zip(processingDelay)
.map(x => x._1 + x._2).headOption
}