[SPARK-14630][BUILD][CORE][SQL][STREAMING] Code style: public abstract methods should have explicit return types

## What changes were proposed in this pull request?

Currently many public abstract methods (in abstract classes as well as traits) don't declare return types explicitly, such as in [o.a.s.streaming.dstream.InputDStream](https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/InputDStream.scala#L110):
```scala
def start() // should be: def start(): Unit
def stop()  // should be: def stop(): Unit
```

These methods exist in core, sql, streaming; this PR fixes them.

## How was this patch tested?

N/A

## Which piece of scala style rule led to the changes?

the rule was added separately in https://github.com/apache/spark/pull/12396

Author: Liwei Lin <lwlin7@gmail.com>

Closes #12389 from lw-lin/public-abstract-methods.
This commit is contained in:
Liwei Lin 2016-04-14 10:14:38 -07:00 committed by Reynold Xin
parent de2ad52855
commit 3e27940a19
27 changed files with 50 additions and 49 deletions

View file

@ -278,9 +278,9 @@ private object ContextCleaner {
* Listener class used for testing when any item has been cleaned by the Cleaner class.
*/
private[spark] trait CleanerListener {
def rddCleaned(rddId: Int)
def shuffleCleaned(shuffleId: Int)
def broadcastCleaned(broadcastId: Long)
def accumCleaned(accId: Long)
def checkpointCleaned(rddId: Long)
def rddCleaned(rddId: Int): Unit
def shuffleCleaned(shuffleId: Int): Unit
def broadcastCleaned(broadcastId: Long): Unit
def accumCleaned(accId: Long): Unit
def checkpointCleaned(rddId: Long): Unit
}

View file

@ -41,7 +41,7 @@ trait FutureAction[T] extends Future[T] {
/**
* Cancels the execution of this action.
*/
def cancel()
def cancel(): Unit
/**
* Blocks until this action completes.
@ -65,7 +65,7 @@ trait FutureAction[T] extends Future[T] {
* When this action is completed, either through an exception, or a value, applies the provided
* function.
*/
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext)
def onComplete[U](func: (Try[T]) => U)(implicit executor: ExecutionContext): Unit
/**
* Returns whether the action has already been completed with a value or an exception.

View file

@ -33,7 +33,8 @@ private[spark] trait AppClientListener {
/** An application death is an unrecoverable failure condition. */
def dead(reason: String): Unit
def executorAdded(fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int)
def executorAdded(
fullId: String, workerId: String, hostPort: String, cores: Int, memory: Int): Unit
def executorRemoved(fullId: String, message: String, exitStatus: Option[Int]): Unit
}

View file

@ -32,8 +32,8 @@ trait LeaderElectionAgent {
@DeveloperApi
trait LeaderElectable {
def electedLeader()
def revokedLeadership()
def electedLeader(): Unit
def revokedLeadership(): Unit
}
/** Single-node implementation of LeaderElectionAgent -- we're initially and always the leader. */

View file

@ -40,12 +40,12 @@ abstract class PersistenceEngine {
* Defines how the object is serialized and persisted. Implementation will
* depend on the store used.
*/
def persist(name: String, obj: Object)
def persist(name: String, obj: Object): Unit
/**
* Defines how the object referred by its name is removed from the store.
*/
def unpersist(name: String)
def unpersist(name: String): Unit
/**
* Gives all objects, matching a prefix. This defines how objects are

View file

@ -218,7 +218,7 @@ private[deploy] class DriverRunner(
}
private[deploy] trait Sleeper {
def sleep(seconds: Int)
def sleep(seconds: Int): Unit
}
// Needed because ProcessBuilder is a final class and cannot be mocked

View file

@ -25,6 +25,6 @@ import org.apache.spark.TaskState.TaskState
* A pluggable interface used by the Executor to send updates to the cluster scheduler.
*/
private[spark] trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer): Unit
}

View file

@ -36,7 +36,7 @@ abstract class BlockTransferService extends ShuffleClient with Closeable with Lo
* Initialize the transfer service by giving it the BlockDataManager that can be used to fetch
* local blocks or put local blocks.
*/
def init(blockDataManager: BlockDataManager)
def init(blockDataManager: BlockDataManager): Unit
/**
* Tear down the transfer service.

View file

@ -23,6 +23,6 @@ package org.apache.spark.scheduler
* job fails (and no further taskSucceeded events will happen).
*/
private[spark] trait JobListener {
def taskSucceeded(index: Int, result: Any)
def jobFailed(exception: Exception)
def taskSucceeded(index: Int, result: Any): Unit
def jobFailed(exception: Exception): Unit
}

View file

@ -34,9 +34,9 @@ import org.apache.spark.util.Utils
private[spark] trait SchedulableBuilder {
def rootPool: Pool
def buildPools()
def buildPools(): Unit
def addTaskSetManager(manager: Schedulable, properties: Properties)
def addTaskSetManager(manager: Schedulable, properties: Properties): Unit
}
private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)

View file

@ -51,7 +51,7 @@ private[spark] trait TaskScheduler {
def submitTasks(taskSet: TaskSet): Unit
// Cancel a stage.
def cancelTasks(stageId: Int, interruptThread: Boolean)
def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit

View file

@ -357,7 +357,7 @@ private[spark] class KryoSerializerInstance(ks: KryoSerializer) extends Serializ
* serialization.
*/
trait KryoRegistrator {
def registerClasses(kryo: Kryo)
def registerClasses(kryo: Kryo): Unit
}
private[serializer] object KryoSerializer {

View file

@ -35,7 +35,7 @@ private[spark] trait ShuffleWriterGroup {
val writers: Array[DiskBlockObjectWriter]
/** @param success Indicates all writes were successful. If false, no blocks will be recorded. */
def releaseWriters(success: Boolean)
def releaseWriters(success: Boolean): Unit
}
/**

View file

@ -129,7 +129,7 @@ private[spark] abstract class WebUI(
}
/** Initialize all components of the server. */
def initialize()
def initialize(): Unit
/** Bind to the HTTP server behind this web interface. */
def bind() {

View file

@ -32,10 +32,10 @@ private[spark] trait RollingPolicy {
def shouldRollover(bytesToBeWritten: Long): Boolean
/** Notify that rollover has occurred */
def rolledOver()
def rolledOver(): Unit
/** Notify that bytes have been written */
def bytesWritten(bytes: Long)
def bytesWritten(bytes: Long): Unit
/** Get the desired name of the rollover file */
def generateRolledOverFileSuffix(): String

View file

@ -26,5 +26,5 @@ import org.apache.spark.annotation.DeveloperApi
@DeveloperApi
trait Pseudorandom {
/** Set random seed. */
def setSeed(seed: Long)
def setSeed(seed: Long): Unit
}

View file

@ -62,7 +62,7 @@ import org.apache.spark.sql.types._
abstract class MutableValue extends Serializable {
var isNull: Boolean = true
def boxed: Any
def update(v: Any)
def update(v: Any): Unit
def copy(): MutableValue
}

View file

@ -164,7 +164,7 @@ trait BaseGenericInternalRow extends InternalRow {
abstract class MutableRow extends InternalRow {
def setNullAt(i: Int): Unit
def update(i: Int, value: Any)
def update(i: Int, value: Any): Unit
// default implementation (slow)
def setBoolean(i: Int, value: Boolean): Unit = { update(i, value) }

View file

@ -38,7 +38,7 @@ private[columnar] trait ColumnAccessor {
def hasNext: Boolean
def extractTo(row: MutableRow, ordinal: Int)
def extractTo(row: MutableRow, ordinal: Int): Unit
protected def underlyingBuffer: ByteBuffer
}

View file

@ -28,12 +28,12 @@ private[columnar] trait ColumnBuilder {
/**
* Initializes with an approximate lower bound on the expected number of elements in this column.
*/
def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false)
def initialize(initialSize: Int, columnName: String = "", useCompression: Boolean = false): Unit
/**
* Appends `row(ordinal)` to the column builder.
*/
def appendFrom(row: InternalRow, ordinal: Int)
def appendFrom(row: InternalRow, ordinal: Int): Unit
/**
* Column statistics information

View file

@ -50,7 +50,7 @@ trait StateStore {
def get(key: UnsafeRow): Option[UnsafeRow]
/** Put a new value for a key. */
def put(key: UnsafeRow, value: UnsafeRow)
def put(key: UnsafeRow, value: UnsafeRow): Unit
/**
* Remove keys that match the following condition.

View file

@ -37,7 +37,7 @@ abstract class ContinuousQueryListener {
* `DataFrameWriter.startStream()` returns the corresponding [[ContinuousQuery]]. Please
* don't block this method as it will block your query.
*/
def onQueryStarted(queryStarted: QueryStarted)
def onQueryStarted(queryStarted: QueryStarted): Unit
/**
* Called when there is some status update (ingestion rate updated, etc.)
@ -47,10 +47,10 @@ abstract class ContinuousQueryListener {
* may be changed before/when you process the event. E.g., you may find [[ContinuousQuery]]
* is terminated when you are processing [[QueryProgress]].
*/
def onQueryProgress(queryProgress: QueryProgress)
def onQueryProgress(queryProgress: QueryProgress): Unit
/** Called when a query is stopped, with or without error */
def onQueryTerminated(queryTerminated: QueryTerminated)
def onQueryTerminated(queryTerminated: QueryTerminated): Unit
}

View file

@ -107,8 +107,8 @@ abstract class InputDStream[T: ClassTag](_ssc: StreamingContext)
}
/** Method called to start receiving data. Subclasses must implement this method. */
def start()
def start(): Unit
/** Method called to stop receiving data. Subclasses must implement this method. */
def stop()
def stop(): Unit
}

View file

@ -37,7 +37,7 @@ private[streaming] trait BlockGeneratorListener {
* that will be useful when a block is generated. Any long blocking operation in this callback
* will hurt the throughput.
*/
def onAddData(data: Any, metadata: Any)
def onAddData(data: Any, metadata: Any): Unit
/**
* Called when a new block of data is generated by the block generator. The block generation
@ -47,7 +47,7 @@ private[streaming] trait BlockGeneratorListener {
* be useful when the block has been successfully stored. Any long blocking operation in this
* callback will hurt the throughput.
*/
def onGenerateBlock(blockId: StreamBlockId)
def onGenerateBlock(blockId: StreamBlockId): Unit
/**
* Called when a new block is ready to be pushed. Callers are supposed to store the block into
@ -55,13 +55,13 @@ private[streaming] trait BlockGeneratorListener {
* thread, that is not synchronized with any other callbacks. Hence it is okay to do long
* blocking operation in this callback.
*/
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_])
def onPushBlock(blockId: StreamBlockId, arrayBuffer: ArrayBuffer[_]): Unit
/**
* Called when an error has occurred in the BlockGenerator. Can be called form many places
* so better to not do any long block operation in this callback.
*/
def onError(message: String, throwable: Throwable)
def onError(message: String, throwable: Throwable): Unit
}
/**

View file

@ -48,7 +48,7 @@ private[streaming] trait ReceivedBlockHandler {
def storeBlock(blockId: StreamBlockId, receivedBlock: ReceivedBlock): ReceivedBlockStoreResult
/** Cleanup old blocks older than the given threshold time */
def cleanupOldBlocks(threshTime: Long)
def cleanupOldBlocks(threshTime: Long): Unit
}

View file

@ -99,13 +99,13 @@ abstract class Receiver[T](val storageLevel: StorageLevel) extends Serializable
* (iii) `restart(...)` can be called to restart the receiver. This will call `onStop()`
* immediately, and then `onStart()` after a delay.
*/
def onStart()
def onStart(): Unit
/**
* This method is called by the system when the receiver is stopped. All resources
* (threads, buffers, etc.) set up in `onStart()` must be cleaned up in this method.
*/
def onStop()
def onStop(): Unit
/** Override this to specify a preferred location (hostname). */
def preferredLocation: Option[String] = None

View file

@ -70,28 +70,28 @@ private[streaming] abstract class ReceiverSupervisor(
@volatile private[streaming] var receiverState = Initialized
/** Push a single data item to backend data store. */
def pushSingle(data: Any)
def pushSingle(data: Any): Unit
/** Store the bytes of received data as a data block into Spark's memory. */
def pushBytes(
bytes: ByteBuffer,
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
)
): Unit
/** Store a iterator of received data as a data block into Spark's memory. */
def pushIterator(
iterator: Iterator[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
)
): Unit
/** Store an ArrayBuffer of received data as a data block into Spark's memory. */
def pushArrayBuffer(
arrayBuffer: ArrayBuffer[_],
optionalMetadata: Option[Any],
optionalBlockId: Option[StreamBlockId]
)
): Unit
/**
* Create a custom [[BlockGenerator]] that the receiver implementation can directly control
@ -103,7 +103,7 @@ private[streaming] abstract class ReceiverSupervisor(
def createBlockGenerator(blockGeneratorListener: BlockGeneratorListener): BlockGenerator
/** Report errors. */
def reportError(message: String, throwable: Throwable)
def reportError(message: String, throwable: Throwable): Unit
/**
* Called when supervisor is started.