Adding queueStream and some slight refactoring

This commit is contained in:
Patrick Wendell 2013-01-17 21:07:09 -08:00
parent 6fba7683c2
commit e0165bf714
4 changed files with 226 additions and 83 deletions

View file

@ -0,0 +1,62 @@
package spark.streaming.examples;
import com.google.common.collect.Lists;
import scala.Tuple2;
import spark.api.java.JavaRDD;
import spark.api.java.function.Function2;
import spark.api.java.function.PairFunction;
import spark.streaming.Duration;
import spark.streaming.api.java.JavaDStream;
import spark.streaming.api.java.JavaPairDStream;
import spark.streaming.api.java.JavaStreamingContext;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
public class JavaQueueStream {
public static void main(String[] args) throws InterruptedException {
if (args.length < 1) {
System.err.println("Usage: JavaQueueStream <master>");
System.exit(1);
}
// Create the context
JavaStreamingContext ssc = new JavaStreamingContext(args[0], "QueueStream", new Duration(1000));
// Create the queue through which RDDs can be pushed to
// a QueueInputDStream
Queue<JavaRDD<Integer>> rddQueue = new LinkedList<JavaRDD<Integer>>();
// Create and push some RDDs into the queue
List<Integer> list = Lists.newArrayList();
for (int i = 0; i < 1000; i++) {
list.add(i);
}
for (int i = 0; i < 30; i++) {
rddQueue.add(ssc.sc().parallelize(list));
}
// Create the QueueInputDStream and use it do some processing
JavaDStream<Integer> inputStream = ssc.queueStream(rddQueue);
JavaPairDStream<Integer, Integer> mappedStream = inputStream.map(
new PairFunction<Integer, Integer, Integer>() {
@Override
public Tuple2<Integer, Integer> call(Integer i) throws Exception {
return new Tuple2<Integer, Integer>(i % 10, 1);
}
});
JavaPairDStream<Integer, Integer> reducedStream = mappedStream.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) throws Exception {
return i1 + i2;
}
});
reducedStream.print();
ssc.start();
}
}

View file

@ -162,8 +162,7 @@ object SparkBuild extends Build {
def streamingSettings = sharedSettings ++ Seq(
name := "spark-streaming",
libraryDependencies ++= Seq(
"com.github.sgroschupf" % "zkclient" % "0.1",
"junit" % "junit" % "4.8.1")
"com.github.sgroschupf" % "zkclient" % "0.1")
) ++ assemblySettings ++ extraAssemblySettings
def extraAssemblySettings() = Seq(test in assembly := {}) ++ Seq(

View file

@ -10,6 +10,7 @@ import spark.api.java.function.{Function => JFunction, Function2 => JFunction2}
import org.apache.hadoop.mapreduce.{InputFormat => NewInputFormat}
import java.io.InputStream
import java.util.{Map => JMap}
import spark.api.java.{JavaSparkContext, JavaRDD}
/**
* A StreamingContext is the main entry point for Spark Streaming functionality. Besides the basic
@ -39,6 +40,9 @@ class JavaStreamingContext(val ssc: StreamingContext) {
*/
def this(path: String) = this (new StreamingContext(path))
/** The underlying SparkContext */
val sc: JavaSparkContext = new JavaSparkContext(ssc.sc)
/**
* Create an input stream that pulls messages form a Kafka Broker.
* @param hostname Zookeper hostname.
@ -254,6 +258,60 @@ class JavaStreamingContext(val ssc: StreamingContext) {
ssc.registerOutputStream(outputStream.dstream)
}
/**
* Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
* @param queue Queue of RDDs
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]]): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue)
}
/**
* Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @tparam T Type of objects in the RDD
*/
def queueStream[T](queue: java.util.Queue[JavaRDD[T]], oneAtATime: Boolean): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime)
}
/**
* Creates a input stream from an queue of RDDs. In each batch,
* it will process either one or all of the RDDs returned by the queue.
*
* NOTE: changes to the queue after the stream is created will not be recognized.
* @param queue Queue of RDDs
* @param oneAtATime Whether only one RDD should be consumed from the queue in every interval
* @param defaultRDD Default RDD is returned by the DStream when the queue is empty
* @tparam T Type of objects in the RDD
*/
def queueStream[T](
queue: java.util.Queue[JavaRDD[T]],
oneAtATime: Boolean,
defaultRDD: JavaRDD[T]): JavaDStream[T] = {
implicit val cm: ClassManifest[T] =
implicitly[ClassManifest[AnyRef]].asInstanceOf[ClassManifest[T]]
val sQueue = new scala.collection.mutable.Queue[spark.RDD[T]]
sQueue.enqueue(queue.map(_.rdd).toSeq: _*)
ssc.queueStream(sQueue, oneAtATime, defaultRDD.rdd)
}
/**
* Sets the context to periodically checkpoint the DStream operations for master
* fault-tolerance. By default, the graph will be checkpointed every batch interval.

View file

@ -12,6 +12,7 @@ import org.junit.Test;
import scala.Tuple2;
import spark.HashPartitioner;
import spark.api.java.JavaRDD;
import spark.api.java.JavaSparkContext;
import spark.api.java.function.*;
import spark.storage.StorageLevel;
import spark.streaming.api.java.JavaDStream;
@ -28,17 +29,17 @@ import java.util.*;
// serialized, as an alternative to converting these anonymous classes to static inner classes;
// see http://stackoverflow.com/questions/758570/.
public class JavaAPISuite implements Serializable {
private transient JavaStreamingContext sc;
private transient JavaStreamingContext ssc;
@Before
public void setUp() {
sc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
}
@After
public void tearDown() {
sc.stop();
sc = null;
ssc.stop();
ssc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.master.port");
}
@ -55,10 +56,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(3L),
Arrays.asList(1L));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream count = stream.count();
JavaTestUtils.attachTestOutputStream(count);
List<List<Long>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Long>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
}
@ -72,7 +73,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(5,5),
Arrays.asList(9,4));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
@ -80,7 +81,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(letterCount);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 2, 2);
assertOrderInvariantEquals(expected, result);
}
@ -98,10 +99,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,4,5,6),
Arrays.asList(7,8,9));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream windowed = stream.window(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
assertOrderInvariantEquals(expected, result);
}
@ -122,10 +123,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,10,11,12,13,14,15,16,17,18),
Arrays.asList(13,14,15,16,17,18));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream windowed = stream.window(new Duration(4000), new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 8, 4);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 8, 4);
assertOrderInvariantEquals(expected, result);
}
@ -145,10 +146,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(7,8,9,10,11,12),
Arrays.asList(13,14,15,16,17,18));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream windowed = stream.tumble(new Duration(2000));
JavaTestUtils.attachTestOutputStream(windowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 6, 3);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 6, 3);
assertOrderInvariantEquals(expected, result);
}
@ -163,7 +164,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("giants"),
Arrays.asList("yankees"));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream filtered = stream.filter(new Function<String, Boolean>() {
@Override
public Boolean call(String s) throws Exception {
@ -171,7 +172,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(filtered);
List<List<String>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 2, 2);
assertOrderInvariantEquals(expected, result);
}
@ -186,10 +187,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(Arrays.asList("giants", "dodgers")),
Arrays.asList(Arrays.asList("yankees", "red socks")));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream glommed = stream.glom();
JavaTestUtils.attachTestOutputStream(glommed);
List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -204,7 +205,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("GIANTSDODGERS"),
Arrays.asList("YANKEESRED SOCKS"));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream mapped = stream.mapPartitions(new FlatMapFunction<Iterator<String>, String>() {
@Override
public Iterable<String> call(Iterator<String> in) {
@ -216,7 +217,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(mapped);
List<List<List<String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<List<String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -247,10 +248,10 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(15),
Arrays.asList(24));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream reduced = stream.reduce(new IntegerSum());
JavaTestUtils.attachTestOutputStream(reduced);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@ -268,15 +269,38 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(39),
Arrays.asList(24));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream reducedWindowed = stream.reduceByWindow(new IntegerSum(),
new IntegerDifference(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reducedWindowed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 4, 4);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 4, 4);
Assert.assertEquals(expected, result);
}
@Test
public void testQueueStream() {
List<List<Integer>> expected = Arrays.asList(
Arrays.asList(1,2,3),
Arrays.asList(4,5,6),
Arrays.asList(7,8,9));
JavaSparkContext jsc = new JavaSparkContext(ssc.ssc().sc());
JavaRDD<Integer> rdd1 = ssc.sc().parallelize(Arrays.asList(1,2,3));
JavaRDD<Integer> rdd2 = ssc.sc().parallelize(Arrays.asList(4,5,6));
JavaRDD<Integer> rdd3 = ssc.sc().parallelize(Arrays.asList(7,8,9));
LinkedList<JavaRDD<Integer>> rdds = Lists.newLinkedList();
rdds.add(rdd1);
rdds.add(rdd2);
rdds.add(rdd3);
JavaDStream<Integer> stream = ssc.queueStream(rdds);
JavaTestUtils.attachTestOutputStream(stream);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@Test
public void testTransform() {
List<List<Integer>> inputData = Arrays.asList(
@ -289,7 +313,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(6,7,8),
Arrays.asList(9,10,11));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream transformed = stream.transform(new Function<JavaRDD<Integer>, JavaRDD<Integer>>() {
@Override
public JavaRDD<Integer> call(JavaRDD<Integer> in) throws Exception {
@ -301,7 +325,7 @@ public class JavaAPISuite implements Serializable {
});
}});
JavaTestUtils.attachTestOutputStream(transformed);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
}
@ -318,7 +342,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList("b", "o", "o", "d","o","d","g","e","r","s"),
Arrays.asList("a","t","h","l","e","t","i","c","s"));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream flatMapped = stream.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
@ -326,7 +350,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<String>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<String>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
}
@ -365,7 +389,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<Integer, String>(9, "c"),
new Tuple2<Integer, String>(9, "s")));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream flatMapped = stream.flatMap(new PairFlatMapFunction<String, Integer, String>() {
@Override
public Iterable<Tuple2<Integer, String>> call(String in) throws Exception {
@ -377,7 +401,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Tuple2<Integer, String>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@ -399,12 +423,12 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(2,2,5,5),
Arrays.asList(3,3,6,6));
JavaDStream stream1 = JavaTestUtils.attachTestInputStream(sc, inputData1, 2);
JavaDStream stream2 = JavaTestUtils.attachTestInputStream(sc, inputData2, 2);
JavaDStream stream1 = JavaTestUtils.attachTestInputStream(ssc, inputData1, 2);
JavaDStream stream2 = JavaTestUtils.attachTestInputStream(ssc, inputData2, 2);
JavaDStream unioned = stream1.union(stream2);
JavaTestUtils.attachTestOutputStream(unioned);
List<List<Integer>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Integer>> result = JavaTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result);
}
@ -436,7 +460,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("giants", 6)),
Arrays.asList(new Tuple2<String, Integer>("yankees", 7)));
JavaDStream stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = stream.map(
new PairFunction<String, String, Integer>() {
@Override
@ -453,7 +477,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(filtered);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -492,12 +516,12 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, List<String>> grouped = pairStream.groupByKey();
JavaTestUtils.attachTestOutputStream(grouped);
List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -515,13 +539,13 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduced = pairStream.reduceByKey(new IntegerSum());
JavaTestUtils.attachTestOutputStream(reduced);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -539,7 +563,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Integer>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> combined = pairStream.<Integer>combineByKey(
@ -551,7 +575,7 @@ public class JavaAPISuite implements Serializable {
}, new IntegerSum(), new IntegerSum(), new HashPartitioner(2));
JavaTestUtils.attachTestOutputStream(combined);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -569,12 +593,12 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Long>("new york", 2L)));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Long> counted = pairStream.countByKey();
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -592,13 +616,13 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, List<String>>("california", Arrays.asList("sharks", "ducks")),
new Tuple2<String, List<String>>("new york", Arrays.asList("rangers", "islanders"))));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, List<String>> groupWindowed =
pairStream.groupByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(groupWindowed);
List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Tuple2<String, List<String>>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@ -615,13 +639,13 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("california", 10),
new Tuple2<String, Integer>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@ -638,7 +662,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("california", 14),
new Tuple2<String, Integer>("new york", 9)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> updated = pairStream.updateStateByKey(
@ -656,7 +680,7 @@ public class JavaAPISuite implements Serializable {
}
});
JavaTestUtils.attachTestOutputStream(updated);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@ -673,13 +697,13 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(new Tuple2<String, Integer>("california", 10),
new Tuple2<String, Integer>("new york", 4)));
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream<Tuple2<String, Integer>> stream = JavaTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaPairDStream<String, Integer> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Integer> reduceWindowed =
pairStream.reduceByKeyAndWindow(new IntegerSum(), new IntegerDifference(), new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(reduceWindowed);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Tuple2<String, Integer>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@ -700,13 +724,13 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, Long>("new york", 2L)));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, Long> counted =
pairStream.countByKeyAndWindow(new Duration(2000), new Duration(1000));
JavaTestUtils.attachTestOutputStream(counted);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 3, 3);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 3, 3);
Assert.assertEquals(expected, result);
}
@ -726,7 +750,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, String>("new york", "ISLANDERS")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
JavaPairDStream<String, String> mapped = pairStream.mapValues(new Function<String, String>() {
@ -737,7 +761,7 @@ public class JavaAPISuite implements Serializable {
});
JavaTestUtils.attachTestOutputStream(mapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -765,7 +789,7 @@ public class JavaAPISuite implements Serializable {
new Tuple2<String, String>("new york", "islanders2")));
JavaDStream<Tuple2<String, String>> stream = JavaTestUtils.attachTestInputStream(
sc, inputData, 1);
ssc, inputData, 1);
JavaPairDStream<String, String> pairStream = JavaPairDStream.fromJavaDStream(stream);
@ -781,7 +805,7 @@ public class JavaAPISuite implements Serializable {
});
JavaTestUtils.attachTestOutputStream(flatMapped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -815,16 +839,16 @@ public class JavaAPISuite implements Serializable {
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
sc, stringStringKVStream1, 1);
ssc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
sc, stringStringKVStream2, 1);
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<List<String>, List<String>>> grouped = pairStream1.cogroup(pairStream2);
JavaTestUtils.attachTestOutputStream(grouped);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Tuple2<String, String>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -858,16 +882,16 @@ public class JavaAPISuite implements Serializable {
JavaDStream<Tuple2<String, String>> stream1 = JavaTestUtils.attachTestInputStream(
sc, stringStringKVStream1, 1);
ssc, stringStringKVStream1, 1);
JavaPairDStream<String, String> pairStream1 = JavaPairDStream.fromJavaDStream(stream1);
JavaDStream<Tuple2<String, String>> stream2 = JavaTestUtils.attachTestInputStream(
sc, stringStringKVStream2, 1);
ssc, stringStringKVStream2, 1);
JavaPairDStream<String, String> pairStream2 = JavaPairDStream.fromJavaDStream(stream2);
JavaPairDStream<String, Tuple2<String, String>> joined = pairStream1.join(pairStream2);
JavaTestUtils.attachTestOutputStream(joined);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(sc, 2, 2);
List<List<Tuple2<String, Long>>> result = JavaTestUtils.runStreams(ssc, 2, 2);
Assert.assertEquals(expected, result);
}
@ -887,9 +911,9 @@ public class JavaAPISuite implements Serializable {
File tempDir = Files.createTempDir();
sc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
ssc.checkpoint(tempDir.getAbsolutePath(), new Duration(1000));
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
@ -897,15 +921,15 @@ public class JavaAPISuite implements Serializable {
}
});
JavaCheckpointTestUtils.attachTestOutputStream(letterCount);
List<List<Integer>> initialResult = JavaTestUtils.runStreams(sc, 1, 1);
List<List<Integer>> initialResult = JavaTestUtils.runStreams(ssc, 1, 1);
assertOrderInvariantEquals(expectedInitial, initialResult);
Thread.sleep(1000);
sc.stop();
sc = new JavaStreamingContext(tempDir.getAbsolutePath());
sc.start();
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(sc, 2, 2);
ssc.stop();
ssc = new JavaStreamingContext(tempDir.getAbsolutePath());
ssc.start();
List<List<Integer>> finalResult = JavaCheckpointTestUtils.runStreams(ssc, 2, 2);
assertOrderInvariantEquals(expectedFinal, finalResult);
}
@ -922,7 +946,7 @@ public class JavaAPISuite implements Serializable {
Arrays.asList(1,4),
Arrays.asList(8,7));
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(sc, inputData, 1);
JavaDStream stream = JavaCheckpointTestUtils.attachTestInputStream(ssc, inputData, 1);
JavaDStream letterCount = stream.map(new Function<String, Integer>() {
@Override
public Integer call(String s) throws Exception {
@ -933,7 +957,7 @@ public class JavaAPISuite implements Serializable {
letterCount.checkpoint(new Duration(1000));
List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(sc, 3, 3);
List<List<Integer>> result1 = JavaCheckpointTestUtils.runStreams(ssc, 3, 3);
assertOrderInvariantEquals(expected, result1);
}
*/
@ -945,15 +969,15 @@ public class JavaAPISuite implements Serializable {
public void testKafkaStream() {
HashMap<String, Integer> topics = Maps.newHashMap();
HashMap<KafkaPartitionKey, Long> offsets = Maps.newHashMap();
JavaDStream test1 = sc.kafkaStream("localhost", 12345, "group", topics);
JavaDStream test2 = sc.kafkaStream("localhost", 12345, "group", topics, offsets);
JavaDStream test3 = sc.kafkaStream("localhost", 12345, "group", topics, offsets,
JavaDStream test1 = ssc.kafkaStream("localhost", 12345, "group", topics);
JavaDStream test2 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets);
JavaDStream test3 = ssc.kafkaStream("localhost", 12345, "group", topics, offsets,
StorageLevel.MEMORY_AND_DISK());
}
@Test
public void testNetworkTextStream() {
JavaDStream test = sc.networkTextStream("localhost", 12345);
JavaDStream test = ssc.networkTextStream("localhost", 12345);
}
@Test
@ -973,7 +997,7 @@ public class JavaAPISuite implements Serializable {
}
}
JavaDStream test = sc.networkStream(
JavaDStream test = ssc.networkStream(
"localhost",
12345,
new Converter(),
@ -982,22 +1006,22 @@ public class JavaAPISuite implements Serializable {
@Test
public void testTextFileStream() {
JavaDStream test = sc.textFileStream("/tmp/foo");
JavaDStream test = ssc.textFileStream("/tmp/foo");
}
@Test
public void testRawNetworkStream() {
JavaDStream test = sc.rawNetworkStream("localhost", 12345);
JavaDStream test = ssc.rawNetworkStream("localhost", 12345);
}
@Test
public void testFlumeStream() {
JavaDStream test = sc.flumeStream("localhost", 12345);
JavaDStream test = ssc.flumeStream("localhost", 12345);
}
@Test
public void testFileStream() {
JavaPairDStream<String, String> foo =
sc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
ssc.<String, String, SequenceFileInputFormat>fileStream("/tmp/foo");
}
}