Two changes:

- Updating countByX() types based on bug fix
- Porting new documentation to Java
This commit is contained in:
Patrick Wendell 2013-01-14 10:03:55 -08:00
parent a292ed8d8a
commit d182a57cae
3 changed files with 271 additions and 7 deletions

View file

@ -38,9 +38,9 @@ public class JavaFlumeEventCount {
flumeStream.count();
flumeStream.count().map(new Function<Integer, String>() {
flumeStream.count().map(new Function<Long, String>() {
@Override
public String call(Integer in) {
public String call(Long in) {
return "Received " + in + " flume events.";
}
}).print();

View file

@ -1,7 +1,7 @@
package spark.streaming.api.java
import java.util.{List => JList}
import java.lang.{Integer => JInt}
import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
@ -17,8 +17,8 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
def dstream: DStream[T]
implicit def scalaIntToJavaInteger(in: DStream[Int]): JavaDStream[JInt] = {
in.map(new JInt(_))
implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
in.map(new JLong(_))
}
/**
@ -31,14 +31,14 @@ trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable
* Returns a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): JavaDStream[JInt] = dstream.count()
def count(): JavaDStream[JLong] = dstream.count()
/**
* Returns a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JInt] = {
def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
dstream.countByWindow(windowDuration, slideDuration)
}

View file

@ -85,21 +85,66 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
// Methods only for PairDStream's
// =======================================================================
/**
* Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
* single sequence to generate the RDDs of the new DStream. Hash partitioning is
* used to generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): JavaPairDStream[K, JList[V]] =
dstream.groupByKey().mapValues(seqAsJavaList _)
/**
* Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
* single sequence to generate the RDDs of the new DStream. Hash partitioning is
* used to generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
/**
* Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
* single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
/**
* Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs is merged using the
* associative reduce function to generate the RDDs of the new DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
*/
def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
dstream.reduceByKey(func)
/**
* Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs is merged using the
* associative reduce function to generate the RDDs of the new DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
*/
def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] =
dstream.reduceByKey(func, numPartitions)
/**
* Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs is merged using the
* associative reduce function to generate the RDDs of the new DStream.
* [[spark.Partitioner]] is used to control the partitioning of each RDD.
*/
def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
dstream.reduceByKey(func, partitioner)
}
/**
* Generic function to combine elements of each key in DStream's RDDs using custom function.
* This is similar to the combineByKey for RDDs. Please refer to combineByKey in
* [[spark.PairRDDFunctions]] for more information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
@ -110,25 +155,78 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
/**
* Creates a new DStream by counting the number of values of each key in each RDD
* of `this` DStream. Hash partitioning is used to generate the RDDs.
*/
def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions));
}
/**
* Creates a new DStream by counting the number of values of each key in each RDD
* of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's
* `numPartitions` partitions.
*/
def countByKey(): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey());
}
/**
* Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.groupByKey()` but applies it over a sliding window.
* The new DStream generates RDDs with the same interval as this DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _)
}
/**
* Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.groupByKey()` but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
}
/**
* Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.groupByKey()` but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
:JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
.mapValues(seqAsJavaList _)
}
/**
* Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.groupByKey()` but applies it over a sliding window.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
@ -138,11 +236,31 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
.mapValues(seqAsJavaList _)
}
/**
* Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
* The new DStream generates RDDs with the same interval as this DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration)
:JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
}
/**
* Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
windowDuration: Duration,
@ -151,6 +269,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
}
/**
* Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
windowDuration: Duration,
@ -160,6 +290,17 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions)
}
/**
* Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
windowDuration: Duration,
@ -169,6 +310,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner)
}
/**
* Creates a new DStream by reducing over a window in a smarter way.
* The reduced value of over a new window is calculated incrementally by using the
* old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
@ -178,6 +337,24 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
/**
* Creates a new DStream by reducing over a window in a smarter way.
* The reduced value of over a new window is calculated incrementally by using the
* old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
@ -193,6 +370,23 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
numPartitions)
}
/**
* Creates a new DStream by reducing over a window in a smarter way.
* The reduced value of over a new window is calculated incrementally by using the
* old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
@ -208,16 +402,38 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
partitioner)
}
/**
* Creates a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
}
/**
* Creates a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
: JavaPairDStream[K, Long] = {
dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
}
// TODO: Update State
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
@ -232,12 +448,26 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.flatMapValues(fn)
}
/**
* Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
* be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
* each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
* will contains a tuple with the list of values for that key in both RDDs.
* HashPartitioner is used to partition each generated RDD into default number of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
/**
* Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
* be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
* each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
* will contains a tuple with the list of values for that key in both RDDs.
* Partitioner is used to partition each generated RDD.
*/
def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
@ -246,12 +476,22 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
/**
* Joins `this` DStream with `other` DStream. Each RDD of the new DStream will
* be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.join(other.dstream)
}
/**
* Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
*/
def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
@ -259,10 +499,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.join(other.dstream, partitioner)
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
dstream.saveAsHadoopFiles(prefix, suffix)
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles(
prefix: String,
suffix: String,
@ -272,6 +520,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles(
prefix: String,
suffix: String,
@ -282,10 +534,18 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,
@ -295,6 +555,10 @@ class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,