Fixed Warning: ClassManifest -> ClassTag

This commit is contained in:
Prashant Sharma 2013-04-29 16:39:13 +05:30
parent 4b4a36ea7d
commit 8f3ac240cb
73 changed files with 531 additions and 438 deletions

View file

@ -7,6 +7,7 @@ import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import scala.reflect.{ ClassTag, classTag}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
@ -29,7 +30,7 @@ import spark.Partitioner._
* Extra functions available on RDDs of (key, value) pairs through an implicit conversion.
* Import `spark.SparkContext._` at the top of your program to use these functions.
*/
class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
class PairRDDFunctions[K: ClassTag, V: ClassTag](
self: RDD[(K, V)])
extends Logging
with HadoopMapReduceUtil
@ -394,7 +395,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
val cg = new CoGroupedRDD[K](
Seq(self.asInstanceOf[RDD[(K, _)]], other.asInstanceOf[RDD[(K, _)]]),
partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag)
prfs.mapValues {
case Seq(vs, ws) =>
(vs.asInstanceOf[Seq[V]], ws.asInstanceOf[Seq[W]])
@ -415,7 +416,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
other1.asInstanceOf[RDD[(K, _)]],
other2.asInstanceOf[RDD[(K, _)]]),
partitioner)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classManifest[K], Manifests.seqSeqManifest)
val prfs = new PairRDDFunctions[K, Seq[Seq[_]]](cg)(classTag[K], ClassTags.seqSeqClassTag)
prfs.mapValues {
case Seq(vs, w1s, w2s) =>
(vs.asInstanceOf[Seq[V]], w1s.asInstanceOf[Seq[W1]], w2s.asInstanceOf[Seq[W2]])
@ -469,19 +470,19 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
/**
* Return an RDD with the pairs from `this` whose keys are not in `other`.
*
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
def subtractByKey[W: ClassManifest](other: RDD[(K, W)]): RDD[(K, V)] =
def subtractByKey[W: ClassTag](other: RDD[(K, W)]): RDD[(K, V)] =
subtractByKey(other, self.partitioner.getOrElse(new HashPartitioner(self.partitions.size)))
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W: ClassManifest](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
def subtractByKey[W: ClassTag](other: RDD[(K, W)], numPartitions: Int): RDD[(K, V)] =
subtractByKey(other, new HashPartitioner(numPartitions))
/** Return an RDD with the pairs from `this` whose keys are not in `other`. */
def subtractByKey[W: ClassManifest](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
def subtractByKey[W: ClassTag](other: RDD[(K, W)], p: Partitioner): RDD[(K, V)] =
new SubtractedRDD[K, V, W](self, other, p)
/**
@ -510,7 +511,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Output the RDD to any Hadoop-supported file system, using a Hadoop `OutputFormat` class
* supporting the key and value types K and V in this RDD.
*/
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
def saveAsHadoopFile[F <: OutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
saveAsHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
@ -518,7 +519,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Output the RDD to any Hadoop-supported file system, using a new Hadoop API `OutputFormat`
* (mapreduce.OutputFormat) object supporting the key and value types K and V in this RDD.
*/
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassManifest[F]) {
def saveAsNewAPIHadoopFile[F <: NewOutputFormat[K, V]](path: String)(implicit fm: ClassTag[F]) {
saveAsNewAPIHadoopFile(path, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
@ -644,15 +645,15 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* Return an RDD with the keys of each tuple.
*/
def keys: RDD[K] = self.map(_._1)
/**
* Return an RDD with the values of each tuple.
*/
def values: RDD[V] = self.map(_._2)
private[spark] def getKeyClass() = implicitly[ClassManifest[K]].erasure
private[spark] def getKeyClass() = implicitly[ClassTag[K]].erasure
private[spark] def getValueClass() = implicitly[ClassManifest[V]].erasure
private[spark] def getValueClass() = implicitly[ClassTag[V]].erasure
}
/**
@ -660,7 +661,7 @@ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](
* an implicit conversion. Import `spark.SparkContext._` at the top of your program to use these
* functions. They will work with any key type that has a `scala.math.Ordered` implementation.
*/
class OrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
class OrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
self: RDD[(K, V)])
extends Logging
with Serializable {
@ -704,6 +705,6 @@ class FlatMappedValuesRDD[K, V, U](prev: RDD[(K, V)], f: V => TraversableOnce[U]
}
}
private[spark] object Manifests {
val seqSeqManifest = classManifest[Seq[Seq[_]]]
private[spark] object ClassTags {
val seqSeqClassTag = classTag[Seq[Seq[_]]]
}

View file

@ -1,5 +1,7 @@
package spark
import scala.reflect.ClassTag
/**
* An object that defines how the elements in a key-value pair RDD are partitioned by key.
* Maps each key to a partition ID, from 0 to `numPartitions - 1`.
@ -60,7 +62,7 @@ class HashPartitioner(partitions: Int) extends Partitioner {
}
}
}
override def equals(other: Any): Boolean = other match {
case h: HashPartitioner =>
h.numPartitions == numPartitions
@ -73,10 +75,10 @@ class HashPartitioner(partitions: Int) extends Partitioner {
* A [[spark.Partitioner]] that partitions sortable records by range into roughly equal ranges.
* Determines the ranges by sampling the RDD passed in.
*/
class RangePartitioner[K <% Ordered[K]: ClassManifest, V](
class RangePartitioner[K <% Ordered[K]: ClassTag, V](
partitions: Int,
@transient rdd: RDD[(K,V)],
private val ascending: Boolean = true)
private val ascending: Boolean = true)
extends Partitioner {
// An array of upper bounds for the first (partitions - 1) partitions

View file

@ -8,6 +8,7 @@ import scala.collection.Map
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.reflect.{classTag, ClassTag}
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable
@ -65,7 +66,7 @@ import SparkContext._
* [[http://www.cs.berkeley.edu/~matei/papers/2012/nsdi_spark.pdf Spark paper]] for more details
* on RDD internals.
*/
abstract class RDD[T: ClassManifest](
abstract class RDD[T: ClassTag](
@transient private var sc: SparkContext,
@transient private var deps: Seq[Dependency[_]]
) extends Serializable with Logging {
@ -213,13 +214,13 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[U: ClassManifest](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
def map[U: ClassTag](f: T => U): RDD[U] = new MappedRDD(this, sc.clean(f))
/**
* Return a new RDD by first applying a function to all elements of this
* RDD, and then flattening the results.
*/
def flatMap[U: ClassManifest](f: T => TraversableOnce[U]): RDD[U] =
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] =
new FlatMappedRDD(this, sc.clean(f))
/**
@ -307,25 +308,25 @@ abstract class RDD[T: ClassManifest](
* Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of
* elements (a, b) where a is in `this` and b is in `other`.
*/
def cartesian[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new CartesianRDD(sc, this, other)
/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassManifest](f: T => K): RDD[(K, Seq[T])] =
def groupBy[K: ClassTag](f: T => K): RDD[(K, Seq[T])] =
groupBy[K](f, defaultPartitioner(this))
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K: ClassManifest](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
def groupBy[K: ClassTag](f: T => K, numPartitions: Int): RDD[(K, Seq[T])] =
groupBy(f, new HashPartitioner(numPartitions))
/**
* Return an RDD of grouped items.
*/
def groupBy[K: ClassManifest](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
def groupBy[K: ClassTag](f: T => K, p: Partitioner): RDD[(K, Seq[T])] = {
val cleanF = sc.clean(f)
this.map(t => (cleanF(t), t)).groupByKey(p)
}
@ -349,7 +350,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return a new RDD by applying a function to each partition of this RDD.
*/
def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U],
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] =
new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
@ -357,7 +358,7 @@ abstract class RDD[T: ClassManifest](
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
* of the original partition.
*/
def mapPartitionsWithIndex[U: ClassManifest](
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] =
new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
@ -367,7 +368,7 @@ abstract class RDD[T: ClassManifest](
* of the original partition.
*/
@deprecated("use mapPartitionsWithIndex", "0.7.0")
def mapPartitionsWithSplit[U: ClassManifest](
def mapPartitionsWithSplit[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U] =
new MapPartitionsWithIndexRDD(this, sc.clean(f), preservesPartitioning)
@ -377,7 +378,7 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
def mapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
def mapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)
(f:(T, A) => U): RDD[U] = {
def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
val a = constructA(index)
@ -391,7 +392,7 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
def flatMapWith[A: ClassManifest, U: ClassManifest](constructA: Int => A, preservesPartitioning: Boolean = false)
def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)
(f:(T, A) => Seq[U]): RDD[U] = {
def iterF(index: Int, iter: Iterator[T]): Iterator[U] = {
val a = constructA(index)
@ -405,7 +406,7 @@ abstract class RDD[T: ClassManifest](
* This additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
def foreachWith[A: ClassManifest](constructA: Int => A)
def foreachWith[A: ClassTag](constructA: Int => A)
(f:(T, A) => Unit) {
def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
val a = constructA(index)
@ -419,7 +420,7 @@ abstract class RDD[T: ClassManifest](
* additional parameter is produced by constructA, which is called in each
* partition with the index of that partition.
*/
def filterWith[A: ClassManifest](constructA: Int => A)
def filterWith[A: ClassTag](constructA: Int => A)
(p:(T, A) => Boolean): RDD[T] = {
def iterF(index: Int, iter: Iterator[T]): Iterator[T] = {
val a = constructA(index)
@ -434,7 +435,7 @@ abstract class RDD[T: ClassManifest](
* partitions* and the *same number of elements in each partition* (e.g. one was made through
* a map on the other).
*/
def zip[U: ClassManifest](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = new ZippedRDD(sc, this, other)
// Actions (launch a job to return a value to the user program)
@ -470,7 +471,7 @@ abstract class RDD[T: ClassManifest](
/**
* Return an RDD that contains all matching values by applying `f`.
*/
def collect[U: ClassManifest](f: PartialFunction[T, U]): RDD[U] = {
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = {
filter(f.isDefinedAt).map(f)
}
@ -560,7 +561,7 @@ abstract class RDD[T: ClassManifest](
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*/
def aggregate[U: ClassManifest](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U = {
// Clone the zero value since we will also be serializing it as part of tasks
var jobResult = Utils.clone(zeroValue, sc.env.closureSerializer.newInstance())
val cleanSeqOp = sc.clean(seqOp)
@ -607,7 +608,7 @@ abstract class RDD[T: ClassManifest](
* combine step happens locally on the master, equivalent to running a single reduce task.
*/
def countByValue(): Map[T, Long] = {
if (elementClassManifest.erasure.isArray) {
if (elementClassTag.erasure.isArray) {
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
@ -638,7 +639,7 @@ abstract class RDD[T: ClassManifest](
timeout: Long,
confidence: Double = 0.95
): PartialResult[Map[T, BoundedDouble]] = {
if (elementClassManifest.erasure.isArray) {
if (elementClassTag.erasure.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
@ -751,12 +752,12 @@ abstract class RDD[T: ClassManifest](
/** Record user function generating this RDD. */
private[spark] val origin = Utils.getSparkCallSite
private[spark] def elementClassManifest: ClassManifest[T] = classManifest[T]
private[spark] def elementClassTag: ClassTag[T] = classTag[T]
private[spark] var checkpointData: Option[RDDCheckpointData[T]] = None
/** Returns the first parent RDD */
protected[spark] def firstParent[U: ClassManifest] = {
protected[spark] def firstParent[U: ClassTag] = {
dependencies.head.rdd.asInstanceOf[RDD[U]]
}

View file

@ -1,7 +1,11 @@
package spark
import scala.reflect.ClassTag
import org.apache.hadoop.fs.Path
import rdd.{CheckpointRDD, CoalescedRDD}
import scheduler.{ResultTask, ShuffleMapTask}
/**
@ -19,7 +23,7 @@ private[spark] object CheckpointState extends Enumeration {
* manages the post-checkpoint state by providing the updated partitions, iterator and preferred locations
* of the checkpointed RDD.
*/
private[spark] class RDDCheckpointData[T: ClassManifest](rdd: RDD[T])
private[spark] class RDDCheckpointData[T: ClassTag](rdd: RDD[T])
extends Logging with Serializable {
import CheckpointState._

View file

@ -11,6 +11,7 @@ import java.util.Date
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.Map
import scala.collection.mutable.HashMap
import scala.reflect.{ classTag, ClassTag}
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
@ -32,15 +33,15 @@ import spark.SparkContext._
*
* Users should import `spark.SparkContext._` at the top of their program to use these functions.
*/
class SequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable : ClassManifest](
class SequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable : ClassTag](
self: RDD[(K, V)])
extends Logging
with Serializable {
private def getWritableClass[T <% Writable: ClassManifest](): Class[_ <: Writable] = {
private def getWritableClass[T <% Writable: ClassTag](): Class[_ <: Writable] = {
val c = {
if (classOf[Writable].isAssignableFrom(classManifest[T].erasure)) {
classManifest[T].erasure
if (classOf[Writable].isAssignableFrom(classTag[T].erasure)) {
classTag[T].erasure
} else {
// We get the type of the Writable class by looking at the apply method which converts
// from T to Writable. Since we have two apply methods we filter out the one which

View file

@ -8,6 +8,7 @@ import scala.collection.Map
import scala.collection.generic.Growable
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import scala.reflect.{ ClassTag, classTag}
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
@ -210,19 +211,19 @@ class SparkContext(
// Methods for creating RDDs
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
/** Distribute a local Scala collection to form an RDD. */
def makeRDD[T: ClassManifest](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}
/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
def makeRDD[T: ClassManifest](seq: Seq[(T, Seq[String])]): RDD[T] = {
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
new ParallelCollectionRDD[T](this, seq.map(_._1), seq.size, indexToPrefs)
}
@ -265,7 +266,7 @@ class SparkContext(
}
/**
* Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
* Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
@ -273,7 +274,7 @@ class SparkContext(
* }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String, minSplits: Int)
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F])
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F])
: RDD[(K, V)] = {
hadoopFile(path,
fm.erasure.asInstanceOf[Class[F]],
@ -283,7 +284,7 @@ class SparkContext(
}
/**
* Smarter version of hadoopFile() that uses class manifests to figure out the classes of keys,
* Smarter version of hadoopFile() that uses class tags to figure out the classes of keys,
* values and the InputFormat so that users don't need to pass them directly. Instead, callers
* can just write, for example,
* {{{
@ -291,12 +292,12 @@ class SparkContext(
* }}}
*/
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] =
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinSplits)
/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](path: String)
(implicit km: ClassManifest[K], vm: ClassManifest[V], fm: ClassManifest[F]): RDD[(K, V)] = {
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
newAPIHadoopFile(
path,
fm.erasure.asInstanceOf[Class[F]],
@ -359,11 +360,11 @@ class SparkContext(
* IntWritable). The most natural thing would've been to have implicit objects for the
* converters, but then we couldn't have an object for every subclass of Writable (you can't
* have a parameterized singleton object). We use functions instead to create a new converter
* for the appropriate type. In addition, we pass the converter a ClassManifest of its type to
* for the appropriate type. In addition, we pass the converter a ClassTag of its type to
* allow it to figure out the Writable class to use in the subclass case.
*/
def sequenceFile[K, V](path: String, minSplits: Int = defaultMinSplits)
(implicit km: ClassManifest[K], vm: ClassManifest[V],
(implicit km: ClassTag[K], vm: ClassTag[V],
kcf: () => WritableConverter[K], vcf: () => WritableConverter[V])
: RDD[(K, V)] = {
val kc = kcf()
@ -382,7 +383,7 @@ class SparkContext(
* slow if you use the default serializer (Java serialization), though the nice thing about it is
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T: ClassManifest](
def objectFile[T: ClassTag](
path: String,
minSplits: Int = defaultMinSplits
): RDD[T] = {
@ -391,17 +392,17 @@ class SparkContext(
}
protected[spark] def checkpointFile[T: ClassManifest](
protected[spark] def checkpointFile[T: ClassTag](
path: String
): RDD[T] = {
new CheckpointRDD[T](this, path)
}
/** Build the union of a list of RDDs. */
def union[T: ClassManifest](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = new UnionRDD(this, rdds)
/** Build the union of a list of RDDs passed as variable-length arguments. */
def union[T: ClassManifest](first: RDD[T], rest: RDD[T]*): RDD[T] =
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
new UnionRDD(this, Seq(first) ++ rest)
// Methods for creating shared variables
@ -569,7 +570,7 @@ class SparkContext(
* flag specifies whether the scheduler can run the computation on the driver rather than
* shipping it out to the cluster, for short actions like first().
*/
def runJob[T, U: ClassManifest](
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
@ -589,7 +590,7 @@ class SparkContext(
* allowLocal flag specifies whether the scheduler can run the computation on the driver rather
* than shipping it out to the cluster, for short actions like first().
*/
def runJob[T, U: ClassManifest](
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
@ -604,7 +605,7 @@ class SparkContext(
* Run a job on a given set of partitions of an RDD, but take a function of type
* `Iterator[T] => U` instead of `(TaskContext, Iterator[T]) => U`.
*/
def runJob[T, U: ClassManifest](
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: Iterator[T] => U,
partitions: Seq[Int],
@ -616,21 +617,21 @@ class SparkContext(
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U: ClassManifest](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
def runJob[T, U: ClassTag](rdd: RDD[T], func: (TaskContext, Iterator[T]) => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
}
/**
* Run a job on all partitions in an RDD and return the results in an array.
*/
def runJob[T, U: ClassManifest](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
def runJob[T, U: ClassTag](rdd: RDD[T], func: Iterator[T] => U): Array[U] = {
runJob(rdd, func, 0 until rdd.partitions.size, false)
}
/**
* Run a job on all partitions in an RDD and pass the results to a handler function.
*/
def runJob[T, U: ClassManifest](
def runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: (TaskContext, Iterator[T]) => U,
resultHandler: (Int, U) => Unit)
@ -641,7 +642,7 @@ class SparkContext(
/**
* Run a job on all partitions in an RDD and pass the results to a handler function.
*/
def runJob[T, U: ClassManifest](
def runJob[T, U: ClassTag](
rdd: RDD[T],
processPartition: Iterator[T] => U,
resultHandler: (Int, U) => Unit)
@ -745,14 +746,14 @@ object SparkContext {
// TODO: Add AccumulatorParams for other types, e.g. lists and strings
implicit def rddToPairRDDFunctions[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]) =
implicit def rddToPairRDDFunctions[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]) =
new PairRDDFunctions(rdd)
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassManifest, V <% Writable: ClassManifest](
implicit def rddToSequenceFileRDDFunctions[K <% Writable: ClassTag, V <% Writable: ClassTag](
rdd: RDD[(K, V)]) =
new SequenceFileRDDFunctions(rdd)
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassManifest, V: ClassManifest](
implicit def rddToOrderedRDDFunctions[K <% Ordered[K]: ClassTag, V: ClassTag](
rdd: RDD[(K, V)]) =
new OrderedRDDFunctions(rdd)
@ -777,16 +778,16 @@ object SparkContext {
implicit def stringToText(s: String) = new Text(s)
private implicit def arrayToArrayWritable[T <% Writable: ClassManifest](arr: Traversable[T]): ArrayWritable = {
private implicit def arrayToArrayWritable[T <% Writable: ClassTag](arr: Traversable[T]): ArrayWritable = {
def anyToWritable[U <% Writable](u: U): Writable = u
new ArrayWritable(classManifest[T].erasure.asInstanceOf[Class[Writable]],
new ArrayWritable(classTag[T].erasure.asInstanceOf[Class[Writable]],
arr.map(x => anyToWritable(x)).toArray)
}
// Helper objects for converting common types to Writable
private def simpleWritableConverter[T, W <: Writable: ClassManifest](convert: W => T) = {
val wClass = classManifest[W].erasure.asInstanceOf[Class[W]]
private def simpleWritableConverter[T, W <: Writable: ClassTag](convert: W => T) = {
val wClass = classTag[W].erasure.asInstanceOf[Class[W]]
new WritableConverter[T](_ => wClass, x => convert(x.asInstanceOf[W]))
}
@ -834,11 +835,11 @@ object SparkContext {
/**
* A class encapsulating how to convert some type T to Writable. It stores both the Writable class
* corresponding to T (e.g. IntWritable for Int) and a function for doing the conversion.
* The getter for the writable class takes a ClassManifest[T] in case this is a generic object
* The getter for the writable class takes a ClassTag[T] in case this is a generic object
* that doesn't know the type of T when it is created. This sounds strange but is necessary to
* support converting subclasses of Writable to themselves (writableWritableConverter).
*/
private[spark] class WritableConverter[T](
val writableClass: ClassManifest[T] => Class[_ <: Writable],
val writableClass: ClassTag[T] => Class[_ <: Writable],
val convert: Writable => T)
extends Serializable

View file

@ -4,14 +4,19 @@ import java.io._
import java.net._
import java.util.{Locale, Random, UUID}
import java.util.concurrent.{Executors, ThreadFactory, ThreadPoolExecutor}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, FileSystem, FileUtil}
import scala.collection.mutable.ArrayBuffer
import scala.collection.JavaConversions._
import scala.io.Source
import scala.reflect.ClassTag
import scala.Some
import com.google.common.io.Files
import com.google.common.util.concurrent.ThreadFactoryBuilder
import scala.Some
import spark.serializer.SerializerInstance
/**
@ -207,7 +212,7 @@ private object Utils extends Logging {
* result in a new collection. Unlike scala.util.Random.shuffle, this method
* uses a local random number generator, avoiding inter-thread contention.
*/
def randomize[T: ClassManifest](seq: TraversableOnce[T]): Seq[T] = {
def randomize[T: ClassTag](seq: TraversableOnce[T]): Seq[T] = {
randomizeInPlace(seq.toArray)
}

View file

@ -1,5 +1,6 @@
package spark.api.java
import scala.reflect.ClassTag
import spark.RDD
import spark.SparkContext.doubleRDDToDoubleRDDFunctions
import spark.api.java.function.{Function => JFunction}
@ -11,7 +12,7 @@ import spark.Partitioner
class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, JavaDoubleRDD] {
override val classManifest: ClassManifest[Double] = implicitly[ClassManifest[Double]]
override val classTag: ClassTag[Double] = implicitly[ClassTag[Double]]
override val rdd: RDD[Double] = srdd.map(x => Double.valueOf(x))
@ -25,7 +26,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaDoubleRDD = fromRDD(srdd.cache())
/**
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@ -65,7 +66,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
@ -123,7 +124,7 @@ class JavaDoubleRDD(val srdd: RDD[scala.Double]) extends JavaRDDLike[Double, Jav
/** Return the approximate sum of the elements in this RDD. */
def sumApprox(timeout: Long, confidence: Double): PartialResult[BoundedDouble] =
srdd.sumApprox(timeout, confidence)
/** Return the approximate sum of the elements in this RDD. */
def sumApprox(timeout: Long): PartialResult[BoundedDouble] = srdd.sumApprox(timeout)
}

View file

@ -5,6 +5,7 @@ import java.util.Comparator
import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import org.apache.hadoop.mapred.JobConf
import org.apache.hadoop.mapred.OutputFormat
@ -23,13 +24,13 @@ import spark.Partitioner._
import spark.RDD
import spark.SparkContext.rddToPairRDDFunctions
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManifest[K],
implicit val vManifest: ClassManifest[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kClassTag: ClassTag[K],
implicit val vClassTag: ClassTag[V]) extends JavaRDDLike[(K, V), JavaPairRDD[K, V]] {
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
override val classManifest: ClassManifest[(K, V)] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
override val classTag: ClassTag[(K, V)] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
import JavaPairRDD._
@ -38,7 +39,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.cache())
/**
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@ -94,14 +95,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
override def first(): (K, V) = rdd.first()
// Pair RDD functions
/**
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
* "combined type" C * Note that V and C can be different -- for example, one might group an
* RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
* Generic function to combine the elements for each key using a custom set of aggregation
* functions. Turns a JavaPairRDD[(K, V)] into a result of type JavaPairRDD[(K, C)], for a
* "combined type" C * Note that V and C can be different -- for example, one might group an
* RDD of type (Int, Int) into an RDD of type (Int, List[Int]). Users provide three
* functions:
*
*
* - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)
* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)
* - `mergeCombiners`, to combine two C's into a single one.
@ -113,8 +114,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner): JavaPairRDD[K, C] = {
implicit val cm: ClassManifest[C] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
implicit val cm: ClassTag[C] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
fromRDD(rdd.combineByKey(
createCombiner,
mergeValue,
@ -151,14 +152,14 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/** Count the number of elements for each key, and return the result to the master as a Map. */
def countByKey(): java.util.Map[K, Long] = mapAsJavaMap(rdd.countByKey())
/**
/**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
def countByKeyApprox(timeout: Long): PartialResult[java.util.Map[K, BoundedDouble]] =
rdd.countByKeyApprox(timeout).map(mapAsJavaMap)
/**
/**
* (Experimental) Approximate version of countByKey that can return a partial result if it does
* not finish within a timeout.
*/
@ -214,7 +215,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
@ -271,15 +272,15 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
: JavaPairRDD[K, (Option[V], W)] =
fromRDD(rdd.rightOuterJoin(other, partitioner))
/**
/**
* Simplified version of combineByKey that hash-partitions the resulting RDD using the existing
* partitioner/parallelism level.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C]): JavaPairRDD[K, C] = {
implicit val cm: ClassManifest[C] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
implicit val cm: ClassTag[C] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
fromRDD(combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(rdd)))
}
@ -362,8 +363,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
* this also retains the original RDD's partitioning.
*/
def mapValues[U](f: JFunction[V, U]): JavaPairRDD[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
fromRDD(rdd.mapValues(f))
}
@ -374,8 +375,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairRDD[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
fromRDD(rdd.flatMapValues(fn))
}
@ -541,22 +542,22 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])(implicit val kManifest: ClassManif
}
object JavaPairRDD {
def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassManifest[K],
vcm: ClassManifest[T]): RDD[(K, JList[T])] =
def groupByResultToJava[K, T](rdd: RDD[(K, Seq[T])])(implicit kcm: ClassTag[K],
vcm: ClassTag[T]): RDD[(K, JList[T])] =
rddToPairRDDFunctions(rdd).mapValues(seqAsJavaList _)
def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassManifest[K],
vcm: ClassManifest[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd).mapValues((x: (Seq[V],
def cogroupResultToJava[W, K, V](rdd: RDD[(K, (Seq[V], Seq[W]))])(implicit kcm: ClassTag[K],
vcm: ClassTag[V]): RDD[(K, (JList[V], JList[W]))] = rddToPairRDDFunctions(rdd).mapValues((x: (Seq[V],
Seq[W])) => (seqAsJavaList(x._1), seqAsJavaList(x._2)))
def cogroupResult2ToJava[W1, W2, K, V](rdd: RDD[(K, (Seq[V], Seq[W1],
Seq[W2]))])(implicit kcm: ClassManifest[K]) : RDD[(K, (JList[V], JList[W1],
Seq[W2]))])(implicit kcm: ClassTag[K]) : RDD[(K, (JList[V], JList[W1],
JList[W2]))] = rddToPairRDDFunctions(rdd).mapValues(
(x: (Seq[V], Seq[W1], Seq[W2])) => (seqAsJavaList(x._1),
seqAsJavaList(x._2),
seqAsJavaList(x._3)))
def fromRDD[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] =
new JavaPairRDD[K, V](rdd)
implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd

View file

@ -1,10 +1,11 @@
package spark.api.java
import scala.reflect.ClassTag
import spark._
import spark.api.java.function.{Function => JFunction}
import spark.storage.StorageLevel
class JavaRDD[T](val rdd: RDD[T])(implicit val classManifest: ClassManifest[T]) extends
class JavaRDD[T](val rdd: RDD[T])(implicit val classTag: ClassTag[T]) extends
JavaRDDLike[T, JavaRDD[T]] {
override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
@ -14,7 +15,7 @@ JavaRDDLike[T, JavaRDD[T]] {
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): JavaRDD[T] = wrapRDD(rdd.cache())
/**
/**
* Set this RDD's storage level to persist its values across operations after the first time
* it is computed. Can only be called once on each RDD.
*/
@ -31,7 +32,7 @@ JavaRDDLike[T, JavaRDD[T]] {
* Return a new RDD containing the distinct elements in this RDD.
*/
def distinct(numPartitions: Int): JavaRDD[T] = wrapRDD(rdd.distinct(numPartitions))
/**
* Return a new RDD containing only the elements that satisfy a predicate.
*/
@ -54,7 +55,7 @@ JavaRDDLike[T, JavaRDD[T]] {
*/
def sample(withReplacement: Boolean, fraction: Double, seed: Int): JavaRDD[T] =
wrapRDD(rdd.sample(withReplacement, fraction, seed))
/**
* Return the union of this RDD and another one. Any identical elements will appear multiple
* times (use `.distinct()` to eliminate them).
@ -63,7 +64,7 @@ JavaRDDLike[T, JavaRDD[T]] {
/**
* Return an RDD with the elements from `this` that are not in `other`.
*
*
* Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
* RDD will be <= us.
*/
@ -85,8 +86,7 @@ JavaRDDLike[T, JavaRDD[T]] {
object JavaRDD {
implicit def fromRDD[T: ClassManifest](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
implicit def fromRDD[T: ClassTag](rdd: RDD[T]): JavaRDD[T] = new JavaRDD[T](rdd)
implicit def toRDD[T](rdd: JavaRDD[T]): RDD[T] = rdd.rdd
}

View file

@ -3,6 +3,7 @@ package spark.api.java
import java.util.{List => JList}
import scala.Tuple2
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import spark.{SparkContext, Partition, RDD, TaskContext}
import spark.api.java.JavaPairRDD._
@ -15,7 +16,7 @@ import com.google.common.base.Optional
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def wrapRDD(rdd: RDD[T]): This
implicit val classManifest: ClassManifest[T]
implicit val classTag: ClassTag[T]
def rdd: RDD[T]
@ -57,7 +58,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Return a new RDD by applying a function to all elements of this RDD.
*/
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairRDD(rdd.map(f)(cm))(f.keyType(), f.valueType())
}
@ -88,7 +89,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
JavaPairRDD.fromRDD(rdd.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@ -128,18 +129,18 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* elements (a, b) where a is in `this` and b is in `other`.
*/
def cartesian[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] =
JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classManifest))(classManifest,
other.classManifest)
JavaPairRDD.fromRDD(rdd.cartesian(other.rdd)(other.classTag))(classTag,
other.classTag)
/**
* Return an RDD of grouped elements. Each group consists of a key and a sequence of elements
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K]): JavaPairRDD[K, JList[T]] = {
implicit val kcm: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val vcm: ClassManifest[JList[T]] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
implicit val kcm: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val vcm: ClassTag[JList[T]] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f)(f.returnType)))(kcm, vcm)
}
@ -148,10 +149,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* mapping to that key.
*/
def groupBy[K](f: JFunction[T, K], numPartitions: Int): JavaPairRDD[K, JList[T]] = {
implicit val kcm: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val vcm: ClassManifest[JList[T]] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[JList[T]]]
implicit val kcm: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val vcm: ClassTag[JList[T]] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[JList[T]]]
JavaPairRDD.fromRDD(groupByResultToJava(rdd.groupBy(f, numPartitions)(f.returnType)))(kcm, vcm)
}
@ -179,7 +180,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* a map on the other).
*/
def zip[U](other: JavaRDDLike[U, _]): JavaPairRDD[T, U] = {
JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classManifest))(classManifest, other.classManifest)
JavaPairRDD.fromRDD(rdd.zip(other.rdd)(other.classTag))(classTag, other.classTag)
}
// Actions (launch a job to return a value to the user program)
@ -304,7 +305,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Creates tuples of the elements in this RDD by applying `f`.
*/
def keyBy[K](f: JFunction[T, K]): JavaPairRDD[K, T] = {
implicit val kcm: ClassManifest[K] = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val kcm: ClassTag[K] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
JavaPairRDD.fromRDD(rdd.keyBy(f))
}

View file

@ -4,6 +4,7 @@ import java.util.{Map => JMap}
import scala.collection.JavaConversions
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapred.InputFormat
@ -63,8 +64,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Distribute a local Scala collection to form an RDD. */
def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)
}
@ -75,10 +76,10 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
/** Distribute a local Scala collection to form an RDD. */
def parallelizePairs[K, V](list: java.util.List[Tuple2[K, V]], numSlices: Int)
: JavaPairRDD[K, V] = {
implicit val kcm: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val vcm: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val kcm: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val vcm: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices))
}
@ -113,16 +114,16 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(keyClass)
implicit val vcm = ClassManifest.fromClass(valueClass)
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass, minSplits))
}
/**Get an RDD for a Hadoop SequenceFile. */
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]):
JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(keyClass)
implicit val vcm = ClassManifest.fromClass(valueClass)
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.sequenceFile(path, keyClass, valueClass))
}
@ -134,8 +135,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T](path: String, minSplits: Int): JavaRDD[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
sc.objectFile(path, minSplits)(cm)
}
@ -147,8 +148,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* that there's very little effort required to save arbitrary objects.
*/
def objectFile[T](path: String): JavaRDD[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
sc.objectFile(path)(cm)
}
@ -164,8 +165,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(keyClass)
implicit val vcm = ClassManifest.fromClass(valueClass)
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass, minSplits))
}
@ -180,8 +181,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
keyClass: Class[K],
valueClass: Class[V]
): JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(keyClass)
implicit val vcm = ClassManifest.fromClass(valueClass)
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopRDD(conf, inputFormatClass, keyClass, valueClass))
}
@ -193,8 +194,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
valueClass: Class[V],
minSplits: Int
): JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(keyClass)
implicit val vcm = ClassManifest.fromClass(valueClass)
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path, inputFormatClass, keyClass, valueClass, minSplits))
}
@ -205,8 +206,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
keyClass: Class[K],
valueClass: Class[V]
): JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(keyClass)
implicit val vcm = ClassManifest.fromClass(valueClass)
implicit val kcm: ClassTag[K] = ClassTag(keyClass)
implicit val vcm: ClassTag[V] = ClassTag(valueClass)
new JavaPairRDD(sc.hadoopFile(path,
inputFormatClass, keyClass, valueClass))
}
@ -221,8 +222,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
kClass: Class[K],
vClass: Class[V],
conf: Configuration): JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(kClass)
implicit val vcm = ClassManifest.fromClass(vClass)
implicit val kcm: ClassTag[K] = ClassTag(kClass)
implicit val vcm: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopFile(path, fClass, kClass, vClass, conf))
}
@ -235,15 +236,15 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
fClass: Class[F],
kClass: Class[K],
vClass: Class[V]): JavaPairRDD[K, V] = {
implicit val kcm = ClassManifest.fromClass(kClass)
implicit val vcm = ClassManifest.fromClass(vClass)
implicit val kcm: ClassTag[K] = ClassTag(kClass)
implicit val vcm: ClassTag[V] = ClassTag(vClass)
new JavaPairRDD(sc.newAPIHadoopRDD(conf, fClass, kClass, vClass))
}
/** Build the union of two or more RDDs. */
override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = {
val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
implicit val cm: ClassManifest[T] = first.classManifest
implicit val cm: ClassTag[T] = first.classTag
sc.union(rdds)(cm)
}
@ -251,9 +252,9 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]])
: JavaPairRDD[K, V] = {
val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd)
implicit val cm: ClassManifest[(K, V)] = first.classManifest
implicit val kcm: ClassManifest[K] = first.kManifest
implicit val vcm: ClassManifest[V] = first.vManifest
implicit val cm: ClassTag[(K, V)] = first.classTag
implicit val kcm: ClassTag[K] = first.kClassTag
implicit val vcm: ClassTag[V] = first.vClassTag
new JavaPairRDD(sc.union(rdds)(cm))(kcm, vcm)
}
@ -386,8 +387,8 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
}
protected def checkpointFile[T](path: String): JavaRDD[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
new JavaRDD(sc.checkpointFile(path))
}
}

View file

@ -1,5 +1,7 @@
package spark.api.java.function
import scala.reflect.ClassTag
/**
* A function that returns zero or more output records from each input record.
*/
@ -7,5 +9,5 @@ abstract class FlatMapFunction[T, R] extends Function[T, java.lang.Iterable[R]]
@throws(classOf[Exception])
def call(x: T) : java.lang.Iterable[R]
def elementType() : ClassManifest[R] = ClassManifest.Any.asInstanceOf[ClassManifest[R]]
def elementType() : ClassTag[R] = ClassTag.Any.asInstanceOf[ClassTag[R]]
}

View file

@ -6,6 +6,7 @@ import java.util.{List => JList, ArrayList => JArrayList, Collections}
import scala.collection.JavaConversions._
import scala.io.Source
import scala.reflect.ClassTag
import spark.api.java.{JavaSparkContext, JavaPairRDD, JavaRDD}
import spark.broadcast.Broadcast
@ -13,7 +14,7 @@ import spark._
import spark.rdd.PipedRDD
private[spark] class PythonRDD[T: ClassManifest](
private[spark] class PythonRDD[T: ClassTag](
parent: RDD[T],
command: Seq[String],
envVars: java.util.Map[String, String],
@ -251,7 +252,7 @@ private[spark] object PythonRDD {
}
def takePartition[T](rdd: RDD[T], partition: Int): Iterator[T] = {
implicit val cm : ClassManifest[T] = rdd.elementClassManifest
implicit val cm : ClassTag[T] = rdd.elementClassTag
rdd.context.runJob(rdd, ((x: Iterator[T]) => x.toArray), Seq(partition), true).head.iterator
}
}
@ -277,7 +278,7 @@ private class BytesToString extends spark.api.java.function.Function[Array[Byte]
*/
class PythonAccumulatorParam(@transient serverHost: String, serverPort: Int)
extends AccumulatorParam[JList[Array[Byte]]] {
override def zero(value: JList[Array[Byte]]): JList[Array[Byte]] = new JArrayList
override def addInPlace(val1: JList[Array[Byte]], val2: JList[Array[Byte]])

View file

@ -1,6 +1,7 @@
package spark.rdd
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
import spark.{RDD, SparkContext, SparkEnv, Partition, TaskContext}
private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Partition {
@ -8,7 +9,7 @@ private[spark] class BlockRDDPartition(val blockId: String, idx: Int) extends Pa
}
private[spark]
class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[String])
class BlockRDD[T: ClassTag](sc: SparkContext, @transient blockIds: Array[String])
extends RDD[T](sc, Nil) {
@transient lazy val locations_ = {
@ -37,4 +38,3 @@ class BlockRDD[T: ClassManifest](sc: SparkContext, @transient blockIds: Array[St
locations_(split.asInstanceOf[BlockRDDPartition].blockId)
}

View file

@ -1,6 +1,9 @@
package spark.rdd
import java.io.{ObjectOutputStream, IOException}
import scala.reflect.ClassTag
import spark._
@ -26,7 +29,7 @@ class CartesianPartition(
}
private[spark]
class CartesianRDD[T: ClassManifest, U:ClassManifest](
class CartesianRDD[T: ClassTag, U:ClassTag](
sc: SparkContext,
var rdd1 : RDD[T],
var rdd2 : RDD[U])

View file

@ -1,5 +1,6 @@
package spark.rdd
import scala.reflect.ClassTag
import spark._
import org.apache.hadoop.mapred.{FileInputFormat, SequenceFileInputFormat, JobConf, Reporter}
import org.apache.hadoop.conf.Configuration
@ -15,7 +16,7 @@ private[spark] class CheckpointRDDPartition(val index: Int) extends Partition {}
* This RDD represents a RDD checkpoint file (similar to HadoopRDD).
*/
private[spark]
class CheckpointRDD[T: ClassManifest](sc: SparkContext, val checkpointPath: String)
class CheckpointRDD[T: ClassTag](sc: SparkContext, val checkpointPath: String)
extends RDD[T](sc, Nil) {
@transient val fs = new Path(checkpointPath).getFileSystem(sc.hadoopConfiguration)

View file

@ -1,5 +1,6 @@
package spark.rdd
import scala.reflect.ClassTag
import spark.{Dependency, OneToOneDependency, NarrowDependency, RDD, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
@ -26,7 +27,7 @@ private[spark] case class CoalescedRDDPartition(
* This transformation is useful when an RDD with many partitions gets filtered into a smaller one,
* or to avoid having a large number of small tasks when processing a directory with many files.
*/
class CoalescedRDD[T: ClassManifest](
class CoalescedRDD[T: ClassTag](
@transient var prev: RDD[T],
maxPartitions: Int)
extends RDD[T](prev.context, Nil) { // Nil since we implement getDependencies

View file

@ -1,8 +1,9 @@
package spark.rdd
import scala.reflect.ClassTag
import spark.{OneToOneDependency, RDD, Partition, TaskContext}
private[spark] class FilteredRDD[T: ClassManifest](
private[spark] class FilteredRDD[T: ClassTag](
prev: RDD[T],
f: T => Boolean)
extends RDD[T](prev) {

View file

@ -1,10 +1,11 @@
package spark.rdd
import scala.reflect.ClassTag
import spark.{RDD, Partition, TaskContext}
private[spark]
class FlatMappedRDD[U: ClassManifest, T: ClassManifest](
class FlatMappedRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: T => TraversableOnce[U])
extends RDD[U](prev) {

View file

@ -1,8 +1,9 @@
package spark.rdd
import scala.reflect.ClassTag
import spark.{RDD, Partition, TaskContext}
private[spark] class GlommedRDD[T: ClassManifest](prev: RDD[T])
private[spark] class GlommedRDD[T: ClassTag](prev: RDD[T])
extends RDD[Array[T]](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions

View file

@ -1,10 +1,11 @@
package spark.rdd
import scala.reflect.ClassTag
import spark.{RDD, Partition, TaskContext}
private[spark]
class MapPartitionsRDD[U: ClassManifest, T: ClassManifest](
class MapPartitionsRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false)

View file

@ -1,5 +1,6 @@
package spark.rdd
import scala.reflect.ClassTag
import spark.{RDD, Partition, TaskContext}
@ -9,7 +10,7 @@ import spark.{RDD, Partition, TaskContext}
* information such as the number of tuples in a partition.
*/
private[spark]
class MapPartitionsWithIndexRDD[U: ClassManifest, T: ClassManifest](
class MapPartitionsWithIndexRDD[U: ClassTag, T: ClassTag](
prev: RDD[T],
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean

View file

@ -1,9 +1,10 @@
package spark.rdd
import scala.reflect.ClassTag
import spark.{RDD, Partition, TaskContext}
private[spark]
class MappedRDD[U: ClassManifest, T: ClassManifest](prev: RDD[T], f: T => U)
class MappedRDD[U: ClassTag, T: ClassTag](prev: RDD[T], f: T => U)
extends RDD[U](prev) {
override def getPartitions: Array[Partition] = firstParent[T].partitions

View file

@ -3,9 +3,11 @@ package spark.rdd
import scala.collection.immutable.NumericRange
import scala.collection.mutable.ArrayBuffer
import scala.collection.Map
import scala.reflect.ClassTag
import spark.{RDD, TaskContext, SparkContext, Partition}
private[spark] class ParallelCollectionPartition[T: ClassManifest](
private[spark] class ParallelCollectionPartition[T: ClassTag](
val rddId: Long,
val slice: Int,
values: Seq[T])
@ -23,7 +25,7 @@ private[spark] class ParallelCollectionPartition[T: ClassManifest](
override val index: Int = slice
}
private[spark] class ParallelCollectionRDD[T: ClassManifest](
private[spark] class ParallelCollectionRDD[T: ClassTag](
@transient sc: SparkContext,
@transient data: Seq[T],
numSlices: Int,
@ -53,7 +55,7 @@ private object ParallelCollectionRDD {
* collections specially, encoding the slices as other Ranges to minimize memory cost. This makes
* it efficient to run Spark over RDDs representing large sets of numbers.
*/
def slice[T: ClassManifest](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
def slice[T: ClassTag](seq: Seq[T], numSlices: Int): Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of slices required")
}

View file

@ -1,5 +1,6 @@
package spark.rdd
import scala.reflect.ClassTag
import spark.{NarrowDependency, RDD, SparkEnv, Partition, TaskContext}
@ -29,7 +30,7 @@ class PruneDependency[T](rdd: RDD[T], @transient partitionFilterFunc: Int => Boo
* and the execution DAG has a filter on the key, we can avoid launching tasks
* on partitions that don't have the range covering the key.
*/
class PartitionPruningRDD[T: ClassManifest](
class PartitionPruningRDD[T: ClassTag](
@transient prev: RDD[T],
@transient partitionFilterFunc: Int => Boolean)
extends RDD[T](prev.context, List(new PruneDependency(prev, partitionFilterFunc))) {
@ -49,6 +50,6 @@ object PartitionPruningRDD {
* when its type T is not known at compile time.
*/
def create[T](rdd: RDD[T], partitionFilterFunc: Int => Boolean) = {
new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassManifest)
new PartitionPruningRDD[T](rdd, partitionFilterFunc)(rdd.elementClassTag)
}
}

View file

@ -7,6 +7,7 @@ import scala.collection.Map
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.io.Source
import scala.reflect.ClassTag
import spark.{RDD, SparkEnv, Partition, TaskContext}
@ -15,7 +16,7 @@ import spark.{RDD, SparkEnv, Partition, TaskContext}
* An RDD that pipes the contents of each parent partition through an external command
* (printing them one per line) and returns the output as a collection of strings.
*/
class PipedRDD[T: ClassManifest](
class PipedRDD[T: ClassTag](
prev: RDD[T],
command: Seq[String],
envVars: Map[String, String])

View file

@ -1,5 +1,6 @@
package spark.rdd
import scala.reflect.ClassTag
import java.util.Random
import cern.jet.random.Poisson
@ -12,9 +13,9 @@ class SampledRDDPartition(val prev: Partition, val seed: Int) extends Partition
override val index: Int = prev.index
}
class SampledRDD[T: ClassManifest](
class SampledRDD[T: ClassTag](
prev: RDD[T],
withReplacement: Boolean,
withReplacement: Boolean,
frac: Double,
seed: Int)
extends RDD[T](prev) {

View file

@ -1,8 +1,11 @@
package spark.rdd
import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import spark.RDD
import spark.Partitioner
import spark.Dependency
@ -28,7 +31,7 @@ import spark.OneToOneDependency
* you can use `rdd1`'s partitioner/partition size and not worry about running
* out of memory because of the size of `rdd2`.
*/
private[spark] class SubtractedRDD[K: ClassManifest, V: ClassManifest, W: ClassManifest](
private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag](
@transient var rdd1: RDD[(K, V)],
@transient var rdd2: RDD[(K, W)],
part: Partitioner) extends RDD[(K, V)](rdd1.context, Nil) {

View file

@ -1,10 +1,11 @@
package spark.rdd
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import spark.{Dependency, RangeDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], splitIndex: Int)
private[spark] class UnionPartition[T: ClassTag](idx: Int, rdd: RDD[T], splitIndex: Int)
extends Partition {
var split: Partition = rdd.partitions(splitIndex)
@ -23,7 +24,7 @@ private[spark] class UnionPartition[T: ClassManifest](idx: Int, rdd: RDD[T], spl
}
}
class UnionRDD[T: ClassManifest](
class UnionRDD[T: ClassTag](
sc: SparkContext,
@transient var rdds: Seq[RDD[T]])
extends RDD[T](sc, Nil) { // Nil since we implement getDependencies

View file

@ -1,10 +1,12 @@
package spark.rdd
import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
import java.io.{ObjectOutputStream, IOException}
import scala.reflect.ClassTag
private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
import spark.{OneToOneDependency, RDD, SparkContext, Partition, TaskContext}
private[spark] class ZippedPartition[T: ClassTag, U: ClassTag](
idx: Int,
@transient rdd1: RDD[T],
@transient rdd2: RDD[U]
@ -25,7 +27,7 @@ private[spark] class ZippedPartition[T: ClassManifest, U: ClassManifest](
}
}
class ZippedRDD[T: ClassManifest, U: ClassManifest](
class ZippedRDD[T: ClassTag, U: ClassTag](
sc: SparkContext,
var rdd1: RDD[T],
var rdd2: RDD[U])

View file

@ -6,6 +6,7 @@ import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit
import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map}
import scala.reflect.ClassTag
import spark._
import spark.executor.TaskMetrics
@ -215,7 +216,7 @@ class DAGScheduler(
* The job is assumed to have at least one partition; zero partition jobs should be handled
* without a JobSubmitted event.
*/
private[scheduler] def prepareJob[T, U: ClassManifest](
private[scheduler] def prepareJob[T, U: ClassTag](
finalRdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
@ -231,7 +232,7 @@ class DAGScheduler(
return (toSubmit, waiter)
}
def runJob[T, U: ClassManifest](
def runJob[T, U: ClassTag](
finalRdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
@ -326,7 +327,7 @@ class DAGScheduler(
submitStage(stage)
}
}
/**
* Check for waiting or failed stages which are now eligible for resubmission.
* Ordinarily run on every iteration of the event loop.
@ -712,7 +713,7 @@ class DAGScheduler(
sizeBefore = shuffleToMapStage.size
shuffleToMapStage.clearOldValues(cleanupTime)
logInfo("shuffleToMapStage " + sizeBefore + " --> " + shuffleToMapStage.size)
sizeBefore = pendingTasks.size
pendingTasks.clearOldValues(cleanupTime)
logInfo("pendingTasks " + sizeBefore + " --> " + pendingTasks.size)

View file

@ -1,5 +1,6 @@
package spark
import scala.reflect.ClassTag
import org.scalatest.FunSuite
import java.io.File
import spark.rdd._
@ -179,7 +180,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* not, but this is not done by default as usually the partitions do not refer to any RDD and
* therefore never store the lineage.
*/
def testCheckpointing[U: ClassManifest](
def testCheckpointing[U: ClassTag](
op: (RDD[Int]) => RDD[U],
testRDDSize: Boolean = true,
testRDDPartitionSize: Boolean = false
@ -248,7 +249,7 @@ class CheckpointSuite extends FunSuite with LocalSparkContext with Logging {
* RDDs partitions. So even if the parent RDD is checkpointed and its partitions changed,
* this RDD will remember the partitions and therefore potentially the whole lineage.
*/
def testParentCheckpointing[U: ClassManifest](
def testParentCheckpointing[U: ClassTag](
op: (RDD[Int]) => RDD[U],
testRDDSize: Boolean,
testRDDPartitionSize: Boolean

View file

@ -2,6 +2,7 @@ package spark.streaming.examples
import scala.collection.mutable.LinkedList
import scala.util.Random
import scala.reflect.ClassTag
import akka.actor.Actor
import akka.actor.ActorRef
@ -65,7 +66,7 @@ class FeederActor extends Actor {
*
* @see [[spark.streaming.examples.FeederActor]]
*/
class SampleActorReceiver[T: ClassManifest](urlOfPublisher: String)
class SampleActorReceiver[T: ClassTag](urlOfPublisher: String)
extends Actor with Receiver {
lazy private val remotePublisher = context.actorFor(urlOfPublisher)

View file

@ -9,6 +9,7 @@ import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException, ObjectOutputStream}
@ -36,7 +37,7 @@ import org.apache.hadoop.conf.Configuration
* - A function that is used to generate an RDD after each time interval
*/
abstract class DStream[T: ClassManifest] (
abstract class DStream[T: ClassTag] (
@transient protected[streaming] var ssc: StreamingContext
) extends Serializable with Logging {
@ -62,7 +63,7 @@ abstract class DStream[T: ClassManifest] (
// RDDs generated, marked as protected[streaming] so that testsuites can access it
@transient
protected[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()
// Time zero for the DStream
protected[streaming] var zeroTime: Time = null
@ -254,16 +255,16 @@ abstract class DStream[T: ClassManifest] (
/**
* Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
* method that should not be called directly.
*/
*/
protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
// If this DStream was not initialized (i.e., zeroTime not set), then do it
// If RDD was already generated, then retrieve it from HashMap
generatedRDDs.get(time) match {
// If an RDD was already generated and is being reused, then
// If an RDD was already generated and is being reused, then
// probably all RDDs in this DStream will be reused and hence should be cached
case Some(oldRDD) => Some(oldRDD)
// if RDD was not generated, and if the time is valid
// (based on sliding time of this DStream), then generate the RDD
case None => {
@ -280,7 +281,7 @@ abstract class DStream[T: ClassManifest] (
}
generatedRDDs.put(time, newRDD)
Some(newRDD)
case None =>
case None =>
None
}
} else {
@ -324,7 +325,7 @@ abstract class DStream[T: ClassManifest] (
dependencies.foreach(_.clearOldMetadata(time))
}
/* Adds metadata to the Stream while it is running.
/* Adds metadata to the Stream while it is running.
* This methd should be overwritten by sublcasses of InputDStream.
*/
protected[streaming] def addMetadata(metadata: Any) {
@ -396,7 +397,7 @@ abstract class DStream[T: ClassManifest] (
// =======================================================================
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
def map[U: ClassTag](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
}
@ -404,7 +405,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
def flatMap[U: ClassTag](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
}
@ -423,7 +424,7 @@ abstract class DStream[T: ClassManifest] (
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
def mapPartitions[U: ClassManifest](
def mapPartitions[U: ClassTag](
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean = false
): DStream[U] = {
@ -474,7 +475,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
def transform[U: ClassTag](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
transform((r: RDD[T], t: Time) => transformFunc(r))
}
@ -482,7 +483,7 @@ abstract class DStream[T: ClassManifest] (
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
def transform[U: ClassTag](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
new TransformedDStream(this, context.sparkContext.clean(transformFunc))
}

View file

@ -3,13 +3,15 @@ package spark.streaming
import org.apache.hadoop.fs.Path
import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.conf.Configuration
import collection.mutable.HashMap
import spark.Logging
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag
private[streaming]
class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
class DStreamCheckpointData[T: ClassTag] (dstream: DStream[T])
extends Serializable with Logging {
protected val data = new HashMap[Time, AnyRef]()
@ -90,4 +92,3 @@ class DStreamCheckpointData[T: ClassManifest] (dstream: DStream[T])
"[\n" + checkpointFiles.size + " checkpoint files \n" + checkpointFiles.mkString("\n") + "\n]"
}
}

View file

@ -5,18 +5,19 @@ import spark.streaming.dstream.{ReducedWindowedDStream, StateDStream}
import spark.streaming.dstream.{CoGroupedDStream, ShuffledDStream}
import spark.streaming.dstream.{MapValuedDStream, FlatMapValuedDStream}
import spark.{Manifests, RDD, Partitioner, HashPartitioner}
import spark.{ClassTags, RDD, Partitioner, HashPartitioner}
import spark.SparkContext._
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.reflect.{ClassTag, classTag}
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.mapred.OutputFormat
import org.apache.hadoop.conf.Configuration
class PairDStreamFunctions[K: ClassManifest, V: ClassManifest](self: DStream[(K,V)])
class PairDStreamFunctions[K: ClassTag, V: ClassTag](self: DStream[(K,V)])
extends Serializable {
private[streaming] def ssc = self.ssc
@ -86,7 +87,7 @@ extends Serializable {
* combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more
* information.
*/
def combineByKey[C: ClassManifest](
def combineByKey[C: ClassTag](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiner: (C, C) => C,
@ -186,7 +187,7 @@ extends Serializable {
* DStream's batching interval
*/
def reduceByKeyAndWindow(
reduceFunc: (V, V) => V,
reduceFunc: (V, V) => V,
windowDuration: Duration,
slideDuration: Duration
): DStream[(K, V)] = {
@ -317,7 +318,7 @@ extends Serializable {
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S: ClassManifest](
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S]
): DStream[(K, S)] = {
updateStateByKey(updateFunc, defaultPartitioner())
@ -332,7 +333,7 @@ extends Serializable {
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
def updateStateByKey[S: ClassManifest](
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
numPartitions: Int
): DStream[(K, S)] = {
@ -348,7 +349,7 @@ extends Serializable {
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
def updateStateByKey[S: ClassManifest](
def updateStateByKey[S: ClassTag](
updateFunc: (Seq[V], Option[S]) => Option[S],
partitioner: Partitioner
): DStream[(K, S)] = {
@ -371,7 +372,7 @@ extends Serializable {
* @param rememberPartitioner Whether to remember the paritioner object in the generated RDDs.
* @tparam S State type
*/
def updateStateByKey[S: ClassManifest](
def updateStateByKey[S: ClassTag](
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,
rememberPartitioner: Boolean
@ -380,11 +381,11 @@ extends Serializable {
}
def mapValues[U: ClassManifest](mapValuesFunc: V => U): DStream[(K, U)] = {
def mapValues[U: ClassTag](mapValuesFunc: V => U): DStream[(K, U)] = {
new MapValuedDStream[K, V, U](self, mapValuesFunc)
}
def flatMapValues[U: ClassManifest](
def flatMapValues[U: ClassTag](
flatMapValuesFunc: V => TraversableOnce[U]
): DStream[(K, U)] = {
new FlatMapValuedDStream[K, V, U](self, flatMapValuesFunc)
@ -396,7 +397,7 @@ extends Serializable {
* key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
* of partitions.
*/
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
def cogroup[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}
@ -405,7 +406,7 @@ extends Serializable {
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. Partitioner is used to partition each generated RDD.
*/
def cogroup[W: ClassManifest](
def cogroup[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (Seq[V], Seq[W]))] = {
@ -415,8 +416,8 @@ extends Serializable {
partitioner
)
val pdfs = new PairDStreamFunctions[K, Seq[Seq[_]]](cgd)(
classManifest[K],
Manifests.seqSeqManifest
classTag[K],
ClassTags.seqSeqClassTag
)
pdfs.mapValues {
case Seq(vs, ws) =>
@ -428,7 +429,7 @@ extends Serializable {
* Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
join[W](other, defaultPartitioner())
}
@ -437,7 +438,7 @@ extends Serializable {
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
*/
def join[W: ClassManifest](
def join[W: ClassTag](
other: DStream[(K, W)],
partitioner: Partitioner
): DStream[(K, (V, W))] = {
@ -455,7 +456,7 @@ extends Serializable {
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
prefix: String,
suffix: String
)(implicit fm: ClassManifest[F]) {
)(implicit fm: ClassTag[F]) {
saveAsHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
@ -485,7 +486,7 @@ extends Serializable {
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
suffix: String
)(implicit fm: ClassManifest[F]) {
)(implicit fm: ClassTag[F]) {
saveAsNewAPIHadoopFiles(prefix, suffix, getKeyClass, getValueClass, fm.erasure.asInstanceOf[Class[F]])
}
@ -508,9 +509,7 @@ extends Serializable {
self.foreach(saveFunc)
}
private def getKeyClass() = implicitly[ClassManifest[K]].erasure
private def getKeyClass() = implicitly[ClassTag[K]].erasure
private def getValueClass() = implicitly[ClassManifest[V]].erasure
private def getValueClass() = implicitly[ClassTag[V]].erasure
}

View file

@ -16,6 +16,7 @@ import spark.streaming.receivers.ActorReceiver
import scala.collection.mutable.Queue
import scala.collection.Map
import scala.reflect.ClassTag
import java.io.InputStream
import java.util.concurrent.atomic.AtomicInteger
@ -166,7 +167,7 @@ class StreamingContext private (
* Create an input stream with any arbitrary user implemented network receiver.
* @param receiver Custom implementation of NetworkReceiver
*/
def networkStream[T: ClassManifest](
def networkStream[T: ClassTag](
receiver: NetworkReceiver[T]): DStream[T] = {
val inputStream = new PluggableInputDStream[T](this,
receiver)
@ -185,7 +186,7 @@ class StreamingContext private (
* to ensure the type safety, i.e parametrized type of data received and actorStream
* should be same.
*/
def actorStream[T: ClassManifest](
def actorStream[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel = StorageLevel.MEMORY_ONLY_SER_2,
@ -203,7 +204,7 @@ class StreamingContext private (
* and sub sequence refer to its payload.
* @param storageLevel RDD storage level. Defaults to memory-only.
*/
def zeroMQStream[T: ClassManifest](
def zeroMQStream[T: ClassTag](
publisherUrl:String,
subscribe: Subscribe,
bytesToObjects: Seq[Seq[Byte]] Iterator[T],
@ -225,7 +226,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def kafkaStream[T: ClassManifest](
def kafkaStream[T: ClassTag](
zkQuorum: String,
groupId: String,
topics: Map[String, Int],
@ -264,7 +265,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def socketStream[T: ClassManifest](
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) => Iterator[T],
@ -286,7 +287,7 @@ class StreamingContext private (
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): DStream[SparkFlumeEvent] = {
val inputStream = new FlumeInputDStream(this, hostname, port, storageLevel)
val inputStream = new FlumeInputDStream[SparkFlumeEvent](this, hostname, port, storageLevel)
registerInputStream(inputStream)
inputStream
}
@ -301,7 +302,7 @@ class StreamingContext private (
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
def rawSocketStream[T: ClassManifest](
def rawSocketStream[T: ClassTag](
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
@ -321,9 +322,9 @@ class StreamingContext private (
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassManifest,
V: ClassManifest,
F <: NewInputFormat[K, V]: ClassManifest
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory)
registerInputStream(inputStream)
@ -341,9 +342,9 @@ class StreamingContext private (
* @tparam F Input format for reading HDFS file
*/
def fileStream[
K: ClassManifest,
V: ClassManifest,
F <: NewInputFormat[K, V]: ClassManifest
K: ClassTag,
V: ClassTag,
F <: NewInputFormat[K, V]: ClassTag
] (directory: String, filter: Path => Boolean, newFilesOnly: Boolean): DStream[(K, V)] = {
val inputStream = new FileInputDStream[K, V, F](this, directory, filter, newFilesOnly)
registerInputStream(inputStream)
@ -385,7 +386,7 @@ class StreamingContext private (
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean = true
): DStream[T] = {
@ -400,7 +401,7 @@ class StreamingContext private (
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty. Set as null if no RDD should be returned when empty
* @tparam T Type of objects in the RDD
*/
def queueStream[T: ClassManifest](
def queueStream[T: ClassTag](
queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
@ -413,7 +414,7 @@ class StreamingContext private (
/**
* Create a unified DStream from multiple DStreams of the same type and same interval
*/
def union[T: ClassManifest](streams: Seq[DStream[T]]): DStream[T] = {
def union[T: ClassTag](streams: Seq[DStream[T]]): DStream[T] = {
new UnionDStream[T](streams.toArray)
}
@ -490,7 +491,7 @@ class StreamingContext private (
object StreamingContext {
implicit def toPairDStreamFunctions[K: ClassManifest, V: ClassManifest](stream: DStream[(K,V)]) = {
implicit def toPairDStreamFunctions[K: ClassTag, V: ClassTag](stream: DStream[(K,V)]) = {
new PairDStreamFunctions[K, V](stream)
}

View file

@ -6,6 +6,8 @@ import spark.api.java.JavaRDD
import spark.storage.StorageLevel
import spark.RDD
import scala.reflect.ClassTag
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
@ -24,7 +26,7 @@ import spark.RDD
* - A time interval at which the DStream generates an RDD
* - A function that is used to generate an RDD after each time interval
*/
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
class JavaDStream[T](val dstream: DStream[T])(implicit val classTag: ClassTag[T])
extends JavaDStreamLike[T, JavaDStream[T], JavaRDD[T]] {
override def wrapRDD(rdd: RDD[T]): JavaRDD[T] = JavaRDD.fromRDD(rdd)
@ -80,6 +82,6 @@ class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassM
}
object JavaDStream {
implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] =
implicit def fromDStream[T: ClassTag](dstream: DStream[T]): JavaDStream[T] =
new JavaDStream[T](dstream)
}
}

View file

@ -4,6 +4,7 @@ import java.util.{List => JList}
import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import spark.streaming._
import spark.api.java.{JavaPairRDD, JavaRDDLike, JavaRDD}
@ -14,7 +15,7 @@ import JavaDStream._
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T, R]]
extends Serializable {
implicit val classManifest: ClassManifest[T]
implicit val classTag: ClassTag[T]
def dstream: DStream[T]
@ -116,7 +117,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[K2, V2](f: PairFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
@ -137,7 +138,7 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def flatMap[K2, V2](f: PairFlatMapFunction[T, K2, V2]): JavaPairDStream[K2, V2] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K2, V2]]]
def cm = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K2, V2]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
@ -240,8 +241,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction[R, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@ -252,8 +253,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction2[R, Time, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@ -265,10 +266,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def transform[K2, V2](transformFunc: JFunction[R, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
implicit val cmk: ClassManifest[K2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
implicit val cmv: ClassManifest[V2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
implicit val cmk: ClassTag[K2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
implicit val cmv: ClassTag[V2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
def scalaTransform (in: RDD[T]): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in)).rdd
dstream.transform(scalaTransform(_))
@ -280,10 +281,10 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
*/
def transform[K2, V2](transformFunc: JFunction2[R, Time, JavaPairRDD[K2, V2]]):
JavaPairDStream[K2, V2] = {
implicit val cmk: ClassManifest[K2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K2]]
implicit val cmv: ClassManifest[V2] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V2]]
implicit val cmk: ClassTag[K2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K2]]
implicit val cmv: ClassTag[V2] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V2]]
def scalaTransform (in: RDD[T], time: Time): RDD[(K2, V2)] =
transformFunc.call(wrapRDD(in), time).rdd
dstream.transform(scalaTransform(_, _))
@ -296,4 +297,4 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This, R], R <: JavaRDDLike[T
def checkpoint(interval: Duration) = {
dstream.checkpoint(interval)
}
}
}

View file

@ -4,6 +4,7 @@ import java.util.{List => JList}
import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import spark.streaming._
import spark.streaming.StreamingContext._
@ -18,8 +19,8 @@ import com.google.common.base.Optional
import spark.RDD
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
implicit val kTag: ClassTag[K],
implicit val vTag: ClassTag[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V], JavaPairRDD[K, V]] {
override def wrapRDD(rdd: RDD[(K, V)]): JavaPairRDD[K, V] = JavaPairRDD.fromRDD(rdd)
@ -138,8 +139,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner
): JavaPairDStream[K, C] = {
implicit val cm: ClassManifest[C] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
implicit val cm: ClassTag[C] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[C]]
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
@ -407,8 +408,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
implicit val cm: ClassManifest[S] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
implicit val cm: ClassTag[S] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}
@ -421,7 +422,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
def updateStateByKey[S: ClassManifest](
def updateStateByKey[S: ClassTag](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
@ -437,7 +438,7 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
def updateStateByKey[S: ClassManifest](
def updateStateByKey[S: ClassTag](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
@ -445,16 +446,16 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
}
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
dstream.mapValues(f)
}
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
implicit val cm: ClassTag[U] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[U]]
dstream.flatMapValues(fn)
}
@ -465,8 +466,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@ -477,8 +478,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.cogroup(other.dstream, partitioner)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
@ -488,8 +489,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
* to partition each generated RDD into default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.join(other.dstream)
}
@ -500,8 +501,8 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
*/
def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
implicit val cm: ClassTag[W] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[W]]
dstream.join(other.dstream, partitioner)
}
@ -575,24 +576,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
override val classManifest: ClassManifest[(K, V)] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
override val classTag: ClassTag[(K, V)] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[Tuple2[K, V]]]
}
object JavaPairDStream {
implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)])
implicit def fromPairDStream[K: ClassTag, V: ClassTag](dstream: DStream[(K, V)])
:JavaPairDStream[K, V] =
new JavaPairDStream[K, V](dstream)
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val cmk: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
new JavaPairDStream[K, V](dstream.dstream)
}
def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long])
def scalaToJavaLong[K: ClassTag](dstream: JavaPairDStream[K, Long])
: JavaPairDStream[K, JLong] = {
StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
}

View file

@ -17,6 +17,7 @@ import akka.actor.SupervisorStrategy
import akka.zeromq.Subscribe
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import java.lang.{Long => JLong, Integer => JInt}
import java.io.InputStream
@ -126,8 +127,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
groupId: String,
topics: JMap[String, JInt])
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.kafkaStream[T](zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}
@ -146,8 +147,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
topics: JMap[String, JInt],
initialOffsets: JMap[KafkaPartitionKey, JLong])
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.kafkaStream[T](
zkQuorum,
groupId,
@ -172,8 +173,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
initialOffsets: JMap[KafkaPartitionKey, JLong],
storageLevel: StorageLevel)
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.kafkaStream[T](
zkQuorum,
groupId,
@ -224,8 +225,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
storageLevel: StorageLevel)
: JavaDStream[T] = {
def fn = (x: InputStream) => converter.apply(x).toIterator
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.socketStream(hostname, port, fn, storageLevel)
}
@ -253,8 +254,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
hostname: String,
port: Int,
storageLevel: StorageLevel): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port, storageLevel))
}
@ -268,8 +269,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam T Type of the objects in the received blocks
*/
def rawSocketStream[T](hostname: String, port: Int): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cmt: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
JavaDStream.fromDStream(ssc.rawSocketStream(hostname, port))
}
@ -283,12 +284,12 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val cmf: ClassManifest[F] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
implicit val cmk: ClassTag[K] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[K]]
implicit val cmv: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
implicit val cmf: ClassTag[F] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[F]]
ssc.fileStream[K, V, F](directory);
}
@ -372,8 +373,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel, supervisorStrategy)
}
@ -393,8 +394,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
name: String,
storageLevel: StorageLevel
): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name, storageLevel)
}
@ -412,8 +413,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
props: Props,
name: String
): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.actorStream[T](props, name)
}
@ -434,8 +435,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
storageLevel: StorageLevel,
supervisorStrategy: SupervisorStrategy
): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
ssc.zeroMQStream[T](publisherUrl, subscribe, bytesToObjects, storageLevel, supervisorStrategy)
}
@ -455,8 +456,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]],
storageLevel: StorageLevel
): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn, storageLevel)
}
@ -475,8 +476,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
subscribe: Subscribe,
bytesToObjects: JFunction[Array[Array[Byte]], java.lang.Iterable[T]]
): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
def fn(x: Seq[Seq[Byte]]) = bytesToObjects.apply(x.map(_.toArray).toArray).toIterator
ssc.zeroMQStream[T](publisherUrl, subscribe, fn)
}
@ -497,8 +498,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue)
@ -514,8 +515,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime)
@ -535,8 +536,8 @@ class JavaStreamingContext(val ssc: StreamingContext) {
queue: java.util.Queue[JavaRDD[T]],
oneAtATime: Boolean,
defaultRDD: JavaRDD[T]): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd)

View file

@ -4,8 +4,10 @@ import spark.{RDD, Partitioner}
import spark.rdd.CoGroupedRDD
import spark.streaming.{Time, DStream, Duration}
import scala.reflect.ClassTag
private[streaming]
class CoGroupedDStream[K : ClassManifest](
class CoGroupedDStream[K : ClassTag](
parents: Seq[DStream[(K, _)]],
partitioner: Partitioner
) extends DStream[(K, Seq[Seq[_]])](parents.head.ssc) {

View file

@ -3,10 +3,12 @@ package spark.streaming.dstream
import spark.RDD
import spark.streaming.{Time, StreamingContext}
import scala.reflect.ClassTag
/**
* An input stream that always returns the same RDD on each timestep. Useful for testing.
*/
class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T])
class ConstantInputDStream[T: ClassTag](ssc_ : StreamingContext, rdd: RDD[T])
extends InputDStream[T](ssc_) {
override def start() {}
@ -16,4 +18,4 @@ class ConstantInputDStream[T: ClassManifest](ssc_ : StreamingContext, rdd: RDD[T
override def compute(validTime: Time): Option[RDD[T]] = {
Some(rdd)
}
}
}

View file

@ -9,14 +9,16 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import scala.collection.mutable.{HashSet, HashMap}
import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException}
private[streaming]
class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K,V] : ClassManifest](
class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
@transient ssc_ : StreamingContext,
directory: String,
filter: Path => Boolean = FileInputDStream.defaultFilter,
newFilesOnly: Boolean = true)
newFilesOnly: Boolean = true)
extends InputDStream[(K, V)](ssc_) {
protected[streaming] override val checkpointData = new FileInputDStreamCheckpointData
@ -37,7 +39,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
}
logDebug("LastModTime initialized to " + lastModTime + ", new files only = " + newFilesOnly)
}
override def stop() { }
/**
@ -83,7 +85,7 @@ class FileInputDStream[K: ClassManifest, V: ClassManifest, F <: NewInputFormat[K
latestModTimeFiles += path.toString
logDebug("Accepted " + path)
return true
}
}
}
}
logDebug("Finding new files at time " + validTime + " for last mod time = " + lastModTime)
@ -178,5 +180,3 @@ private[streaming]
object FileInputDStream {
def defaultFilter(path: Path): Boolean = !path.getName().startsWith(".")
}

View file

@ -3,8 +3,10 @@ package spark.streaming.dstream
import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import scala.reflect.ClassTag
private[streaming]
class FilteredDStream[T: ClassManifest](
class FilteredDStream[T: ClassTag](
parent: DStream[T],
filterFunc: T => Boolean
) extends DStream[T](parent.ssc) {
@ -17,5 +19,3 @@ class FilteredDStream[T: ClassManifest](
parent.getOrCompute(validTime).map(_.filter(filterFunc))
}
}

View file

@ -4,8 +4,10 @@ import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import spark.SparkContext._
import scala.reflect.ClassTag
private[streaming]
class FlatMapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
class FlatMapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
parent: DStream[(K, V)],
flatMapValueFunc: V => TraversableOnce[U]
) extends DStream[(K, U)](parent.ssc) {

View file

@ -3,8 +3,10 @@ package spark.streaming.dstream
import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import scala.reflect.ClassTag
private[streaming]
class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T => Traversable[U]
) extends DStream[U](parent.ssc) {
@ -17,4 +19,3 @@ class FlatMappedDStream[T: ClassManifest, U: ClassManifest](
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
}
}

View file

@ -12,13 +12,14 @@ import org.apache.avro.ipc.specific.SpecificResponder
import org.apache.avro.ipc.NettyServer
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
import java.net.InetSocketAddress
import java.io.{ObjectInput, ObjectOutput, Externalizable}
import java.nio.ByteBuffer
private[streaming]
class FlumeInputDStream[T: ClassManifest](
class FlumeInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,

View file

@ -2,9 +2,10 @@ package spark.streaming.dstream
import spark.RDD
import spark.streaming.{Duration, DStream, Job, Time}
import scala.reflect.ClassTag
private[streaming]
class ForEachDStream[T: ClassManifest] (
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) => Unit
) extends DStream[Unit](parent.ssc) {

View file

@ -3,8 +3,10 @@ package spark.streaming.dstream
import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import scala.reflect.ClassTag
private[streaming]
class GlommedDStream[T: ClassManifest](parent: DStream[T])
class GlommedDStream[T: ClassTag](parent: DStream[T])
extends DStream[Array[T]](parent.ssc) {
override def dependencies = List(parent)

View file

@ -2,6 +2,8 @@ package spark.streaming.dstream
import spark.streaming.{Time, Duration, StreamingContext, DStream}
import scala.reflect.ClassTag
/**
* This is the abstract base class for all input streams. This class provides to methods
* start() and stop() which called by the scheduler to start and stop receiving data/
@ -13,7 +15,7 @@ import spark.streaming.{Time, Duration, StreamingContext, DStream}
* that requires running a receiver on the worker nodes, use NetworkInputDStream
* as the parent class.
*/
abstract class InputDStream[T: ClassManifest] (@transient ssc_ : StreamingContext)
abstract class InputDStream[T: ClassTag] (@transient ssc_ : StreamingContext)
extends DStream[T](ssc_) {
var lastValidTime: Time = null

View file

@ -16,14 +16,14 @@ import kafka.utils.ZkUtils._
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import scala.reflect.ClassTag
// Key for a specific Kafka Partition: (broker, topic, group, part)
case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, partId: Int)
/**
* Input stream that pulls messages from a Kafka Broker.
*
*
* @param zkQuorum Zookeper quorum (hostname:port,hostname:port,..).
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
@ -33,7 +33,7 @@ case class KafkaPartitionKey(brokerId: Int, topic: String, groupId: String, part
* @param storageLevel RDD storage level.
*/
private[streaming]
class KafkaInputDStream[T: ClassManifest](
class KafkaInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
zkQuorum: String,
groupId: String,
@ -51,7 +51,7 @@ class KafkaInputDStream[T: ClassManifest](
private[streaming]
class KafkaReceiver(zkQuorum: String, groupId: String,
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
topics: Map[String, Int], initialOffsets: Map[KafkaPartitionKey, Long],
storageLevel: StorageLevel) extends NetworkReceiver[Any] {
// Timeout for establishing a connection to Zookeper in ms.

View file

@ -3,8 +3,10 @@ package spark.streaming.dstream
import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import scala.reflect.ClassTag
private[streaming]
class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
class MapPartitionedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
mapPartFunc: Iterator[T] => Iterator[U],
preservePartitioning: Boolean
@ -18,4 +20,3 @@ class MapPartitionedDStream[T: ClassManifest, U: ClassManifest](
parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
}
}

View file

@ -4,8 +4,10 @@ import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import spark.SparkContext._
import scala.reflect.ClassTag
private[streaming]
class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
class MapValuedDStream[K: ClassTag, V: ClassTag, U: ClassTag](
parent: DStream[(K, V)],
mapValueFunc: V => U
) extends DStream[(K, U)](parent.ssc) {
@ -18,4 +20,3 @@ class MapValuedDStream[K: ClassManifest, V: ClassManifest, U: ClassManifest](
parent.getOrCompute(validTime).map(_.mapValues[U](mapValueFunc))
}
}

View file

@ -3,8 +3,10 @@ package spark.streaming.dstream
import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import scala.reflect.ClassTag
private[streaming]
class MappedDStream[T: ClassManifest, U: ClassManifest] (
class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T => U
) extends DStream[U](parent.ssc) {
@ -17,4 +19,3 @@ class MappedDStream[T: ClassManifest, U: ClassManifest] (
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
}
}

View file

@ -8,6 +8,7 @@ import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import scala.concurrent.duration._
import scala.reflect.ClassTag
import java.nio.ByteBuffer
@ -28,7 +29,7 @@ import java.util.concurrent.ArrayBlockingQueue
* @param ssc_ Streaming context that will execute this input stream
* @tparam T Class type of the object of this stream
*/
abstract class NetworkInputDStream[T: ClassManifest](@transient ssc_ : StreamingContext)
abstract class NetworkInputDStream[T: ClassTag](@transient ssc_ : StreamingContext)
extends InputDStream[T](ssc_) {
// This is an unique identifier that is used to match the network receiver with the
@ -70,7 +71,7 @@ private[streaming] case class ReportError(msg: String) extends NetworkReceiverMe
* Abstract class of a receiver that can be run on worker nodes to receive external data. See
* [[spark.streaming.dstream.NetworkInputDStream]] for an explanation.
*/
abstract class NetworkReceiver[T: ClassManifest]() extends Serializable with Logging {
abstract class NetworkReceiver[T: ClassTag]() extends Serializable with Logging {
initLogging()

View file

@ -2,8 +2,10 @@ package spark.streaming.dstream
import spark.streaming.StreamingContext
import scala.reflect.ClassTag
private[streaming]
class PluggableInputDStream[T: ClassManifest](
class PluggableInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
receiver: NetworkReceiver[T]) extends NetworkInputDStream[T](ssc_) {

View file

@ -7,18 +7,20 @@ import scala.collection.mutable.Queue
import scala.collection.mutable.ArrayBuffer
import spark.streaming.{Time, StreamingContext}
import scala.reflect.ClassTag
private[streaming]
class QueueInputDStream[T: ClassManifest](
class QueueInputDStream[T: ClassTag](
@transient ssc: StreamingContext,
val queue: Queue[RDD[T]],
oneAtATime: Boolean,
defaultRDD: RDD[T]
) extends InputDStream[T](ssc) {
override def start() { }
override def stop() { }
override def compute(validTime: Time): Option[RDD[T]] = {
val buffer = new ArrayBuffer[RDD[T]]()
if (oneAtATime && queue.size > 0) {
@ -38,5 +40,5 @@ class QueueInputDStream[T: ClassManifest](
None
}
}
}

View file

@ -4,6 +4,8 @@ import spark.Logging
import spark.storage.StorageLevel
import spark.streaming.StreamingContext
import scala.reflect.ClassTag
import java.net.InetSocketAddress
import java.nio.ByteBuffer
import java.nio.channels.{ReadableByteChannel, SocketChannel}
@ -18,7 +20,7 @@ import java.util.concurrent.ArrayBlockingQueue
* in the format that the system is configured with.
*/
private[streaming]
class RawInputDStream[T: ClassManifest](
class RawInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,

View file

@ -8,11 +8,13 @@ import spark.Partitioner
import spark.SparkContext._
import spark.storage.StorageLevel
import scala.collection.mutable.ArrayBuffer
import spark.streaming.{Duration, Interval, Time, DStream}
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
private[streaming]
class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
class ReducedWindowedDStream[K: ClassTag, V: ClassTag](
parent: DStream[(K, V)],
reduceFunc: (V, V) => V,
invReduceFunc: (V, V) => V,
@ -32,7 +34,7 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
"must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")"
)
// Reduce each batch of data using reduceByKey which will be further reduced by window
// Reduce each batch of data using reduceByKey which will be further reduced by window
// by ReducedWindowedDStream
val reducedStream = parent.reduceByKey(reduceFunc, partitioner)
@ -153,5 +155,3 @@ class ReducedWindowedDStream[K: ClassManifest, V: ClassManifest](
}
}
}

View file

@ -4,8 +4,10 @@ import spark.{RDD, Partitioner}
import spark.SparkContext._
import spark.streaming.{Duration, DStream, Time}
import scala.reflect.ClassTag
private[streaming]
class ShuffledDStream[K: ClassManifest, V: ClassManifest, C: ClassManifest](
class ShuffledDStream[K: ClassTag, V: ClassTag, C: ClassTag](
parent: DStream[(K,V)],
createCombiner: V => C,
mergeValue: (C, V) => C,

View file

@ -4,11 +4,13 @@ import spark.streaming.StreamingContext
import spark.storage.StorageLevel
import spark.util.NextIterator
import scala.reflect.ClassTag
import java.io._
import java.net.Socket
private[streaming]
class SocketInputDStream[T: ClassManifest](
class SocketInputDStream[T: ClassTag](
@transient ssc_ : StreamingContext,
host: String,
port: Int,
@ -22,7 +24,7 @@ class SocketInputDStream[T: ClassManifest](
}
private[streaming]
class SocketReceiver[T: ClassManifest](
class SocketReceiver[T: ClassTag](
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],

View file

@ -6,8 +6,10 @@ import spark.SparkContext._
import spark.storage.StorageLevel
import spark.streaming.{Duration, Time, DStream}
import scala.reflect.ClassTag
private[streaming]
class StateDStream[K: ClassManifest, V: ClassManifest, S: ClassManifest](
class StateDStream[K: ClassTag, V: ClassTag, S: ClassTag](
parent: DStream[(K, V)],
updateFunc: (Iterator[(K, Seq[V], Option[S])]) => Iterator[(K, S)],
partitioner: Partitioner,

View file

@ -3,8 +3,10 @@ package spark.streaming.dstream
import spark.RDD
import spark.streaming.{Duration, DStream, Time}
import scala.reflect.ClassTag
private[streaming]
class TransformedDStream[T: ClassManifest, U: ClassManifest] (
class TransformedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
transformFunc: (RDD[T], Time) => RDD[U]
) extends DStream[U](parent.ssc) {

View file

@ -2,11 +2,13 @@ package spark.streaming.dstream
import spark.streaming.{Duration, DStream, Time}
import spark.RDD
import collection.mutable.ArrayBuffer
import spark.rdd.UnionRDD
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
private[streaming]
class UnionDStream[T: ClassManifest](parents: Array[DStream[T]])
class UnionDStream[T: ClassTag](parents: Array[DStream[T]])
extends DStream[T](parents.head.ssc) {
if (parents.length == 0) {

View file

@ -5,8 +5,10 @@ import spark.rdd.UnionRDD
import spark.storage.StorageLevel
import spark.streaming.{Duration, Interval, Time, DStream}
import scala.reflect.ClassTag
private[streaming]
class WindowedDStream[T: ClassManifest](
class WindowedDStream[T: ClassTag](
parent: DStream[T],
_windowDuration: Duration,
_slideDuration: Duration)
@ -35,6 +37,3 @@ class WindowedDStream[T: ClassManifest](
Some(new UnionRDD(ssc.sc, parent.slice(currentWindow)))
}
}

View file

@ -4,14 +4,16 @@ import akka.actor.{ Actor, PoisonPill, Props, SupervisorStrategy }
import akka.actor.{ actorRef2Scala, ActorRef }
import akka.actor.{ PossiblyHarmful, OneForOneStrategy }
import akka.actor.SupervisorStrategy._
import scala.concurrent.duration._
import scala.reflect.ClassTag
import spark.storage.StorageLevel
import spark.streaming.dstream.NetworkReceiver
import java.util.concurrent.atomic.AtomicInteger
/** A helper with set of defaults for supervisor strategy **/
/** A helper with set of defaults for supervisor strategy */
object ReceiverSupervisorStrategy {
val defaultStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange =
@ -43,11 +45,11 @@ object ReceiverSupervisorStrategy {
*
*/
trait Receiver { self: Actor
def pushBlock[T: ClassManifest](iter: Iterator[T]) {
def pushBlock[T: ClassTag](iter: Iterator[T]) {
context.parent ! Data(iter)
}
def pushBlock[T: ClassManifest](data: T) {
def pushBlock[T: ClassTag](data: T) {
context.parent ! Data(data)
}
@ -61,8 +63,8 @@ case class Statistics(numberOfMsgs: Int,
numberOfHiccups: Int,
otherInfo: String)
/** Case class to receive data sent by child actors **/
private[streaming] case class Data[T: ClassManifest](data: T)
/** Case class to receive data sent by child actors */
private[streaming] case class Data[T: ClassTag](data: T)
/**
* Provides Actors as receivers for receiving stream.
@ -85,7 +87,7 @@ private[streaming] case class Data[T: ClassManifest](data: T)
*
*
*/
private[streaming] class ActorReceiver[T: ClassManifest](
private[streaming] class ActorReceiver[T: ClassTag](
props: Props,
name: String,
storageLevel: StorageLevel,

View file

@ -5,10 +5,12 @@ import akka.zeromq._
import spark.Logging
import scala.reflect.ClassTag
/**
* A receiver to subscribe to ZeroMQ stream.
*/
private[streaming] class ZeroMQReceiver[T: ClassManifest](publisherUrl: String,
private[streaming] class ZeroMQReceiver[T: ClassTag](publisherUrl: String,
subscribe: Subscribe,
bytesToObjects: Seq[Seq[Byte]] Iterator[T])
extends Actor with Receiver with Logging {

View file

@ -7,6 +7,7 @@ import StreamingContext._
import scala.util.Random
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag
import java.io.{File, ObjectInputStream, IOException}
import java.util.UUID
@ -102,7 +103,7 @@ object MasterFailureTest extends Logging {
* Tests stream operation with multiple master failures, and verifies whether the
* final set of output values is as expected or not.
*/
def testOperation[T: ClassManifest](
def testOperation[T: ClassTag](
directory: String,
batchDuration: Duration,
input: Seq[String],
@ -140,7 +141,7 @@ object MasterFailureTest extends Logging {
* and batch duration. Returns the streaming context and the directory to which
* files should be written for testing.
*/
private def setupStreams[T: ClassManifest](
private def setupStreams[T: ClassTag](
directory: String,
batchDuration: Duration,
operation: DStream[String] => DStream[T]
@ -173,7 +174,7 @@ object MasterFailureTest extends Logging {
* Repeatedly starts and kills the streaming context until timed out or
* the last expected output is generated. Finally, return
*/
private def runStreams[T: ClassManifest](
private def runStreams[T: ClassTag](
ssc_ : StreamingContext,
lastExpectedOutput: T,
maxTimeToRun: Long
@ -254,7 +255,7 @@ object MasterFailureTest extends Logging {
* duplicate batch outputs of values from the `output`. As a result, the
* expected output should not have consecutive batches with the same values as output.
*/
private def verifyOutput[T: ClassManifest](output: Seq[T], expectedOutput: Seq[T]) {
private def verifyOutput[T: ClassTag](output: Seq[T], expectedOutput: Seq[T]) {
// Verify whether expected outputs do not consecutive batches with same output
for (i <- 0 until expectedOutput.size - 1) {
assert(expectedOutput(i) != expectedOutput(i+1),
@ -285,7 +286,7 @@ object MasterFailureTest extends Logging {
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
private[streaming]
class TestOutputStream[T: ClassManifest](
class TestOutputStream[T: ClassTag](
parent: DStream[T],
val output: ArrayBuffer[Seq[T]] = new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]]
) extends ForEachDStream[T](
@ -359,22 +360,22 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
val hadoopFile = new Path(testDir, (i+1).toString)
FileUtils.writeStringToFile(localFile, input(i).toString + "\n")
var tries = 0
var done = false
var done = false
while (!done && tries < maxTries) {
tries += 1
try {
fs.copyFromLocalFile(new Path(localFile.toString), hadoopFile)
done = true
} catch {
case ioe: IOException => {
fs = testDir.getFileSystem(new Configuration())
done = true
} catch {
case ioe: IOException => {
fs = testDir.getFileSystem(new Configuration())
logWarning("Attempt " + tries + " at generating file " + hadoopFile + " failed.", ioe)
}
}
}
if (!done)
}
}
if (!done)
logError("Could not generate file " + hadoopFile)
else
else
logInfo("Generated file " + hadoopFile + " at " + System.currentTimeMillis)
Thread.sleep(interval)
localFile.delete()
@ -388,5 +389,3 @@ class FileGeneratingThread(input: Seq[String], testDir: Path, interval: Long)
}
}
}

View file

@ -1,6 +1,8 @@
package spark.streaming
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import scala.reflect.ClassTag
import java.util.{List => JList}
import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
import spark.streaming._
@ -13,15 +15,15 @@ trait JavaTestBase extends TestSuiteBase {
/**
* Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context.
* The stream will be derived from the supplied lists of Java objects.
**/
*/
def attachTestInputStream[T](
ssc: JavaStreamingContext,
data: JList[JList[T]],
numPartitions: Int) = {
val seqData = data.map(Seq(_:_*))
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
ssc.ssc.registerInputStream(dstream)
new JavaDStream[T](dstream)
@ -30,12 +32,12 @@ trait JavaTestBase extends TestSuiteBase {
/**
* Attach a provided stream to it's associated StreamingContext as a
* [[spark.streaming.TestOutputStream]].
**/
*/
def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T, This, R],
R <: spark.api.java.JavaRDDLike[T, R]](
dstream: JavaDStreamLike[T, This, R]) = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
implicit val cm: ClassTag[T] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]]
val ostream = new TestOutputStream(dstream.dstream,
new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
dstream.dstream.ssc.registerOutputStream(ostream)
@ -48,8 +50,8 @@ trait JavaTestBase extends TestSuiteBase {
*/
def runStreams[V](
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
implicit val cm: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val cm: ClassTag[V] =
implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[V]]
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))
@ -64,4 +66,4 @@ object JavaTestUtils extends JavaTestBase {
object JavaCheckpointTestUtils extends JavaTestBase {
override def actuallyWait = true
}
}

View file

@ -3,6 +3,7 @@ package spark.streaming
import java.io.File
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
import org.apache.commons.io.FileUtils
import org.scalatest.BeforeAndAfter
@ -297,7 +298,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
* NOTE: This takes into consideration that the last batch processed before
* master failure will be re-processed after restart/recovery.
*/
def testCheckpointedOperation[U: ClassManifest, V: ClassManifest](
def testCheckpointedOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@ -340,7 +341,7 @@ class CheckpointSuite extends TestSuiteBase with BeforeAndAfter {
* Advances the manual clock on the streaming scheduler by given number of batches.
* It also waits for the expected amount of time for each batch.
*/
def advanceTimeWithRealDelay[V: ClassManifest](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
def advanceTimeWithRealDelay[V: ClassTag](ssc: StreamingContext, numBatches: Long): Seq[Seq[V]] = {
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
logInfo("Manual clock before advancing = " + clock.time)
for (i <- 1 to numBatches.toInt) {

View file

@ -5,8 +5,9 @@ import spark.streaming.util.ManualClock
import spark.{RDD, Logging}
import collection.mutable.ArrayBuffer
import collection.mutable.SynchronizedBuffer
import scala.collection.mutable.ArrayBuffer
import scala.collection.mutable.SynchronizedBuffer
import scala.reflect.ClassTag
import java.io.{ObjectInputStream, IOException}
@ -17,7 +18,7 @@ import org.scalatest.{BeforeAndAfter, FunSuite}
* replayable, reliable message queue like Kafka. It requires a sequence as input, and
* returns the i_th element at the i_th batch unde manual clock.
*/
class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
class TestInputStream[T: ClassTag](ssc_ : StreamingContext, input: Seq[Seq[T]], numPartitions: Int)
extends InputDStream[T](ssc_) {
def start() {}
@ -43,7 +44,7 @@ class TestInputStream[T: ClassManifest](ssc_ : StreamingContext, input: Seq[Seq[
* This is a output stream just for the testsuites. All the output is collected into a
* ArrayBuffer. This buffer is wiped clean on being restored from checkpoint.
*/
class TestOutputStream[T: ClassManifest](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
class TestOutputStream[T: ClassTag](parent: DStream[T], val output: ArrayBuffer[Seq[T]])
extends ForEachDStream[T](parent, (rdd: RDD[T], t: Time) => {
val collected = rdd.collect()
output += collected
@ -88,7 +89,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the DStream operation using the two sequences
* of input collections.
*/
def setupStreams[U: ClassManifest, V: ClassManifest](
def setupStreams[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V]
): StreamingContext = {
@ -112,7 +113,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Set up required DStreams to test the binary operation using the sequence
* of input collections.
*/
def setupStreams[U: ClassManifest, V: ClassManifest, W: ClassManifest](
def setupStreams[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W]
@ -140,7 +141,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* returns the collected output. It will wait until `numExpectedOutput` number of
* output data has been collected or timeout (set by `maxWaitTimeMillis`) is reached.
*/
def runStreams[V: ClassManifest](
def runStreams[V: ClassTag](
ssc: StreamingContext,
numBatches: Int,
numExpectedOutput: Int
@ -196,7 +197,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* is same as the expected output values, by comparing the output
* collections either as lists (order matters) or sets (order does not matter)
*/
def verifyOutput[V: ClassManifest](
def verifyOutput[V: ClassTag](
output: Seq[Seq[V]],
expectedOutput: Seq[Seq[V]],
useSet: Boolean
@ -226,7 +227,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test unary DStream operation with a list of inputs, with number of
* batches to run same as the number of expected output values
*/
def testOperation[U: ClassManifest, V: ClassManifest](
def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@ -244,7 +245,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
def testOperation[U: ClassManifest, V: ClassManifest](
def testOperation[U: ClassTag, V: ClassTag](
input: Seq[Seq[U]],
operation: DStream[U] => DStream[V],
expectedOutput: Seq[Seq[V]],
@ -261,7 +262,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* Test binary DStream operation with two lists of inputs, with number of
* batches to run same as the number of expected output values
*/
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],
@ -281,7 +282,7 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
* @param useSet Compare the output values with the expected output values
* as sets (order matters) or as lists (order does not matter)
*/
def testOperation[U: ClassManifest, V: ClassManifest, W: ClassManifest](
def testOperation[U: ClassTag, V: ClassTag, W: ClassTag](
input1: Seq[Seq[U]],
input2: Seq[Seq[V]],
operation: (DStream[U], DStream[V]) => DStream[W],