Stylistic changes and Public Accumulable and Broadcast

This commit is contained in:
Denny 2012-10-02 19:25:26 -07:00
parent b7a913e1fa
commit 18a1faedf6
30 changed files with 117 additions and 59 deletions

View file

@ -18,7 +18,7 @@ import scala.collection.generic.Growable
* @tparam R the full accumulated data
* @tparam T partial data that can be added in
*/
private[spark] class Accumulable[R, T] (
class Accumulable[R, T] (
@transient initialValue: R,
param: AccumulableParam[R, T])
extends Serializable {
@ -73,7 +73,7 @@ private[spark] class Accumulable[R, T] (
* @tparam R the full accumulated data
* @tparam T partial data that can be added in
*/
private[spark] trait AccumulableParam[R, T] extends Serializable {
trait AccumulableParam[R, T] extends Serializable {
/**
* Add additional data to the accumulator value.
* @param r the current value of the accumulator
@ -93,7 +93,7 @@ private[spark] trait AccumulableParam[R, T] extends Serializable {
def zero(initialValue: R): R
}
private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
class GrowableAccumulableParam[R <% Growable[T] with TraversableOnce[T] with Serializable, T]
extends AccumulableParam[R,T] {
def addAccumulator(growable: R, elem: T) : R = {
@ -124,7 +124,7 @@ private[spark] class GrowableAccumulableParam[R <% Growable[T] with TraversableO
* @param param helper object defining how to add elements of type `T`
* @tparam T result type
*/
private[spark] class Accumulator[T](
class Accumulator[T](
@transient initialValue: T,
param: AccumulatorParam[T]) extends Accumulable[T,T](initialValue, param)
@ -133,7 +133,7 @@ private[spark] class Accumulator[T](
* as the accumulated value
* @tparam T type of value to accumulate
*/
private[spark] trait AccumulatorParam[T] extends AccumulableParam[T, T] {
AccumulatorParam[T] extends AccumulableParam[T, T] {
def addAccumulator(t1: T, t2: T) : T = {
addInPlace(t1, t2)
}

View file

@ -1,6 +1,7 @@
package spark
private[spark] class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
private[spark]
class CartesianSplit(idx: Int, val s1: Split, val s2: Split) extends Split with Serializable {
override val index: Int = idx
}

View file

@ -10,7 +10,8 @@ private[spark] sealed trait CoGroupSplitDep extends Serializable
private[spark] case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
private[spark] case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
private[spark] class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable {
private[spark]
class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep]) extends Split with Serializable {
override val index: Int = idx
override def hashCode(): Int = idx
}

View file

@ -10,7 +10,7 @@ import spark.util.StatCounter
/**
* Extra functions available on RDDs of Doubles through an implicit conversion.
*/
private[spark] class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
class DoubleRDDFunctions(self: RDD[Double]) extends Logging with Serializable {
def sum(): Double = {
self.reduce(_ + _)
}

View file

@ -68,7 +68,8 @@ private[spark] object ZigZag {
}
}
private[spark] class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream)
private[spark]
class KryoSerializationStream(kryo: Kryo, threadBuffer: ByteBuffer, out: OutputStream)
extends SerializationStream {
val channel = Channels.newChannel(out)
@ -85,7 +86,8 @@ extends SerializationStream {
def close() { out.close() }
}
private[spark] class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream)
private[spark]
class KryoDeserializationStream(objectBuffer: ObjectBuffer, in: InputStream)
extends DeserializationStream {
def readObject[T](): T = {
val len = ZigZag.readInt(in)

View file

@ -13,7 +13,8 @@ import org.apache.hadoop.mapreduce.TaskAttemptID
import java.util.Date
import java.text.SimpleDateFormat
private[spark] class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
private[spark]
class NewHadoopSplit(rddId: Int, val index: Int, @transient rawSplit: InputSplit with Writable)
extends Split {
val serializableHadoopSplit = new SerializableWritable(rawSplit)

View file

@ -41,7 +41,7 @@ import spark.partial.PartialResult
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
*/
private[spark] class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
extends Logging
with Serializable {
@ -430,7 +430,7 @@ private[spark] class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
def getValueClass() = implicitly[ClassManifest[V]].erasure
}
private[spark] class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
self: RDD[(K, V)])
extends Logging
with Serializable {

View file

@ -4,7 +4,8 @@ import java.util.Random
import cern.jet.random.Poisson
import cern.jet.random.engine.DRand
private[spark] class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
private[spark]
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
override val index: Int = prev.index
}

View file

@ -30,7 +30,7 @@ import SparkContext._
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
* we need more implicit parameters to convert our keys and values to Writable.
*/
private[spark] class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
self: RDD[(K, V)])
extends Logging
with Serializable {

View file

@ -10,7 +10,13 @@ import spark.storage.BlockManagerId
private[spark] sealed trait TaskEndReason
private[spark] case object Success extends TaskEndReason
private[spark] case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
private[spark] case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
private[spark]
case object Resubmitted extends TaskEndReason // Task was finished earlier but we've now lost it
private[spark]
case class FetchFailed(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int) extends TaskEndReason
private[spark] case class ExceptionFailure(exception: Throwable) extends TaskEndReason
private[spark] case class OtherFailure(message: String) extends TaskEndReason

View file

@ -5,7 +5,7 @@ import java.util.concurrent.atomic.AtomicLong
import spark._
private[spark] abstract class Broadcast[T](id: Long) extends Serializable {
abstract class Broadcast[T](id: Long) extends Serializable {
def value: T
// We cannot have an abstract readObject here due to some weird issues with
@ -14,7 +14,8 @@ private[spark] abstract class Broadcast[T](id: Long) extends Serializable {
override def toString = "spark.Broadcast(" + id + ")"
}
private[spark] class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable {
private[spark]
class BroadcastManager(val isMaster_ : Boolean) extends Logging with Serializable {
private var initialized = false
private var broadcastFactory: BroadcastFactory = null

View file

@ -11,10 +11,12 @@ private[spark] sealed trait DeployMessage extends Serializable
// Worker to Master
private[spark] case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
private[spark]
case class RegisterWorker(id: String, host: String, port: Int, cores: Int, memory: Int, webUiPort: Int)
extends DeployMessage
private[spark] case class ExecutorStateChanged(
private[spark]
case class ExecutorStateChanged(
jobId: String,
execId: Int,
state: ExecutorState,
@ -42,10 +44,17 @@ private[spark] case class RegisterJob(jobDescription: JobDescription) extends De
// Master to Client
private[spark] case class RegisteredJob(jobId: String) extends DeployMessage
private[spark] case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
private[spark] case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String])
private[spark] case class JobKilled(message: String)
private[spark]
case class RegisteredJob(jobId: String) extends DeployMessage
private[spark]
case class ExecutorAdded(id: Int, workerId: String, host: String, cores: Int, memory: Int)
private[spark]
case class ExecutorUpdated(id: Int, state: ExecutorState, message: Option[String])
private[spark]
case class JobKilled(message: String)
// Internal message in Client
@ -57,7 +66,8 @@ private[spark] case object RequestMasterState
// Master to MasterWebUI
private[spark] case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
private[spark]
case class MasterState(uri : String, workers: List[WorkerInfo], activeJobs: List[JobInfo],
completedJobs: List[JobInfo])
// WorkerWebUI to Worker
@ -65,6 +75,7 @@ private[spark] case object RequestWorkerState
// Worker to WorkerWebUI
private[spark] case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
private[spark]
case class WorkerState(uri: String, workerId: String, executors: List[ExecutorRunner],
finishedExecutors: List[ExecutorRunner], masterUrl: String, cores: Int, memory: Int,
coresUsed: Int, memoryUsed: Int, masterWebUiUrl: String)

View file

@ -9,7 +9,8 @@ import spark.{Logging, Utils}
import scala.collection.mutable.ArrayBuffer
private[spark] class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
private[spark]
class LocalSparkCluster(numSlaves: Int, coresPerSlave: Int, memoryPerSlave: Int) extends Logging {
val localIpAddress = Utils.localIpAddress

View file

@ -5,7 +5,8 @@ import java.util.Date
import akka.actor.ActorRef
import scala.collection.mutable
private[spark] class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
private[spark]
class JobInfo(val id: String, val desc: JobDescription, val submitDate: Date, val actor: ActorRef) {
var state = JobState.WAITING
var executors = new mutable.HashMap[Int, ExecutorInfo]
var coresGranted = 0

View file

@ -10,7 +10,8 @@ import cc.spray.directives._
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy._
private[spark] class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
private[spark]
class MasterWebUI(val actorSystem: ActorSystem, master: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/master/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"

View file

@ -9,7 +9,8 @@ import cc.spray.Directives
import cc.spray.typeconversion.TwirlSupport._
import spark.deploy.{WorkerState, RequestWorkerState}
private[spark] class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
private[spark]
class WorkerWebUI(val actorSystem: ActorSystem, worker: ActorRef) extends Directives {
val RESOURCE_DIR = "spark/deploy/worker/webui"
val STATIC_RESOURCE_DIR = "spark/deploy/static"

View file

@ -11,7 +11,8 @@ import java.nio.channels.spi._
import java.net._
private[spark] abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging {
private[spark]
abstract class Connection(val channel: SocketChannel, val selector: Selector) extends Logging {
channel.configureBlocking(false)
channel.socket.setTcpNoDelay(true)

View file

@ -3,6 +3,7 @@ package spark.partial
/**
* A Double with error bars on it.
*/
private[spark] class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
private[spark]
class BoundedDouble(val mean: Double, val confidence: Double, val low: Double, val high: Double) {
override def toString(): String = "[%.3f, %.3f]".format(low, high)
}

View file

@ -21,7 +21,8 @@ import spark.storage.BlockManagerId
* schedule to run the job. Subclasses only need to implement the code to send a task to the cluster
* and to report fetch failures (the submitTasks method, and code to add CompletionEvents).
*/
private[spark] class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging {
private[spark]
class DAGScheduler(taskSched: TaskScheduler) extends TaskSchedulerListener with Logging {
taskSched.setListener(this)
// Called by TaskScheduler to report task completions or failures.

View file

@ -7,7 +7,8 @@ import scala.collection.mutable.Map
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
// what seems to be a bug with messages over 60KB in libprocess; fix it
private[spark] class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable {
private[spark]
class TaskResult[T](var value: T, var accumUpdates: Map[Long, Any]) extends Externalizable {
def this() = this(null.asInstanceOf[T], null)
override def writeExternal(out: ObjectOutput) {

View file

@ -1,3 +1,4 @@
package spark.scheduler.cluster
private[spark] class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}
private[spark]
class SlaveResources(val slaveId: String, val hostname: String, val coresFree: Int) {}

View file

@ -7,17 +7,25 @@ import spark.util.SerializableBuffer
private[spark] sealed trait StandaloneClusterMessage extends Serializable
// Master to slaves
private[spark] case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
private[spark] case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
private[spark] case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
private[spark]
case class LaunchTask(task: TaskDescription) extends StandaloneClusterMessage
private[spark]
case class RegisteredSlave(sparkProperties: Seq[(String, String)]) extends StandaloneClusterMessage
private[spark]
case class RegisterSlaveFailed(message: String) extends StandaloneClusterMessage
// Slaves to master
private[spark] case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
private[spark]
case class RegisterSlave(slaveId: String, host: String, cores: Int) extends StandaloneClusterMessage
private[spark] case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
private[spark]
case class StatusUpdate(slaveId: String, taskId: Long, state: TaskState, data: SerializableBuffer)
extends StandaloneClusterMessage
private[spark] object StatusUpdate {
private[spark]
object StatusUpdate {
/** Alternate factory method that takes a ByteBuffer directly for the data field */
def apply(slaveId: String, taskId: Long, state: TaskState, data: ByteBuffer): StatusUpdate = {
StatusUpdate(slaveId, taskId, state, new SerializableBuffer(data))

View file

@ -16,7 +16,8 @@ import akka.remote.{RemoteClientShutdown, RemoteClientDisconnected, RemoteClient
* Akka. These may be executed in a variety of ways, such as Mesos tasks for the coarse-grained
* Mesos mode or standalone processes for Spark's standalone deploy mode (spark.deploy.*).
*/
private[spark] class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
private[spark]
class StandaloneSchedulerBackend(scheduler: ClusterScheduler, actorSystem: ActorSystem)
extends SchedulerBackend with Logging {
// Use an atomic variable to track total number of cores in the cluster for simplicity and speed

View file

@ -3,7 +3,8 @@ package spark.scheduler.cluster
/**
* Information about a running task attempt inside a TaskSet.
*/
private[spark] class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) {
private[spark]
class TaskInfo(val taskId: Long, val index: Int, val launchTime: Long, val host: String) {
var finishTime: Long = 0
var failed = false

View file

@ -3,5 +3,6 @@ package spark.scheduler.cluster
/**
* Represents free resources available on a worker node.
*/
private[spark] class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) {
private[spark]
class WorkerOffer(val slaveId: String, val hostname: String, val cores: Int) {
}

View file

@ -43,7 +43,8 @@ private[spark] class BlockManagerId(var ip: String, var port: Int) extends Exter
}
private[spark] case class BlockException(blockId: String, message: String, ex: Exception = null)
private[spark]
case class BlockException(blockId: String, message: String, ex: Exception = null)
extends Exception(message)
@ -56,7 +57,8 @@ private[spark] class BlockLocker(numLockers: Int) {
}
private[spark] class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
private[spark]
class BlockManager(val master: BlockManagerMaster, val serializer: Serializer, maxMemory: Long)
extends Logging {
case class BlockInfo(level: StorageLevel, tellMaster: Boolean)

View file

@ -16,14 +16,17 @@ import akka.util.duration._
import spark.{Logging, SparkException, Utils}
private[spark] sealed trait ToBlockManagerMaster
private[spark]
sealed trait ToBlockManagerMaster
private[spark] case class RegisterBlockManager(
private[spark]
case class RegisterBlockManager(
blockManagerId: BlockManagerId,
maxMemSize: Long)
extends ToBlockManagerMaster
private[spark] class HeartBeat(
private[spark]
class HeartBeat(
var blockManagerId: BlockManagerId,
var blockId: String,
var storageLevel: StorageLevel,
@ -53,7 +56,8 @@ private[spark] class HeartBeat(
}
}
private[spark] object HeartBeat {
private[spark]
object HeartBeat {
def apply(blockManagerId: BlockManagerId,
blockId: String,
storageLevel: StorageLevel,
@ -68,15 +72,20 @@ private[spark] object HeartBeat {
}
}
private[spark] case class GetLocations(blockId: String) extends ToBlockManagerMaster
private[spark]
case class GetLocations(blockId: String) extends ToBlockManagerMaster
private[spark] case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
private[spark]
case class GetLocationsMultipleBlockIds(blockIds: Array[String]) extends ToBlockManagerMaster
private[spark] case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
private[spark]
case class GetPeers(blockManagerId: BlockManagerId, size: Int) extends ToBlockManagerMaster
private[spark] case class RemoveHost(host: String) extends ToBlockManagerMaster
private[spark]
case class RemoveHost(host: String) extends ToBlockManagerMaster
private[spark] case object StopBlockManagerMaster extends ToBlockManagerMaster
private[spark]
case object StopBlockManagerMaster extends ToBlockManagerMaster
private[spark] class BlockManagerMasterActor(val isLocal: Boolean) extends Actor with Logging {

View file

@ -8,7 +8,8 @@ import scala.collection.mutable.ArrayBuffer
import spark._
import spark.network._
private[spark] class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging {
private[spark]
class BlockMessageArray(var blockMessages: Seq[BlockMessage]) extends Seq[BlockMessage] with Logging {
def this(bm: BlockMessage) = this(Array(bm))

View file

@ -8,7 +8,8 @@ import java.nio.channels.Channels
* A wrapper around a java.nio.ByteBuffer that is serializable through Java serialization, to make
* it easier to pass ByteBuffers in case class messages.
*/
private[spark] class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
private[spark]
class SerializableBuffer(@transient var buffer: ByteBuffer) extends Serializable {
def value = buffer
private def readObject(in: ObjectInputStream) {

View file

@ -5,7 +5,8 @@ package spark.util
* numerically robust way. Includes support for merging two StatCounters. Based on Welford and
* Chan's algorithms described at http://en.wikipedia.org/wiki/Algorithms_for_calculating_variance.
*/
private[spark] class StatCounter(values: TraversableOnce[Double]) extends Serializable {
private[spark]
class StatCounter(values: TraversableOnce[Double]) extends Serializable {
private var n: Long = 0 // Running count of our values
private var mu: Double = 0 // Running mean of our values
private var m2: Double = 0 // Running variance numerator (sum of (x - mean)^2)