Merge pull request #369 from pwendell/streaming-java-api

Java API For Spark Streaming
This commit is contained in:
Patrick Wendell 2013-01-17 22:39:30 -08:00
commit 11bbe23140
15 changed files with 2708 additions and 121 deletions

View file

@ -9,6 +9,7 @@ import spark.api.java.JavaPairRDD._
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
import spark.partial.{PartialResult, BoundedDouble}
import spark.storage.StorageLevel
import com.google.common.base.Optional
trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
@ -298,4 +299,31 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable {
* Save this RDD as a SequenceFile of serialized objects.
*/
def saveAsObjectFile(path: String) = rdd.saveAsObjectFile(path)
/**
* Mark this RDD for checkpointing. The RDD will be saved to a file inside `checkpointDir`
* (set using setCheckpointDir()) and all references to its parent RDDs will be removed.
* This is used to truncate very long lineages. In the current implementation, Spark will save
* this RDD to a file (using saveAsObjectFile()) after the first job using this RDD is done.
* Hence, it is strongly recommended to use checkpoint() on RDDs when
* (i) checkpoint() is called before the any job has been executed on this RDD.
* (ii) This RDD has been made to persist in memory. Otherwise saving it on a file will
* require recomputation.
*/
def checkpoint() = rdd.checkpoint()
/**
* Return whether this RDD has been checkpointed or not
*/
def isCheckpointed(): Boolean = rdd.isCheckpointed()
/**
* Gets the name of the file to which this RDD was checkpointed
*/
def getCheckpointFile(): Optional[String] = {
rdd.getCheckpointFile match {
case Some(file) => Optional.of(file)
case _ => Optional.absent()
}
}
}

View file

@ -342,6 +342,32 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
def clearFiles() {
sc.clearFiles()
}
/**
* Set the directory under which RDDs are going to be checkpointed. This method will
* create this directory and will throw an exception of the path already exists (to avoid
* overwriting existing files may be overwritten). The directory will be deleted on exit
* if indicated.
*/
def setCheckpointDir(dir: String, useExisting: Boolean) {
sc.setCheckpointDir(dir, useExisting)
}
/**
* Set the directory under which RDDs are going to be checkpointed. This method will
* create this directory and will throw an exception of the path already exists (to avoid
* overwriting existing files may be overwritten). The directory will be deleted on exit
* if indicated.
*/
def setCheckpointDir(dir: String) {
sc.setCheckpointDir(dir)
}
protected def checkpointFile[T](path: String): JavaRDD[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
new JavaRDD(sc.checkpointFile(path))
}
}
object JavaSparkContext {

View file

@ -625,4 +625,31 @@ public class JavaAPISuite implements Serializable {
});
Assert.assertEquals((Float) 25.0f, floatAccum.value());
}
@Test
public void checkpointAndComputation() {
File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
Assert.assertEquals(false, rdd.isCheckpointed());
rdd.checkpoint();
rdd.count(); // Forces the DAG to cause a checkpoint
Assert.assertEquals(true, rdd.isCheckpointed());
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), rdd.collect());
}
@Test
public void checkpointAndRestore() {
File tempDir = Files.createTempDir();
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
sc.setCheckpointDir(tempDir.getAbsolutePath(), true);
Assert.assertEquals(false, rdd.isCheckpointed());
rdd.checkpoint();
rdd.count(); // Forces the DAG to cause a checkpoint
Assert.assertEquals(true, rdd.isCheckpointed());
Assert.assertTrue(rdd.getCheckpointFile().isPresent());
JavaRDD<Integer> recovered = sc.checkpointFile(rdd.getCheckpointFile().get());
Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), recovered.collect());
}
}

View file

@ -0,0 +1,50 @@
package spark.streaming.examples;
import spark.api.java.function.Function;
import spark.streaming.*;
import spark.streaming.api.java.*;
import spark.streaming.dstream.SparkFlumeEvent;
/**
* Produces a count of events received from Flume.
*
* This should be used in conjunction with an AvroSink in Flume. It will start
* an Avro server on at the request host:port address and listen for requests.
* Your Flume AvroSink should be pointed to this address.
*
* Usage: JavaFlumeEventCount <master> <host> <port>
*
* <master> is a Spark master URL
* <host> is the host the Flume receiver will be started on - a receiver
* creates a server and listens for flume events.
* <port> is the port the Flume receiver will listen on.
*/
public class JavaFlumeEventCount {
public static void main(String[] args) {
if (args.length != 3) {
System.err.println("Usage: JavaFlumeEventCount <master> <host> <port>");
System.exit(1);
}
String master = args[0];
String host = args[1];
int port = Integer.parseInt(args[2]);
Duration batchInterval = new Duration(2000);
JavaStreamingContext sc = new JavaStreamingContext(master, "FlumeEventCount", batchInterval);
JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream("localhost", port);
flumeStream.count();
flumeStream.count().map(new Function<Long, String>() {
@Override
public String call(Long in) {
return "Received " + in + " flume events.";
}
}).print();
sc.start();
}
}

View file

@ -0,0 +1,62 @@
package spark.streaming.examples;
import com.google.common.collect.Lists;
import scala.Tuple2;
import spark.api.java.function.FlatMapFunction;
import spark.api.java.function.Function2;
import spark.api.java.function.PairFunction;
import spark.streaming.Duration;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
/**
* Counts words in UTF8 encoded, '\n' delimited text received from the network every second.
* Usage: NetworkWordCount <master> <hostname> <port>
* <master> is the Spark master URL. In local mode, <master> should be 'local[n]' with n > 1.
* <hostname> and <port> describe the TCP server that Spark Streaming would connect to receive data.
*
* To run this on your local machine, you need to first run a Netcat server
* `$ nc -lk 9999`
* and then run the example
* `$ ./run spark.streaming.examples.JavaNetworkWordCount local[2] localhost 9999`
*/
public class JavaNetworkWordCount {
public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
"In local mode, <master> should be 'local[n]' with n > 1");
System.exit(1);
}
// Create the context with a 1 second batch size
JavaStreamingContext ssc = new JavaStreamingContext(
args[0], "NetworkWordCount", new Duration(1000));
// Create a NetworkInputDStream on target ip:port and count the
// words in input stream of \n delimited test (eg. generated by 'nc')
JavaDStream<String> lines = ssc.networkTextStream(args[1], Integer.parseInt(args[2]));
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(x.split(" "));
}
});
JavaPairDStream<String, Integer> wordCounts = words.map(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
wordCounts.print();
ssc.start();
}
}

View file

@ -0,0 +1,62 @@
package spark.streaming.examples;
import com.google.common.collect.Lists;
import scala.Tuple2;
import spark.api.java.JavaRDD;
import spark.api.java.function.Function2;
import spark.api.java.function.PairFunction;
import spark.streaming.Duration;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
public class JavaQueueStream {
public static void main(String[] args) throws InterruptedException {
if (args.length < 1) {
System.err.println("Usage: JavaQueueStream <master>");
System.exit(1);
}
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
// Create and push some RDDs into the queue
List<Integer> list = Lists.newArrayList();
for (int i = 0; i < 1000; i++) {
list.add(i);
}
for (int i = 0; i < 30; i++) {
rddQueue.add(ssc.sc().parallelize(list));
}
// Create the QueueInputDStream and use it do some processing
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) throws Exception {
return new Tuple2<Integer, Integer>(i % 10, 1);
}
});
JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
reducedStream.print();
ssc.start();
}
}

View file

@ -22,7 +22,7 @@ object NetworkWordCount {
System.exit(1)
}
// Create the context and set the batch size
// Create the context with a 1 second batch size
val ssc = new StreamingContext(args(0), "NetworkWordCount", Seconds(1))
// Create a NetworkInputDStream on target ip:port and count the

View file

@ -98,10 +98,10 @@ abstract class DStream[T: ClassManifest] (
this
}
/** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): DStream[T] = persist(StorageLevel.MEMORY_ONLY_SER)
/** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): DStream[T] = persist()
/**
@ -119,7 +119,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* This method initializes the DStream by setting the "zero" time, based on which
* Initialize the DStream by setting the "zero" time, based on which
* the validity of future times is calculated. This method also recursively initializes
* its parent DStreams.
*/
@ -244,7 +244,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Retrieves a precomputed RDD of this DStream, or computes the RDD. This is an internal
* Retrieve a precomputed RDD of this DStream, or computes the RDD. This is an internal
* method that should not be called directly.
*/
protected[streaming] def getOrCompute(time: Time): Option[RDD[T]] = {
@ -283,7 +283,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Generates a SparkStreaming job for the given time. This is an internal method that
* Generate a SparkStreaming job for the given time. This is an internal method that
* should not be called directly. This default implementation creates a job
* that materializes the corresponding RDD. Subclasses of DStream may override this
* (eg. ForEachDStream).
@ -302,7 +302,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Dereferences RDDs that are older than rememberDuration.
* Dereference RDDs that are older than rememberDuration.
*/
protected[streaming] def forgetOldRDDs(time: Time) {
val keys = generatedRDDs.keys
@ -328,7 +328,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Refreshes the list of checkpointed RDDs that will be saved along with checkpoint of
* Refresh the list of checkpointed RDDs that will be saved along with checkpoint of
* this stream. This is an internal method that should not be called directly. This is
* a default implementation that saves only the file names of the checkpointed RDDs to
* checkpointData. Subclasses of DStream (especially those of InputDStream) may override
@ -373,7 +373,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Restores the RDDs in generatedRDDs from the checkpointData. This is an internal method
* Restore the RDDs in generatedRDDs from the checkpointData. This is an internal method
* that should not be called directly. This is a default implementation that recreates RDDs
* from the checkpoint file names stored in checkpointData. Subclasses of DStream that
* override the updateCheckpointData() method would also need to override this method.
@ -425,20 +425,20 @@ abstract class DStream[T: ClassManifest] (
// DStream operations
// =======================================================================
/** Returns a new DStream by applying a function to all elements of this DStream. */
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassManifest](mapFunc: T => U): DStream[U] = {
new MappedDStream(this, ssc.sc.clean(mapFunc))
}
/**
* Returns a new DStream by applying a function to all elements of this DStream,
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[U: ClassManifest](flatMapFunc: T => Traversable[U]): DStream[U] = {
new FlatMappedDStream(this, ssc.sc.clean(flatMapFunc))
}
/** Returns a new DStream containing only the elements that satisfy a predicate. */
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(filterFunc: T => Boolean): DStream[T] = new FilteredDStream(this, filterFunc)
/**
@ -461,20 +461,20 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Returns a new DStream in which each RDD has a single element generated by reducing each RDD
* Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
def reduce(reduceFunc: (T, T) => T): DStream[T] =
this.map(x => (null, x)).reduceByKey(reduceFunc, 1).map(_._2)
/**
* Returns a new DStream in which each RDD has a single element generated by counting each RDD
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): DStream[Long] = this.map(_ => 1L).reduce(_ + _)
/**
* Applies a function to each RDD in this DStream. This is an output operator, so
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: RDD[T] => Unit) {
@ -482,7 +482,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Applies a function to each RDD in this DStream. This is an output operator, so
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: (RDD[T], Time) => Unit) {
@ -492,7 +492,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Returns a new DStream in which each RDD is generated by applying a function
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: RDD[T] => RDD[U]): DStream[U] = {
@ -500,7 +500,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Returns a new DStream in which each RDD is generated by applying a function
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U: ClassManifest](transformFunc: (RDD[T], Time) => RDD[U]): DStream[U] = {
@ -508,7 +508,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Prints the first ten elements of each RDD generated in this DStream. This is an output
* Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print() {
@ -545,7 +545,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
* Return a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchTime, batchTime).
* @param batchDuration tumbling window duration; must be a multiple of this DStream's
* batching interval
@ -553,7 +553,7 @@ abstract class DStream[T: ClassManifest] (
def tumble(batchDuration: Duration): DStream[T] = window(batchDuration, batchDuration)
/**
* Returns a new DStream in which each RDD has a single element generated by reducing all
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a window over this DStream. windowDuration and slideDuration are as defined
* in the window() operation. This is equivalent to
* window(windowDuration, slideDuration).reduce(reduceFunc)
@ -578,7 +578,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Returns a new DStream in which each RDD has a single element generated by counting the number
* Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
@ -587,20 +587,20 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Returns a new DStream by unifying data of another DStream with this DStream.
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same slideDuration as this DStream.
*/
def union(that: DStream[T]): DStream[T] = new UnionDStream[T](Array(this, that))
/**
* Returns all the RDDs defined by the Interval object (both end times included)
* Return all the RDDs defined by the Interval object (both end times included)
*/
protected[streaming] def slice(interval: Interval): Seq[RDD[T]] = {
slice(interval.beginTime, interval.endTime)
}
/**
* Returns all the RDDs between 'fromTime' to 'toTime' (both included)
* Return all the RDDs between 'fromTime' to 'toTime' (both included)
*/
def slice(fromTime: Time, toTime: Time): Seq[RDD[T]] = {
val rdds = new ArrayBuffer[RDD[T]]()
@ -616,7 +616,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Saves each RDD in this DStream as a Sequence file of serialized objects.
* Save each RDD in this DStream as a Sequence file of serialized objects.
* The file name at each batch interval is generated based on `prefix` and
* `suffix`: "prefix-TIME_IN_MS.suffix".
*/
@ -629,7 +629,7 @@ abstract class DStream[T: ClassManifest] (
}
/**
* Saves each RDD in this DStream as at text file, using string representation
* Save each RDD in this DStream as at text file, using string representation
* of elements. The file name at each batch interval is generated based on
* `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/

View file

@ -26,29 +26,23 @@ extends Serializable {
}
/**
* Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
* single sequence to generate the RDDs of the new DStream. Hash partitioning is
* used to generate the RDDs with Spark's default number of partitions.
* Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner())
}
/**
* Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
* single sequence to generate the RDDs of the new DStream. Hash partitioning is
* used to generate the RDDs with `numPartitions` partitions.
* Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): DStream[(K, Seq[V])] = {
groupByKey(defaultPartitioner(numPartitions))
}
/**
* Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
* single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
* Create a new DStream by applying `groupByKey` on each RDD. The supplied [[spark.Partitioner]]
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): DStream[(K, Seq[V])] = {
@ -60,30 +54,27 @@ extends Serializable {
}
/**
* Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs is merged using the
* associative reduce function to generate the RDDs of the new DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs
* with Spark's default number of partitions.
*/
def reduceByKey(reduceFunc: (V, V) => V): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner())
}
/**
* Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs is merged using the
* associative reduce function to generate the RDDs of the new DStream.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
* with `numPartitions` partitions.
*/
def reduceByKey(reduceFunc: (V, V) => V, numPartitions: Int): DStream[(K, V)] = {
reduceByKey(reduceFunc, defaultPartitioner(numPartitions))
}
/**
* Creates a new DStream by applying `reduceByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs is merged using the
* associative reduce function to generate the RDDs of the new DStream.
* [[spark.Partitioner]] is used to control the partitioning of each RDD.
* Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
* partitioning of each RDD.
*/
def reduceByKey(reduceFunc: (V, V) => V, partitioner: Partitioner): DStream[(K, V)] = {
val cleanedReduceFunc = ssc.sc.clean(reduceFunc)
@ -91,9 +82,9 @@ extends Serializable {
}
/**
* Generic function to combine elements of each key in DStream's RDDs using custom function.
* This is similar to the combineByKey for RDDs. Please refer to combineByKey in
* [[spark.PairRDDFunctions]] for more information.
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more
* information.
*/
def combineByKey[C: ClassManifest](
createCombiner: V => C,
@ -104,19 +95,18 @@ extends Serializable {
}
/**
* Creates a new DStream by counting the number of values of each key in each RDD
* of `this` DStream. Hash partitioning is used to generate the RDDs with Spark's
* `numPartitions` partitions.
* Create a new DStream by counting the number of values of each key in each RDD. Hash
* partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
*/
def countByKey(numPartitions: Int = self.ssc.sc.defaultParallelism): DStream[(K, Long)] = {
self.map(x => (x._1, 1L)).reduceByKey((x: Long, y: Long) => x + y, numPartitions)
}
/**
* Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.groupByKey()` but applies it over a sliding window.
* The new DStream generates RDDs with the same interval as this DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
* with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
@ -125,9 +115,9 @@ extends Serializable {
}
/**
* Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.groupByKey()` but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* Create a new DStream by applying `groupByKey` over a sliding window. Similar to
* `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@ -139,8 +129,8 @@ extends Serializable {
}
/**
* Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.groupByKey()` but applies it over a sliding window.
* Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@ -158,8 +148,8 @@ extends Serializable {
}
/**
* Creates a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.groupByKey()` but applies it over a sliding window.
* Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
@ -176,10 +166,10 @@ extends Serializable {
}
/**
* Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
* The new DStream generates RDDs with the same interval as this DStream.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
* generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
* the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@ -192,9 +182,9 @@ extends Serializable {
}
/**
* Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@ -211,9 +201,9 @@ extends Serializable {
}
/**
* Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@ -232,8 +222,8 @@ extends Serializable {
}
/**
* Creates a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* This is similar to `DStream.reduceByKey()` but applies it over a sliding window.
* Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
* `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@ -255,9 +245,8 @@ extends Serializable {
}
/**
* Creates a new DStream by reducing over a window in a smarter way.
* The reduced value of over a new window is calculated incrementally by using the
* old window's reduce value :
* Create a new DStream by reducing over a using incremental computation.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@ -283,9 +272,8 @@ extends Serializable {
}
/**
* Creates a new DStream by reducing over a window in a smarter way.
* The reduced value of over a new window is calculated incrementally by using the
* old window's reduce value :
* Create a new DStream by reducing over a using incremental computation.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@ -313,9 +301,8 @@ extends Serializable {
}
/**
* Creates a new DStream by reducing over a window in a smarter way.
* The reduced value of over a new window is calculated incrementally by using the
* old window's reduce value :
* Create a new DStream by reducing over a using incremental computation.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
@ -344,7 +331,7 @@ extends Serializable {
}
/**
* Creates a new DStream by counting the number of values for each key over a window.
* Create a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
@ -369,10 +356,9 @@ extends Serializable {
}
/**
* Creates a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key from
* `this` DStream. Hash partitioning is used to generate the RDDs with Spark's default
* number of partitions.
* Create a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
@ -384,9 +370,9 @@ extends Serializable {
}
/**
* Creates a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key from
* `this` DStream. Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* Create a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param numPartitions Number of partitions of each RDD in the new DStream.
@ -400,9 +386,9 @@ extends Serializable {
}
/**
* Creates a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key from
* `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
* Create a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
* [[spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
@ -419,9 +405,9 @@ extends Serializable {
}
/**
* Creates a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key from
* `this` DStream. [[spark.Partitioner]] is used to control the partitioning of each RDD.
* Create a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* [[spark.Paxrtitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated. Note, that
* this function may generate a different a tuple with a different key
@ -451,22 +437,19 @@ extends Serializable {
}
/**
* Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
* be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
* each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
* will contains a tuple with the list of values for that key in both RDDs.
* HashPartitioner is used to partition each generated RDD into default number of partitions.
* Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
* of partitions.
*/
def cogroup[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (Seq[V], Seq[W]))] = {
cogroup(other, defaultPartitioner())
}
/**
* Cogroups `this` DStream with `other` DStream. Each RDD of the new DStream will
* be generated by cogrouping RDDs from`this`and `other` DStreams. Therefore, for
* each key k in corresponding RDDs of `this` or `other` DStreams, the generated RDD
* will contains a tuple with the list of values for that key in both RDDs.
* Partitioner is used to partition each generated RDD.
* Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. Partitioner is used to partition each generated RDD.
*/
def cogroup[W: ClassManifest](
other: DStream[(K, W)],
@ -488,8 +471,7 @@ extends Serializable {
}
/**
* Joins `this` DStream with `other` DStream. Each RDD of the new DStream will
* be generated by joining RDDs from `this` and `other` DStreams. HashPartitioner is used
* Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
*/
def join[W: ClassManifest](other: DStream[(K, W)]): DStream[(K, (V, W))] = {
@ -497,7 +479,7 @@ extends Serializable {
}
/**
* Joins `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
*/
@ -513,7 +495,7 @@ extends Serializable {
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles[F <: OutputFormat[K, V]](
@ -524,7 +506,7 @@ extends Serializable {
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix"
*/
def saveAsHadoopFiles(
@ -543,8 +525,8 @@ extends Serializable {
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](
prefix: String,
@ -554,8 +536,8 @@ extends Serializable {
}
/**
* Saves each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is generated
* based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles(
prefix: String,

View file

@ -0,0 +1,91 @@
package spark.streaming.api.java
import spark.streaming.{Duration, Time, DStream}
import spark.api.java.function.{Function => JFunction}
import spark.api.java.JavaRDD
import spark.storage.StorageLevel
/**
* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
* sequence of RDDs (of the same type) representing a continuous stream of data (see [[spark.RDD]]
* for more details on RDDs). DStreams can either be created from live data (such as, data from
* HDFS, Kafka or Flume) or it can be generated by transformation existing DStreams using operations
* such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each
* DStream periodically generates a RDD, either from live data or by transforming the RDD generated
* by a parent DStream.
*
* This class contains the basic operations available on all DStreams, such as `map`, `filter` and
* `window`. In addition, [[spark.streaming.api.java.JavaPairDStream]] contains operations available
* only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and `join`. These operations
* are automatically available on any DStream of the right type (e.g., DStream[(Int, Int)] through
* implicit conversions when `spark.streaming.StreamingContext._` is imported.
*
* DStreams internally is characterized by a few basic properties:
* - A list of other DStreams that the DStream depends on
* - A time interval at which the DStream generates an RDD
* - A function that is used to generate an RDD after each time interval
*/
class JavaDStream[T](val dstream: DStream[T])(implicit val classManifest: ClassManifest[T])
extends JavaDStreamLike[T, JavaDStream[T]] {
/** Return a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[T, java.lang.Boolean]): JavaDStream[T] =
dstream.filter((x => f(x).booleanValue()))
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaDStream[T] = dstream.cache()
/** Persist RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): JavaDStream[T] = dstream.cache()
/** Persist the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaDStream[T] = dstream.persist(storageLevel)
/** Generate an RDD for the given duration */
def compute(validTime: Time): JavaRDD[T] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaRDD(rdd)
case None => null
}
}
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
* @return
*/
def window(windowDuration: Duration): JavaDStream[T] =
dstream.window(windowDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* @param windowDuration duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
def window(windowDuration: Duration, slideDuration: Duration): JavaDStream[T] =
dstream.window(windowDuration, slideDuration)
/**
* Return a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchDuration, batchDuration).
* @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
*/
def tumble(batchDuration: Duration): JavaDStream[T] =
dstream.tumble(batchDuration)
/**
* Return a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
def union(that: JavaDStream[T]): JavaDStream[T] =
dstream.union(that.dstream)
}
object JavaDStream {
implicit def fromDStream[T: ClassManifest](dstream: DStream[T]): JavaDStream[T] =
new JavaDStream[T](dstream)
}

View file

@ -0,0 +1,183 @@
package spark.streaming.api.java
import java.util.{List => JList}
import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
import spark.streaming._
import spark.api.java.JavaRDD
import spark.api.java.function.{Function2 => JFunction2, Function => JFunction, _}
import java.util
import spark.RDD
import JavaDStream._
trait JavaDStreamLike[T, This <: JavaDStreamLike[T, This]] extends Serializable {
implicit val classManifest: ClassManifest[T]
def dstream: DStream[T]
implicit def scalaIntToJavaLong(in: DStream[Long]): JavaDStream[JLong] = {
in.map(new JLong(_))
}
/**
* Print the first ten elements of each RDD generated in this DStream. This is an output
* operator, so this DStream will be registered as an output stream and there materialized.
*/
def print() = dstream.print()
/**
* Return a new DStream in which each RDD has a single element generated by counting each RDD
* of this DStream.
*/
def count(): JavaDStream[JLong] = dstream.count()
/**
* Return a new DStream in which each RDD has a single element generated by counting the number
* of elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).count()
*/
def countByWindow(windowDuration: Duration, slideDuration: Duration) : JavaDStream[JLong] = {
dstream.countByWindow(windowDuration, slideDuration)
}
/**
* Return a new DStream in which each RDD is generated by applying glom() to each RDD of
* this DStream. Applying glom() to an RDD coalesces all elements within each partition into
* an array.
*/
def glom(): JavaDStream[JList[T]] =
new JavaDStream(dstream.glom().map(x => new java.util.ArrayList[T](x.toSeq)))
/** Return the StreamingContext associated with this DStream */
def context(): StreamingContext = dstream.context()
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[R](f: JFunction[T, R]): JavaDStream[R] = {
new JavaDStream(dstream.map(f)(f.returnType()))(f.returnType())
}
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[K, V](f: PairFunction[T, K, V]): JavaPairDStream[K, V] = {
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
new JavaPairDStream(dstream.map(f)(cm))(f.keyType(), f.valueType())
}
/**
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[U](f: FlatMapFunction[T, U]): JavaDStream[U] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
new JavaDStream(dstream.flatMap(fn)(f.elementType()))(f.elementType())
}
/**
* Return a new DStream by applying a function to all elements of this DStream,
* and then flattening the results
*/
def flatMap[K, V](f: PairFlatMapFunction[T, K, V]): JavaPairDStream[K, V] = {
import scala.collection.JavaConverters._
def fn = (x: T) => f.apply(x).asScala
def cm = implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
new JavaPairDStream(dstream.flatMap(fn)(cm))(f.keyType(), f.valueType())
}
/**
* Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaDStream[U] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaDStream(dstream.mapPartitions(fn)(f.elementType()))(f.elementType())
}
/**
* Return a new DStream in which each RDD is generated by applying mapPartitions() to each RDDs
* of this DStream. Applying mapPartitions() to an RDD applies a function to each partition
* of the RDD.
*/
def mapPartitions[K, V](f: PairFlatMapFunction[java.util.Iterator[T], K, V])
: JavaPairDStream[K, V] = {
def fn = (x: Iterator[T]) => asScalaIterator(f.apply(asJavaIterator(x)).iterator())
new JavaPairDStream(dstream.mapPartitions(fn))(f.keyType(), f.valueType())
}
/**
* Return a new DStream in which each RDD has a single element generated by reducing each RDD
* of this DStream.
*/
def reduce(f: JFunction2[T, T, T]): JavaDStream[T] = dstream.reduce(f)
/**
* Return a new DStream in which each RDD has a single element generated by reducing all
* elements in a window over this DStream. windowDuration and slideDuration are as defined in the
* window() operation. This is equivalent to window(windowDuration, slideDuration).reduce(reduceFunc)
*/
def reduceByWindow(
reduceFunc: JFunction2[T, T, T],
invReduceFunc: JFunction2[T, T, T],
windowDuration: Duration,
slideDuration: Duration
): JavaDStream[T] = {
dstream.reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
/**
* Return all the RDDs between 'fromDuration' to 'toDuration' (both included)
*/
def slice(fromDuration: Duration, toDuration: Duration): JList[JavaRDD[T]] = {
new util.ArrayList(dstream.slice(fromDuration, toDuration).map(new JavaRDD(_)).toSeq)
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction[JavaRDD[T], Void]) {
dstream.foreach(rdd => foreachFunc.call(new JavaRDD(rdd)))
}
/**
* Apply a function to each RDD in this DStream. This is an output operator, so
* this DStream will be registered as an output stream and therefore materialized.
*/
def foreach(foreachFunc: JFunction2[JavaRDD[T], Time, Void]) {
dstream.foreach((rdd, time) => foreachFunc.call(new JavaRDD(rdd), time))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction[JavaRDD[T], JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
def scalaTransform (in: RDD[T]): RDD[U] =
transformFunc.call(new JavaRDD[T](in)).rdd
dstream.transform(scalaTransform(_))
}
/**
* Return a new DStream in which each RDD is generated by applying a function
* on each RDD of this DStream.
*/
def transform[U](transformFunc: JFunction2[JavaRDD[T], Time, JavaRDD[U]]): JavaDStream[U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
def scalaTransform (in: RDD[T], time: Time): RDD[U] =
transformFunc.call(new JavaRDD[T](in), time).rdd
dstream.transform(scalaTransform(_, _))
}
/**
* Enable periodic checkpointing of RDDs of this DStream
* @param interval Time interval after which generated RDD will be checkpointed
*/
def checkpoint(interval: Duration) = {
dstream.checkpoint(interval)
}
}

View file

@ -0,0 +1,638 @@
package spark.streaming.api.java
import java.util.{List => JList}
import java.lang.{Long => JLong}
import scala.collection.JavaConversions._
import spark.streaming._
import spark.streaming.StreamingContext._
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import spark.Partitioner
import org.apache.hadoop.mapred.{JobConf, OutputFormat}
import org.apache.hadoop.mapreduce.{OutputFormat => NewOutputFormat}
import org.apache.hadoop.conf.Configuration
import spark.api.java.JavaPairRDD
import spark.storage.StorageLevel
import com.google.common.base.Optional
class JavaPairDStream[K, V](val dstream: DStream[(K, V)])(
implicit val kManifiest: ClassManifest[K],
implicit val vManifest: ClassManifest[V])
extends JavaDStreamLike[(K, V), JavaPairDStream[K, V]] {
// =======================================================================
// Methods common to all DStream's
// =======================================================================
/** Returns a new DStream containing only the elements that satisfy a predicate. */
def filter(f: JFunction[(K, V), java.lang.Boolean]): JavaPairDStream[K, V] =
dstream.filter((x => f(x).booleanValue()))
/** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def cache(): JavaPairDStream[K, V] = dstream.cache()
/** Persists RDDs of this DStream with the default storage level (MEMORY_ONLY_SER) */
def persist(): JavaPairDStream[K, V] = dstream.cache()
/** Persists the RDDs of this DStream with the given storage level */
def persist(storageLevel: StorageLevel): JavaPairDStream[K, V] = dstream.persist(storageLevel)
/** Method that generates a RDD for the given Duration */
def compute(validTime: Time): JavaPairRDD[K, V] = {
dstream.compute(validTime) match {
case Some(rdd) => new JavaPairRDD(rdd)
case None => null
}
}
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* The new DStream generates RDDs with the same interval as this DStream.
* @param windowDuration width of the window; must be a multiple of this DStream's interval.
* @return
*/
def window(windowDuration: Duration): JavaPairDStream[K, V] =
dstream.window(windowDuration)
/**
* Return a new DStream which is computed based on windowed batches of this DStream.
* @param windowDuration duration (i.e., width) of the window;
* must be a multiple of this DStream's interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's interval
*/
def window(windowDuration: Duration, slideDuration: Duration): JavaPairDStream[K, V] =
dstream.window(windowDuration, slideDuration)
/**
* Returns a new DStream which computed based on tumbling window on this DStream.
* This is equivalent to window(batchDuration, batchDuration).
* @param batchDuration tumbling window duration; must be a multiple of this DStream's interval
*/
def tumble(batchDuration: Duration): JavaPairDStream[K, V] =
dstream.tumble(batchDuration)
/**
* Returns a new DStream by unifying data of another DStream with this DStream.
* @param that Another DStream having the same interval (i.e., slideDuration) as this DStream.
*/
def union(that: JavaPairDStream[K, V]): JavaPairDStream[K, V] =
dstream.union(that.dstream)
// =======================================================================
// Methods only for PairDStream's
// =======================================================================
/**
* Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
*/
def groupByKey(): JavaPairDStream[K, JList[V]] =
dstream.groupByKey().mapValues(seqAsJavaList _)
/**
* Create a new DStream by applying `groupByKey` to each RDD. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
*/
def groupByKey(numPartitions: Int): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(numPartitions).mapValues(seqAsJavaList _)
/**
* Creates a new DStream by applying `groupByKey` on each RDD of `this` DStream.
* Therefore, the values for each key in `this` DStream's RDDs are grouped into a
* single sequence to generate the RDDs of the new DStream. [[spark.Partitioner]]
* is used to control the partitioning of each RDD.
*/
def groupByKey(partitioner: Partitioner): JavaPairDStream[K, JList[V]] =
dstream.groupByKey(partitioner).mapValues(seqAsJavaList _)
/**
* Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the associative reduce function. Hash partitioning is used to generate the RDDs
* with Spark's default number of partitions.
*/
def reduceByKey(func: JFunction2[V, V, V]): JavaPairDStream[K, V] =
dstream.reduceByKey(func)
/**
* Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. Hash partitioning is used to generate the RDDs
* with `numPartitions` partitions.
*/
def reduceByKey(func: JFunction2[V, V, V], numPartitions: Int): JavaPairDStream[K, V] =
dstream.reduceByKey(func, numPartitions)
/**
* Create a new DStream by applying `reduceByKey` to each RDD. The values for each key are
* merged using the supplied reduce function. [[spark.Partitioner]] is used to control the
* partitioning of each RDD.
*/
def reduceByKey(func: JFunction2[V, V, V], partitioner: Partitioner): JavaPairDStream[K, V] = {
dstream.reduceByKey(func, partitioner)
}
/**
* Combine elements of each key in DStream's RDDs using custom function. This is similar to the
* combineByKey for RDDs. Please refer to combineByKey in [[spark.PairRDDFunctions]] for more
* information.
*/
def combineByKey[C](createCombiner: JFunction[V, C],
mergeValue: JFunction2[C, V, C],
mergeCombiners: JFunction2[C, C, C],
partitioner: Partitioner
): JavaPairDStream[K, C] = {
implicit val cm: ClassManifest[C] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[C]]
dstream.combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)
}
/**
* Create a new DStream by counting the number of values of each key in each RDD. Hash
* partitioning is used to generate the RDDs with Spark's `numPartitions` partitions.
*/
def countByKey(numPartitions: Int): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey(numPartitions));
}
/**
* Create a new DStream by counting the number of values of each key in each RDD. Hash
* partitioning is used to generate the RDDs with the default number of partitions.
*/
def countByKey(): JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKey());
}
/**
* Creates a new DStream by applying `groupByKey` over a sliding window. This is similar to
* `DStream.groupByKey()` but applies it over a sliding window. The new DStream generates RDDs
* with the same interval as this DStream. Hash partitioning is used to generate the RDDs with
* Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration): JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration).mapValues(seqAsJavaList _)
}
/**
* Create a new DStream by applying `groupByKey` over a sliding window. Similar to
* `DStream.groupByKey()`, but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration).mapValues(seqAsJavaList _)
}
/**
* Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def groupByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
:JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, numPartitions)
.mapValues(seqAsJavaList _)
}
/**
* Create a new DStream by applying `groupByKey` over a sliding window on `this` DStream.
* Similar to `DStream.groupByKey()`, but applies it over a sliding window.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def groupByKeyAndWindow(
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
):JavaPairDStream[K, JList[V]] = {
dstream.groupByKeyAndWindow(windowDuration, slideDuration, partitioner)
.mapValues(seqAsJavaList _)
}
/**
* Create a new DStream by applying `reduceByKey` over a sliding window on `this` DStream.
* Similar to `DStream.reduceByKey()`, but applies it over a sliding window. The new DStream
* generates RDDs with the same interval as this DStream. Hash partitioning is used to generate
* the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
*/
def reduceByKeyAndWindow(reduceFunc: Function2[V, V, V], windowDuration: Duration)
:JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration)
}
/**
* Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
):JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration)
}
/**
* Create a new DStream by applying `reduceByKey` over a sliding window. This is similar to
* `DStream.reduceByKey()` but applies it over a sliding window. Hash partitioning is used to
* generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, numPartitions)
}
/**
* Create a new DStream by applying `reduceByKey` over a sliding window. Similar to
* `DStream.reduceByKey()`, but applies it over a sliding window.
* @param reduceFunc associative reduce function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, windowDuration, slideDuration, partitioner)
}
/**
* Create a new DStream by reducing over a using incremental computation.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration)
}
/**
* Create a new DStream by reducing over a using incremental computation.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
numPartitions: Int
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
numPartitions)
}
/**
* Create a new DStream by reducing over a using incremental computation.
* The reduced value of over a new window is calculated using the old window's reduce value :
* 1. reduce the new values that entered the window (e.g., adding new counts)
* 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts)
* This is more efficient that reduceByKeyAndWindow without "inverse reduce" function.
* However, it is applicable to only "invertible reduce functions".
* @param reduceFunc associative reduce function
* @param invReduceFunc inverse function
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
*/
def reduceByKeyAndWindow(
reduceFunc: Function2[V, V, V],
invReduceFunc: Function2[V, V, V],
windowDuration: Duration,
slideDuration: Duration,
partitioner: Partitioner
): JavaPairDStream[K, V] = {
dstream.reduceByKeyAndWindow(
reduceFunc,
invReduceFunc,
windowDuration,
slideDuration,
partitioner)
}
/**
* Create a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
*/
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration)
: JavaPairDStream[K, JLong] = {
JavaPairDStream.scalaToJavaLong(dstream.countByKeyAndWindow(windowDuration, slideDuration))
}
/**
* Create a new DStream by counting the number of values for each key over a window.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param windowDuration width of the window; must be a multiple of this DStream's
* batching interval
* @param slideDuration sliding interval of the window (i.e., the interval after which
* the new DStream will generate RDDs); must be a multiple of this
* DStream's batching interval
* @param numPartitions Number of partitions of each RDD in the new DStream.
*/
def countByKeyAndWindow(windowDuration: Duration, slideDuration: Duration, numPartitions: Int)
: JavaPairDStream[K, Long] = {
dstream.countByKeyAndWindow(windowDuration, slideDuration, numPartitions)
}
private def convertUpdateStateFunction[S](in: JFunction2[JList[V], Optional[S], Optional[S]]):
(Seq[V], Option[S]) => Option[S] = {
val scalaFunc: (Seq[V], Option[S]) => Option[S] = (values, state) => {
val list: JList[V] = values
val scalaState: Optional[S] = state match {
case Some(s) => Optional.of(s)
case _ => Optional.absent()
}
val result: Optional[S] = in.apply(list, scalaState)
result.isPresent match {
case true => Some(result.get())
case _ => None
}
}
scalaFunc
}
/**
* Create a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @tparam S State type
*/
def updateStateByKey[S](updateFunc: JFunction2[JList[V], Optional[S], Optional[S]])
: JavaPairDStream[K, S] = {
implicit val cm: ClassManifest[S] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[S]]
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc))
}
/**
* Create a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of each key.
* Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param numPartitions Number of partitions of each RDD in the new DStream.
* @tparam S State type
*/
def updateStateByKey[S: ClassManifest](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
numPartitions: Int)
: JavaPairDStream[K, S] = {
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), numPartitions)
}
/**
* Create a new "state" DStream where the state for each key is updated by applying
* the given function on the previous state of the key and the new values of the key.
* [[spark.Partitioner]] is used to control the partitioning of each RDD.
* @param updateFunc State update function. If `this` function returns None, then
* corresponding state key-value pair will be eliminated.
* @param partitioner Partitioner for controlling the partitioning of each RDD in the new DStream.
* @tparam S State type
*/
def updateStateByKey[S: ClassManifest](
updateFunc: JFunction2[JList[V], Optional[S], Optional[S]],
partitioner: Partitioner
): JavaPairDStream[K, S] = {
dstream.updateStateByKey(convertUpdateStateFunction(updateFunc), partitioner)
}
def mapValues[U](f: JFunction[V, U]): JavaPairDStream[K, U] = {
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
dstream.mapValues(f)
}
def flatMapValues[U](f: JFunction[V, java.lang.Iterable[U]]): JavaPairDStream[K, U] = {
import scala.collection.JavaConverters._
def fn = (x: V) => f.apply(x).asScala
implicit val cm: ClassManifest[U] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[U]]
dstream.flatMapValues(fn)
}
/**
* Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. HashPartitioner is used to partition each generated RDD into default number
* of partitions.
*/
def cogroup[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.cogroup(other.dstream).mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
/**
* Cogroup `this` DStream with `other` DStream. For each key k in corresponding RDDs of `this`
* or `other` DStreams, the generated RDD will contains a tuple with the list of values for that
* key in both RDDs. Partitioner is used to partition each generated RDD.
*/
def cogroup[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (JList[V], JList[W])] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.cogroup(other.dstream, partitioner)
.mapValues(t => (seqAsJavaList(t._1), seqAsJavaList((t._2))))
}
/**
* Join `this` DStream with `other` DStream. HashPartitioner is used
* to partition each generated RDD into default number of partitions.
*/
def join[W](other: JavaPairDStream[K, W]): JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.join(other.dstream)
}
/**
* Join `this` DStream with `other` DStream, that is, each RDD of the new DStream will
* be generated by joining RDDs from `this` and other DStream. Uses the given
* Partitioner to partition each generated RDD.
*/
def join[W](other: JavaPairDStream[K, W], partitioner: Partitioner)
: JavaPairDStream[K, (V, W)] = {
implicit val cm: ClassManifest[W] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[W]]
dstream.join(other.dstream, partitioner)
}
/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsHadoopFiles[F <: OutputFormat[K, V]](prefix: String, suffix: String) {
dstream.saveAsHadoopFiles(prefix, suffix)
}
/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsHadoopFiles(
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]]) {
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsHadoopFiles(
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: OutputFormat[_, _]],
conf: JobConf) {
dstream.saveAsHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles[F <: NewOutputFormat[K, V]](prefix: String, suffix: String) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix)
}
/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]]) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass)
}
/**
* Save each RDD in `this` DStream as a Hadoop file. The file name at each batch interval is
* generated based on `prefix` and `suffix`: "prefix-TIME_IN_MS.suffix".
*/
def saveAsNewAPIHadoopFiles(
prefix: String,
suffix: String,
keyClass: Class[_],
valueClass: Class[_],
outputFormatClass: Class[_ <: NewOutputFormat[_, _]],
conf: Configuration = new Configuration) {
dstream.saveAsNewAPIHadoopFiles(prefix, suffix, keyClass, valueClass, outputFormatClass, conf)
}
override val classManifest: ClassManifest[(K, V)] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[Tuple2[K, V]]]
}
object JavaPairDStream {
implicit def fromPairDStream[K: ClassManifest, V: ClassManifest](dstream: DStream[(K, V)])
:JavaPairDStream[K, V] =
new JavaPairDStream[K, V](dstream)
def fromJavaDStream[K, V](dstream: JavaDStream[(K, V)]): JavaPairDStream[K, V] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
new JavaPairDStream[K, V](dstream.dstream)
}
def scalaToJavaLong[K: ClassManifest](dstream: JavaPairDStream[K, Long])
: JavaPairDStream[K, JLong] = {
StreamingContext.toPairDStreamFunctions(dstream.dstream).mapValues(new JLong(_))
}
}

View file

@ -0,0 +1,346 @@
package spark.streaming.api.java
import scala.collection.JavaConversions._
import java.lang.{Long => JLong, Integer => JInt}
import spark.streaming._
import dstream._
import spark.storage.StorageLevel
import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import java.io.InputStream
import java.util.{Map => JMap}
import spark.api.java.{JavaSparkContext, JavaRDD}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
* information (such as, cluster URL and job name) to internally create a SparkContext, it provides
* methods used to create DStream from various input sources.
*/
class JavaStreamingContext(val ssc: StreamingContext) {
// TODOs:
// - Test to/from Hadoop functions
// - Support creating and registering InputStreams
/**
* Creates a StreamingContext.
* @param master Name of the Spark Master
* @param frameworkName Name to be used when registering with the scheduler
* @param batchDuration The time interval at which streaming data will be divided into batches
*/
def this(master: String, frameworkName: String, batchDuration: Duration) =
this(new StreamingContext(master, frameworkName, batchDuration))
/**
* Re-creates a StreamingContext from a checkpoint file.
* @param path Path either to the directory that was specified as the checkpoint directory, or
* to the checkpoint file 'graph' or 'graph.bk'.
*/
def this(path: String) = this (new StreamingContext(path))
/** The underlying SparkContext */
val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
*/
def kafkaStream[T](
hostname: String,
port: Int,
groupId: String,
topics: JMap[String, JInt])
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](hostname, port, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*))
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
* By default the value is pulled from zookeper.
*/
def kafkaStream[T](
hostname: String,
port: Int,
groupId: String,
topics: JMap[String, JInt],
initialOffsets: JMap[KafkaPartitionKey, JLong])
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](
hostname,
port,
groupId,
Map(topics.mapValues(_.intValue()).toSeq: _*),
Map(initialOffsets.mapValues(_.longValue()).toSeq: _*))
}
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
* @param port Zookeper port.
* @param groupId The group id for this consumer.
* @param topics Map of (topic_name -> numPartitions) to consume. Each partition is consumed
* in its own thread.
* @param initialOffsets Optional initial offsets for each of the partitions to consume.
* By default the value is pulled from zookeper.
* @param storageLevel RDD storage level. Defaults to memory-only
*/
def kafkaStream[T](
hostname: String,
port: Int,
groupId: String,
topics: JMap[String, JInt],
initialOffsets: JMap[KafkaPartitionKey, JLong],
storageLevel: StorageLevel)
: JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.kafkaStream[T](
hostname,
port,
groupId,
Map(topics.mapValues(_.intValue()).toSeq: _*),
Map(initialOffsets.mapValues(_.longValue()).toSeq: _*),
storageLevel)
}
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* (default: StorageLevel.MEMORY_AND_DISK_SER_2)
*/
def networkTextStream(hostname: String, port: Int, storageLevel: StorageLevel)
: JavaDStream[String] = {
ssc.networkTextStream(hostname, port, storageLevel)
}
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes is interpreted as UTF8 encoded \n delimited
* lines.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
*/
def networkTextStream(hostname: String, port: Int): JavaDStream[String] = {
ssc.networkTextStream(hostname, port)
}
/**
* Create a input stream from network source hostname:port. Data is received using
* a TCP socket and the receive bytes it interepreted as object using the given
* converter.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param converter Function to convert the byte stream to objects
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects received (after converting bytes to objects)
*/
def networkStream[T](
hostname: String,
port: Int,
converter: JFunction[InputStream, java.lang.Iterable[T]],
storageLevel: StorageLevel)
: JavaDStream[T] = {
def fn = (x: InputStream) => converter.apply(x).toIterator
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
ssc.networkStream(hostname, port, fn, storageLevel)
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them as text files (using key as LongWritable, value
* as Text and input format as TextInputFormat). File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
*/
def textFileStream(directory: String): JavaDStream[String] = {
ssc.textFileStream(directory)
}
/**
* Create a input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
* way to receive data.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @param storageLevel Storage level to use for storing the received objects
* @tparam T Type of the objects in the received blocks
*/
def rawNetworkStream[T](
hostname: String,
port: Int,
storageLevel: StorageLevel): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port, storageLevel))
}
/**
* Create a input stream from network source hostname:port, where data is received
* as serialized blocks (serialized using the Spark's serializer) that can be directly
* pushed into the block manager without deserializing them. This is the most efficient
* way to receive data.
* @param hostname Hostname to connect to for receiving data
* @param port Port to connect to for receiving data
* @tparam T Type of the objects in the received blocks
*/
def rawNetworkStream[T](hostname: String, port: Int): JavaDStream[T] = {
implicit val cmt: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
JavaDStream.fromDStream(ssc.rawNetworkStream(hostname, port))
}
/**
* Creates a input stream that monitors a Hadoop-compatible filesystem
* for new files and reads them using the given key-value types and input format.
* File names starting with . are ignored.
* @param directory HDFS directory to monitor for new file
* @tparam K Key type for reading HDFS file
* @tparam V Value type for reading HDFS file
* @tparam F Input format for reading HDFS file
*/
def fileStream[K, V, F <: NewInputFormat[K, V]](directory: String): JavaPairDStream[K, V] = {
implicit val cmk: ClassManifest[K] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[K]]
implicit val cmv: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
implicit val cmf: ClassManifest[F] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[F]]
ssc.fileStream[K, V, F](directory);
}
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
* @param storageLevel Storage level to use for storing the received objects
*/
def flumeStream(hostname: String, port: Int, storageLevel: StorageLevel):
JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port, storageLevel)
}
/**
* Creates a input stream from a Flume source.
* @param hostname Hostname of the slave machine to which the flume data will be sent
* @param port Port of the slave machine to which the flume data will be sent
*/
def flumeStream(hostname: String, port: Int):
JavaDStream[SparkFlumeEvent] = {
ssc.flumeStream(hostname, port)
}
/**
* Registers an output stream that will be computed every interval
*/
def registerOutputStream(outputStream: JavaDStreamLike[_, _]) {
ssc.registerOutputStream(outputStream.dstream)
}
/**
* Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
* @param queue Queue of RDDs
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue)
}
/**
* Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime)
}
/**
* Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty
* @tparam T Type of objects in the RDD
*/
def queueStream[T](
queue: java.util.Queue[JavaRDD[T]],
oneAtATime: Boolean,
defaultRDD: JavaRDD[T]): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd)
}
/**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. By default, the graph will be checkpointed every batch interval.
* @param directory HDFS-compatible directory where the checkpoint data will be reliably stored
* @param interval checkpoint interval
*/
def checkpoint(directory: String, interval: Duration = null) {
ssc.checkpoint(directory, interval)
}
/**
* Sets each DStreams in this context to remember RDDs it generated in the last given duration.
* DStreams remember RDDs only for a limited duration of duration and releases them for garbage
* collection. This method allows the developer to specify how to long to remember the RDDs (
* if the developer wishes to query old data outside the DStream computation).
* @param duration Minimum duration that each DStream should remember its RDDs
*/
def remember(duration: Duration) {
ssc.remember(duration)
}
/**
* Starts the execution of the streams.
*/
def start() = ssc.start()
/**
* Sstops the execution of the streams.
*/
def stop() = ssc.stop()
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,65 @@
package spark.streaming
import collection.mutable.{SynchronizedBuffer, ArrayBuffer}
import java.util.{List => JList}
import spark.streaming.api.java.{JavaPairDStream, JavaDStreamLike, JavaDStream, JavaStreamingContext}
import spark.streaming._
import java.util.ArrayList
import collection.JavaConversions._
/** Exposes streaming test functionality in a Java-friendly way. */
trait JavaTestBase extends TestSuiteBase {
/**
* Create a [[spark.streaming.TestInputStream]] and attach it to the supplied context.
* The stream will be derived from the supplied lists of Java objects.
**/
def attachTestInputStream[T](
ssc: JavaStreamingContext,
data: JList[JList[T]],
numPartitions: Int) = {
val seqData = data.map(Seq(_:_*))
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val dstream = new TestInputStream[T](ssc.ssc, seqData, numPartitions)
ssc.ssc.registerInputStream(dstream)
new JavaDStream[T](dstream)
}
/**
* Attach a provided stream to it's associated StreamingContext as a
* [[spark.streaming.TestOutputStream]].
**/
def attachTestOutputStream[T, This <: spark.streaming.api.java.JavaDStreamLike[T,This]](
dstream: JavaDStreamLike[T, This]) = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val ostream = new TestOutputStream(dstream.dstream,
new ArrayBuffer[Seq[T]] with SynchronizedBuffer[Seq[T]])
dstream.dstream.ssc.registerOutputStream(ostream)
}
/**
* Process all registered streams for a numBatches batches, failing if
* numExpectedOutput RDD's are not generated. Generated RDD's are collected
* and returned, represented as a list for each batch interval.
*/
def runStreams[V](
ssc: JavaStreamingContext, numBatches: Int, numExpectedOutput: Int): JList[JList[V]] = {
implicit val cm: ClassManifest[V] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[V]]
val res = runStreams[V](ssc.ssc, numBatches, numExpectedOutput)
val out = new ArrayList[JList[V]]()
res.map(entry => out.append(new ArrayList[V](entry)))
out
}
}
object JavaTestUtils extends JavaTestBase {
}
object JavaCheckpointTestUtils extends JavaTestBase {
override def actuallyWait = true
}