Fix issue #65: Change @serializable to extends Serializable in 2.9 branch

Note that we use scala.Serializable introduced in Scala 2.9 instead of
java.io.Serializable. Also, case classes inherit from scala.Serializable by
default.
This commit is contained in:
Ismael Juma 2011-08-02 10:16:33 +01:00
parent 2e57338896
commit 0fba22b3d2
30 changed files with 58 additions and 90 deletions

View file

@ -111,8 +111,7 @@ trait Aggregator[V, A] {
def mergeAggregators(a: A, b: A): A
}
@serializable
class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] {
class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] with Serializable {
def createCombiner(msg: M): ArrayBuffer[M] =
ArrayBuffer(msg)
def mergeMsg(combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] =
@ -121,8 +120,7 @@ class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] {
a ++= b
}
@serializable
class NullAggregator[V] extends Aggregator[V, Option[Nothing]] {
class NullAggregator[V] extends Aggregator[V, Option[Nothing]] with Serializable {
def createAggregator(vert: V): Option[Nothing] = None
def mergeAggregators(a: Option[Nothing], b: Option[Nothing]): Option[Nothing] = None
}
@ -130,8 +128,8 @@ class NullAggregator[V] extends Aggregator[V, Option[Nothing]] {
/**
* Represents a Bagel vertex.
*
* Subclasses may store state along with each vertex and must be
* annotated with @serializable.
* Subclasses may store state along with each vertex and must
* inherit from java.io.Serializable or scala.Serializable.
*/
trait Vertex {
def id: String
@ -142,7 +140,7 @@ trait Vertex {
* Represents a Bagel message to a target vertex.
*
* Subclasses may contain a payload to deliver to the target vertex
* and must be annotated with @serializable.
* and must inherit from java.io.Serializable or scala.Serializable.
*/
trait Message {
def targetId: String
@ -151,8 +149,8 @@ trait Message {
/**
* Represents a directed edge between two vertices.
*
* Subclasses may store state along each edge and must be annotated
* with @serializable.
* Subclasses may store state along each edge and must inherit from
* java.io.Serializable or scala.Serializable.
*/
trait Edge {
def targetId: String

View file

@ -81,8 +81,7 @@ object ShortestPath {
}
}
@serializable
object MinCombiner extends Combiner[SPMessage, Int] {
object MinCombiner extends Combiner[SPMessage, Int] with Serializable {
def createCombiner(msg: SPMessage): Int =
msg.value
def mergeMsg(combiner: Int, msg: SPMessage): Int =
@ -91,6 +90,6 @@ object MinCombiner extends Combiner[SPMessage, Int] {
min(a, b)
}
@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex
@serializable class SPEdge(val targetId: String, val value: Int) extends Edge
@serializable class SPMessage(val targetId: String, val value: Int) extends Message
class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex with Serializable
class SPEdge(val targetId: String, val value: Int) extends Edge with Serializable
class SPMessage(val targetId: String, val value: Int) extends Message with Serializable

View file

@ -76,8 +76,7 @@ object WikipediaPageRank {
}
}
@serializable
object PRCombiner extends Combiner[PRMessage, Double] {
object PRCombiner extends Combiner[PRMessage, Double] with Serializable {
def createCombiner(msg: PRMessage): Double =
msg.value
def mergeMsg(combiner: Double, msg: PRMessage): Double =
@ -105,8 +104,7 @@ object PRCombiner extends Combiner[PRMessage, Double] {
}
}
@serializable
object PRNoCombiner extends DefaultCombiner[PRMessage] {
object PRNoCombiner extends DefaultCombiner[PRMessage] with Serializable {
def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[ArrayBuffer[PRMessage]], superstep: Int): (PRVertex, Iterable[PRMessage]) =
PRCombiner.compute(numVertices, epsilon)(self, messages match {
case Some(msgs) => Some(msgs.map(_.value).sum)
@ -114,7 +112,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}, superstep)
}
@serializable class PRVertex() extends Vertex {
class PRVertex() extends Vertex with Serializable {
var id: String = _
var value: Double = _
var outEdges: ArrayBuffer[PREdge] = _
@ -129,7 +127,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}
}
@serializable class PRMessage() extends Message {
class PRMessage() extends Message with Serializable {
var targetId: String = _
var value: Double = _
@ -140,7 +138,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}
}
@serializable class PREdge() extends Edge {
class PREdge() extends Edge with Serializable {
var targetId: String = _
def this(targetId: String) {

View file

@ -12,8 +12,8 @@ import spark._
import spark.bagel.Bagel._
@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex
@serializable class TestMessage(val targetId: String) extends Message
class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message with Serializable
class BagelSuite extends FunSuite with Assertions {
test("halting by voting") {

View file

@ -4,8 +4,8 @@ import java.io._
import scala.collection.mutable.Map
@serializable class Accumulator[T](
@transient initialValue: T, param: AccumulatorParam[T])
class Accumulator[T] (
@transient initialValue: T, param: AccumulatorParam[T]) extends Serializable
{
val id = Accumulators.newId
@transient var value_ = initialValue // Current value on master
@ -32,7 +32,7 @@ import scala.collection.mutable.Map
override def toString = value_.toString
}
@serializable trait AccumulatorParam[T] {
trait AccumulatorParam[T] extends Serializable {
def addInPlace(t1: T, t2: T): T
def zero(initialValue: T): T
}

View file

@ -1,8 +1,7 @@
package spark
@serializable
class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C
)
) extends Serializable

View file

@ -1,14 +1,13 @@
package spark
@serializable class CartesianSplit(idx: Int, val s1: Split, val s2: Split)
extends Split {
class CartesianSplit(idx: Int, val s1: Split, val s2: Split)
extends Split with Serializable {
override val index = idx
}
@serializable
class CartesianRDD[T: ClassManifest, U:ClassManifest](
sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U])
extends RDD[Pair[T, U]](sc) {
extends RDD[Pair[T, U]](sc) with Serializable {
val numSplitsInRdd2 = rdd2.splits.size
@transient val splits_ = {

View file

@ -6,24 +6,21 @@ import java.io.ObjectInputStream
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@serializable
sealed trait CoGroupSplitDep
sealed trait CoGroupSplitDep extends Serializable
case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
@serializable
class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep])
extends Split {
extends Split with Serializable {
override val index = idx
override def hashCode(): Int = idx
}
@serializable
class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] (
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
{ (b1, b2) => b1 ++ b2 }
)
) with Serializable
class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging {

View file

@ -1,7 +1,6 @@
package spark
@serializable
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean)
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable
abstract class NarrowDependency[T](rdd: RDD[T])
extends Dependency(rdd, false) {

View file

@ -13,8 +13,8 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
/** A Spark split class that wraps around a Hadoop InputSplit */
@serializable class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
extends Split {
class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
extends Split with Serializable {
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt

View file

@ -20,7 +20,7 @@ import spark.Logging
* also contain an output key class, an output value class, a filename to write to, etc
* exactly like in a Hadoop job.
*/
@serializable class HadoopWriter(@transient jobConf: JobConf) extends Logging {
class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable {
private val now = new Date()
private val conf = new SerializableWritable(jobConf)

View file

@ -30,8 +30,7 @@ import SparkContext._
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
*/
@serializable
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging {
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging with Serializable {
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
for ((k, v) <- m2) {

View file

@ -2,9 +2,9 @@ package spark
import java.util.concurrent.atomic.AtomicLong
@serializable class ParallelCollectionSplit[T: ClassManifest](
class ParallelCollectionSplit[T: ClassManifest](
val rddId: Long, val slice: Int, values: Seq[T])
extends Split {
extends Split with Serializable {
def iterator(): Iterator[T] = values.iterator
override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt

View file

@ -1,7 +1,6 @@
package spark
@serializable
abstract class Partitioner {
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}

View file

@ -44,8 +44,7 @@ import SparkContext._
* In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs,
* and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles.
*/
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable {
// Methods that must be implemented by subclasses
def splits: Array[Split]
def compute(split: Split): Iterator[T]

View file

@ -2,7 +2,7 @@ package spark
import java.util.Random
@serializable class SampledRDDSplit(val prev: Split, val seed: Int) extends Split {
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
override val index = prev.index
}

View file

@ -31,8 +31,7 @@ import SparkContext._
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
* we need more implicit parameters to convert our keys and values to Writable.
*/
@serializable
class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging {
class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging with Serializable {
def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))

View file

@ -6,8 +6,7 @@ import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.JobConf
@serializable
class SerializableWritable[T <: Writable](@transient var t: T) {
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString

View file

@ -363,5 +363,4 @@ object SparkContext {
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
*/
@serializable
class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) {}
class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable

View file

@ -3,7 +3,7 @@ package spark
/**
* A partition of an RDD.
*/
@serializable trait Split {
trait Split extends Serializable {
/**
* Get the split's index within its parent RDD
*/

View file

@ -1,11 +1,8 @@
package spark
@serializable
class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) {
}
class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) extends Serializable
@serializable
abstract class Task[T] {
abstract class Task[T] extends Serializable {
def run (id: Int): T
def preferredLocations: Seq[String] = Nil
def generation: Option[Long] = None

View file

@ -5,5 +5,4 @@ import scala.collection.mutable.Map
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
// what seems to be a bug with messages over 60KB in libprocess; fix it
@serializable
private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any])
private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any]) extends Serializable

View file

@ -2,17 +2,15 @@ package spark
import scala.collection.mutable.ArrayBuffer
@serializable
class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], split: Split)
extends Split {
extends Split with Serializable {
def iterator() = rdd.iterator(split)
def preferredLocations() = rdd.preferredLocations(split)
override val index = idx
}
@serializable
class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
extends RDD[T](sc) {
extends RDD[T](sc) with Serializable {
@transient val splits_ : Array[Split] = {
val array = new Array[Split](rdds.map(_.splits.size).sum)
var pos = 0

View file

@ -10,9 +10,8 @@ import scala.math
import spark._
@serializable
class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging {
extends Broadcast[T] with Logging with Serializable {
def value = value_

View file

@ -7,8 +7,7 @@ import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import spark._
@serializable
trait Broadcast[T] {
trait Broadcast[T] extends Serializable {
val uuid = UUID.randomUUID
def value: T
@ -20,7 +19,7 @@ trait Broadcast[T] {
}
object Broadcast
extends Logging {
extends Logging with Serializable {
// Messages
val REGISTER_BROADCAST_TRACKER = 0
val UNREGISTER_BROADCAST_TRACKER = 1
@ -191,18 +190,15 @@ extends Logging {
}
}
@serializable
case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) { }
case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) extends Serializable
@serializable
case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock],
val totalBlocks: Int,
val totalBytes: Int) {
val totalBytes: Int) extends Serializable {
@transient var hasBlocks = 0
}
@serializable
class SpeedTracker {
class SpeedTracker extends Serializable {
// Mapping 'source' to '(totalTime, numBlocks)'
private var sourceToSpeedMap = Map[SourceInfo, (Long, Int)] ()

View file

@ -9,9 +9,8 @@ import scala.math
import spark._
@serializable
class ChainedBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging {
extends Broadcast[T] with Logging with Serializable {
def value = value_

View file

@ -11,9 +11,8 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import spark._
@serializable
class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging {
extends Broadcast[T] with Logging with Serializable {
def value = value_

View file

@ -10,7 +10,6 @@ import spark._
* CHANGED: Keep track of the blockSize for THIS broadcast variable.
* Broadcast.BlockSize is expected to be updated across different broadcasts
*/
@serializable
case class SourceInfo (val hostAddress: String,
val listenPort: Int,
val totalBlocks: Int = SourceInfo.UnusedParam,

View file

@ -9,9 +9,8 @@ import scala.math
import spark._
@serializable
class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging {
extends Broadcast[T] with Logging with Serializable {
def value = value_

View file

@ -1,6 +1,6 @@
package spark.examples
@serializable class Vector(val elements: Array[Double]) {
class Vector(val elements: Array[Double]) extends Serializable {
def length = elements.length
def apply(index: Int) = elements(index)