From 18a1faedf64ea712348367e8d8bc0c9db0e0591a Mon Sep 17 00:00:00 2001 From: Denny Date: Tue, 2 Oct 2012 19:25:26 -0700 Subject: [PATCH] Stylistic changes and Public Accumulable and Broadcast --- core/src/main/scala/spark/Accumulators.scala | 10 +++---- core/src/main/scala/spark/CartesianRDD.scala | 3 ++- core/src/main/scala/spark/CoGroupedRDD.scala | 3 ++- .../main/scala/spark/DoubleRDDFunctions.scala | 2 +- .../src/main/scala/spark/KryoSerializer.scala | 6 +++-- core/src/main/scala/spark/NewHadoopRDD.scala | 3 ++- .../main/scala/spark/PairRDDFunctions.scala | 4 +-- core/src/main/scala/spark/SampledRDD.scala | 3 ++- .../spark/SequenceFileRDDFunctions.scala | 2 +- core/src/main/scala/spark/TaskEndReason.scala | 10 +++++-- .../scala/spark/broadcast/Broadcast.scala | 5 ++-- .../scala/spark/deploy/DeployMessage.scala | 27 +++++++++++++------ .../spark/deploy/LocalSparkCluster.scala | 3 ++- .../scala/spark/deploy/master/JobInfo.scala | 3 ++- .../spark/deploy/master/MasterWebUI.scala | 3 ++- .../spark/deploy/worker/WorkerWebUI.scala | 3 ++- .../main/scala/spark/network/Connection.scala | 3 ++- .../scala/spark/partial/BoundedDouble.scala | 3 ++- .../scala/spark/scheduler/DAGScheduler.scala | 3 ++- .../scala/spark/scheduler/TaskResult.scala | 3 ++- .../scheduler/cluster/SlaveResources.scala | 3 ++- .../cluster/StandaloneClusterMessage.scala | 20 +++++++++----- .../cluster/StandaloneSchedulerBackend.scala | 3 ++- .../spark/scheduler/cluster/TaskInfo.scala | 3 ++- .../spark/scheduler/cluster/WorkerOffer.scala | 3 ++- .../scala/spark/storage/BlockManager.scala | 6 +++-- .../spark/storage/BlockManagerMaster.scala | 27 ++++++++++++------- .../spark/storage/BlockMessageArray.scala | 3 ++- .../scala/spark/util/SerializableBuffer.scala | 3 ++- .../main/scala/spark/util/StatCounter.scala | 3 ++- 30 files changed, 117 insertions(+), 59 deletions(-) diff --git a/core/src/main/scala/spark/Accumulators.scala b/core/src/main/scala/spark/Accumulators.scala index cd8e43f556..0363434d7a 100644 --- a/core/src/main/scala/spark/Accumulators.scala +++ b/core/src/main/scala/spark/Accumulators.scala @@ -18,7 +18,7 @@ import scala.collection.generic.Growable * @tparam R the full accumulated data * @tparam T partial data that can be added in */ -private[spark] class Accumulable[R, T] ( +class Accumulable[R, T] ( @transient initialValue: R, param: AccumulableParam[R, T]) extends Serializable { @@ -73,7 +73,7 @@ private[spark] class Accumulable[R, T] ( * @tparam R the full accumulated data * @tparam T partial data that can be added in */ -private[spark] trait AccumulableParam[R, T] extends Serializable { +trait AccumulableParam[R, T] extends Serializable { /** * Add additional data to the accumulator value. * @param r the current value of the accumulator @@ -93,7 +93,7 @@ private[spark] trait AccumulableParam[R, T] extends Serializable { def zero(initialValue: R): R } -private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T] +class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T] extends AccumulableParam[R,T] { def addAccumulator(growable: R, elem: T) : R = { @@ -124,7 +124,7 @@ private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableO * @param param helper object defining how to add elements of type `T` * @tparam T result type */ -private[spark] class Accumulator[T]( +class Accumulator[T]( @transient initialValue: T, param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param) @@ -133,7 +133,7 @@ private[spark] class Accumulator[T]( * as the accumulated value * @tparam T type of value to accumulate */ -private[spark] trait AccumulatorParam[T] extends AccumulableParam[T, T] { +AccumulatorParam[T] extends AccumulableParam[T, T] { def addAccumulator(t1: T, t2: T) : T = { addInPlace(t1, t2) } diff --git a/core/src/main/scala/spark/CartesianRDD.scala b/core/src/main/scala/spark/CartesianRDD.scala index 9f94bcb413..83db2d2934 100644 --- a/core/src/main/scala/spark/CartesianRDD.scala +++ b/core/src/main/scala/spark/CartesianRDD.scala @@ -1,6 +1,7 @@ package spark -private[spark] class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable { +private[spark] +class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable { override val index: Int = idx } diff --git a/core/src/main/scala/spark/CoGroupedRDD.scala b/core/src/main/scala/spark/CoGroupedRDD.scala index 2c270766f9..daba719b14 100644 --- a/core/src/main/scala/spark/CoGroupedRDD.scala +++ b/core/src/main/scala/spark/CoGroupedRDD.scala @@ -10,7 +10,8 @@ private[spark] sealed trait CoGroupSplitDep extends Serializable private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep -private[spark] class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable { +private[spark] +class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable { override val index: Int = idx override def hashCode(): Int = idx } diff --git a/core/src/main/scala/spark/DoubleRDDFunctions.scala b/core/src/main/scala/spark/DoubleRDDFunctions.scala index 6252dc44f7..1fbf66b7de 100644 --- a/core/src/main/scala/spark/DoubleRDDFunctions.scala +++ b/core/src/main/scala/spark/DoubleRDDFunctions.scala @@ -10,7 +10,7 @@ import spark.util.StatCounter /** * Extra functions available on RDDs of Doubles through an implicit conversion. */ -private[spark] class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { +class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable { def sum(): Double = { self.reduce(_ + _) } diff --git a/core/src/main/scala/spark/KryoSerializer.scala b/core/src/main/scala/spark/KryoSerializer.scala index 1e55621b8e..b8aa3a86c5 100644 --- a/core/src/main/scala/spark/KryoSerializer.scala +++ b/core/src/main/scala/spark/KryoSerializer.scala @@ -68,7 +68,8 @@ private[spark] object ZigZag { } } -private[spark] class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream) +private[spark] +class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream) extends SerializationStream { val channel = Channels.newChannel(out) @@ -85,7 +86,8 @@ extends SerializationStream { def close() { out.close() } } -private[spark] class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream) +private[spark] +class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream) extends DeserializationStream { def readObject[T](): T = { val len = ZigZag.readInt(in) diff --git a/core/src/main/scala/spark/NewHadoopRDD.scala b/core/src/main/scala/spark/NewHadoopRDD.scala index 9c5ad3511e..9072698357 100644 --- a/core/src/main/scala/spark/NewHadoopRDD.scala +++ b/core/src/main/scala/spark/NewHadoopRDD.scala @@ -13,7 +13,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID import java.util.Date import java.text.SimpleDateFormat -private[spark] class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) +private[spark] +class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable) extends Split { val serializableHadoopSplit = new SerializableWritable(rawSplit) diff --git a/core/src/main/scala/spark/PairRDDFunctions.scala b/core/src/main/scala/spark/PairRDDFunctions.scala index 57fdb741df..80d62caf25 100644 --- a/core/src/main/scala/spark/PairRDDFunctions.scala +++ b/core/src/main/scala/spark/PairRDDFunctions.scala @@ -41,7 +41,7 @@ import spark.partial.PartialResult /** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. */ -private[spark] class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( +class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( self: RDD[(K, V)]) extends Logging with Serializable { @@ -430,7 +430,7 @@ private[spark] class PairRDDFunctions[K: ClassManifest, V: ClassManifest]( def getValueClass() = implicitly[ClassManifest[V]].erasure } -private[spark] class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( +class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest]( self: RDD[(K, V)]) extends Logging with Serializable { diff --git a/core/src/main/scala/spark/SampledRDD.scala b/core/src/main/scala/spark/SampledRDD.scala index 4b33148364..ac10aed477 100644 --- a/core/src/main/scala/spark/SampledRDD.scala +++ b/core/src/main/scala/spark/SampledRDD.scala @@ -4,7 +4,8 @@ import java.util.Random import cern.jet.random.Poisson import cern.jet.random.engine.DRand -private[spark] class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { +private[spark] +class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable { override val index: Int = prev.index } diff --git a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala index 6224e957e5..ea7171d3a1 100644 --- a/core/src/main/scala/spark/SequenceFileRDDFunctions.scala +++ b/core/src/main/scala/spark/SequenceFileRDDFunctions.scala @@ -30,7 +30,7 @@ import SparkContext._ * through an implicit conversion. Note that this can't be part of PairRDDFunctions because * we need more implicit parameters to convert our keys and values to Writable. */ -private[spark] class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest]( +class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest]( self: RDD[(K, V)]) extends Logging with Serializable { diff --git a/core/src/main/scala/spark/TaskEndReason.scala b/core/src/main/scala/spark/TaskEndReason.scala index 3e5668892f..420c54bc9a 100644 --- a/core/src/main/scala/spark/TaskEndReason.scala +++ b/core/src/main/scala/spark/TaskEndReason.scala @@ -10,7 +10,13 @@ import spark.storage.BlockManagerId private[spark] sealed trait TaskEndReason private[spark] case object Success extends TaskEndReason -private[spark] case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it -private[spark] case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason + +private[spark] +case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it + +private[spark] +case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason + private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason + private[spark] case class OtherFailure(message: String) extends TaskEndReason diff --git a/core/src/main/scala/spark/broadcast/Broadcast.scala b/core/src/main/scala/spark/broadcast/Broadcast.scala index 370978113f..6055bfd045 100644 --- a/core/src/main/scala/spark/broadcast/Broadcast.scala +++ b/core/src/main/scala/spark/broadcast/Broadcast.scala @@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicLong import spark._ -private[spark] abstract class Broadcast[T](id: Long) extends Serializable { +abstract class Broadcast[T](id: Long) extends Serializable { def value: T // We cannot have an abstract readObject here due to some weird issues with @@ -14,7 +14,8 @@ private[spark] abstract class Broadcast[T](id: Long) extends Serializable { override def toString = "spark.Broadcast(" + id + ")" } -private[spark] class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable { +private[spark] +class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable { private var initialized = false private var broadcastFactory: BroadcastFactory = null diff --git a/core/src/main/scala/spark/deploy/DeployMessage.scala b/core/src/main/scala/spark/deploy/DeployMessage.scala index 7eaae2c618..d2b63d6e0d 100644 --- a/core/src/main/scala/spark/deploy/DeployMessage.scala +++ b/core/src/main/scala/spark/deploy/DeployMessage.scala @@ -11,10 +11,12 @@ private[spark] sealed trait DeployMessage extends Serializable // Worker to Master -private[spark] case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int) +private[spark] +case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int) extends DeployMessage -private[spark] case class ExecutorStateChanged( +private[spark] +case class ExecutorStateChanged( jobId: String, execId: Int, state: ExecutorState, @@ -42,10 +44,17 @@ private[spark] case class RegisterJob(jobDescription: JobDescription) extends De // Master to Client -private[spark] case class RegisteredJob(jobId: String) extends DeployMessage -private[spark] case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) -private[spark] case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String]) -private[spark] case class JobKilled(message: String) +private[spark] +case class RegisteredJob(jobId: String) extends DeployMessage + +private[spark] +case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int) + +private[spark] +case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String]) + +private[spark] +case class JobKilled(message: String) // Internal message in Client @@ -57,7 +66,8 @@ private[spark] case object RequestMasterState // Master to MasterWebUI -private[spark] case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo], +private[spark] +case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo], completedJobs: List[JobInfo]) // WorkerWebUI to Worker @@ -65,6 +75,7 @@ private[spark] case object RequestWorkerState // Worker to WorkerWebUI -private[spark] case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner], +private[spark] +case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner], finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int, coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String) \ No newline at end of file diff --git a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala index e938981f6e..8b2a71add5 100644 --- a/core/src/main/scala/spark/deploy/LocalSparkCluster.scala +++ b/core/src/main/scala/spark/deploy/LocalSparkCluster.scala @@ -9,7 +9,8 @@ import spark.{Logging, Utils} import scala.collection.mutable.ArrayBuffer -private[spark] class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging { +private[spark] +class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging { val localIpAddress = Utils.localIpAddress diff --git a/core/src/main/scala/spark/deploy/master/JobInfo.scala b/core/src/main/scala/spark/deploy/master/JobInfo.scala index e2364f1863..8795c09cc1 100644 --- a/core/src/main/scala/spark/deploy/master/JobInfo.scala +++ b/core/src/main/scala/spark/deploy/master/JobInfo.scala @@ -5,7 +5,8 @@ import java.util.Date import akka.actor.ActorRef import scala.collection.mutable -private[spark] class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) { +private[spark] +class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) { var state = JobState.WAITING var executors = new mutable.HashMap[Int, ExecutorInfo] var coresGranted = 0 diff --git a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala index c083e7f5ea..700a41c770 100644 --- a/core/src/main/scala/spark/deploy/master/MasterWebUI.scala +++ b/core/src/main/scala/spark/deploy/master/MasterWebUI.scala @@ -10,7 +10,8 @@ import cc.spray.directives._ import cc.spray.typeconversion.TwirlSupport._ import spark.deploy._ -private[spark] class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { +private[spark] +class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives { val RESOURCE_DIR = "spark/deploy/master/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" diff --git a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala index 78a9adc86f..d06f4884ee 100644 --- a/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala +++ b/core/src/main/scala/spark/deploy/worker/WorkerWebUI.scala @@ -9,7 +9,8 @@ import cc.spray.Directives import cc.spray.typeconversion.TwirlSupport._ import spark.deploy.{WorkerState, RequestWorkerState} -private[spark] class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { +private[spark] +class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives { val RESOURCE_DIR = "spark/deploy/worker/webui" val STATIC_RESOURCE_DIR = "spark/deploy/static" diff --git a/core/src/main/scala/spark/network/Connection.scala b/core/src/main/scala/spark/network/Connection.scala index 3a03f6843a..80262ab7b4 100644 --- a/core/src/main/scala/spark/network/Connection.scala +++ b/core/src/main/scala/spark/network/Connection.scala @@ -11,7 +11,8 @@ import java.nio.channels.spi._ import java.net._ -private[spark] abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging { +private[spark] +abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging { channel.configureBlocking(false) channel.socket.setTcpNoDelay(true) diff --git a/core/src/main/scala/spark/partial/BoundedDouble.scala b/core/src/main/scala/spark/partial/BoundedDouble.scala index ab5cc21aa0..8bedd75182 100644 --- a/core/src/main/scala/spark/partial/BoundedDouble.scala +++ b/core/src/main/scala/spark/partial/BoundedDouble.scala @@ -3,6 +3,7 @@ package spark.partial /** * A Double with error bars on it. */ -private[spark] class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) { +private[spark] +class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) { override def toString(): String = "[%.3f, %.3f]".format(low, high) } diff --git a/core/src/main/scala/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/spark/scheduler/DAGScheduler.scala index 33388545c4..9b666ed181 100644 --- a/core/src/main/scala/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/spark/scheduler/DAGScheduler.scala @@ -21,7 +21,8 @@ import spark.storage.BlockManagerId * schedule to run the job. Subclasses only need to implement the code to send a task to the cluster * and to report fetch failures (the submitTasks method, and code to add CompletionEvents). */ -private[spark] class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging { +private[spark] +class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging { taskSched.setListener(this) // Called by TaskScheduler to report task completions or failures. diff --git a/core/src/main/scala/spark/scheduler/TaskResult.scala b/core/src/main/scala/spark/scheduler/TaskResult.scala index c7b123ce7f..9a54d0e854 100644 --- a/core/src/main/scala/spark/scheduler/TaskResult.scala +++ b/core/src/main/scala/spark/scheduler/TaskResult.scala @@ -7,7 +7,8 @@ import scala.collection.mutable.Map // Task result. Also contains updates to accumulator variables. // TODO: Use of distributed cache to return result is a hack to get around // what seems to be a bug with messages over 60KB in libprocess; fix it -private[spark] class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable { +private[spark] +class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable { def this() = this(null.asInstanceOf[T], null) override def writeExternal(out: ObjectOutput) { diff --git a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala index 3c8f511a07..96ebaa4601 100644 --- a/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala +++ b/core/src/main/scala/spark/scheduler/cluster/SlaveResources.scala @@ -1,3 +1,4 @@ package spark.scheduler.cluster -private[spark] class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {} +private[spark] +class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {} diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala index c4e8fea3dc..1386cd9d44 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneClusterMessage.scala @@ -7,17 +7,25 @@ import spark.util.SerializableBuffer private[spark] sealed trait StandaloneClusterMessage extends Serializable // Master to slaves -private[spark] case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage -private[spark] case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage -private[spark] case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage +private[spark] +case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage + +private[spark] +case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage + +private[spark] +case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage // Slaves to master -private[spark] case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage +private[spark] +case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage -private[spark] case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer) +private[spark] +case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer) extends StandaloneClusterMessage -private[spark] object StatusUpdate { +private[spark] +object StatusUpdate { /** Alternate factory method that takes a ByteBuffer directly for the data field */ def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = { StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data)) diff --git a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 8aebdedda2..d2cce0dc05 100644 --- a/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -16,7 +16,8 @@ import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClient * Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained * Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*). */ -private[spark] class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) +private[spark] +class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem) extends SchedulerBackend with Logging { // Use an atomic variable to track total number of cores in the cluster for simplicity and speed diff --git a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala index ae2c6b9836..ca84503780 100644 --- a/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala +++ b/core/src/main/scala/spark/scheduler/cluster/TaskInfo.scala @@ -3,7 +3,8 @@ package spark.scheduler.cluster /** * Information about a running task attempt inside a TaskSet. */ -private[spark] class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) { +private[spark] +class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) { var finishTime: Long = 0 var failed = false diff --git a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala index 298b6d5529..6b919d68b2 100644 --- a/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala +++ b/core/src/main/scala/spark/scheduler/cluster/WorkerOffer.scala @@ -3,5 +3,6 @@ package spark.scheduler.cluster /** * Represents free resources available on a worker node. */ -private[spark] class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) { +private[spark] +class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) { } diff --git a/core/src/main/scala/spark/storage/BlockManager.scala b/core/src/main/scala/spark/storage/BlockManager.scala index bb033871b6..7d8f9ff824 100644 --- a/core/src/main/scala/spark/storage/BlockManager.scala +++ b/core/src/main/scala/spark/storage/BlockManager.scala @@ -43,7 +43,8 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter } -private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null) +private[spark] +case class BlockException(blockId: String, message: String, ex: Exception = null) extends Exception(message) @@ -56,7 +57,8 @@ private[spark] class BlockLocker(numLockers: Int) { } -private[spark] class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) +private[spark] +class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long) extends Logging { case class BlockInfo(level: StorageLevel, tellMaster: Boolean) diff --git a/core/src/main/scala/spark/storage/BlockManagerMaster.scala b/core/src/main/scala/spark/storage/BlockManagerMaster.scala index ef76b3f470..7bfa31ac3d 100644 --- a/core/src/main/scala/spark/storage/BlockManagerMaster.scala +++ b/core/src/main/scala/spark/storage/BlockManagerMaster.scala @@ -16,14 +16,17 @@ import akka.util.duration._ import spark.{Logging, SparkException, Utils} -private[spark] sealed trait ToBlockManagerMaster +private[spark] +sealed trait ToBlockManagerMaster -private[spark] case class RegisterBlockManager( +private[spark] +case class RegisterBlockManager( blockManagerId: BlockManagerId, maxMemSize: Long) extends ToBlockManagerMaster -private[spark] class HeartBeat( +private[spark] +class HeartBeat( var blockManagerId: BlockManagerId, var blockId: String, var storageLevel: StorageLevel, @@ -53,7 +56,8 @@ private[spark] class HeartBeat( } } -private[spark] object HeartBeat { +private[spark] +object HeartBeat { def apply(blockManagerId: BlockManagerId, blockId: String, storageLevel: StorageLevel, @@ -68,15 +72,20 @@ private[spark] object HeartBeat { } } -private[spark] case class GetLocations(blockId: String) extends ToBlockManagerMaster +private[spark] +case class GetLocations(blockId: String) extends ToBlockManagerMaster -private[spark] case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster +private[spark] +case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster -private[spark] case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster +private[spark] +case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster -private[spark] case class RemoveHost(host: String) extends ToBlockManagerMaster +private[spark] +case class RemoveHost(host: String) extends ToBlockManagerMaster -private[spark] case object StopBlockManagerMaster extends ToBlockManagerMaster +private[spark] +case object StopBlockManagerMaster extends ToBlockManagerMaster private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging { diff --git a/core/src/main/scala/spark/storage/BlockMessageArray.scala b/core/src/main/scala/spark/storage/BlockMessageArray.scala index 8cf7565be7..a25decb123 100644 --- a/core/src/main/scala/spark/storage/BlockMessageArray.scala +++ b/core/src/main/scala/spark/storage/BlockMessageArray.scala @@ -8,7 +8,8 @@ import scala.collection.mutable.ArrayBuffer import spark._ import spark.network._ -private[spark] class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging { +private[spark] +class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging { def this(bm: BlockMessage) = this(Array(bm)) diff --git a/core/src/main/scala/spark/util/SerializableBuffer.scala b/core/src/main/scala/spark/util/SerializableBuffer.scala index b6e153b00b..09d588fe1c 100644 --- a/core/src/main/scala/spark/util/SerializableBuffer.scala +++ b/core/src/main/scala/spark/util/SerializableBuffer.scala @@ -8,7 +8,8 @@ import java.nio.channels.Channels * A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make * it easier to pass ByteBuffers in case class messages. */ -private[spark] class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable { +private[spark] +class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable { def value = buffer private def readObject(in: ObjectInputStream) { diff --git a/core/src/main/scala/spark/util/StatCounter.scala b/core/src/main/scala/spark/util/StatCounter.scala index 101895f08e..023ec09332 100644 --- a/core/src/main/scala/spark/util/StatCounter.scala +++ b/core/src/main/scala/spark/util/StatCounter.scala @@ -5,7 +5,8 @@ package spark.util * numerically robust way. Includes support for merging two StatCounters. Based on Welford and * Chan's algorithms described at http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance. */ -private[spark] class StatCounter(values: TraversableOnce[Double]) extends Serializable { +private[spark] +class StatCounter(values: TraversableOnce[Double]) extends Serializable { private var n: Long = 0 // Running count of our values private var mu: Double = 0 // Running mean of our values private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)