Merge pull request #76 from ijuma/deprecations-scala-2.9

Fix code not to use code deprecated in Scala 2.9
This commit is contained in:
Matei Zaharia 2011-08-02 15:47:42 -07:00
commit cfc6000ede
35 changed files with 68 additions and 100 deletions

View file

@ -111,8 +111,7 @@ trait Aggregator[V, A] {
def mergeAggregators(a: A, b: A): A
}
@serializable
class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] {
class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] with Serializable {
def createCombiner(msg: M): ArrayBuffer[M] =
ArrayBuffer(msg)
def mergeMsg(combiner: ArrayBuffer[M], msg: M): ArrayBuffer[M] =
@ -121,8 +120,7 @@ class DefaultCombiner[M] extends Combiner[M, ArrayBuffer[M]] {
a ++= b
}
@serializable
class NullAggregator[V] extends Aggregator[V, Option[Nothing]] {
class NullAggregator[V] extends Aggregator[V, Option[Nothing]] with Serializable {
def createAggregator(vert: V): Option[Nothing] = None
def mergeAggregators(a: Option[Nothing], b: Option[Nothing]): Option[Nothing] = None
}
@ -130,8 +128,8 @@ class NullAggregator[V] extends Aggregator[V, Option[Nothing]] {
/**
* Represents a Bagel vertex.
*
* Subclasses may store state along with each vertex and must be
* annotated with @serializable.
* Subclasses may store state along with each vertex and must
* inherit from java.io.Serializable or scala.Serializable.
*/
trait Vertex {
def id: String
@ -142,7 +140,7 @@ trait Vertex {
* Represents a Bagel message to a target vertex.
*
* Subclasses may contain a payload to deliver to the target vertex
* and must be annotated with @serializable.
* and must inherit from java.io.Serializable or scala.Serializable.
*/
trait Message {
def targetId: String
@ -151,8 +149,8 @@ trait Message {
/**
* Represents a directed edge between two vertices.
*
* Subclasses may store state along each edge and must be annotated
* with @serializable.
* Subclasses may store state along each edge and must inherit from
* java.io.Serializable or scala.Serializable.
*/
trait Edge {
def targetId: String

View file

@ -81,8 +81,7 @@ object ShortestPath {
}
}
@serializable
object MinCombiner extends Combiner[SPMessage, Int] {
object MinCombiner extends Combiner[SPMessage, Int] with Serializable {
def createCombiner(msg: SPMessage): Int =
msg.value
def mergeMsg(combiner: Int, msg: SPMessage): Int =
@ -91,6 +90,6 @@ object MinCombiner extends Combiner[SPMessage, Int] {
min(a, b)
}
@serializable class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex
@serializable class SPEdge(val targetId: String, val value: Int) extends Edge
@serializable class SPMessage(val targetId: String, val value: Int) extends Message
class SPVertex(val id: String, val value: Int, val outEdges: Seq[SPEdge], val active: Boolean) extends Vertex with Serializable
class SPEdge(val targetId: String, val value: Int) extends Edge with Serializable
class SPMessage(val targetId: String, val value: Int) extends Message with Serializable

View file

@ -76,8 +76,7 @@ object WikipediaPageRank {
}
}
@serializable
object PRCombiner extends Combiner[PRMessage, Double] {
object PRCombiner extends Combiner[PRMessage, Double] with Serializable {
def createCombiner(msg: PRMessage): Double =
msg.value
def mergeMsg(combiner: Double, msg: PRMessage): Double =
@ -105,8 +104,7 @@ object PRCombiner extends Combiner[PRMessage, Double] {
}
}
@serializable
object PRNoCombiner extends DefaultCombiner[PRMessage] {
object PRNoCombiner extends DefaultCombiner[PRMessage] with Serializable {
def compute(numVertices: Long, epsilon: Double)(self: PRVertex, messages: Option[ArrayBuffer[PRMessage]], superstep: Int): (PRVertex, Iterable[PRMessage]) =
PRCombiner.compute(numVertices, epsilon)(self, messages match {
case Some(msgs) => Some(msgs.map(_.value).sum)
@ -114,7 +112,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}, superstep)
}
@serializable class PRVertex() extends Vertex {
class PRVertex() extends Vertex with Serializable {
var id: String = _
var value: Double = _
var outEdges: ArrayBuffer[PREdge] = _
@ -129,7 +127,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}
}
@serializable class PRMessage() extends Message {
class PRMessage() extends Message with Serializable {
var targetId: String = _
var value: Double = _
@ -140,7 +138,7 @@ object PRNoCombiner extends DefaultCombiner[PRMessage] {
}
}
@serializable class PREdge() extends Edge {
class PREdge() extends Edge with Serializable {
var targetId: String = _
def this(targetId: String) {

View file

@ -12,8 +12,8 @@ import spark._
import spark.bagel.Bagel._
@serializable class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex
@serializable class TestMessage(val targetId: String) extends Message
class TestVertex(val id: String, val active: Boolean, val age: Int) extends Vertex with Serializable
class TestMessage(val targetId: String) extends Message with Serializable
class BagelSuite extends FunSuite with Assertions {
test("halting by voting") {

View file

@ -4,8 +4,8 @@ import java.io._
import scala.collection.mutable.Map
@serializable class Accumulator[T](
@transient initialValue: T, param: AccumulatorParam[T])
class Accumulator[T] (
@transient initialValue: T, param: AccumulatorParam[T]) extends Serializable
{
val id = Accumulators.newId
@transient var value_ = initialValue // Current value on master
@ -32,7 +32,7 @@ import scala.collection.mutable.Map
override def toString = value_.toString
}
@serializable trait AccumulatorParam[T] {
trait AccumulatorParam[T] extends Serializable {
def addInPlace(t1: T, t2: T): T
def zero(initialValue: T): T
}
@ -52,20 +52,20 @@ private object Accumulators
if (original) {
originals(a.id) = a
} else {
val accums = localAccums.getOrElseUpdate(currentThread, Map())
val accums = localAccums.getOrElseUpdate(Thread.currentThread, Map())
accums(a.id) = a
}
}
// Clear the local (non-original) accumulators for the current thread
def clear: Unit = synchronized {
localAccums.remove(currentThread)
localAccums.remove(Thread.currentThread)
}
// Get the values of the local accumulators for the current thread (by ID)
def values: Map[Long, Any] = synchronized {
val ret = Map[Long, Any]()
for ((id, accum) <- localAccums.getOrElse(currentThread, Map()))
for ((id, accum) <- localAccums.getOrElse(Thread.currentThread, Map()))
ret(id) = accum.value
return ret
}

View file

@ -1,8 +1,7 @@
package spark
@serializable
class Aggregator[K, V, C] (
val createCombiner: V => C,
val mergeValue: (C, V) => C,
val mergeCombiners: (C, C) => C
)
) extends Serializable

View file

@ -1,14 +1,13 @@
package spark
@serializable class CartesianSplit(idx: Int, val s1: Split, val s2: Split)
extends Split {
class CartesianSplit(idx: Int, val s1: Split, val s2: Split)
extends Split with Serializable {
override val index = idx
}
@serializable
class CartesianRDD[T: ClassManifest, U:ClassManifest](
sc: SparkContext, rdd1: RDD[T], rdd2: RDD[U])
extends RDD[Pair[T, U]](sc) {
extends RDD[Pair[T, U]](sc) with Serializable {
val numSplitsInRdd2 = rdd2.splits.size
@transient val splits_ = {

View file

@ -6,24 +6,21 @@ import java.io.ObjectInputStream
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
@serializable
sealed trait CoGroupSplitDep
sealed trait CoGroupSplitDep extends Serializable
case class NarrowCoGroupSplitDep(rdd: RDD[_], split: Split) extends CoGroupSplitDep
case class ShuffleCoGroupSplitDep(shuffleId: Int) extends CoGroupSplitDep
@serializable
class CoGroupSplit(idx: Int, val deps: Seq[CoGroupSplitDep])
extends Split {
extends Split with Serializable {
override val index = idx
override def hashCode(): Int = idx
}
@serializable
class CoGroupAggregator extends Aggregator[Any, Any, ArrayBuffer[Any]] (
{ x => ArrayBuffer(x) },
{ (b, x) => b += x },
{ (b1, b2) => b1 ++ b2 }
)
) with Serializable
class CoGroupedRDD[K](rdds: Seq[RDD[(_, _)]], part: Partitioner)
extends RDD[(K, Seq[Seq[_]])](rdds.head.context) with Logging {

View file

@ -1,7 +1,6 @@
package spark
@serializable
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean)
abstract class Dependency[T](val rdd: RDD[T], val isShuffle: Boolean) extends Serializable
abstract class NarrowDependency[T](rdd: RDD[T])
extends Dependency(rdd, false) {

View file

@ -13,8 +13,8 @@ import org.apache.hadoop.mapred.Reporter
import org.apache.hadoop.util.ReflectionUtils
/** A Spark split class that wraps around a Hadoop InputSplit */
@serializable class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
extends Split {
class HadoopSplit(rddId: Int, idx: Int, @transient s: InputSplit)
extends Split with Serializable {
val inputSplit = new SerializableWritable[InputSplit](s)
override def hashCode(): Int = (41 * (41 + rddId) + idx).toInt

View file

@ -20,7 +20,7 @@ import spark.Logging
* also contain an output key class, an output value class, a filename to write to, etc
* exactly like in a Hadoop job.
*/
@serializable class HadoopWriter(@transient jobConf: JobConf) extends Logging {
class HadoopWriter(@transient jobConf: JobConf) extends Logging with Serializable {
private val now = new Date()
private val conf = new SerializableWritable(jobConf)

View file

@ -12,7 +12,7 @@ class JavaSerializationStream(out: OutputStream) extends SerializationStream {
class JavaDeserializationStream(in: InputStream) extends DeserializationStream {
val objIn = new ObjectInputStream(in) {
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, currentThread.getContextClassLoader)
Class.forName(desc.getName, false, Thread.currentThread.getContextClassLoader)
}
def readObject[T](): T = objIn.readObject().asInstanceOf[T]

View file

@ -33,7 +33,7 @@ private class LocalScheduler(threads: Int) extends DAGScheduler with Logging {
val bytes = Utils.serialize(tasks(i))
logInfo("Size of task " + i + " is " + bytes.size + " bytes")
val deserializedTask = Utils.deserialize[Task[_]](
bytes, currentThread.getContextClassLoader)
bytes, Thread.currentThread.getContextClassLoader)
val result: Any = deserializedTask.run(myAttemptId)
val accumUpdates = Accumulators.values
logInfo("Finished task " + i)

View file

@ -30,8 +30,7 @@ import SparkContext._
/**
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
*/
@serializable
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging {
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging with Serializable {
def reduceByKeyToDriver(func: (V, V) => V): Map[K, V] = {
def mergeMaps(m1: HashMap[K, V], m2: HashMap[K, V]): HashMap[K, V] = {
for ((k, v) <- m2) {

View file

@ -2,9 +2,9 @@ package spark
import java.util.concurrent.atomic.AtomicLong
@serializable class ParallelCollectionSplit[T: ClassManifest](
class ParallelCollectionSplit[T: ClassManifest](
val rddId: Long, val slice: Int, values: Seq[T])
extends Split {
extends Split with Serializable {
def iterator(): Iterator[T] = values.iterator
override def hashCode(): Int = (41 * (41 + rddId) + slice).toInt

View file

@ -1,7 +1,6 @@
package spark
@serializable
abstract class Partitioner {
abstract class Partitioner extends Serializable {
def numPartitions: Int
def getPartition(key: Any): Int
}

View file

@ -44,8 +44,7 @@ import SparkContext._
* In addition, PairRDDFunctions contains extra methods available on RDDs of key-value pairs,
* and SequenceFileRDDFunctions contains extra methods for saving RDDs to Hadoop SequenceFiles.
*/
@serializable
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) {
abstract class RDD[T: ClassManifest](@transient sc: SparkContext) extends Serializable {
// Methods that must be implemented by subclasses
def splits: Array[Split]
def compute(split: Split): Iterator[T]

View file

@ -2,7 +2,7 @@ package spark
import java.util.Random
@serializable class SampledRDDSplit(val prev: Split, val seed: Int) extends Split {
class SampledRDDSplit(val prev: Split, val seed: Int) extends Split with Serializable {
override val index = prev.index
}

View file

@ -31,8 +31,7 @@ import SparkContext._
* through an implicit conversion. Note that this can't be part of PairRDDFunctions because
* we need more implicit parameters to convert our keys and values to Writable.
*/
@serializable
class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging {
class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](self: RDD[(K,V)]) extends Logging with Serializable {
def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure))

View file

@ -6,8 +6,7 @@ import org.apache.hadoop.io.ObjectWritable
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.JobConf
@serializable
class SerializableWritable[T <: Writable](@transient var t: T) {
class SerializableWritable[T <: Writable](@transient var t: T) extends Serializable {
def value = t
override def toString = t.toString

View file

@ -27,7 +27,7 @@ extends Job(jobId) with Logging
// Maximum times a task is allowed to fail before failing the job
val MAX_TASK_FAILURES = 4
val callingThread = currentThread
val callingThread = Thread.currentThread
val tasks = tasksSeq.toArray
val numTasks = tasks.length
val launched = new Array[Boolean](numTasks)

View file

@ -257,7 +257,7 @@ extends Logging {
def defaultParallelism: Int = scheduler.defaultParallelism
// Default min number of splits for Hadoop RDDs when not given by user
def defaultMinSplits: Int = Math.min(defaultParallelism, 2)
def defaultMinSplits: Int = math.min(defaultParallelism, 2)
private var nextShuffleId = new AtomicInteger(0)
@ -363,5 +363,4 @@ object SparkContext {
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
*/
@serializable
class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) {}
class WritableConverter[T](val writableClass: ClassManifest[T] => Class[_ <: Writable], val convert: Writable => T) extends Serializable

View file

@ -3,7 +3,7 @@ package spark
/**
* A partition of an RDD.
*/
@serializable trait Split {
trait Split extends Serializable {
/**
* Get the split's index within its parent RDD
*/

View file

@ -1,11 +1,8 @@
package spark
@serializable
class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) {
}
class TaskContext(val stageId: Int, val splitId: Int, val attemptId: Int) extends Serializable
@serializable
abstract class Task[T] {
abstract class Task[T] extends Serializable {
def run (id: Int): T
def preferredLocations: Seq[String] = Nil
def generation: Option[Long] = None

View file

@ -5,5 +5,4 @@ import scala.collection.mutable.Map
// Task result. Also contains updates to accumulator variables.
// TODO: Use of distributed cache to return result is a hack to get around
// what seems to be a bug with messages over 60KB in libprocess; fix it
@serializable
private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any])
private class TaskResult[T](val value: T, val accumUpdates: Map[Long, Any]) extends Serializable

View file

@ -2,17 +2,15 @@ package spark
import scala.collection.mutable.ArrayBuffer
@serializable
class UnionSplit[T: ClassManifest](idx: Int, rdd: RDD[T], split: Split)
extends Split {
extends Split with Serializable {
def iterator() = rdd.iterator(split)
def preferredLocations() = rdd.preferredLocations(split)
override val index = idx
}
@serializable
class UnionRDD[T: ClassManifest](sc: SparkContext, rdds: Seq[RDD[T]])
extends RDD[T](sc) {
extends RDD[T](sc) with Serializable {
@transient val splits_ : Array[Split] = {
val array = new Array[Split](rdds.map(_.splits.size).sum)
var pos = 0

View file

@ -10,9 +10,8 @@ import scala.math
import spark._
@serializable
class BitTorrentBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging {
extends Broadcast[T] with Logging with Serializable {
def value = value_

View file

@ -7,8 +7,7 @@ import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import spark._
@serializable
trait Broadcast[T] {
trait Broadcast[T] extends Serializable {
val uuid = UUID.randomUUID
def value: T
@ -20,7 +19,7 @@ trait Broadcast[T] {
}
object Broadcast
extends Logging {
extends Logging with Serializable {
// Messages
val REGISTER_BROADCAST_TRACKER = 0
val UNREGISTER_BROADCAST_TRACKER = 1
@ -183,7 +182,7 @@ extends Logging {
private def byteArrayToObject[OUT](bytes: Array[Byte]): OUT = {
val in = new ObjectInputStream (new ByteArrayInputStream (bytes)){
override def resolveClass(desc: ObjectStreamClass) =
Class.forName(desc.getName, false, currentThread.getContextClassLoader)
Class.forName(desc.getName, false, Thread.currentThread.getContextClassLoader)
}
val retVal = in.readObject.asInstanceOf[OUT]
in.close()
@ -191,18 +190,15 @@ extends Logging {
}
}
@serializable
case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) { }
case class BroadcastBlock (val blockID: Int, val byteArray: Array[Byte]) extends Serializable
@serializable
case class VariableInfo (@transient val arrayOfBlocks : Array[BroadcastBlock],
val totalBlocks: Int,
val totalBytes: Int) {
val totalBytes: Int) extends Serializable {
@transient var hasBlocks = 0
}
@serializable
class SpeedTracker {
class SpeedTracker extends Serializable {
// Mapping 'source' to '(totalTime, numBlocks)'
private var sourceToSpeedMap = Map[SourceInfo, (Long, Int)] ()

View file

@ -9,9 +9,8 @@ import scala.math
import spark._
@serializable
class ChainedBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging {
extends Broadcast[T] with Logging with Serializable {
def value = value_

View file

@ -11,9 +11,8 @@ import org.apache.hadoop.fs.{FileSystem, Path, RawLocalFileSystem}
import spark._
@serializable
class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging {
class DfsBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging with Serializable {
def value = value_

View file

@ -10,7 +10,6 @@ import spark._
* CHANGED: Keep track of the blockSize for THIS broadcast variable.
* Broadcast.BlockSize is expected to be updated across different broadcasts
*/
@serializable
case class SourceInfo (val hostAddress: String,
val listenPort: Int,
val totalBlocks: Int = SourceInfo.UnusedParam,

View file

@ -9,9 +9,8 @@ import scala.math
import spark._
@serializable
class TreeBroadcast[T](@transient var value_ : T, isLocal: Boolean)
extends Broadcast[T] with Logging {
extends Broadcast[T] with Logging with Serializable {
def value = value_

View file

@ -1,6 +1,6 @@
package spark.examples
@serializable class Vector(val elements: Array[Double]) {
class Vector(val elements: Array[Double]) extends Serializable {
def length = elements.length
def apply(index: Int) = elements(index)

View file

@ -17,7 +17,7 @@ object SparkBuild extends Build {
organization := "org.spark-project",
version := "0.4-SNAPSHOT",
scalaVersion := "2.9.0-1",
scalacOptions := Seq(/*"-deprecation",*/ "-unchecked"), // TODO Enable -deprecation and fix all warnings
scalacOptions := Seq(/*"-deprecation",*/ "-unchecked"), // -deprecation is too noisy due to usage of old Hadoop API, enable it once that's no longer an issue
unmanagedJars in Compile <<= baseDirectory map { base => (base ** "*.jar").classpath },
retrieveManaged := true,
transitiveClassifiers in Scope.GlobalScope := Seq("sources"),

View file

@ -25,7 +25,7 @@ class ReplSuite extends FunSuite {
val interp = new SparkILoop(in, new PrintWriter(out), master)
spark.repl.Main.interp = interp
val separator = System.getProperty("path.separator")
interp.main(Array("-classpath", paths.mkString(separator)))
interp.process(Array("-classpath", paths.mkString(separator)))
spark.repl.Main.interp = null
if (interp.sparkContext != null)
interp.sparkContext.stop()